]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/snap_schedule: fix db connection concurrent usage
authorMilind Changire <mchangir@redhat.com>
Wed, 24 Nov 2021 05:13:11 +0000 (10:43 +0530)
committerMilind Changire <mchangir@redhat.com>
Wed, 24 Nov 2021 05:13:11 +0000 (10:43 +0530)
Serialize access to DB connection to avoid transaction aborts due to
concurrent use.

Some flake8-3.9 and mypy parsing error cleanups to keep 'make check' happy.

Fixes: https://tracker.ceph.com/issues/52642
Signed-off-by: Milind Changire <mchangir@redhat.com>
src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/fs/schedule_client.py

index ce07e31c3987f36f0df4a1d6faa18d55d3089ec3..eb347367a64d36827da56a5bcb3419b695c28cfa 100644 (file)
@@ -29,7 +29,7 @@ except AttributeError:
     log.info(('Couldn\'t find datetime.fromisoformat, falling back to '
               f'static timestamp parsing ({SNAP_DB_TS_FORMAT}'))
 
-    def ts_parser(data_string: str) -> datetime: # type: ignore
+    def ts_parser(data_string: str) -> datetime:  # type: ignore
         try:
             date = datetime.strptime(data_string, SNAP_DB_TS_FORMAT)
             return date
@@ -225,7 +225,7 @@ class Schedule(object):
             data += (start,)
         with db:
             c = db.execute(query, data)
-        return [cls._from_db_row(row, fs) for row in c.fetchall()]
+            return [cls._from_db_row(row, fs) for row in c.fetchall()]
 
     @classmethod
     def list_schedules(cls,
@@ -239,7 +239,7 @@ class Schedule(object):
             else:
                 c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?',
                                (f'{path}',))
-        return [cls._from_db_row(row, fs) for row in c.fetchall()]
+            return [cls._from_db_row(row, fs) for row in c.fetchall()]
 
     INSERT_SCHEDULE = '''INSERT INTO
         schedules(path, subvol, retention, rel_path)
index a21c7829e3a1423cb28be0afc2d0f9a0e7b33d8a..cf1ba78aac7322e0b5d3975cab3cbafc47d5d2ae 100644 (file)
@@ -4,17 +4,16 @@ Copyright (C) 2020 SUSE
 LGPL2.1.  See file COPYING.
 """
 import cephfs
-import errno
 import rados
 from contextlib import contextmanager
-from mgr_util import CephfsClient, CephfsConnectionException, \
-        open_filesystem
+from mgr_util import CephfsClient, open_filesystem
 from collections import OrderedDict
 from datetime import datetime, timezone
 import logging
-from threading import Timer
+from threading import Timer, Lock
 from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
-    Tuple, TypeVar, Union
+    Tuple, TypeVar, Union, Type
+from types import TracebackType
 import sqlite3
 from .schedule import Schedule
 import traceback
@@ -112,13 +111,49 @@ def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
     return candidates - set(keep)
 
 
+class DBInfo():
+    def __init__(self, fs: str, db: sqlite3.Connection):
+        self.fs: str = fs
+        self.lock: Lock = Lock()
+        self.db: sqlite3.Connection = db
+
+
+# context manager for serializing db connection usage
+class DBConnectionManager():
+    def __init__(self, info: DBInfo):
+        self.dbinfo: DBInfo = info
+
+    # using string as return type hint since __future__.annotations is not
+    # available with Python 3.6; its avaialbe starting from Pytohn 3.7
+    def __enter__(self) -> 'DBConnectionManager':
+        log.debug(f'locking db connection for {self.dbinfo.fs}')
+        self.dbinfo.lock.acquire()
+        log.debug(f'locked db connection for {self.dbinfo.fs}')
+        return self
+
+    def __exit__(self,
+                 exception_type: Optional[Type[BaseException]],
+                 exception_value: Optional[BaseException],
+                 traceback: Optional[TracebackType]) -> None:
+        log.debug(f'unlocking db connection for {self.dbinfo.fs}')
+        self.dbinfo.lock.release()
+        log.debug(f'unlocked db connection for {self.dbinfo.fs}')
+
+
 class SnapSchedClient(CephfsClient):
 
     def __init__(self, mgr: Any) -> None:
         super(SnapSchedClient, self).__init__(mgr)
         # TODO maybe iterate over all fs instance in fsmap and load snap dbs?
-        self.sqlite_connections: Dict[str, sqlite3.Connection] = {}
+        #
+        # Each db connection is now guarded by a Lock; this is required to
+        # avoid concurrent DB transactions when more than one paths in a
+        # file-system are scheduled at the same interval eg. 1h; without the
+        # lock, there are races to use the same connection, causing  nested
+        # transactions to be aborted
+        self.sqlite_connections: Dict[str, DBInfo] = {}
         self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
+        self.conn_lock: Lock = Lock()  # lock to protect add/lookup db connections
 
     @property
     def allow_minute_snaps(self) -> None:
@@ -128,11 +163,13 @@ class SnapSchedClient(CephfsClient):
     def dump_on_update(self) -> None:
         return self.mgr.get_module_option('dump_on_update')
 
-    def get_schedule_db(self, fs: str) -> sqlite3.Connection:
+    def get_schedule_db(self, fs: str) -> DBConnectionManager:
+        dbinfo = None
+        self.conn_lock.acquire()
         if fs not in self.sqlite_connections:
             poolid = self.get_metadata_pool(fs)
             assert poolid, f'fs "{fs}" not found'
-            uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph";
+            uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
             log.debug(f"using uri {uri}")
             db = sqlite3.connect(uri, check_same_thread=False, uri=True)
             db.execute('PRAGMA FOREIGN_KEYS = 1')
@@ -141,7 +178,8 @@ class SnapSchedClient(CephfsClient):
             db.execute('PRAGMA CACHE_SIZE = 256')
             db.row_factory = sqlite3.Row
             # check for legacy dump store
-            with open_ioctx(self, poolid) as ioctx:
+            pool_param = cast(Union[int, str], poolid)
+            with open_ioctx(self, pool_param) as ioctx:
                 try:
                     size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
                     dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
@@ -150,9 +188,10 @@ class SnapSchedClient(CephfsClient):
                 except rados.ObjectNotFound:
                     log.debug(f'No legacy schedule DB found in {fs}')
             db.executescript(Schedule.CREATE_TABLES)
-            self.sqlite_connections[fs] = db
-            return db
-        return self.sqlite_connections[fs]
+            self.sqlite_connections[fs] = DBInfo(fs, db)
+        dbinfo = self.sqlite_connections[fs]
+        self.conn_lock.release()
+        return DBConnectionManager(dbinfo)
 
     def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
         if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
@@ -167,21 +206,31 @@ class SnapSchedClient(CephfsClient):
         else:
             return True
 
-    def refresh_snap_timers(self, fs: str, path: str) -> None:
+    def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
+        with db:
+            if self.dump_on_update:
+                dump = [line for line in db.iterdump()]
+                dump = "\n".join(dump)
+                log.debug(f"db dump:\n{dump}")
+            cur = db.execute(Schedule.EXEC_QUERY, (path,))
+            all_rows = cur.fetchall()
+            rows = [r for r in all_rows
+                    if self._is_allowed_repeat(r, path)][0:1]
+            return rows
+
+    def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
         try:
             log.debug((f'SnapDB on {fs} changed for {path}, '
                        'updating next Timer'))
-            db = self.get_schedule_db(fs)
             rows = []
-            with db:
-                if self.dump_on_update:
-                    dump = [line for line in db.iterdump()]
-                    dump = "\n".join(dump)
-                    log.debug(f"db dump:\n{dump}")
-                cur = db.execute(Schedule.EXEC_QUERY, (path,))
-                all_rows = cur.fetchall()
-                rows = [r for r in all_rows
-                        if self._is_allowed_repeat(r, path)][0:1]
+            # olddb is passed in the case where we land here without a timer
+            # the lock on the db connection has already been taken
+            if olddb:
+                rows = self.fetch_schedules(olddb, path)
+            else:
+                with self.get_schedule_db(fs) as conn_mgr:
+                    db = conn_mgr.dbinfo.db
+                    rows = self.fetch_schedules(db, path)
             timers = self.active_timers.get((fs, path), [])
             for timer in timers:
                 timer.cancel()
@@ -210,29 +259,33 @@ class SnapSchedClient(CephfsClient):
                                   repeat: str) -> None:
         log.debug(f'Scheduled snapshot of {path} triggered')
         try:
-            db = self.get_schedule_db(fs_name)
-            sched = Schedule.get_db_schedules(path,
-                                              db,
-                                              fs_name,
-                                              repeat=repeat,
-                                              start=start)[0]
-            time = datetime.now(timezone.utc)
-            with open_filesystem(self, fs_name) as fs_handle:
-                snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
-                snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
-                fs_handle.mkdir(snap_name, 0o755)
-            log.info(f'created scheduled snapshot of {path}')
-            log.debug(f'created scheduled snapshot {snap_name}')
-            sched.update_last(time, db)
-        except cephfs.Error:
-            self._log_exception('create_scheduled_snapshot')
-            sched.set_inactive(db)
-        except Exception:
-            # catch all exceptions cause otherwise we'll never know since this
-            # is running in a thread
-            self._log_exception('create_scheduled_snapshot')
+            with self.get_schedule_db(fs_name) as conn_mgr:
+                db = conn_mgr.dbinfo.db
+                try:
+                    sched = Schedule.get_db_schedules(path,
+                                                      db,
+                                                      fs_name,
+                                                      repeat=repeat,
+                                                      start=start)[0]
+                    time = datetime.now(timezone.utc)
+                    with open_filesystem(self, fs_name) as fs_handle:
+                        snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
+                        snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+                        fs_handle.mkdir(snap_name, 0o755)
+                    log.info(f'created scheduled snapshot of {path}')
+                    log.debug(f'created scheduled snapshot {snap_name}')
+                    sched.update_last(time, db)
+                except cephfs.Error:
+                    self._log_exception('create_scheduled_snapshot')
+                    sched.set_inactive(db)
+                except Exception:
+                    # catch all exceptions cause otherwise we'll never know since this
+                    # is running in a thread
+                    self._log_exception('create_scheduled_snapshot')
         finally:
-            self.refresh_snap_timers(fs_name, path)
+            with self.get_schedule_db(fs_name) as conn_mgr:
+                db = conn_mgr.dbinfo.db
+                self.refresh_snap_timers(fs_name, path, db)
             self.prune_snapshots(sched)
 
     def prune_snapshots(self, sched: Schedule) -> None:
@@ -261,21 +314,24 @@ class SnapSchedClient(CephfsClient):
                     log.debug(f'rmdir on {dirname}')
                     fs_handle.rmdir(f'{path}/.snap/{dirname}')
                 if to_prune:
-                    sched.update_pruned(time, self.get_schedule_db(sched.fs),
-                                        len(to_prune))
+                    with self.get_schedule_db(sched.fs) as conn_mgr:
+                        db = conn_mgr.dbinfo.db
+                        sched.update_pruned(time, db, len(to_prune))
         except Exception:
             self._log_exception('prune_snapshots')
 
     def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
-        db = self.get_schedule_db(fs)
-        return Schedule.get_db_schedules(path, db, fs)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            return Schedule.get_db_schedules(path, db, fs)
 
     def list_snap_schedules(self,
                             fs: str,
                             path: str,
                             recursive: bool) -> List[Schedule]:
-        db = self.get_schedule_db(fs)
-        return Schedule.list_schedules(path, db, fs, recursive)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            return Schedule.list_schedules(path, db, fs, recursive)
 
     @updates_schedule_db
     # TODO improve interface
@@ -289,16 +345,18 @@ class SnapSchedClient(CephfsClient):
             log.error('not allowed')
             raise ValueError('no minute snaps allowed')
         log.debug(f'attempting to add schedule {sched}')
-        db = self.get_schedule_db(fs)
-        sched.store_schedule(db)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            sched.store_schedule(db)
 
     @updates_schedule_db
     def rm_snap_schedule(self,
                          fs: str, path: str,
                          schedule: Optional[str],
                          start: Optional[str]) -> None:
-        db = self.get_schedule_db(fs)
-        Schedule.rm_schedule(db, path, schedule, start)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.rm_schedule(db, path, schedule, start)
 
     @updates_schedule_db
     def add_retention_spec(self,
@@ -309,8 +367,9 @@ class SnapSchedClient(CephfsClient):
         retention_spec = retention_spec_or_period
         if retention_count:
             retention_spec = retention_count + retention_spec
-        db = self.get_schedule_db(fs)
-        Schedule.add_retention(db, path, retention_spec)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.add_retention(db, path, retention_spec)
 
     @updates_schedule_db
     def rm_retention_spec(self,
@@ -321,8 +380,9 @@ class SnapSchedClient(CephfsClient):
         retention_spec = retention_spec_or_period
         if retention_count:
             retention_spec = retention_count + retention_spec
-        db = self.get_schedule_db(fs)
-        Schedule.rm_retention(db, path, retention_spec)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.rm_retention(db, path, retention_spec)
 
     @updates_schedule_db
     def activate_snap_schedule(self,
@@ -330,21 +390,23 @@ class SnapSchedClient(CephfsClient):
                                path: str,
                                schedule: Optional[str],
                                start: Optional[str]) -> None:
-        db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs,
-                                              schedule=schedule,
-                                              start=start)
-        for s in schedules:
-            s.set_active(db)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            schedules = Schedule.get_db_schedules(path, db, fs,
+                                                  schedule=schedule,
+                                                  start=start)
+            for s in schedules:
+                s.set_active(db)
 
     @updates_schedule_db
     def deactivate_snap_schedule(self,
                                  fs: str, path: str,
                                  schedule: Optional[str],
                                  start: Optional[str]) -> None:
-        db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs,
-                                              schedule=schedule,
-                                              start=start)
-        for s in schedules:
-            s.set_inactive(db)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            schedules = Schedule.get_db_schedules(path, db, fs,
+                                                  schedule=schedule,
+                                                  start=start)
+            for s in schedules:
+                s.set_inactive(db)