]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: add sqlite3 db module helpers
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 9 Apr 2021 21:58:47 +0000 (14:58 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Sat, 12 Jun 2021 02:35:16 +0000 (19:35 -0700)
This creates a new '.mgr' pool for storing a default sqlite3 database
for each mgr module. Each module's database is stored in:

        file:///.mgr:<mgr module name>/main.db?vfs=ceph

The "main.db" is the only one used presently but perhaps a module may
want extra databases for some reason. The module name is used for the
RADOS namespace.

Databases are versioned in a common table called MgrModuleKV using the
"__version" key. A mechanism is in place (SCHEMA_VERSIONED) to allow
modules to upgrade their databases over time in a consistent way.

Fixes: https://tracker.ceph.com/issues/50278
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/pybind/mgr/mgr_module.py

index fa3a85c5305e1981c042f2b6bac935c73cca2047..79f5aa8e66b3896d3e0446b08c1764f99fff43b8 100644 (file)
@@ -20,6 +20,7 @@ from enum import IntEnum
 import rados
 import re
 import socket
+import sqlite3
 import sys
 import time
 from ceph_argparse import CephArgtype
@@ -122,6 +123,7 @@ class HandleCommandResult(NamedTuple):
 
 
 class MonCommandFailed(RuntimeError): pass
+class MgrDBNotReady(RuntimeError): pass
 
 
 class OSDMap(ceph_module.BasePyOSDMap):
@@ -444,6 +446,14 @@ def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerF
         return check
     return CheckFileInput
 
+def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
+    @functools.wraps(func)
+    def check(self, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+        if not self.db_ready():
+            return -errno.EAGAIN, "", "mgr db not yet available"
+        return func(self, *args, **kwargs)
+    check.__signature__ = inspect.signature(func)  # type: ignore[attr-defined]
+    return check
 
 def _get_localized_key(prefix: str, key: str) -> str:
     return '{}/{}'.format(prefix, key)
@@ -835,10 +845,16 @@ PerfCounterT = Dict[str, Any]
 
 
 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
+    MGR_POOL_NAME = ".mgr"
+
     COMMANDS = []  # type: List[Any]
     MODULE_OPTIONS: List[Option] = []
     MODULE_OPTION_DEFAULTS = {}  # type: Dict[str, Any]
 
+    # Database Schema
+    SCHEMA = None # type: Optional[str]
+    SCHEMA_VERSIONED = None # type: Optional[List[str]]
+
     # Priority definitions for perf counters
     PRIO_CRITICAL = 10
     PRIO_INTERESTING = 8
@@ -897,6 +913,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         # for backwards compatibility
         self._logger = self.getLogger()
 
+        self._db = None # type: Optional[sqlite3.Connection]
+
         self._version = self._ceph_get_version()
 
         self._perf_schema_cache = None
@@ -904,6 +922,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         # Keep a librados instance for those that need it.
         self._rados: Optional[rados.Rados] = None
 
+        self._db_lock = threading.Lock()
+
     def __del__(self) -> None:
         self._unconfigure_logging()
 
@@ -945,6 +965,173 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
     def version(self) -> str:
         return self._version
 
+    def pool_exists(self, name: str) -> bool:
+        pools = [p['pool_name'] for p in self.get('osd_map')['pools']]
+        return name in pools
+
+    def have_enough_osds(self) -> bool:
+        # wait until we have enough OSDs to allow the pool to be healthy
+        ready = 0
+        for osd in self.get("osd_map")["osds"]:
+            if osd["up"] and osd["in"]:
+                ready += 1
+
+        need = cast(int, self.get_ceph_option("osd_pool_default_size"))
+        return ready >= need
+
+    def rename_pool(self, srcpool: str, destpool: str) -> None:
+        c = {
+            'prefix': 'osd pool rename',
+            'format': 'json',
+            'srcpool': srcpool,
+            'destpool': destpool,
+        }
+        self.check_mon_command(c)
+
+    def create_pool(self, pool: str) -> None:
+        c = {
+            'prefix': 'osd pool create',
+            'format': 'json',
+            'pool': pool,
+            'pg_num': 1,
+            'pg_num_min': 1,
+        }
+        self.check_mon_command(c)
+
+    def appify_pool(self, pool: str, app: str) -> None:
+        c = {
+            'prefix': 'osd pool application enable',
+            'format': 'json',
+            'pool': pool,
+            'app': app,
+            'yes_i_really_mean_it': True
+        }
+        self.check_mon_command(c)
+
+    def create_mgr_pool(self) -> None:
+        self.log.info("creating mgr pool")
+
+        ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
+        devhealth = cast(str, ov)
+        if devhealth is not None and self.pool_exists(devhealth):
+            self.log.debug("reusing devicehealth pool")
+            self.rename_pool(devhealth, self.MGR_POOL_NAME)
+            self.appify_pool(self.MGR_POOL_NAME, 'mgr')
+        else:
+            self.log.debug("creating new mgr pool")
+            self.create_pool(self.MGR_POOL_NAME)
+            self.appify_pool(self.MGR_POOL_NAME, 'mgr')
+
+    def create_skeleton_schema(self, db: sqlite3.Connection) -> None:
+        SQL = """
+        CREATE TABLE IF NOT EXISTS MgrModuleKV (
+          key TEXT PRIMARY KEY,
+          value NOT NULL
+        ) WITHOUT ROWID;
+        INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
+        """
+
+        db.executescript(SQL)
+
+    def update_schema_version(self, db: sqlite3.Connection, version: int) -> None:
+        SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
+
+        db.execute(SQL, (version,))
+
+    def set_kv(self, key: str, value: Any) -> None:
+        SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
+
+        assert key[:2] != "__"
+
+        self.log.debug(f"set_kv('{key}', '{value}')")
+
+        with self._db_lock, self.db:
+            self.db.execute(SQL, (key, value))
+
+    def get_kv(self, key: str) -> Any:
+        SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;"
+
+        assert key[:2] != "__"
+
+        self.log.debug(f"get_kv('{key}')")
+
+        with self._db_lock, self.db:
+            cur = self.db.execute(SQL, (key,))
+            row = cur.fetchone()
+            if row is None:
+                return None
+            else:
+                v = row['value']
+                self.log.debug(f" = {v}")
+                return v
+
+    def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
+        if version <= 0:
+            self.log.info(f"creating main.db for {self.module_name}")
+            assert self.SCHEMA is not None
+            cur = db.executescript(self.SCHEMA)
+            self.update_schema_version(db, 1)
+        else:
+            assert self.SCHEMA_VERSIONED is not None
+            latest = len(self.SCHEMA_VERSIONED)
+            if latest < version:
+                raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})")
+            for i in range(version, latest):
+                self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}")
+                SQL = self.SCHEMA_VERSIONED[i]
+                db.executescript(SQL)
+            if version < latest:
+                self.update_schema_version(db, latest)
+
+    def load_schema(self, db: sqlite3.Connection) -> None:
+        SQL = """
+        SELECT value FROM MgrModuleKV WHERE key = '__version';
+        """
+
+        with db:
+            self.create_skeleton_schema(db)
+            cur = db.execute(SQL)
+            row = cur.fetchone()
+            self.maybe_upgrade(db, int(row['value']))
+            assert cur.fetchone() is None
+            cur.close()
+
+    def configure_db(self, db: sqlite3.Connection) -> None:
+        db.execute('PRAGMA FOREIGN_KEYS = 1')
+        db.execute('PRAGMA JOURNAL_MODE = PERSIST')
+        db.execute('PRAGMA PAGE_SIZE = 65536')
+        db.execute('PRAGMA CACHE_SIZE = 64')
+        db.row_factory = sqlite3.Row
+        self.load_schema(db)
+
+    def open_db(self) -> Optional[sqlite3.Connection]:
+        if not self.pool_exists(self.MGR_POOL_NAME):
+            if not self.have_enough_osds():
+                return None
+            self.create_mgr_pool()
+        uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
+        self.log.debug(f"using uri {uri}")
+        db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+        self.configure_db(db)
+        return db
+
+    def db_ready(self) -> bool:
+        with self._db_lock:
+            try:
+                return self.db is not None
+            except MgrDBNotReady:
+                return False
+
+    @property
+    def db(self) -> sqlite3.Connection:
+        assert self._db_lock.locked()
+        if self._db is not None:
+            return self._db
+        self._db = self.open_db()
+        if self._db is None:
+            raise MgrDBNotReady();
+        return self._db
+
     @property
     def release_name(self) -> str:
         """