from mgr_module import MgrModule, CommandResult, CLIRequiresDB, CLICommand, Option
import operator
import rados
+import re
from threading import Event
from datetime import datetime, timedelta
from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
self.get_module_option(opt['name']))
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+ def _legacy_put_device_metrics(self, t: str, devid: str, data: str) -> None:
+ SQL = """
+ INSERT OR IGNORE INTO DeviceHealthMetrics (time, devid, raw_smart)
+ VALUES (?, ?, ?);
+ """
+
+ self._create_device(devid)
+ epoch = self._t2epoch(t)
+ json.loads(data) # valid?
+ self.db.execute(SQL, (epoch, devid, data))
+
+ devre = r"[a-zA-Z0-9-]+[_-][a-zA-Z0-9-]+[_-][a-zA-Z0-9-]+"
+
+ def _load_legacy_object(self, ioctx: rados.Ioctx, oid: str) -> bool:
+ MAX_OMAP = 10000
+ self.log.debug(f"loading object {oid}")
+ if re.search(self.devre, oid) is None:
+ return False
+ with rados.ReadOpCtx() as op:
+ it, rc = ioctx.get_omap_vals(op, None, None, MAX_OMAP)
+ if rc == 0:
+ ioctx.operate_read_op(op, oid)
+ count = 0
+ for t, raw_smart in it:
+ self.log.debug(f"putting {oid} {t}")
+ self._legacy_put_device_metrics(t, oid, raw_smart)
+ count += 1
+ assert count < MAX_OMAP
+ self.log.debug(f"removing object {oid}")
+ ioctx.remove_object(oid)
+ return True
+
+ def check_legacy_pool(self) -> bool:
+ try:
+ # 'device_health_metrics' is automatically renamed '.mgr' in
+ # create_mgr_pool
+ ioctx = self.rados.open_ioctx(self.MGR_POOL_NAME)
+ except rados.ObjectNotFound:
+ return True
+ if not ioctx:
+ return True
+
+ done = False
+ with ioctx, self._db_lock, self.db:
+ count = 0
+ for obj in ioctx.list_objects():
+ try:
+ if self._load_legacy_object(ioctx, obj.key):
+ count += 1
+ except json.decoder.JSONDecodeError:
+ pass
+ if count >= 10:
+ break
+ done = count < 10
+ self.log.debug(f"finished reading legacy pool, complete = {done}")
+ return done
+
def serve(self) -> None:
self.log.info("Starting")
self.config_notify()
last_scrape = None
+ finished_loading_legacy = False
while self.run:
if self.db_ready() and self.enable_monitoring:
self.log.debug('Running')
+ if not finished_loading_legacy:
+ finished_loading_legacy = self.check_legacy_pool()
+
if last_scrape is None:
ls = self.get_kv('last_scrape')
if ls:
# sleep
sleep_interval = self.sleep_interval or 60
+ if not finished_loading_legacy:
+ sleep_interval = 2
self.log.debug('Sleeping for %d seconds', sleep_interval)
self.event.wait(sleep_interval)
self.event.clear()