import errno
import json
-from mgr_module import MgrModule, CommandResult, CLIRequiresDB, CLICommand, CLIReadCommand, Option, MgrDBNotReady
+from mgr_module import MgrModule, CommandResult, MgrModuleRecoverDB, CLIRequiresDB, CLICommand, CLIReadCommand, Option, MgrDBNotReady
import operator
import rados
import re
@CLIRequiresDB
@CLIReadCommand('device scrape-daemon-health-metrics')
+ @MgrModuleRecoverDB
def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
'''
Scrape and store device health metrics for a given daemon
@CLIRequiresDB
@CLIReadCommand('device scrape-health-metrics')
+ @MgrModuleRecoverDB
def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
'''
Scrape and store device health metrics
@CLIRequiresDB
@CLIReadCommand('device get-health-metrics')
+ @MgrModuleRecoverDB
def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
'''
Show stored device metrics for the device
@CLIRequiresDB
@CLICommand('device check-health')
+ @MgrModuleRecoverDB
def do_check_health(self) -> Tuple[int, str, str]:
'''
Check life expectancy of devices
@CLIRequiresDB
@CLIReadCommand('device predict-life-expectancy')
+ @MgrModuleRecoverDB
def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
'''
Predict life expectancy with local predictor
self.log.debug(f"finished reading legacy pool, complete = {done}")
return done
- def serve(self) -> None:
- self.log.info("Starting")
- self.config_notify()
-
+ @MgrModuleRecoverDB
+ def _do_serve(self) -> None:
last_scrape = None
finished_loading_legacy = False
+
while self.run:
+ # sleep first, in case of exceptions causing retry:
+ sleep_interval = self.sleep_interval or 60
+ if not finished_loading_legacy:
+ sleep_interval = 2
+ self.log.debug('Sleeping for %d seconds', sleep_interval)
+ self.event.wait(sleep_interval)
+ self.event.clear()
+
if self.db_ready() and self.enable_monitoring:
self.log.debug('Running')
last_scrape = now
self.set_kv('last_scrape', last_scrape.strftime(TIME_FORMAT))
- # sleep
- sleep_interval = self.sleep_interval or 60
- if not finished_loading_legacy:
- sleep_interval = 2
- self.log.debug('Sleeping for %d seconds', sleep_interval)
- self.event.wait(sleep_interval)
- self.event.clear()
+ def serve(self) -> None:
+ self.log.info("Starting")
+ self.config_notify()
+
+ self._do_serve()
def shutdown(self) -> None:
self.log.info('Stopping')
return check
return CheckFileInput
+# If the mgr loses its lock on the database because e.g. the pgs were
+# transiently down, then close it and allow it to be reopened.
+MAX_DBCLEANUP_RETRIES = 3
+def MgrModuleRecoverDB(func: Callable) -> Callable:
+ @functools.wraps(func)
+ def check(self: MgrModule, *args: Any, **kwargs: Any) -> Any:
+ retries = 0
+ while True:
+ try:
+ return func(self, *args, **kwargs)
+ except sqlite3.DatabaseError as e:
+ self.log.error(f"Caught fatal database error: {e}")
+ retries = retries+1
+ if retries > MAX_DBCLEANUP_RETRIES:
+ raise
+ self.log.debug(f"attempting reopen of database")
+ self.close_db()
+ self.open_db();
+ # allow retry of func(...)
+ check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
+ return check
+
def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
@functools.wraps(func)
def check(self: MgrModule, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
db.row_factory = sqlite3.Row
self.load_schema(db)
+ def close_db(self) -> None:
+ with self._db_lock:
+ if self._db is not None:
+ self._db.close()
+ self._db = None
+
def open_db(self) -> Optional[sqlite3.Connection]:
if not self.pool_exists(self.MGR_POOL_NAME):
if not self.have_enough_osds():
uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
self.log.debug(f"using uri {uri}")
db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+ # if libcephsqlite reconnects, update the addrv for blocklist
+ with db:
+ cur = db.execute('SELECT json_extract(ceph_status(), "$.addr");')
+ (addrv,) = cur.fetchone()
+ assert addrv is not None
+ self.log.debug(f"new libcephsqlite addrv = {addrv}")
+ self._ceph_register_client("libcephsqlite", addrv, True)
self.configure_db(db)
return db