]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/snap_schedule: fix db connection concurrent usage
authorMilind Changire <mchangir@redhat.com>
Wed, 24 Nov 2021 05:13:11 +0000 (10:43 +0530)
committerVenky Shankar <vshankar@redhat.com>
Fri, 15 Apr 2022 17:09:40 +0000 (22:39 +0530)
Serialize access to DB connection to avoid transaction aborts due to
concurrent use.

Some flake8-3.9 and mypy parsing error cleanups to keep 'make check' happy.

Fixes: https://tracker.ceph.com/issues/52642
Signed-off-by: Milind Changire <mchangir@redhat.com>
(cherry picked from commit 707543779e24c6bc1489c07f5fa1a239d110d9fb)

Conflicts:
src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/fs/schedule_client.py
        - changes related to DBConnectionManager to serialize
          db interactions

src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/fs/schedule_client.py
src/pybind/mgr/snap_schedule/module.py

index 233795fac835d3898e874b02677133454218a15d..7fad53fe8bc560e9840c51e2c9b58f82f3537d65 100644 (file)
@@ -6,7 +6,6 @@ LGPL2.1.  See file COPYING.
 from datetime import datetime, timezone
 import json
 import logging
-from os import environ
 import re
 import sqlite3
 from typing import Tuple, Any
@@ -30,12 +29,12 @@ except AttributeError:
     log.info(('Couldn\'t find datetime.fromisoformat, falling back to '
               f'static timestamp parsing ({SNAP_DB_TS_FORMAT}'))
 
-    def ts_parser(ts):
+    def ts_parser(date_string):  # type: ignore
         try:
-            date = datetime.strptime(ts, SNAP_DB_TS_FORMAT)
+            date = datetime.strptime(date_string, SNAP_DB_TS_FORMAT)
             return date
         except ValueError:
-            msg = f'''The date string {ts} does not match the required format
+            msg = f'''The date string {date_string} does not match the required format
             {SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to
             python3.7 or install
             https://github.com/movermeyer/backports.datetime_fromisoformat'''
@@ -221,7 +220,7 @@ class Schedule(object):
             data += (start,)
         with db:
             c = db.execute(query, data)
-        return [cls._from_db_row(row, fs) for row in c.fetchall()]
+            return [cls._from_db_row(row, fs) for row in c.fetchall()]
 
     @classmethod
     def list_schedules(cls, path, db, fs, recursive):
@@ -232,7 +231,7 @@ class Schedule(object):
             else:
                 c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?',
                                (f'{path}',))
-        return [cls._from_db_row(row, fs) for row in c.fetchall()]
+            return [cls._from_db_row(row, fs) for row in c.fetchall()]
 
     INSERT_SCHEDULE = '''INSERT INTO
         schedules(path, subvol, retention, rel_path)
index bb5bcf17f792718bae51db4497ef11368e5f3660..fceaca60e5391b92cb6d9135b867690afb0c276a 100644 (file)
@@ -4,18 +4,20 @@ Copyright (C) 2020 SUSE
 LGPL2.1.  See file COPYING.
 """
 import cephfs
-import errno
 import rados
 from contextlib import contextmanager
-from mgr_util import CephfsClient, CephfsConnectionException, \
-        open_filesystem
+from mgr_util import CephfsClient, open_filesystem, CephfsConnectionException
 from collections import OrderedDict
 from datetime import datetime, timezone
 import logging
-from threading import Timer
+from threading import Timer, Lock
+from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
+    Tuple, TypeVar, Union, Type
+from types import TracebackType
 import sqlite3
 from .schedule import Schedule, parse_retention
 import traceback
+import errno
 
 
 MAX_SNAPS_PER_PATH = 50
@@ -99,50 +101,90 @@ def get_prune_set(candidates, retention):
     return candidates - set(keep)
 
 
+class DBInfo():
+    def __init__(self, fs: str, db: sqlite3.Connection):
+        self.fs: str = fs
+        self.lock: Lock = Lock()
+        self.db: sqlite3.Connection = db
+
+
+# context manager for serializing db connection usage
+class DBConnectionManager():
+    def __init__(self, info: DBInfo):
+        self.dbinfo: DBInfo = info
+
+    # using string as return type hint since __future__.annotations is not
+    # available with Python 3.6; its avaialbe starting from Pytohn 3.7
+    def __enter__(self) -> 'DBConnectionManager':
+        log.debug(f'locking db connection for {self.dbinfo.fs}')
+        self.dbinfo.lock.acquire()
+        log.debug(f'locked db connection for {self.dbinfo.fs}')
+        return self
+
+    def __exit__(self,
+                 exception_type: Optional[Type[BaseException]],
+                 exception_value: Optional[BaseException],
+                 traceback: Optional[TracebackType]) -> None:
+        log.debug(f'unlocking db connection for {self.dbinfo.fs}')
+        self.dbinfo.lock.release()
+        log.debug(f'unlocked db connection for {self.dbinfo.fs}')
+
+
 class SnapSchedClient(CephfsClient):
 
     def __init__(self, mgr):
         super(SnapSchedClient, self).__init__(mgr)
         # TODO maybe iterate over all fs instance in fsmap and load snap dbs?
-        self.sqlite_connections = {}
-        self.active_timers = {}
+        #
+        # Each db connection is now guarded by a Lock; this is required to
+        # avoid concurrent DB transactions when more than one paths in a
+        # file-system are scheduled at the same interval eg. 1h; without the
+        # lock, there are races to use the same connection, causing  nested
+        # transactions to be aborted
+        self.sqlite_connections: Dict[str, DBInfo] = {}
+        self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
+        self.conn_lock: Lock = Lock()  # lock to protect add/lookup db connections
 
     @property
     def allow_minute_snaps(self):
         return self.mgr.get_module_option('allow_m_granularity')
 
+    @property
+    def dump_on_update(self) -> None:
+        return self.mgr.get_module_option('dump_on_update')
+
     def get_schedule_db(self, fs):
+        dbinfo = None
+        self.conn_lock.acquire()
         if fs not in self.sqlite_connections:
-            self.sqlite_connections[fs] = sqlite3.connect(
-                ':memory:',
-                check_same_thread=False)
-            with self.sqlite_connections[fs] as con:
-                con.row_factory = sqlite3.Row
-                con.execute("PRAGMA FOREIGN_KEYS = 1")
+            db = sqlite3.connect(':memory:', check_same_thread=False)
+            with db:
+                db.row_factory = sqlite3.Row
+                db.execute("PRAGMA FOREIGN_KEYS = 1")
                 pool = self.get_metadata_pool(fs)
                 with open_ioctx(self, pool) as ioctx:
                     try:
                         size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
-                        db = ioctx.read(SNAP_DB_OBJECT_NAME,
-                                        size).decode('utf-8')
-                        con.executescript(db)
+                        ddl = ioctx.read(SNAP_DB_OBJECT_NAME,
+                                         size).decode('utf-8')
+                        db.executescript(ddl)
                     except rados.ObjectNotFound:
                         log.debug(f'No schedule DB found in {fs}, creating one.')
-                        con.executescript(Schedule.CREATE_TABLES)
-        return self.sqlite_connections[fs]
+                        db.executescript(Schedule.CREATE_TABLES)
+            self.sqlite_connections[fs] = DBInfo(fs, db)
+        dbinfo = self.sqlite_connections[fs]
+        self.conn_lock.release()
+        return DBConnectionManager(dbinfo)
 
-    def store_schedule_db(self, fs):
+    def store_schedule_db(self, fs, db):
         # only store db is it exists, otherwise nothing to do
         metadata_pool = self.get_metadata_pool(fs)
         if not metadata_pool:
             raise CephfsConnectionException(
                 -errno.ENOENT, "Filesystem {} does not exist".format(fs))
-        if fs in self.sqlite_connections:
-            db_content = []
-            db = self.sqlite_connections[fs]
-            with db:
-                for row in db.iterdump():
-                    db_content.append(row)
+        db_content = []
+        for row in db.iterdump():
+            db_content.append(row)
         with open_ioctx(self, metadata_pool) as ioctx:
             ioctx.write_full(SNAP_DB_OBJECT_NAME,
                              '\n'.join(db_content).encode('utf-8'))
@@ -158,16 +200,31 @@ class SnapSchedClient(CephfsClient):
         else:
             return True
 
+    def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
+        with db:
+            if self.dump_on_update:
+                dump = [line for line in db.iterdump()]
+                dump = "\n".join(dump)
+                log.debug(f"db dump:\n{dump}")
+            cur = db.execute(Schedule.EXEC_QUERY, (path,))
+            all_rows = cur.fetchall()
+            rows = [r for r in all_rows
+                    if self._is_allowed_repeat(r, path)][0:1]
+            return rows
 
-    def refresh_snap_timers(self, fs, path):
+    def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
         try:
-            log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer')
-            db = self.get_schedule_db(fs)
+            log.debug((f'SnapDB on {fs} changed for {path}, '
+                       'updating next Timer'))
             rows = []
-            with db:
-                cur = db.execute(Schedule.EXEC_QUERY, (path,))
-                all_rows = cur.fetchall()
-                rows = [r for r in all_rows if self._is_allowed_repeat(r, path)][0:1]
+            # olddb is passed in the case where we land here without a timer
+            # the lock on the db connection has already been taken
+            if olddb:
+                rows = self.fetch_schedules(olddb, path)
+            else:
+                with self.get_schedule_db(fs) as conn_mgr:
+                    db = conn_mgr.dbinfo.db
+                    rows = self.fetch_schedules(db, path)
             timers = self.active_timers.get((fs, path), [])
             for timer in timers:
                 timer.cancel()
@@ -191,29 +248,33 @@ class SnapSchedClient(CephfsClient):
     def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
         log.debug(f'Scheduled snapshot of {path} triggered')
         try:
-            db = self.get_schedule_db(fs_name)
-            sched = Schedule.get_db_schedules(path,
-                                              db,
-                                              fs_name,
-                                              repeat=repeat,
-                                              start=start)[0]
-            time = datetime.now(timezone.utc)
-            with open_filesystem(self, fs_name) as fs_handle:
-                snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
-                snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
-                fs_handle.mkdir(snap_name, 0o755)
-            log.info(f'created scheduled snapshot of {path}')
-            log.debug(f'created scheduled snapshot {snap_name}')
-            sched.update_last(time, db)
-        except cephfs.Error:
-            self._log_exception('create_scheduled_snapshot')
-            sched.set_inactive(db)
-        except Exception:
-            # catch all exceptions cause otherwise we'll never know since this
-            # is running in a thread
-            self._log_exception('create_scheduled_snapshot')
+            with self.get_schedule_db(fs_name) as conn_mgr:
+                db = conn_mgr.dbinfo.db
+                try:
+                    sched = Schedule.get_db_schedules(path,
+                                                      db,
+                                                      fs_name,
+                                                      repeat=repeat,
+                                                      start=start)[0]
+                    time = datetime.now(timezone.utc)
+                    with open_filesystem(self, fs_name) as fs_handle:
+                        snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
+                        snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+                        fs_handle.mkdir(snap_name, 0o755)
+                    log.info(f'created scheduled snapshot of {path}')
+                    log.debug(f'created scheduled snapshot {snap_name}')
+                    sched.update_last(time, db)
+                except cephfs.Error:
+                    self._log_exception('create_scheduled_snapshot')
+                    sched.set_inactive(db)
+                except Exception:
+                    # catch all exceptions cause otherwise we'll never know since this
+                    # is running in a thread
+                    self._log_exception('create_scheduled_snapshot')
         finally:
-            self.refresh_snap_timers(fs_name, path)
+            with self.get_schedule_db(fs_name) as conn_mgr:
+                db = conn_mgr.dbinfo.db
+                self.refresh_snap_timers(fs_name, path, db)
             self.prune_snapshots(sched)
 
     def prune_snapshots(self, sched):
@@ -242,18 +303,24 @@ class SnapSchedClient(CephfsClient):
                     log.debug(f'rmdir on {dirname}')
                     fs_handle.rmdir(f'{path}/.snap/{dirname}')
                 if to_prune:
-                    sched.update_pruned(time, self.get_schedule_db(sched.fs),
-                                        len(to_prune))
+                    with self.get_schedule_db(sched.fs) as conn_mgr:
+                        db = conn_mgr.dbinfo.db
+                        sched.update_pruned(time, db, len(to_prune))
         except Exception:
             self._log_exception('prune_snapshots')
 
-    def get_snap_schedules(self, fs, path):
-        db = self.get_schedule_db(fs)
-        return Schedule.get_db_schedules(path, db, fs)
+    def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            return Schedule.get_db_schedules(path, db, fs)
 
-    def list_snap_schedules(self, fs, path, recursive):
-        db = self.get_schedule_db(fs)
-        return Schedule.list_schedules(path, db, fs, recursive)
+    def list_snap_schedules(self,
+                            fs: str,
+                            path: str,
+                            recursive: bool) -> List[Schedule]:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            return Schedule.list_schedules(path, db, fs, recursive)
 
     @updates_schedule_db
     # TODO improve interface
@@ -264,14 +331,19 @@ class SnapSchedClient(CephfsClient):
             log.error('not allowed')
             raise ValueError('no minute snaps allowed')
         log.debug(f'attempting to add schedule {sched}')
-        db = self.get_schedule_db(fs)
-        sched.store_schedule(db)
-        self.store_schedule_db(sched.fs)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            sched.store_schedule(db)
+            self.store_schedule_db(sched.fs, db)
 
     @updates_schedule_db
-    def rm_snap_schedule(self, fs, path, schedule, start):
-        db = self.get_schedule_db(fs)
-        Schedule.rm_schedule(db, path, schedule, start)
+    def rm_snap_schedule(self,
+                         fs: str, path: str,
+                         schedule: Optional[str],
+                         start: Optional[str]) -> None:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.rm_schedule(db, path, schedule, start)
 
     @updates_schedule_db
     def add_retention_spec(self,
@@ -282,8 +354,9 @@ class SnapSchedClient(CephfsClient):
         retention_spec = retention_spec_or_period
         if retention_count:
             retention_spec = retention_count + retention_spec
-        db = self.get_schedule_db(fs)
-        Schedule.add_retention(db, path, retention_spec)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.add_retention(db, path, retention_spec)
 
     @updates_schedule_db
     def rm_retention_spec(self,
@@ -294,21 +367,33 @@ class SnapSchedClient(CephfsClient):
         retention_spec = retention_spec_or_period
         if retention_count:
             retention_spec = retention_count + retention_spec
-        db = self.get_schedule_db(fs)
-        Schedule.rm_retention(db, path, retention_spec)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.rm_retention(db, path, retention_spec)
 
     @updates_schedule_db
-    def activate_snap_schedule(self, fs, path, schedule, start):
-        db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs,
-                                              schedule=schedule,
-                                              start=start)
-        [s.set_active(db) for s in schedules]
+    def activate_snap_schedule(self,
+                               fs: str,
+                               path: str,
+                               schedule: Optional[str],
+                               start: Optional[str]) -> None:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            schedules = Schedule.get_db_schedules(path, db, fs,
+                                                  schedule=schedule,
+                                                  start=start)
+            for s in schedules:
+                s.set_active(db)
 
     @updates_schedule_db
-    def deactivate_snap_schedule(self, fs, path, schedule, start):
-        db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs,
-                                              schedule=schedule,
-                                              start=start)
-        [s.set_inactive(db) for s in schedules]
+    def deactivate_snap_schedule(self,
+                                 fs: str, path: str,
+                                 schedule: Optional[str],
+                                 start: Optional[str]) -> None:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            schedules = Schedule.get_db_schedules(path, db, fs,
+                                                  schedule=schedule,
+                                                  start=start)
+            for s in schedules:
+                s.set_inactive(db)
index 701c1551e4f5d24b67032ba0de4aeb7bad741eb9..e298aad12a55d8791b8a403e6dca6a91d5c403e4 100644 (file)
@@ -6,7 +6,7 @@ LGPL2.1.  See file COPYING.
 import errno
 import json
 import sqlite3
-from typing import Dict, Sequence, Optional
+from typing import Dict, Sequence, Optional, cast, Optional
 from .fs.schedule_client import SnapSchedClient
 from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option
 from mgr_util import CephfsConnectionException
@@ -22,6 +22,13 @@ class Module(MgrModule):
             desc='allow minute scheduled snapshots',
             runtime=True,
         ),
+        Option(
+            'dump_on_update',
+            type='bool',
+            default=False,
+            desc='dump database to debug log on update',
+            runtime=True,
+        ),
     ]
 
     def __init__(self, *args, **kwargs):
@@ -68,6 +75,7 @@ class Module(MgrModule):
         '''
         use_fs = fs if fs else self.default_fs
         try:
+            path = cast(str, path)
             ret_scheds = self.client.get_snap_schedules(use_fs, path)
         except CephfsConnectionException as e:
             return e.to_tuple()
@@ -87,6 +95,7 @@ class Module(MgrModule):
         '''
         try:
             use_fs = fs if fs else self.default_fs
+            recursive = cast(bool, recursive)
             scheds = self.client.list_snap_schedules(use_fs, path, recursive)
             self.log.debug(f'recursive is {recursive}')
         except CephfsConnectionException as e: