From: Milind Changire Date: Wed, 24 Nov 2021 05:13:11 +0000 (+0530) Subject: mgr/snap_schedule: fix db connection concurrent usage X-Git-Tag: v17.1.0~156^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=707543779e24c6bc1489c07f5fa1a239d110d9fb;p=ceph.git mgr/snap_schedule: fix db connection concurrent usage Serialize access to DB connection to avoid transaction aborts due to concurrent use. Some flake8-3.9 and mypy parsing error cleanups to keep 'make check' happy. Fixes: https://tracker.ceph.com/issues/52642 Signed-off-by: Milind Changire --- diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py index ce07e31c398..eb347367a64 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule.py @@ -29,7 +29,7 @@ except AttributeError: log.info(('Couldn\'t find datetime.fromisoformat, falling back to ' f'static timestamp parsing ({SNAP_DB_TS_FORMAT}')) - def ts_parser(data_string: str) -> datetime: # type: ignore + def ts_parser(data_string: str) -> datetime: # type: ignore try: date = datetime.strptime(data_string, SNAP_DB_TS_FORMAT) return date @@ -225,7 +225,7 @@ class Schedule(object): data += (start,) with db: c = db.execute(query, data) - return [cls._from_db_row(row, fs) for row in c.fetchall()] + return [cls._from_db_row(row, fs) for row in c.fetchall()] @classmethod def list_schedules(cls, @@ -239,7 +239,7 @@ class Schedule(object): else: c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?', (f'{path}',)) - return [cls._from_db_row(row, fs) for row in c.fetchall()] + return [cls._from_db_row(row, fs) for row in c.fetchall()] INSERT_SCHEDULE = '''INSERT INTO schedules(path, subvol, retention, rel_path) diff --git a/src/pybind/mgr/snap_schedule/fs/schedule_client.py b/src/pybind/mgr/snap_schedule/fs/schedule_client.py index a21c7829e3a..cf1ba78aac7 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule_client.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule_client.py @@ -4,17 +4,16 @@ Copyright (C) 2020 SUSE LGPL2.1. See file COPYING. """ import cephfs -import errno import rados from contextlib import contextmanager -from mgr_util import CephfsClient, CephfsConnectionException, \ - open_filesystem +from mgr_util import CephfsClient, open_filesystem from collections import OrderedDict from datetime import datetime, timezone import logging -from threading import Timer +from threading import Timer, Lock from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \ - Tuple, TypeVar, Union + Tuple, TypeVar, Union, Type +from types import TracebackType import sqlite3 from .schedule import Schedule import traceback @@ -112,13 +111,49 @@ def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]], return candidates - set(keep) +class DBInfo(): + def __init__(self, fs: str, db: sqlite3.Connection): + self.fs: str = fs + self.lock: Lock = Lock() + self.db: sqlite3.Connection = db + + +# context manager for serializing db connection usage +class DBConnectionManager(): + def __init__(self, info: DBInfo): + self.dbinfo: DBInfo = info + + # using string as return type hint since __future__.annotations is not + # available with Python 3.6; its avaialbe starting from Pytohn 3.7 + def __enter__(self) -> 'DBConnectionManager': + log.debug(f'locking db connection for {self.dbinfo.fs}') + self.dbinfo.lock.acquire() + log.debug(f'locked db connection for {self.dbinfo.fs}') + return self + + def __exit__(self, + exception_type: Optional[Type[BaseException]], + exception_value: Optional[BaseException], + traceback: Optional[TracebackType]) -> None: + log.debug(f'unlocking db connection for {self.dbinfo.fs}') + self.dbinfo.lock.release() + log.debug(f'unlocked db connection for {self.dbinfo.fs}') + + class SnapSchedClient(CephfsClient): def __init__(self, mgr: Any) -> None: super(SnapSchedClient, self).__init__(mgr) # TODO maybe iterate over all fs instance in fsmap and load snap dbs? - self.sqlite_connections: Dict[str, sqlite3.Connection] = {} + # + # Each db connection is now guarded by a Lock; this is required to + # avoid concurrent DB transactions when more than one paths in a + # file-system are scheduled at the same interval eg. 1h; without the + # lock, there are races to use the same connection, causing nested + # transactions to be aborted + self.sqlite_connections: Dict[str, DBInfo] = {} self.active_timers: Dict[Tuple[str, str], List[Timer]] = {} + self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections @property def allow_minute_snaps(self) -> None: @@ -128,11 +163,13 @@ class SnapSchedClient(CephfsClient): def dump_on_update(self) -> None: return self.mgr.get_module_option('dump_on_update') - def get_schedule_db(self, fs: str) -> sqlite3.Connection: + def get_schedule_db(self, fs: str) -> DBConnectionManager: + dbinfo = None + self.conn_lock.acquire() if fs not in self.sqlite_connections: poolid = self.get_metadata_pool(fs) assert poolid, f'fs "{fs}" not found' - uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"; + uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph" log.debug(f"using uri {uri}") db = sqlite3.connect(uri, check_same_thread=False, uri=True) db.execute('PRAGMA FOREIGN_KEYS = 1') @@ -141,7 +178,8 @@ class SnapSchedClient(CephfsClient): db.execute('PRAGMA CACHE_SIZE = 256') db.row_factory = sqlite3.Row # check for legacy dump store - with open_ioctx(self, poolid) as ioctx: + pool_param = cast(Union[int, str], poolid) + with open_ioctx(self, pool_param) as ioctx: try: size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME) dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8') @@ -150,9 +188,10 @@ class SnapSchedClient(CephfsClient): except rados.ObjectNotFound: log.debug(f'No legacy schedule DB found in {fs}') db.executescript(Schedule.CREATE_TABLES) - self.sqlite_connections[fs] = db - return db - return self.sqlite_connections[fs] + self.sqlite_connections[fs] = DBInfo(fs, db) + dbinfo = self.sqlite_connections[fs] + self.conn_lock.release() + return DBConnectionManager(dbinfo) def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool: if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M': @@ -167,21 +206,31 @@ class SnapSchedClient(CephfsClient): else: return True - def refresh_snap_timers(self, fs: str, path: str) -> None: + def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]: + with db: + if self.dump_on_update: + dump = [line for line in db.iterdump()] + dump = "\n".join(dump) + log.debug(f"db dump:\n{dump}") + cur = db.execute(Schedule.EXEC_QUERY, (path,)) + all_rows = cur.fetchall() + rows = [r for r in all_rows + if self._is_allowed_repeat(r, path)][0:1] + return rows + + def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None: try: log.debug((f'SnapDB on {fs} changed for {path}, ' 'updating next Timer')) - db = self.get_schedule_db(fs) rows = [] - with db: - if self.dump_on_update: - dump = [line for line in db.iterdump()] - dump = "\n".join(dump) - log.debug(f"db dump:\n{dump}") - cur = db.execute(Schedule.EXEC_QUERY, (path,)) - all_rows = cur.fetchall() - rows = [r for r in all_rows - if self._is_allowed_repeat(r, path)][0:1] + # olddb is passed in the case where we land here without a timer + # the lock on the db connection has already been taken + if olddb: + rows = self.fetch_schedules(olddb, path) + else: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + rows = self.fetch_schedules(db, path) timers = self.active_timers.get((fs, path), []) for timer in timers: timer.cancel() @@ -210,29 +259,33 @@ class SnapSchedClient(CephfsClient): repeat: str) -> None: log.debug(f'Scheduled snapshot of {path} triggered') try: - db = self.get_schedule_db(fs_name) - sched = Schedule.get_db_schedules(path, - db, - fs_name, - repeat=repeat, - start=start)[0] - time = datetime.now(timezone.utc) - with open_filesystem(self, fs_name) as fs_handle: - snap_ts = time.strftime(SNAPSHOT_TS_FORMAT) - snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}' - fs_handle.mkdir(snap_name, 0o755) - log.info(f'created scheduled snapshot of {path}') - log.debug(f'created scheduled snapshot {snap_name}') - sched.update_last(time, db) - except cephfs.Error: - self._log_exception('create_scheduled_snapshot') - sched.set_inactive(db) - except Exception: - # catch all exceptions cause otherwise we'll never know since this - # is running in a thread - self._log_exception('create_scheduled_snapshot') + with self.get_schedule_db(fs_name) as conn_mgr: + db = conn_mgr.dbinfo.db + try: + sched = Schedule.get_db_schedules(path, + db, + fs_name, + repeat=repeat, + start=start)[0] + time = datetime.now(timezone.utc) + with open_filesystem(self, fs_name) as fs_handle: + snap_ts = time.strftime(SNAPSHOT_TS_FORMAT) + snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}' + fs_handle.mkdir(snap_name, 0o755) + log.info(f'created scheduled snapshot of {path}') + log.debug(f'created scheduled snapshot {snap_name}') + sched.update_last(time, db) + except cephfs.Error: + self._log_exception('create_scheduled_snapshot') + sched.set_inactive(db) + except Exception: + # catch all exceptions cause otherwise we'll never know since this + # is running in a thread + self._log_exception('create_scheduled_snapshot') finally: - self.refresh_snap_timers(fs_name, path) + with self.get_schedule_db(fs_name) as conn_mgr: + db = conn_mgr.dbinfo.db + self.refresh_snap_timers(fs_name, path, db) self.prune_snapshots(sched) def prune_snapshots(self, sched: Schedule) -> None: @@ -261,21 +314,24 @@ class SnapSchedClient(CephfsClient): log.debug(f'rmdir on {dirname}') fs_handle.rmdir(f'{path}/.snap/{dirname}') if to_prune: - sched.update_pruned(time, self.get_schedule_db(sched.fs), - len(to_prune)) + with self.get_schedule_db(sched.fs) as conn_mgr: + db = conn_mgr.dbinfo.db + sched.update_pruned(time, db, len(to_prune)) except Exception: self._log_exception('prune_snapshots') def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]: - db = self.get_schedule_db(fs) - return Schedule.get_db_schedules(path, db, fs) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + return Schedule.get_db_schedules(path, db, fs) def list_snap_schedules(self, fs: str, path: str, recursive: bool) -> List[Schedule]: - db = self.get_schedule_db(fs) - return Schedule.list_schedules(path, db, fs, recursive) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + return Schedule.list_schedules(path, db, fs, recursive) @updates_schedule_db # TODO improve interface @@ -289,16 +345,18 @@ class SnapSchedClient(CephfsClient): log.error('not allowed') raise ValueError('no minute snaps allowed') log.debug(f'attempting to add schedule {sched}') - db = self.get_schedule_db(fs) - sched.store_schedule(db) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + sched.store_schedule(db) @updates_schedule_db def rm_snap_schedule(self, fs: str, path: str, schedule: Optional[str], start: Optional[str]) -> None: - db = self.get_schedule_db(fs) - Schedule.rm_schedule(db, path, schedule, start) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.rm_schedule(db, path, schedule, start) @updates_schedule_db def add_retention_spec(self, @@ -309,8 +367,9 @@ class SnapSchedClient(CephfsClient): retention_spec = retention_spec_or_period if retention_count: retention_spec = retention_count + retention_spec - db = self.get_schedule_db(fs) - Schedule.add_retention(db, path, retention_spec) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.add_retention(db, path, retention_spec) @updates_schedule_db def rm_retention_spec(self, @@ -321,8 +380,9 @@ class SnapSchedClient(CephfsClient): retention_spec = retention_spec_or_period if retention_count: retention_spec = retention_count + retention_spec - db = self.get_schedule_db(fs) - Schedule.rm_retention(db, path, retention_spec) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.rm_retention(db, path, retention_spec) @updates_schedule_db def activate_snap_schedule(self, @@ -330,21 +390,23 @@ class SnapSchedClient(CephfsClient): path: str, schedule: Optional[str], start: Optional[str]) -> None: - db = self.get_schedule_db(fs) - schedules = Schedule.get_db_schedules(path, db, fs, - schedule=schedule, - start=start) - for s in schedules: - s.set_active(db) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + schedules = Schedule.get_db_schedules(path, db, fs, + schedule=schedule, + start=start) + for s in schedules: + s.set_active(db) @updates_schedule_db def deactivate_snap_schedule(self, fs: str, path: str, schedule: Optional[str], start: Optional[str]) -> None: - db = self.get_schedule_db(fs) - schedules = Schedule.get_db_schedules(path, db, fs, - schedule=schedule, - start=start) - for s in schedules: - s.set_inactive(db) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + schedules = Schedule.get_db_schedules(path, db, fs, + schedule=schedule, + start=start) + for s in schedules: + s.set_inactive(db)