]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/devicehealth: update to store metrics in sqlite
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 9 Apr 2021 22:00:06 +0000 (15:00 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Sat, 12 Jun 2021 02:35:16 +0000 (19:35 -0700)
This commit just modifies the module to use the sqlite3 database for
storing health metrics. A followup commit loads the legacy pool's data.

Fixes: https://tracker.ceph.com/issues/50278
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/pybind/mgr/devicehealth/module.py

index ed752fe15b0c93d7255acf81028e3b334fa75f48..f81099da548d88da299f55f7478ace243f51750a 100644 (file)
@@ -4,7 +4,7 @@ Device health monitoring
 
 import errno
 import json
-from mgr_module import MgrModule, CommandResult, CLICommand, Option
+from mgr_module import MgrModule, CommandResult, CLIRequiresDB, CLICommand, Option
 import operator
 import rados
 from threading import Event
@@ -22,8 +22,6 @@ HEALTH_MESSAGES = {
     DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
 }
 
-MAX_SAMPLES = 500
-
 
 def get_ata_wear_level(data: Dict[Any, Any]) -> Optional[float]:
     """
@@ -49,6 +47,35 @@ def get_nvme_wear_level(data: Dict[Any, Any]) -> Optional[float]:
 
 
 class Module(MgrModule):
+
+    # latest (if db does not exist)
+    SCHEMA = """
+CREATE TABLE Device (
+  devid TEXT PRIMARY KEY
+) WITHOUT ROWID;
+CREATE TABLE DeviceHealthMetrics (
+  time DATETIME DEFAULT (strftime('%s', 'now')),
+  devid TEXT NOT NULL REFERENCES Device (devid),
+  raw_smart TEXT NOT NULL,
+  PRIMARY KEY (time, devid)
+);
+"""
+
+    SCHEMA_VERSIONED = [
+        # v1
+        """
+CREATE TABLE Device (
+  devid TEXT PRIMARY KEY
+) WITHOUT ROWID;
+CREATE TABLE DeviceHealthMetrics (
+  time DATETIME DEFAULT (strftime('%s', 'now')),
+  devid TEXT NOT NULL REFERENCES Device (devid),
+  raw_smart TEXT NOT NULL,
+  PRIMARY KEY (time, devid)
+);
+"""
+    ]
+
     MODULE_OPTIONS = [
         Option(
             name='enable_monitoring',
@@ -118,7 +145,6 @@ class Module(MgrModule):
         # other
         self.run = True
         self.event = Event()
-        self.has_device_pool = False
 
         # for mypy which does not run the code
         if TYPE_CHECKING:
@@ -154,6 +180,7 @@ class Module(MgrModule):
         }), '')
         return result.wait()
 
+    @CLIRequiresDB
     @CLICommand('device scrape-daemon-health-metrics',
                 perm='r')
     def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
@@ -165,6 +192,7 @@ class Module(MgrModule):
         (daemon_type, daemon_id) = who.split('.')
         return self.scrape_daemon(daemon_type, daemon_id)
 
+    @CLIRequiresDB
     @CLICommand('device scrape-health-metrics',
                 perm='r')
     def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
@@ -176,6 +204,7 @@ class Module(MgrModule):
         else:
             return self.scrape_device(devid)
 
+    @CLIRequiresDB
     @CLICommand('device get-health-metrics',
                 perm='r')
     def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
@@ -184,6 +213,7 @@ class Module(MgrModule):
         '''
         return self.show_device_metrics(devid, sample)
 
+    @CLIRequiresDB
     @CLICommand('device check-health',
                 perm='rw')
     def do_check_health(self) -> Tuple[int, str, str]:
@@ -212,6 +242,7 @@ class Module(MgrModule):
         self.set_health_checks({})  # avoid stuck health alerts
         return 0, '', ''
 
+    @CLIRequiresDB
     @CLICommand('device predict-life-expectancy',
                 perm='r')
     def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
@@ -221,6 +252,7 @@ class Module(MgrModule):
         return self.predict_lift_expectancy(devid)
 
     def self_test(self) -> None:
+        assert self.db_ready()
         self.config_notify()
         osdmap = self.get('osd_map')
         osd_id = osdmap['osds'][0]['osd']
@@ -228,12 +260,15 @@ class Module(MgrModule):
         devs = osdmeta.get(str(osd_id), {}).get('device_ids')
         if devs:
             devid = devs.split()[0].split('=')[1]
-            (r, before, err) = self.show_device_metrics(devid, '')
+            self.log.debug(f"getting devid {devid}")
+            (r, before, err) = self.show_device_metrics(devid, None)
             assert r == 0
+            self.log.debug(f"before: {before}")
             (r, out, err) = self.scrape_device(devid)
             assert r == 0
-            (r, after, err) = self.show_device_metrics(devid, '')
+            (r, after, err) = self.show_device_metrics(devid, None)
             assert r == 0
+            self.log.debug(f"after: {after}")
             assert before != after
 
     def config_notify(self) -> None:
@@ -243,70 +278,24 @@ class Module(MgrModule):
                     self.get_module_option(opt['name']))
             self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
 
-    def notify(self, notify_type: str, notify_id: str) -> None:
-        if notify_type == "osd_map" and self.enable_monitoring:
-            # create device_health_metrics pool if it doesn't exist
-            self.maybe_create_device_pool()
-
-    def have_enough_osds(self) -> bool:
-        # wait until we have enough OSDs to allow the pool to be healthy
-        up = 0
-        for osd in self.get("osd_map")["osds"]:
-            if osd["up"]:
-                up += 1
-
-        need = cast(int, self.get_ceph_option("osd_pool_default_size"))
-        return up >= need
-
-    def maybe_create_device_pool(self) -> bool:
-        if not self.has_device_pool:
-            if not self.have_enough_osds():
-                self.log.warning("Not enough OSDs yet to create monitoring pool")
-                return False
-            self.create_device_pool()
-            self.has_device_pool = True
-        return True
-
-    def create_device_pool(self) -> None:
-        self.log.debug('create %s pool' % self.pool_name)
-        # create pool
-        result = CommandResult('')
-        self.send_command(result, 'mon', '', json.dumps({
-            'prefix': 'osd pool create',
-            'format': 'json',
-            'pool': self.pool_name,
-            'pg_num': 1,
-            'pg_num_min': 1,
-        }), '')
-        r, outb, outs = result.wait()
-        assert r == 0
-        # set pool application
-        result = CommandResult('')
-        self.send_command(result, 'mon', '', json.dumps({
-            'prefix': 'osd pool application enable',
-            'format': 'json',
-            'pool': self.pool_name,
-            'app': 'mgr_devicehealth',
-        }), '')
-        r, outb, outs = result.wait()
-        assert r == 0
-
     def serve(self) -> None:
         self.log.info("Starting")
         self.config_notify()
 
         last_scrape = None
-        ls = self.get_store('last_scrape')
-        if ls:
-            try:
-                last_scrape = datetime.strptime(ls, TIME_FORMAT)
-            except ValueError:
-                pass
-        self.log.debug('Last scrape %s', last_scrape)
-
         while self.run:
-            if self.enable_monitoring:
+            if self.db_ready() and self.enable_monitoring:
                 self.log.debug('Running')
+
+                if last_scrape is None:
+                    ls = self.get_kv('last_scrape')
+                    if ls:
+                        try:
+                            last_scrape = datetime.strptime(ls, TIME_FORMAT)
+                        except ValueError:
+                            pass
+                    self.log.debug('Last scrape %s', last_scrape)
+
                 self.check_health()
 
                 now = datetime.utcnow()
@@ -330,7 +319,7 @@ class Module(MgrModule):
                     self.scrape_all()
                     self.predict_all_devices()
                     last_scrape = now
-                    self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
+                    self.set_kv('last_scrape', last_scrape.strftime(TIME_FORMAT))
 
             # sleep
             sleep_interval = self.sleep_interval or 60
@@ -343,32 +332,22 @@ class Module(MgrModule):
         self.run = False
         self.event.set()
 
-    def open_connection(self, create_if_missing: bool = True) -> rados.Ioctx:
-        if create_if_missing:
-            if not self.maybe_create_device_pool():
-                return None
-        ioctx = self.rados.open_ioctx(self.pool_name)
-        return ioctx
-
     def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]:
-        ioctx = self.open_connection()
-        if not ioctx:
-            return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
+        if not self.db_ready():
+            return -errno.EAGAIN, "", "mgr db not yet available"
         raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
         if raw_smart_data:
             for device, raw_data in raw_smart_data.items():
                 data = self.extract_smart_features(raw_data)
                 if device and data:
-                    self.put_device_metrics(ioctx, device, data)
-        ioctx.close()
+                    self.put_device_metrics(device, data)
         return 0, "", ""
 
     def scrape_all(self) -> Tuple[int, str, str]:
+        if not self.db_ready():
+            return -errno.EAGAIN, "", "mgr db not yet available"
         osdmap = self.get("osd_map")
         assert osdmap is not None
-        ioctx = self.open_connection()
-        if not ioctx:
-            return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
         did_device = {}
         ids = []
         for osd in osdmap['osds']:
@@ -387,11 +366,12 @@ class Module(MgrModule):
                 did_device[device] = 1
                 data = self.extract_smart_features(raw_data)
                 if device and data:
-                    self.put_device_metrics(ioctx, device, data)
-        ioctx.close()
+                    self.put_device_metrics(device, data)
         return 0, "", ""
 
     def scrape_device(self, devid: str) -> Tuple[int, str, str]:
+        if not self.db_ready():
+            return -errno.EAGAIN, "", "mgr db not yet available"
         r = self.get("device " + devid)
         if not r or 'device' not in r.keys():
             return -errno.ENOENT, '', 'device ' + devid + ' not found'
@@ -400,17 +380,13 @@ class Module(MgrModule):
             return (-errno.EAGAIN, '',
                     'device ' + devid + ' not claimed by any active daemons')
         (daemon_type, daemon_id) = daemons[0].split('.')
-        ioctx = self.open_connection()
-        if not ioctx:
-            return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
         raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
                                                devid=devid)
         if raw_smart_data:
             for device, raw_data in raw_smart_data.items():
                 data = self.extract_smart_features(raw_data)
                 if device and data:
-                    self.put_device_metrics(ioctx, device, data)
-        ioctx.close()
+                    self.put_device_metrics(device, data)
         return 0, "", ""
 
     def do_scrape_daemon(self,
@@ -437,41 +413,37 @@ class Module(MgrModule):
                     daemon_type, daemon_id, outb))
             return None
 
-    def put_device_metrics(self, ioctx: rados.Ioctx, devid: str, data: Any) -> None:
-        assert devid
-        old_key = datetime.utcnow() - timedelta(
-            seconds=self.retention_period)
-        prune = old_key.strftime(TIME_FORMAT)
-        self.log.debug('put_device_metrics device %s prune %s' %
-                       (devid, prune))
-        erase = []
-        try:
-            with rados.ReadOpCtx() as op:
-                # FIXME
-                omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES)
-                assert ret == 0
-                ioctx.operate_read_op(op, devid)
-                for key, _ in list(omap_iter):
-                    if key >= prune:
-                        break
-                    erase.append(key)
-        except rados.ObjectNotFound:
-            # The object doesn't already exist, no problem.
-            pass
-        except rados.Error as e:
-            # Do not proceed with writes if something unexpected
-            # went wrong with the reads.
-            self.log.exception("Error reading OMAP: {0}".format(e))
-            return
-
-        key = datetime.utcnow().strftime(TIME_FORMAT)
-        self.log.debug('put_device_metrics device %s key %s = %s, erase %s' %
-                       (devid, key, data, erase))
-        with rados.WriteOpCtx() as op:
-            ioctx.set_omap(op, (key,), (str(json.dumps(data)),))
-            if len(erase):
-                ioctx.remove_omap_keys(op, tuple(erase))
-            ioctx.operate_write_op(op, devid)
+    def _prune_device_metrics(self) -> None:
+        SQL = """
+        DELETE FROM DeviceHealthMetrics
+            WHERE time < (strftime('%s', 'now') - ?);
+        """
+
+        cursor = self.db.execute(SQL, (self.retention_period,))
+        if cursor.rowcount >= 1:
+            self.log.info(f"pruned {cursor.rowcount} metrics")
+
+    def _create_device(self, devid: str) -> None:
+        SQL = """
+        INSERT OR IGNORE INTO Device VALUES (?);
+        """
+
+        cursor = self.db.execute(SQL, (devid,))
+        if cursor.rowcount >= 1:
+            self.log.info(f"created device {devid}")
+        else:
+            self.log.debug(f"device {devid} already exists")
+
+    def put_device_metrics(self, devid: str, data: Any) -> None:
+        SQL = """
+        INSERT INTO DeviceHealthMetrics (devid, raw_smart)
+            VALUES (?, ?);
+        """
+
+        with self._db_lock, self.db:
+            self._create_device(devid)
+            self.db.execute(SQL, (devid, json.dumps(data)))
+            self._prune_device_metrics()
 
         # extract wear level?
         wear_level = get_ata_wear_level(data)
@@ -489,37 +461,39 @@ class Module(MgrModule):
                 self.log.debug(f"removing {devid} wear level")
                 self.set_device_wear_level(devid, -1.0)
 
+    def _t2epoch(self, t: Optional[str]) -> int:
+        if t is None:
+            return 0
+        else:
+            return int(datetime.strptime(t, TIME_FORMAT).strftime("%s"))
+
     def _get_device_metrics(self, devid: str,
                             sample: Optional[str] = None,
                             min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
         res = {}
-        ioctx = self.open_connection(create_if_missing=False)
-        if not ioctx:
-            return {}
-        with ioctx:
-            with rados.ReadOpCtx() as op:
-                omap_iter, ret = ioctx.get_omap_vals(op, min_sample or '', sample or '',
-                                                     MAX_SAMPLES)  # fixme
-                assert ret == 0
+
+        SQL = """
+        SELECT time, raw_smart
+            FROM DeviceHealthMetrics
+            WHERE devid = ? AND (time = ? OR ? <= time)
+            ORDER BY time DESC;
+        """
+
+        isample = self._t2epoch(sample)
+        imin_sample = self._t2epoch(min_sample)
+
+        self.log.debug(f"_get_device_metrics: {devid} {sample} {min_sample}")
+
+        with self._db_lock, self.db:
+            cursor = self.db.execute(SQL, (devid, isample, imin_sample))
+            for row in cursor:
+                t = row['time']
+                dt = datetime.utcfromtimestamp(t).strftime(TIME_FORMAT)
                 try:
-                    ioctx.operate_read_op(op, devid)
-                    for key, value in list(omap_iter):
-                        if sample and key != sample:
-                            break
-                        if min_sample and key < min_sample:
-                            break
-                        try:
-                            v = json.loads(value)
-                        except (ValueError, IndexError):
-                            self.log.debug('unable to parse value for %s: "%s"' %
-                                           (key, value))
-                            pass
-                        res[key] = v
-                except rados.ObjectNotFound:
+                    res[dt] = json.loads(row['raw_smart'])
+                except (ValueError, IndexError):
+                    self.log.debug(f"unable to parse value for {devid}:{t}")
                     pass
-                except rados.Error as e:
-                    self.log.exception("RADOS error reading omap: {0}".format(e))
-                    raise
         return res
 
     def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]: