From abd35d47696c208990355395d48c1c1e261de95c Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Fri, 9 Apr 2021 15:00:06 -0700 Subject: [PATCH] pybind/mgr/devicehealth: update to store metrics in sqlite This commit just modifies the module to use the sqlite3 database for storing health metrics. A followup commit loads the legacy pool's data. Fixes: https://tracker.ceph.com/issues/50278 Signed-off-by: Patrick Donnelly --- src/pybind/mgr/devicehealth/module.py | 266 ++++++++++++-------------- 1 file changed, 120 insertions(+), 146 deletions(-) diff --git a/src/pybind/mgr/devicehealth/module.py b/src/pybind/mgr/devicehealth/module.py index ed752fe15b0..f81099da548 100644 --- a/src/pybind/mgr/devicehealth/module.py +++ b/src/pybind/mgr/devicehealth/module.py @@ -4,7 +4,7 @@ Device health monitoring import errno import json -from mgr_module import MgrModule, CommandResult, CLICommand, Option +from mgr_module import MgrModule, CommandResult, CLIRequiresDB, CLICommand, Option import operator import rados from threading import Event @@ -22,8 +22,6 @@ HEALTH_MESSAGES = { DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon', } -MAX_SAMPLES = 500 - def get_ata_wear_level(data: Dict[Any, Any]) -> Optional[float]: """ @@ -49,6 +47,35 @@ def get_nvme_wear_level(data: Dict[Any, Any]) -> Optional[float]: class Module(MgrModule): + + # latest (if db does not exist) + SCHEMA = """ +CREATE TABLE Device ( + devid TEXT PRIMARY KEY +) WITHOUT ROWID; +CREATE TABLE DeviceHealthMetrics ( + time DATETIME DEFAULT (strftime('%s', 'now')), + devid TEXT NOT NULL REFERENCES Device (devid), + raw_smart TEXT NOT NULL, + PRIMARY KEY (time, devid) +); +""" + + SCHEMA_VERSIONED = [ + # v1 + """ +CREATE TABLE Device ( + devid TEXT PRIMARY KEY +) WITHOUT ROWID; +CREATE TABLE DeviceHealthMetrics ( + time DATETIME DEFAULT (strftime('%s', 'now')), + devid TEXT NOT NULL REFERENCES Device (devid), + raw_smart TEXT NOT NULL, + PRIMARY KEY (time, devid) +); +""" + ] + MODULE_OPTIONS = [ Option( name='enable_monitoring', @@ -118,7 +145,6 @@ class Module(MgrModule): # other self.run = True self.event = Event() - self.has_device_pool = False # for mypy which does not run the code if TYPE_CHECKING: @@ -154,6 +180,7 @@ class Module(MgrModule): }), '') return result.wait() + @CLIRequiresDB @CLICommand('device scrape-daemon-health-metrics', perm='r') def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]: @@ -165,6 +192,7 @@ class Module(MgrModule): (daemon_type, daemon_id) = who.split('.') return self.scrape_daemon(daemon_type, daemon_id) + @CLIRequiresDB @CLICommand('device scrape-health-metrics', perm='r') def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]: @@ -176,6 +204,7 @@ class Module(MgrModule): else: return self.scrape_device(devid) + @CLIRequiresDB @CLICommand('device get-health-metrics', perm='r') def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]: @@ -184,6 +213,7 @@ class Module(MgrModule): ''' return self.show_device_metrics(devid, sample) + @CLIRequiresDB @CLICommand('device check-health', perm='rw') def do_check_health(self) -> Tuple[int, str, str]: @@ -212,6 +242,7 @@ class Module(MgrModule): self.set_health_checks({}) # avoid stuck health alerts return 0, '', '' + @CLIRequiresDB @CLICommand('device predict-life-expectancy', perm='r') def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]: @@ -221,6 +252,7 @@ class Module(MgrModule): return self.predict_lift_expectancy(devid) def self_test(self) -> None: + assert self.db_ready() self.config_notify() osdmap = self.get('osd_map') osd_id = osdmap['osds'][0]['osd'] @@ -228,12 +260,15 @@ class Module(MgrModule): devs = osdmeta.get(str(osd_id), {}).get('device_ids') if devs: devid = devs.split()[0].split('=')[1] - (r, before, err) = self.show_device_metrics(devid, '') + self.log.debug(f"getting devid {devid}") + (r, before, err) = self.show_device_metrics(devid, None) assert r == 0 + self.log.debug(f"before: {before}") (r, out, err) = self.scrape_device(devid) assert r == 0 - (r, after, err) = self.show_device_metrics(devid, '') + (r, after, err) = self.show_device_metrics(devid, None) assert r == 0 + self.log.debug(f"after: {after}") assert before != after def config_notify(self) -> None: @@ -243,70 +278,24 @@ class Module(MgrModule): self.get_module_option(opt['name'])) self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) - def notify(self, notify_type: str, notify_id: str) -> None: - if notify_type == "osd_map" and self.enable_monitoring: - # create device_health_metrics pool if it doesn't exist - self.maybe_create_device_pool() - - def have_enough_osds(self) -> bool: - # wait until we have enough OSDs to allow the pool to be healthy - up = 0 - for osd in self.get("osd_map")["osds"]: - if osd["up"]: - up += 1 - - need = cast(int, self.get_ceph_option("osd_pool_default_size")) - return up >= need - - def maybe_create_device_pool(self) -> bool: - if not self.has_device_pool: - if not self.have_enough_osds(): - self.log.warning("Not enough OSDs yet to create monitoring pool") - return False - self.create_device_pool() - self.has_device_pool = True - return True - - def create_device_pool(self) -> None: - self.log.debug('create %s pool' % self.pool_name) - # create pool - result = CommandResult('') - self.send_command(result, 'mon', '', json.dumps({ - 'prefix': 'osd pool create', - 'format': 'json', - 'pool': self.pool_name, - 'pg_num': 1, - 'pg_num_min': 1, - }), '') - r, outb, outs = result.wait() - assert r == 0 - # set pool application - result = CommandResult('') - self.send_command(result, 'mon', '', json.dumps({ - 'prefix': 'osd pool application enable', - 'format': 'json', - 'pool': self.pool_name, - 'app': 'mgr_devicehealth', - }), '') - r, outb, outs = result.wait() - assert r == 0 - def serve(self) -> None: self.log.info("Starting") self.config_notify() last_scrape = None - ls = self.get_store('last_scrape') - if ls: - try: - last_scrape = datetime.strptime(ls, TIME_FORMAT) - except ValueError: - pass - self.log.debug('Last scrape %s', last_scrape) - while self.run: - if self.enable_monitoring: + if self.db_ready() and self.enable_monitoring: self.log.debug('Running') + + if last_scrape is None: + ls = self.get_kv('last_scrape') + if ls: + try: + last_scrape = datetime.strptime(ls, TIME_FORMAT) + except ValueError: + pass + self.log.debug('Last scrape %s', last_scrape) + self.check_health() now = datetime.utcnow() @@ -330,7 +319,7 @@ class Module(MgrModule): self.scrape_all() self.predict_all_devices() last_scrape = now - self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT)) + self.set_kv('last_scrape', last_scrape.strftime(TIME_FORMAT)) # sleep sleep_interval = self.sleep_interval or 60 @@ -343,32 +332,22 @@ class Module(MgrModule): self.run = False self.event.set() - def open_connection(self, create_if_missing: bool = True) -> rados.Ioctx: - if create_if_missing: - if not self.maybe_create_device_pool(): - return None - ioctx = self.rados.open_ioctx(self.pool_name) - return ioctx - def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]: - ioctx = self.open_connection() - if not ioctx: - return -errno.EAGAIN, "", "device_health_metrics pool not yet available" + if not self.db_ready(): + return -errno.EAGAIN, "", "mgr db not yet available" raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id) if raw_smart_data: for device, raw_data in raw_smart_data.items(): data = self.extract_smart_features(raw_data) if device and data: - self.put_device_metrics(ioctx, device, data) - ioctx.close() + self.put_device_metrics(device, data) return 0, "", "" def scrape_all(self) -> Tuple[int, str, str]: + if not self.db_ready(): + return -errno.EAGAIN, "", "mgr db not yet available" osdmap = self.get("osd_map") assert osdmap is not None - ioctx = self.open_connection() - if not ioctx: - return -errno.EAGAIN, "", "device_health_metrics pool not yet available" did_device = {} ids = [] for osd in osdmap['osds']: @@ -387,11 +366,12 @@ class Module(MgrModule): did_device[device] = 1 data = self.extract_smart_features(raw_data) if device and data: - self.put_device_metrics(ioctx, device, data) - ioctx.close() + self.put_device_metrics(device, data) return 0, "", "" def scrape_device(self, devid: str) -> Tuple[int, str, str]: + if not self.db_ready(): + return -errno.EAGAIN, "", "mgr db not yet available" r = self.get("device " + devid) if not r or 'device' not in r.keys(): return -errno.ENOENT, '', 'device ' + devid + ' not found' @@ -400,17 +380,13 @@ class Module(MgrModule): return (-errno.EAGAIN, '', 'device ' + devid + ' not claimed by any active daemons') (daemon_type, daemon_id) = daemons[0].split('.') - ioctx = self.open_connection() - if not ioctx: - return -errno.EAGAIN, "", "device_health_metrics pool not yet available" raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id, devid=devid) if raw_smart_data: for device, raw_data in raw_smart_data.items(): data = self.extract_smart_features(raw_data) if device and data: - self.put_device_metrics(ioctx, device, data) - ioctx.close() + self.put_device_metrics(device, data) return 0, "", "" def do_scrape_daemon(self, @@ -437,41 +413,37 @@ class Module(MgrModule): daemon_type, daemon_id, outb)) return None - def put_device_metrics(self, ioctx: rados.Ioctx, devid: str, data: Any) -> None: - assert devid - old_key = datetime.utcnow() - timedelta( - seconds=self.retention_period) - prune = old_key.strftime(TIME_FORMAT) - self.log.debug('put_device_metrics device %s prune %s' % - (devid, prune)) - erase = [] - try: - with rados.ReadOpCtx() as op: - # FIXME - omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES) - assert ret == 0 - ioctx.operate_read_op(op, devid) - for key, _ in list(omap_iter): - if key >= prune: - break - erase.append(key) - except rados.ObjectNotFound: - # The object doesn't already exist, no problem. - pass - except rados.Error as e: - # Do not proceed with writes if something unexpected - # went wrong with the reads. - self.log.exception("Error reading OMAP: {0}".format(e)) - return - - key = datetime.utcnow().strftime(TIME_FORMAT) - self.log.debug('put_device_metrics device %s key %s = %s, erase %s' % - (devid, key, data, erase)) - with rados.WriteOpCtx() as op: - ioctx.set_omap(op, (key,), (str(json.dumps(data)),)) - if len(erase): - ioctx.remove_omap_keys(op, tuple(erase)) - ioctx.operate_write_op(op, devid) + def _prune_device_metrics(self) -> None: + SQL = """ + DELETE FROM DeviceHealthMetrics + WHERE time < (strftime('%s', 'now') - ?); + """ + + cursor = self.db.execute(SQL, (self.retention_period,)) + if cursor.rowcount >= 1: + self.log.info(f"pruned {cursor.rowcount} metrics") + + def _create_device(self, devid: str) -> None: + SQL = """ + INSERT OR IGNORE INTO Device VALUES (?); + """ + + cursor = self.db.execute(SQL, (devid,)) + if cursor.rowcount >= 1: + self.log.info(f"created device {devid}") + else: + self.log.debug(f"device {devid} already exists") + + def put_device_metrics(self, devid: str, data: Any) -> None: + SQL = """ + INSERT INTO DeviceHealthMetrics (devid, raw_smart) + VALUES (?, ?); + """ + + with self._db_lock, self.db: + self._create_device(devid) + self.db.execute(SQL, (devid, json.dumps(data))) + self._prune_device_metrics() # extract wear level? wear_level = get_ata_wear_level(data) @@ -489,37 +461,39 @@ class Module(MgrModule): self.log.debug(f"removing {devid} wear level") self.set_device_wear_level(devid, -1.0) + def _t2epoch(self, t: Optional[str]) -> int: + if t is None: + return 0 + else: + return int(datetime.strptime(t, TIME_FORMAT).strftime("%s")) + def _get_device_metrics(self, devid: str, sample: Optional[str] = None, min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]: res = {} - ioctx = self.open_connection(create_if_missing=False) - if not ioctx: - return {} - with ioctx: - with rados.ReadOpCtx() as op: - omap_iter, ret = ioctx.get_omap_vals(op, min_sample or '', sample or '', - MAX_SAMPLES) # fixme - assert ret == 0 + + SQL = """ + SELECT time, raw_smart + FROM DeviceHealthMetrics + WHERE devid = ? AND (time = ? OR ? <= time) + ORDER BY time DESC; + """ + + isample = self._t2epoch(sample) + imin_sample = self._t2epoch(min_sample) + + self.log.debug(f"_get_device_metrics: {devid} {sample} {min_sample}") + + with self._db_lock, self.db: + cursor = self.db.execute(SQL, (devid, isample, imin_sample)) + for row in cursor: + t = row['time'] + dt = datetime.utcfromtimestamp(t).strftime(TIME_FORMAT) try: - ioctx.operate_read_op(op, devid) - for key, value in list(omap_iter): - if sample and key != sample: - break - if min_sample and key < min_sample: - break - try: - v = json.loads(value) - except (ValueError, IndexError): - self.log.debug('unable to parse value for %s: "%s"' % - (key, value)) - pass - res[key] = v - except rados.ObjectNotFound: + res[dt] = json.loads(row['raw_smart']) + except (ValueError, IndexError): + self.log.debug(f"unable to parse value for {devid}:{t}") pass - except rados.Error as e: - self.log.exception("RADOS error reading omap: {0}".format(e)) - raise return res def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]: -- 2.39.5