import rados
import re
import socket
+import sqlite3
import sys
import time
from ceph_argparse import CephArgtype
class MonCommandFailed(RuntimeError): pass
+class MgrDBNotReady(RuntimeError): pass
class OSDMap(ceph_module.BasePyOSDMap):
return check
return CheckFileInput
+def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
+ @functools.wraps(func)
+ def check(self, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+ if not self.db_ready():
+ return -errno.EAGAIN, "", "mgr db not yet available"
+ return func(self, *args, **kwargs)
+ check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
+ return check
def _get_localized_key(prefix: str, key: str) -> str:
return '{}/{}'.format(prefix, key)
class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
+ MGR_POOL_NAME = ".mgr"
+
COMMANDS = [] # type: List[Any]
MODULE_OPTIONS: List[Option] = []
MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
+ # Database Schema
+ SCHEMA = None # type: Optional[str]
+ SCHEMA_VERSIONED = None # type: Optional[List[str]]
+
# Priority definitions for perf counters
PRIO_CRITICAL = 10
PRIO_INTERESTING = 8
# for backwards compatibility
self._logger = self.getLogger()
+ self._db = None # type: Optional[sqlite3.Connection]
+
self._version = self._ceph_get_version()
self._perf_schema_cache = None
# Keep a librados instance for those that need it.
self._rados: Optional[rados.Rados] = None
+ self._db_lock = threading.Lock()
+
def __del__(self) -> None:
self._unconfigure_logging()
def version(self) -> str:
return self._version
+ def pool_exists(self, name: str) -> bool:
+ pools = [p['pool_name'] for p in self.get('osd_map')['pools']]
+ return name in pools
+
+ def have_enough_osds(self) -> bool:
+ # wait until we have enough OSDs to allow the pool to be healthy
+ ready = 0
+ for osd in self.get("osd_map")["osds"]:
+ if osd["up"] and osd["in"]:
+ ready += 1
+
+ need = cast(int, self.get_ceph_option("osd_pool_default_size"))
+ return ready >= need
+
+ def rename_pool(self, srcpool: str, destpool: str) -> None:
+ c = {
+ 'prefix': 'osd pool rename',
+ 'format': 'json',
+ 'srcpool': srcpool,
+ 'destpool': destpool,
+ }
+ self.check_mon_command(c)
+
+ def create_pool(self, pool: str) -> None:
+ c = {
+ 'prefix': 'osd pool create',
+ 'format': 'json',
+ 'pool': pool,
+ 'pg_num': 1,
+ 'pg_num_min': 1,
+ }
+ self.check_mon_command(c)
+
+ def appify_pool(self, pool: str, app: str) -> None:
+ c = {
+ 'prefix': 'osd pool application enable',
+ 'format': 'json',
+ 'pool': pool,
+ 'app': app,
+ 'yes_i_really_mean_it': True
+ }
+ self.check_mon_command(c)
+
+ def create_mgr_pool(self) -> None:
+ self.log.info("creating mgr pool")
+
+ ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
+ devhealth = cast(str, ov)
+ if devhealth is not None and self.pool_exists(devhealth):
+ self.log.debug("reusing devicehealth pool")
+ self.rename_pool(devhealth, self.MGR_POOL_NAME)
+ self.appify_pool(self.MGR_POOL_NAME, 'mgr')
+ else:
+ self.log.debug("creating new mgr pool")
+ self.create_pool(self.MGR_POOL_NAME)
+ self.appify_pool(self.MGR_POOL_NAME, 'mgr')
+
+ def create_skeleton_schema(self, db: sqlite3.Connection) -> None:
+ SQL = """
+ CREATE TABLE IF NOT EXISTS MgrModuleKV (
+ key TEXT PRIMARY KEY,
+ value NOT NULL
+ ) WITHOUT ROWID;
+ INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
+ """
+
+ db.executescript(SQL)
+
+ def update_schema_version(self, db: sqlite3.Connection, version: int) -> None:
+ SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
+
+ db.execute(SQL, (version,))
+
+ def set_kv(self, key: str, value: Any) -> None:
+ SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
+
+ assert key[:2] != "__"
+
+ self.log.debug(f"set_kv('{key}', '{value}')")
+
+ with self._db_lock, self.db:
+ self.db.execute(SQL, (key, value))
+
+ def get_kv(self, key: str) -> Any:
+ SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;"
+
+ assert key[:2] != "__"
+
+ self.log.debug(f"get_kv('{key}')")
+
+ with self._db_lock, self.db:
+ cur = self.db.execute(SQL, (key,))
+ row = cur.fetchone()
+ if row is None:
+ return None
+ else:
+ v = row['value']
+ self.log.debug(f" = {v}")
+ return v
+
+ def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
+ if version <= 0:
+ self.log.info(f"creating main.db for {self.module_name}")
+ assert self.SCHEMA is not None
+ cur = db.executescript(self.SCHEMA)
+ self.update_schema_version(db, 1)
+ else:
+ assert self.SCHEMA_VERSIONED is not None
+ latest = len(self.SCHEMA_VERSIONED)
+ if latest < version:
+ raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})")
+ for i in range(version, latest):
+ self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}")
+ SQL = self.SCHEMA_VERSIONED[i]
+ db.executescript(SQL)
+ if version < latest:
+ self.update_schema_version(db, latest)
+
+ def load_schema(self, db: sqlite3.Connection) -> None:
+ SQL = """
+ SELECT value FROM MgrModuleKV WHERE key = '__version';
+ """
+
+ with db:
+ self.create_skeleton_schema(db)
+ cur = db.execute(SQL)
+ row = cur.fetchone()
+ self.maybe_upgrade(db, int(row['value']))
+ assert cur.fetchone() is None
+ cur.close()
+
+ def configure_db(self, db: sqlite3.Connection) -> None:
+ db.execute('PRAGMA FOREIGN_KEYS = 1')
+ db.execute('PRAGMA JOURNAL_MODE = PERSIST')
+ db.execute('PRAGMA PAGE_SIZE = 65536')
+ db.execute('PRAGMA CACHE_SIZE = 64')
+ db.row_factory = sqlite3.Row
+ self.load_schema(db)
+
+ def open_db(self) -> Optional[sqlite3.Connection]:
+ if not self.pool_exists(self.MGR_POOL_NAME):
+ if not self.have_enough_osds():
+ return None
+ self.create_mgr_pool()
+ uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
+ self.log.debug(f"using uri {uri}")
+ db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+ self.configure_db(db)
+ return db
+
+ def db_ready(self) -> bool:
+ with self._db_lock:
+ try:
+ return self.db is not None
+ except MgrDBNotReady:
+ return False
+
+ @property
+ def db(self) -> sqlite3.Connection:
+ assert self._db_lock.locked()
+ if self._db is not None:
+ return self._db
+ self._db = self.open_db()
+ if self._db is None:
+ raise MgrDBNotReady();
+ return self._db
+
@property
def release_name(self) -> str:
"""