From: Patrick Donnelly Date: Fri, 9 Apr 2021 21:58:47 +0000 (-0700) Subject: pybind/mgr: add sqlite3 db module helpers X-Git-Tag: v17.1.0~1668^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e3d771702da3bb858064b67eb6c710a659bfb08d;p=ceph.git pybind/mgr: add sqlite3 db module helpers This creates a new '.mgr' pool for storing a default sqlite3 database for each mgr module. Each module's database is stored in: file:///.mgr:/main.db?vfs=ceph The "main.db" is the only one used presently but perhaps a module may want extra databases for some reason. The module name is used for the RADOS namespace. Databases are versioned in a common table called MgrModuleKV using the "__version" key. A mechanism is in place (SCHEMA_VERSIONED) to allow modules to upgrade their databases over time in a consistent way. Fixes: https://tracker.ceph.com/issues/50278 Signed-off-by: Patrick Donnelly --- diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index fa3a85c5305e..79f5aa8e66b3 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -20,6 +20,7 @@ from enum import IntEnum import rados import re import socket +import sqlite3 import sys import time from ceph_argparse import CephArgtype @@ -122,6 +123,7 @@ class HandleCommandResult(NamedTuple): class MonCommandFailed(RuntimeError): pass +class MgrDBNotReady(RuntimeError): pass class OSDMap(ceph_module.BasePyOSDMap): @@ -444,6 +446,14 @@ def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerF return check return CheckFileInput +def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType: + @functools.wraps(func) + def check(self, *args: Any, **kwargs: Any) -> Tuple[int, str, str]: + if not self.db_ready(): + return -errno.EAGAIN, "", "mgr db not yet available" + return func(self, *args, **kwargs) + check.__signature__ = inspect.signature(func) # type: ignore[attr-defined] + return check def _get_localized_key(prefix: str, key: str) -> str: return '{}/{}'.format(prefix, key) @@ -835,10 +845,16 @@ PerfCounterT = Dict[str, Any] class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): + MGR_POOL_NAME = ".mgr" + COMMANDS = [] # type: List[Any] MODULE_OPTIONS: List[Option] = [] MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any] + # Database Schema + SCHEMA = None # type: Optional[str] + SCHEMA_VERSIONED = None # type: Optional[List[str]] + # Priority definitions for perf counters PRIO_CRITICAL = 10 PRIO_INTERESTING = 8 @@ -897,6 +913,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): # for backwards compatibility self._logger = self.getLogger() + self._db = None # type: Optional[sqlite3.Connection] + self._version = self._ceph_get_version() self._perf_schema_cache = None @@ -904,6 +922,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): # Keep a librados instance for those that need it. self._rados: Optional[rados.Rados] = None + self._db_lock = threading.Lock() + def __del__(self) -> None: self._unconfigure_logging() @@ -945,6 +965,173 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): def version(self) -> str: return self._version + def pool_exists(self, name: str) -> bool: + pools = [p['pool_name'] for p in self.get('osd_map')['pools']] + return name in pools + + def have_enough_osds(self) -> bool: + # wait until we have enough OSDs to allow the pool to be healthy + ready = 0 + for osd in self.get("osd_map")["osds"]: + if osd["up"] and osd["in"]: + ready += 1 + + need = cast(int, self.get_ceph_option("osd_pool_default_size")) + return ready >= need + + def rename_pool(self, srcpool: str, destpool: str) -> None: + c = { + 'prefix': 'osd pool rename', + 'format': 'json', + 'srcpool': srcpool, + 'destpool': destpool, + } + self.check_mon_command(c) + + def create_pool(self, pool: str) -> None: + c = { + 'prefix': 'osd pool create', + 'format': 'json', + 'pool': pool, + 'pg_num': 1, + 'pg_num_min': 1, + } + self.check_mon_command(c) + + def appify_pool(self, pool: str, app: str) -> None: + c = { + 'prefix': 'osd pool application enable', + 'format': 'json', + 'pool': pool, + 'app': app, + 'yes_i_really_mean_it': True + } + self.check_mon_command(c) + + def create_mgr_pool(self) -> None: + self.log.info("creating mgr pool") + + ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics') + devhealth = cast(str, ov) + if devhealth is not None and self.pool_exists(devhealth): + self.log.debug("reusing devicehealth pool") + self.rename_pool(devhealth, self.MGR_POOL_NAME) + self.appify_pool(self.MGR_POOL_NAME, 'mgr') + else: + self.log.debug("creating new mgr pool") + self.create_pool(self.MGR_POOL_NAME) + self.appify_pool(self.MGR_POOL_NAME, 'mgr') + + def create_skeleton_schema(self, db: sqlite3.Connection) -> None: + SQL = """ + CREATE TABLE IF NOT EXISTS MgrModuleKV ( + key TEXT PRIMARY KEY, + value NOT NULL + ) WITHOUT ROWID; + INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0); + """ + + db.executescript(SQL) + + def update_schema_version(self, db: sqlite3.Connection, version: int) -> None: + SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';" + + db.execute(SQL, (version,)) + + def set_kv(self, key: str, value: Any) -> None: + SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);" + + assert key[:2] != "__" + + self.log.debug(f"set_kv('{key}', '{value}')") + + with self._db_lock, self.db: + self.db.execute(SQL, (key, value)) + + def get_kv(self, key: str) -> Any: + SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;" + + assert key[:2] != "__" + + self.log.debug(f"get_kv('{key}')") + + with self._db_lock, self.db: + cur = self.db.execute(SQL, (key,)) + row = cur.fetchone() + if row is None: + return None + else: + v = row['value'] + self.log.debug(f" = {v}") + return v + + def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None: + if version <= 0: + self.log.info(f"creating main.db for {self.module_name}") + assert self.SCHEMA is not None + cur = db.executescript(self.SCHEMA) + self.update_schema_version(db, 1) + else: + assert self.SCHEMA_VERSIONED is not None + latest = len(self.SCHEMA_VERSIONED) + if latest < version: + raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})") + for i in range(version, latest): + self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}") + SQL = self.SCHEMA_VERSIONED[i] + db.executescript(SQL) + if version < latest: + self.update_schema_version(db, latest) + + def load_schema(self, db: sqlite3.Connection) -> None: + SQL = """ + SELECT value FROM MgrModuleKV WHERE key = '__version'; + """ + + with db: + self.create_skeleton_schema(db) + cur = db.execute(SQL) + row = cur.fetchone() + self.maybe_upgrade(db, int(row['value'])) + assert cur.fetchone() is None + cur.close() + + def configure_db(self, db: sqlite3.Connection) -> None: + db.execute('PRAGMA FOREIGN_KEYS = 1') + db.execute('PRAGMA JOURNAL_MODE = PERSIST') + db.execute('PRAGMA PAGE_SIZE = 65536') + db.execute('PRAGMA CACHE_SIZE = 64') + db.row_factory = sqlite3.Row + self.load_schema(db) + + def open_db(self) -> Optional[sqlite3.Connection]: + if not self.pool_exists(self.MGR_POOL_NAME): + if not self.have_enough_osds(): + return None + self.create_mgr_pool() + uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph"; + self.log.debug(f"using uri {uri}") + db = sqlite3.connect(uri, check_same_thread=False, uri=True) + self.configure_db(db) + return db + + def db_ready(self) -> bool: + with self._db_lock: + try: + return self.db is not None + except MgrDBNotReady: + return False + + @property + def db(self) -> sqlite3.Connection: + assert self._db_lock.locked() + if self._db is not None: + return self._db + self._db = self.open_db() + if self._db is None: + raise MgrDBNotReady(); + return self._db + @property def release_name(self) -> str: """