From e3bca0110eb0146c61cefe6e414eac0bf489ab27 Mon Sep 17 00:00:00 2001 From: Milind Changire Date: Wed, 24 Nov 2021 10:43:11 +0530 Subject: [PATCH] mgr/snap_schedule: fix db connection concurrent usage Serialize access to DB connection to avoid transaction aborts due to concurrent use. Some flake8-3.9 and mypy parsing error cleanups to keep 'make check' happy. Fixes: https://tracker.ceph.com/issues/52642 Signed-off-by: Milind Changire (cherry picked from commit 707543779e24c6bc1489c07f5fa1a239d110d9fb) Conflicts: src/pybind/mgr/snap_schedule/fs/schedule.py src/pybind/mgr/snap_schedule/fs/schedule_client.py - changes related to DBConnectionManager to serialize db interactions --- src/pybind/mgr/snap_schedule/fs/schedule.py | 11 +- .../mgr/snap_schedule/fs/schedule_client.py | 251 ++++++++++++------ src/pybind/mgr/snap_schedule/module.py | 11 +- 3 files changed, 183 insertions(+), 90 deletions(-) diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py index 233795fac835d..7fad53fe8bc56 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule.py @@ -6,7 +6,6 @@ LGPL2.1. See file COPYING. from datetime import datetime, timezone import json import logging -from os import environ import re import sqlite3 from typing import Tuple, Any @@ -30,12 +29,12 @@ except AttributeError: log.info(('Couldn\'t find datetime.fromisoformat, falling back to ' f'static timestamp parsing ({SNAP_DB_TS_FORMAT}')) - def ts_parser(ts): + def ts_parser(date_string): # type: ignore try: - date = datetime.strptime(ts, SNAP_DB_TS_FORMAT) + date = datetime.strptime(date_string, SNAP_DB_TS_FORMAT) return date except ValueError: - msg = f'''The date string {ts} does not match the required format + msg = f'''The date string {date_string} does not match the required format {SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to python3.7 or install https://github.com/movermeyer/backports.datetime_fromisoformat''' @@ -221,7 +220,7 @@ class Schedule(object): data += (start,) with db: c = db.execute(query, data) - return [cls._from_db_row(row, fs) for row in c.fetchall()] + return [cls._from_db_row(row, fs) for row in c.fetchall()] @classmethod def list_schedules(cls, path, db, fs, recursive): @@ -232,7 +231,7 @@ class Schedule(object): else: c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?', (f'{path}',)) - return [cls._from_db_row(row, fs) for row in c.fetchall()] + return [cls._from_db_row(row, fs) for row in c.fetchall()] INSERT_SCHEDULE = '''INSERT INTO schedules(path, subvol, retention, rel_path) diff --git a/src/pybind/mgr/snap_schedule/fs/schedule_client.py b/src/pybind/mgr/snap_schedule/fs/schedule_client.py index bb5bcf17f7927..fceaca60e5391 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule_client.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule_client.py @@ -4,18 +4,20 @@ Copyright (C) 2020 SUSE LGPL2.1. See file COPYING. """ import cephfs -import errno import rados from contextlib import contextmanager -from mgr_util import CephfsClient, CephfsConnectionException, \ - open_filesystem +from mgr_util import CephfsClient, open_filesystem, CephfsConnectionException from collections import OrderedDict from datetime import datetime, timezone import logging -from threading import Timer +from threading import Timer, Lock +from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \ + Tuple, TypeVar, Union, Type +from types import TracebackType import sqlite3 from .schedule import Schedule, parse_retention import traceback +import errno MAX_SNAPS_PER_PATH = 50 @@ -99,50 +101,90 @@ def get_prune_set(candidates, retention): return candidates - set(keep) +class DBInfo(): + def __init__(self, fs: str, db: sqlite3.Connection): + self.fs: str = fs + self.lock: Lock = Lock() + self.db: sqlite3.Connection = db + + +# context manager for serializing db connection usage +class DBConnectionManager(): + def __init__(self, info: DBInfo): + self.dbinfo: DBInfo = info + + # using string as return type hint since __future__.annotations is not + # available with Python 3.6; its avaialbe starting from Pytohn 3.7 + def __enter__(self) -> 'DBConnectionManager': + log.debug(f'locking db connection for {self.dbinfo.fs}') + self.dbinfo.lock.acquire() + log.debug(f'locked db connection for {self.dbinfo.fs}') + return self + + def __exit__(self, + exception_type: Optional[Type[BaseException]], + exception_value: Optional[BaseException], + traceback: Optional[TracebackType]) -> None: + log.debug(f'unlocking db connection for {self.dbinfo.fs}') + self.dbinfo.lock.release() + log.debug(f'unlocked db connection for {self.dbinfo.fs}') + + class SnapSchedClient(CephfsClient): def __init__(self, mgr): super(SnapSchedClient, self).__init__(mgr) # TODO maybe iterate over all fs instance in fsmap and load snap dbs? - self.sqlite_connections = {} - self.active_timers = {} + # + # Each db connection is now guarded by a Lock; this is required to + # avoid concurrent DB transactions when more than one paths in a + # file-system are scheduled at the same interval eg. 1h; without the + # lock, there are races to use the same connection, causing nested + # transactions to be aborted + self.sqlite_connections: Dict[str, DBInfo] = {} + self.active_timers: Dict[Tuple[str, str], List[Timer]] = {} + self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections @property def allow_minute_snaps(self): return self.mgr.get_module_option('allow_m_granularity') + @property + def dump_on_update(self) -> None: + return self.mgr.get_module_option('dump_on_update') + def get_schedule_db(self, fs): + dbinfo = None + self.conn_lock.acquire() if fs not in self.sqlite_connections: - self.sqlite_connections[fs] = sqlite3.connect( - ':memory:', - check_same_thread=False) - with self.sqlite_connections[fs] as con: - con.row_factory = sqlite3.Row - con.execute("PRAGMA FOREIGN_KEYS = 1") + db = sqlite3.connect(':memory:', check_same_thread=False) + with db: + db.row_factory = sqlite3.Row + db.execute("PRAGMA FOREIGN_KEYS = 1") pool = self.get_metadata_pool(fs) with open_ioctx(self, pool) as ioctx: try: size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME) - db = ioctx.read(SNAP_DB_OBJECT_NAME, - size).decode('utf-8') - con.executescript(db) + ddl = ioctx.read(SNAP_DB_OBJECT_NAME, + size).decode('utf-8') + db.executescript(ddl) except rados.ObjectNotFound: log.debug(f'No schedule DB found in {fs}, creating one.') - con.executescript(Schedule.CREATE_TABLES) - return self.sqlite_connections[fs] + db.executescript(Schedule.CREATE_TABLES) + self.sqlite_connections[fs] = DBInfo(fs, db) + dbinfo = self.sqlite_connections[fs] + self.conn_lock.release() + return DBConnectionManager(dbinfo) - def store_schedule_db(self, fs): + def store_schedule_db(self, fs, db): # only store db is it exists, otherwise nothing to do metadata_pool = self.get_metadata_pool(fs) if not metadata_pool: raise CephfsConnectionException( -errno.ENOENT, "Filesystem {} does not exist".format(fs)) - if fs in self.sqlite_connections: - db_content = [] - db = self.sqlite_connections[fs] - with db: - for row in db.iterdump(): - db_content.append(row) + db_content = [] + for row in db.iterdump(): + db_content.append(row) with open_ioctx(self, metadata_pool) as ioctx: ioctx.write_full(SNAP_DB_OBJECT_NAME, '\n'.join(db_content).encode('utf-8')) @@ -158,16 +200,31 @@ class SnapSchedClient(CephfsClient): else: return True + def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]: + with db: + if self.dump_on_update: + dump = [line for line in db.iterdump()] + dump = "\n".join(dump) + log.debug(f"db dump:\n{dump}") + cur = db.execute(Schedule.EXEC_QUERY, (path,)) + all_rows = cur.fetchall() + rows = [r for r in all_rows + if self._is_allowed_repeat(r, path)][0:1] + return rows - def refresh_snap_timers(self, fs, path): + def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None: try: - log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer') - db = self.get_schedule_db(fs) + log.debug((f'SnapDB on {fs} changed for {path}, ' + 'updating next Timer')) rows = [] - with db: - cur = db.execute(Schedule.EXEC_QUERY, (path,)) - all_rows = cur.fetchall() - rows = [r for r in all_rows if self._is_allowed_repeat(r, path)][0:1] + # olddb is passed in the case where we land here without a timer + # the lock on the db connection has already been taken + if olddb: + rows = self.fetch_schedules(olddb, path) + else: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + rows = self.fetch_schedules(db, path) timers = self.active_timers.get((fs, path), []) for timer in timers: timer.cancel() @@ -191,29 +248,33 @@ class SnapSchedClient(CephfsClient): def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat): log.debug(f'Scheduled snapshot of {path} triggered') try: - db = self.get_schedule_db(fs_name) - sched = Schedule.get_db_schedules(path, - db, - fs_name, - repeat=repeat, - start=start)[0] - time = datetime.now(timezone.utc) - with open_filesystem(self, fs_name) as fs_handle: - snap_ts = time.strftime(SNAPSHOT_TS_FORMAT) - snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}' - fs_handle.mkdir(snap_name, 0o755) - log.info(f'created scheduled snapshot of {path}') - log.debug(f'created scheduled snapshot {snap_name}') - sched.update_last(time, db) - except cephfs.Error: - self._log_exception('create_scheduled_snapshot') - sched.set_inactive(db) - except Exception: - # catch all exceptions cause otherwise we'll never know since this - # is running in a thread - self._log_exception('create_scheduled_snapshot') + with self.get_schedule_db(fs_name) as conn_mgr: + db = conn_mgr.dbinfo.db + try: + sched = Schedule.get_db_schedules(path, + db, + fs_name, + repeat=repeat, + start=start)[0] + time = datetime.now(timezone.utc) + with open_filesystem(self, fs_name) as fs_handle: + snap_ts = time.strftime(SNAPSHOT_TS_FORMAT) + snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}' + fs_handle.mkdir(snap_name, 0o755) + log.info(f'created scheduled snapshot of {path}') + log.debug(f'created scheduled snapshot {snap_name}') + sched.update_last(time, db) + except cephfs.Error: + self._log_exception('create_scheduled_snapshot') + sched.set_inactive(db) + except Exception: + # catch all exceptions cause otherwise we'll never know since this + # is running in a thread + self._log_exception('create_scheduled_snapshot') finally: - self.refresh_snap_timers(fs_name, path) + with self.get_schedule_db(fs_name) as conn_mgr: + db = conn_mgr.dbinfo.db + self.refresh_snap_timers(fs_name, path, db) self.prune_snapshots(sched) def prune_snapshots(self, sched): @@ -242,18 +303,24 @@ class SnapSchedClient(CephfsClient): log.debug(f'rmdir on {dirname}') fs_handle.rmdir(f'{path}/.snap/{dirname}') if to_prune: - sched.update_pruned(time, self.get_schedule_db(sched.fs), - len(to_prune)) + with self.get_schedule_db(sched.fs) as conn_mgr: + db = conn_mgr.dbinfo.db + sched.update_pruned(time, db, len(to_prune)) except Exception: self._log_exception('prune_snapshots') - def get_snap_schedules(self, fs, path): - db = self.get_schedule_db(fs) - return Schedule.get_db_schedules(path, db, fs) + def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + return Schedule.get_db_schedules(path, db, fs) - def list_snap_schedules(self, fs, path, recursive): - db = self.get_schedule_db(fs) - return Schedule.list_schedules(path, db, fs, recursive) + def list_snap_schedules(self, + fs: str, + path: str, + recursive: bool) -> List[Schedule]: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + return Schedule.list_schedules(path, db, fs, recursive) @updates_schedule_db # TODO improve interface @@ -264,14 +331,19 @@ class SnapSchedClient(CephfsClient): log.error('not allowed') raise ValueError('no minute snaps allowed') log.debug(f'attempting to add schedule {sched}') - db = self.get_schedule_db(fs) - sched.store_schedule(db) - self.store_schedule_db(sched.fs) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + sched.store_schedule(db) + self.store_schedule_db(sched.fs, db) @updates_schedule_db - def rm_snap_schedule(self, fs, path, schedule, start): - db = self.get_schedule_db(fs) - Schedule.rm_schedule(db, path, schedule, start) + def rm_snap_schedule(self, + fs: str, path: str, + schedule: Optional[str], + start: Optional[str]) -> None: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.rm_schedule(db, path, schedule, start) @updates_schedule_db def add_retention_spec(self, @@ -282,8 +354,9 @@ class SnapSchedClient(CephfsClient): retention_spec = retention_spec_or_period if retention_count: retention_spec = retention_count + retention_spec - db = self.get_schedule_db(fs) - Schedule.add_retention(db, path, retention_spec) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.add_retention(db, path, retention_spec) @updates_schedule_db def rm_retention_spec(self, @@ -294,21 +367,33 @@ class SnapSchedClient(CephfsClient): retention_spec = retention_spec_or_period if retention_count: retention_spec = retention_count + retention_spec - db = self.get_schedule_db(fs) - Schedule.rm_retention(db, path, retention_spec) + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.rm_retention(db, path, retention_spec) @updates_schedule_db - def activate_snap_schedule(self, fs, path, schedule, start): - db = self.get_schedule_db(fs) - schedules = Schedule.get_db_schedules(path, db, fs, - schedule=schedule, - start=start) - [s.set_active(db) for s in schedules] + def activate_snap_schedule(self, + fs: str, + path: str, + schedule: Optional[str], + start: Optional[str]) -> None: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + schedules = Schedule.get_db_schedules(path, db, fs, + schedule=schedule, + start=start) + for s in schedules: + s.set_active(db) @updates_schedule_db - def deactivate_snap_schedule(self, fs, path, schedule, start): - db = self.get_schedule_db(fs) - schedules = Schedule.get_db_schedules(path, db, fs, - schedule=schedule, - start=start) - [s.set_inactive(db) for s in schedules] + def deactivate_snap_schedule(self, + fs: str, path: str, + schedule: Optional[str], + start: Optional[str]) -> None: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + schedules = Schedule.get_db_schedules(path, db, fs, + schedule=schedule, + start=start) + for s in schedules: + s.set_inactive(db) diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py index 701c1551e4f5d..e298aad12a55d 100644 --- a/src/pybind/mgr/snap_schedule/module.py +++ b/src/pybind/mgr/snap_schedule/module.py @@ -6,7 +6,7 @@ LGPL2.1. See file COPYING. import errno import json import sqlite3 -from typing import Dict, Sequence, Optional +from typing import Dict, Sequence, Optional, cast, Optional from .fs.schedule_client import SnapSchedClient from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option from mgr_util import CephfsConnectionException @@ -22,6 +22,13 @@ class Module(MgrModule): desc='allow minute scheduled snapshots', runtime=True, ), + Option( + 'dump_on_update', + type='bool', + default=False, + desc='dump database to debug log on update', + runtime=True, + ), ] def __init__(self, *args, **kwargs): @@ -68,6 +75,7 @@ class Module(MgrModule): ''' use_fs = fs if fs else self.default_fs try: + path = cast(str, path) ret_scheds = self.client.get_snap_schedules(use_fs, path) except CephfsConnectionException as e: return e.to_tuple() @@ -87,6 +95,7 @@ class Module(MgrModule): ''' try: use_fs = fs if fs else self.default_fs + recursive = cast(bool, recursive) scheds = self.client.list_snap_schedules(use_fs, path, recursive) self.log.debug(f'recursive is {recursive}') except CephfsConnectionException as e: -- 2.39.5