import errno
import json
-from mgr_module import MgrModule, CommandResult, CLICommand, Option
+from mgr_module import MgrModule, CommandResult, CLIRequiresDB, CLICommand, Option
import operator
import rados
from threading import Event
DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
}
-MAX_SAMPLES = 500
-
def get_ata_wear_level(data: Dict[Any, Any]) -> Optional[float]:
"""
class Module(MgrModule):
+
+ # latest (if db does not exist)
+ SCHEMA = """
+CREATE TABLE Device (
+ devid TEXT PRIMARY KEY
+) WITHOUT ROWID;
+CREATE TABLE DeviceHealthMetrics (
+ time DATETIME DEFAULT (strftime('%s', 'now')),
+ devid TEXT NOT NULL REFERENCES Device (devid),
+ raw_smart TEXT NOT NULL,
+ PRIMARY KEY (time, devid)
+);
+"""
+
+ SCHEMA_VERSIONED = [
+ # v1
+ """
+CREATE TABLE Device (
+ devid TEXT PRIMARY KEY
+) WITHOUT ROWID;
+CREATE TABLE DeviceHealthMetrics (
+ time DATETIME DEFAULT (strftime('%s', 'now')),
+ devid TEXT NOT NULL REFERENCES Device (devid),
+ raw_smart TEXT NOT NULL,
+ PRIMARY KEY (time, devid)
+);
+"""
+ ]
+
MODULE_OPTIONS = [
Option(
name='enable_monitoring',
# other
self.run = True
self.event = Event()
- self.has_device_pool = False
# for mypy which does not run the code
if TYPE_CHECKING:
}), '')
return result.wait()
+ @CLIRequiresDB
@CLICommand('device scrape-daemon-health-metrics',
perm='r')
def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
(daemon_type, daemon_id) = who.split('.')
return self.scrape_daemon(daemon_type, daemon_id)
+ @CLIRequiresDB
@CLICommand('device scrape-health-metrics',
perm='r')
def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
else:
return self.scrape_device(devid)
+ @CLIRequiresDB
@CLICommand('device get-health-metrics',
perm='r')
def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
'''
return self.show_device_metrics(devid, sample)
+ @CLIRequiresDB
@CLICommand('device check-health',
perm='rw')
def do_check_health(self) -> Tuple[int, str, str]:
self.set_health_checks({}) # avoid stuck health alerts
return 0, '', ''
+ @CLIRequiresDB
@CLICommand('device predict-life-expectancy',
perm='r')
def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
return self.predict_lift_expectancy(devid)
def self_test(self) -> None:
+ assert self.db_ready()
self.config_notify()
osdmap = self.get('osd_map')
osd_id = osdmap['osds'][0]['osd']
devs = osdmeta.get(str(osd_id), {}).get('device_ids')
if devs:
devid = devs.split()[0].split('=')[1]
- (r, before, err) = self.show_device_metrics(devid, '')
+ self.log.debug(f"getting devid {devid}")
+ (r, before, err) = self.show_device_metrics(devid, None)
assert r == 0
+ self.log.debug(f"before: {before}")
(r, out, err) = self.scrape_device(devid)
assert r == 0
- (r, after, err) = self.show_device_metrics(devid, '')
+ (r, after, err) = self.show_device_metrics(devid, None)
assert r == 0
+ self.log.debug(f"after: {after}")
assert before != after
def config_notify(self) -> None:
self.get_module_option(opt['name']))
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
- def notify(self, notify_type: str, notify_id: str) -> None:
- if notify_type == "osd_map" and self.enable_monitoring:
- # create device_health_metrics pool if it doesn't exist
- self.maybe_create_device_pool()
-
- def have_enough_osds(self) -> bool:
- # wait until we have enough OSDs to allow the pool to be healthy
- up = 0
- for osd in self.get("osd_map")["osds"]:
- if osd["up"]:
- up += 1
-
- need = cast(int, self.get_ceph_option("osd_pool_default_size"))
- return up >= need
-
- def maybe_create_device_pool(self) -> bool:
- if not self.has_device_pool:
- if not self.have_enough_osds():
- self.log.warning("Not enough OSDs yet to create monitoring pool")
- return False
- self.create_device_pool()
- self.has_device_pool = True
- return True
-
- def create_device_pool(self) -> None:
- self.log.debug('create %s pool' % self.pool_name)
- # create pool
- result = CommandResult('')
- self.send_command(result, 'mon', '', json.dumps({
- 'prefix': 'osd pool create',
- 'format': 'json',
- 'pool': self.pool_name,
- 'pg_num': 1,
- 'pg_num_min': 1,
- }), '')
- r, outb, outs = result.wait()
- assert r == 0
- # set pool application
- result = CommandResult('')
- self.send_command(result, 'mon', '', json.dumps({
- 'prefix': 'osd pool application enable',
- 'format': 'json',
- 'pool': self.pool_name,
- 'app': 'mgr_devicehealth',
- }), '')
- r, outb, outs = result.wait()
- assert r == 0
-
def serve(self) -> None:
self.log.info("Starting")
self.config_notify()
last_scrape = None
- ls = self.get_store('last_scrape')
- if ls:
- try:
- last_scrape = datetime.strptime(ls, TIME_FORMAT)
- except ValueError:
- pass
- self.log.debug('Last scrape %s', last_scrape)
-
while self.run:
- if self.enable_monitoring:
+ if self.db_ready() and self.enable_monitoring:
self.log.debug('Running')
+
+ if last_scrape is None:
+ ls = self.get_kv('last_scrape')
+ if ls:
+ try:
+ last_scrape = datetime.strptime(ls, TIME_FORMAT)
+ except ValueError:
+ pass
+ self.log.debug('Last scrape %s', last_scrape)
+
self.check_health()
now = datetime.utcnow()
self.scrape_all()
self.predict_all_devices()
last_scrape = now
- self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
+ self.set_kv('last_scrape', last_scrape.strftime(TIME_FORMAT))
# sleep
sleep_interval = self.sleep_interval or 60
self.run = False
self.event.set()
- def open_connection(self, create_if_missing: bool = True) -> rados.Ioctx:
- if create_if_missing:
- if not self.maybe_create_device_pool():
- return None
- ioctx = self.rados.open_ioctx(self.pool_name)
- return ioctx
-
def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]:
- ioctx = self.open_connection()
- if not ioctx:
- return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
if raw_smart_data:
for device, raw_data in raw_smart_data.items():
data = self.extract_smart_features(raw_data)
if device and data:
- self.put_device_metrics(ioctx, device, data)
- ioctx.close()
+ self.put_device_metrics(device, data)
return 0, "", ""
def scrape_all(self) -> Tuple[int, str, str]:
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
osdmap = self.get("osd_map")
assert osdmap is not None
- ioctx = self.open_connection()
- if not ioctx:
- return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
did_device = {}
ids = []
for osd in osdmap['osds']:
did_device[device] = 1
data = self.extract_smart_features(raw_data)
if device and data:
- self.put_device_metrics(ioctx, device, data)
- ioctx.close()
+ self.put_device_metrics(device, data)
return 0, "", ""
def scrape_device(self, devid: str) -> Tuple[int, str, str]:
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
r = self.get("device " + devid)
if not r or 'device' not in r.keys():
return -errno.ENOENT, '', 'device ' + devid + ' not found'
return (-errno.EAGAIN, '',
'device ' + devid + ' not claimed by any active daemons')
(daemon_type, daemon_id) = daemons[0].split('.')
- ioctx = self.open_connection()
- if not ioctx:
- return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
devid=devid)
if raw_smart_data:
for device, raw_data in raw_smart_data.items():
data = self.extract_smart_features(raw_data)
if device and data:
- self.put_device_metrics(ioctx, device, data)
- ioctx.close()
+ self.put_device_metrics(device, data)
return 0, "", ""
def do_scrape_daemon(self,
daemon_type, daemon_id, outb))
return None
- def put_device_metrics(self, ioctx: rados.Ioctx, devid: str, data: Any) -> None:
- assert devid
- old_key = datetime.utcnow() - timedelta(
- seconds=self.retention_period)
- prune = old_key.strftime(TIME_FORMAT)
- self.log.debug('put_device_metrics device %s prune %s' %
- (devid, prune))
- erase = []
- try:
- with rados.ReadOpCtx() as op:
- # FIXME
- omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES)
- assert ret == 0
- ioctx.operate_read_op(op, devid)
- for key, _ in list(omap_iter):
- if key >= prune:
- break
- erase.append(key)
- except rados.ObjectNotFound:
- # The object doesn't already exist, no problem.
- pass
- except rados.Error as e:
- # Do not proceed with writes if something unexpected
- # went wrong with the reads.
- self.log.exception("Error reading OMAP: {0}".format(e))
- return
-
- key = datetime.utcnow().strftime(TIME_FORMAT)
- self.log.debug('put_device_metrics device %s key %s = %s, erase %s' %
- (devid, key, data, erase))
- with rados.WriteOpCtx() as op:
- ioctx.set_omap(op, (key,), (str(json.dumps(data)),))
- if len(erase):
- ioctx.remove_omap_keys(op, tuple(erase))
- ioctx.operate_write_op(op, devid)
+ def _prune_device_metrics(self) -> None:
+ SQL = """
+ DELETE FROM DeviceHealthMetrics
+ WHERE time < (strftime('%s', 'now') - ?);
+ """
+
+ cursor = self.db.execute(SQL, (self.retention_period,))
+ if cursor.rowcount >= 1:
+ self.log.info(f"pruned {cursor.rowcount} metrics")
+
+ def _create_device(self, devid: str) -> None:
+ SQL = """
+ INSERT OR IGNORE INTO Device VALUES (?);
+ """
+
+ cursor = self.db.execute(SQL, (devid,))
+ if cursor.rowcount >= 1:
+ self.log.info(f"created device {devid}")
+ else:
+ self.log.debug(f"device {devid} already exists")
+
+ def put_device_metrics(self, devid: str, data: Any) -> None:
+ SQL = """
+ INSERT INTO DeviceHealthMetrics (devid, raw_smart)
+ VALUES (?, ?);
+ """
+
+ with self._db_lock, self.db:
+ self._create_device(devid)
+ self.db.execute(SQL, (devid, json.dumps(data)))
+ self._prune_device_metrics()
# extract wear level?
wear_level = get_ata_wear_level(data)
self.log.debug(f"removing {devid} wear level")
self.set_device_wear_level(devid, -1.0)
+ def _t2epoch(self, t: Optional[str]) -> int:
+ if t is None:
+ return 0
+ else:
+ return int(datetime.strptime(t, TIME_FORMAT).strftime("%s"))
+
def _get_device_metrics(self, devid: str,
sample: Optional[str] = None,
min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
res = {}
- ioctx = self.open_connection(create_if_missing=False)
- if not ioctx:
- return {}
- with ioctx:
- with rados.ReadOpCtx() as op:
- omap_iter, ret = ioctx.get_omap_vals(op, min_sample or '', sample or '',
- MAX_SAMPLES) # fixme
- assert ret == 0
+
+ SQL = """
+ SELECT time, raw_smart
+ FROM DeviceHealthMetrics
+ WHERE devid = ? AND (time = ? OR ? <= time)
+ ORDER BY time DESC;
+ """
+
+ isample = self._t2epoch(sample)
+ imin_sample = self._t2epoch(min_sample)
+
+ self.log.debug(f"_get_device_metrics: {devid} {sample} {min_sample}")
+
+ with self._db_lock, self.db:
+ cursor = self.db.execute(SQL, (devid, isample, imin_sample))
+ for row in cursor:
+ t = row['time']
+ dt = datetime.utcfromtimestamp(t).strftime(TIME_FORMAT)
try:
- ioctx.operate_read_op(op, devid)
- for key, value in list(omap_iter):
- if sample and key != sample:
- break
- if min_sample and key < min_sample:
- break
- try:
- v = json.loads(value)
- except (ValueError, IndexError):
- self.log.debug('unable to parse value for %s: "%s"' %
- (key, value))
- pass
- res[key] = v
- except rados.ObjectNotFound:
+ res[dt] = json.loads(row['raw_smart'])
+ except (ValueError, IndexError):
+ self.log.debug(f"unable to parse value for {devid}:{t}")
pass
- except rados.Error as e:
- self.log.exception("RADOS error reading omap: {0}".format(e))
- raise
return res
def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]: