]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybing/mgr/snap_sched: improve SQL statements, logging, metadata
authorJan Fajerski <jfajerski@suse.com>
Tue, 19 May 2020 15:31:47 +0000 (17:31 +0200)
committerJan Fajerski <jfajerski@suse.com>
Thu, 27 Aug 2020 13:55:46 +0000 (15:55 +0200)
Signed-off-by: Jan Fajerski <jfajerski@suse.com>
src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/fs/schedule_client.py
src/pybind/mgr/snap_schedule/module.py

index bba5abbe8a9b5ea77feb1597cddbb929bd590ed0..5ffdbff28c0721b13b11033c8b1b32d6849b47f7 100644 (file)
@@ -4,6 +4,7 @@ Copyright (C) 2020 SUSE
 LGPL2.1.  See file COPYING.
 """
 from datetime import datetime, timezone
+import json
 import logging
 import sqlite3
 
@@ -84,12 +85,15 @@ class Schedule(object):
     def __str__(self):
         return f'''{self.path} {self.schedule} {self.retention}'''
 
+    def json_list(self):
+        return json.dumps({'path': self.path, 'schedule': self.schedule,
+                           'retention': self.retention})
+
     CREATE_TABLES = '''CREATE TABLE schedules(
-        id integer PRIMARY KEY ASC,
-        path text NOT NULL UNIQUE,
-        subvol text,
-        rel_path text NOT NULL,
-        active int NOT NULL
+        id INTEGER PRIMARY KEY ASC,
+        path TEXT NOT NULL UNIQUE,
+        subvol TEXT,
+        rel_path TEXT NOT NULL
     );
     CREATE TABLE schedules_meta(
         id INTEGER PRIMARY KEY ASC,
@@ -98,12 +102,13 @@ class Schedule(object):
         first TEXT,
         last TEXT,
         last_pruned TEXT,
-        created TEXT,
-        repeat BIGINT NOT NULL,
+        created TEXT NOT NULL,
+        repeat INT NOT NULL,
         schedule TEXT NOT NULL,
         created_count INT DEFAULT 0,
         pruned_count INT DEFAULT 0,
         retention TEXT,
+        active INT NOT NULL,
         FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
         UNIQUE (start, repeat)
     );'''
@@ -118,38 +123,33 @@ class Schedule(object):
         WHERE
             s.path = ? AND
             strftime("%s", "now") - strftime("%s", sm.start) > 0 AND
-            s.active = 1
+            sm.active = 1
         ORDER BY until;'''
 
     PROTO_GET_SCHEDULES = '''SELECT
-          s.path, s.subvol, s.rel_path, s.active,
+          s.path, s.subvol, s.rel_path, sm.active,
           sm.schedule, sm.retention, sm.start, sm.first, sm.last,
           sm.last_pruned, sm.created, sm.created_count, sm.pruned_count
           FROM schedules s
               INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
           WHERE'''
 
-    GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'
-
-    GET_SCHEDULE = PROTO_GET_SCHEDULES + ' s.path = ? and sm.start = ? AND sm.repeat = ?'
+    GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'''
 
-    # TODO merge these two methods
     @classmethod
-    def get_db_schedules(cls, path, db, fs):
+    def get_db_schedules(cls, path, db, fs, repeat=None, start=None):
+        query = cls.GET_SCHEDULES
+        data = (path,)
+        if repeat:
+            query += ' AND sm.repeat = ?'
+            data += (repeat,)
+        if start:
+            query += ' AND sm.start = ?'
+            data += (start,)
         with db:
-            c = db.execute(cls.GET_SCHEDULES, (path,))
+            c = db.execute(query, data)
         return [cls._from_get_query(row, fs) for row in c.fetchall()]
 
-    @classmethod
-    def get_db_schedule(cls, db, fs, path, start, repeat):
-        with db:
-            c = db.execute(cls.GET_SCHEDULE, (path, start, repeat))
-        r = c.fetchone()
-        if r:
-            return cls._from_get_query(r, fs)
-        else:
-            return None
-
     @classmethod
     def list_schedules(cls, path, db, fs, recursive):
         with db:
@@ -162,11 +162,12 @@ class Schedule(object):
         return [cls._from_get_query(row, fs) for row in c.fetchall()]
 
     INSERT_SCHEDULE = '''INSERT INTO
-        schedules(path, subvol, rel_path, active)
-        Values(?, ?, ?, ?);'''
+        schedules(path, subvol, rel_path)
+        Values(?, ?, ?);'''
     INSERT_SCHEDULE_META = '''INSERT INTO
-        schedules_meta(schedule_id, start, created, repeat, schedule, retention)
-        SELECT ?, ?, ?, ?, ?, ?'''
+        schedules_meta(schedule_id, start, created, repeat, schedule,
+        retention, active)
+        SELECT ?, ?, ?, ?, ?, ?, ?'''
 
     def store_schedule(self, db):
         sched_id = None
@@ -175,8 +176,7 @@ class Schedule(object):
                 c = db.execute(self.INSERT_SCHEDULE,
                                (self.path,
                                 self.subvol,
-                                self.rel_path,
-                                1))
+                                self.rel_path,))
                 sched_id = c.lastrowid
             except sqlite3.IntegrityError:
                 # might be adding another schedule, retrieve sched id
@@ -191,7 +191,8 @@ class Schedule(object):
                         self.created.isoformat(),
                         self.repeat,
                         self.schedule,
-                        self.retention))
+                        self.retention,
+                        1))
 
     @classmethod
     def rm_schedule(cls, db, path, repeat, start):
@@ -236,8 +237,10 @@ class Schedule(object):
                 db.execute('DELETE FROM schedules WHERE id = ?;', id_)
 
     def report(self):
-        import pprint
-        return pprint.pformat(self.__dict__)
+        return self.report_json()
+
+    def report_json(self):
+        return json.dumps(dict(self.__dict__), default=lambda o: o.isoformat())
 
     @property
     def repeat(self):
@@ -254,57 +257,81 @@ class Schedule(object):
         else:
             raise Exception('schedule multiplier not recognized')
 
-    UPDATE_LAST = '''UPDATE schedules_meta
+    UPDATE_LAST = '''UPDATE schedules_meta AS sm
     SET
       last = ?,
       created_count = created_count + 1,
       first = CASE WHEN first IS NULL THEN ? ELSE first END
-    WHERE
-      start = ? AND
-      repeat = ?;'''
+    WHERE EXISTS(
+      SELECT id
+      FROM schedules s
+      WHERE s.id = sm.schedule_id
+      AND s.path = ?
+      AND sm.start = ?
+      AND sm.repeat = ?);'''
 
     def update_last(self, time, db):
         with db:
-            db.execute(self.UPDATE_LAST, (time.isoformat(), time.isoformat(),
-                                          self.start.isoformat(), self.repeat))
+            db.execute(self.UPDATE_LAST, (time.isoformat(),
+                                          time.isoformat(),
+                                          self.path,
+                                          self.start.isoformat(),
+                                          self.repeat))
         self.created_count += 1
         self.last = time
 
-    # TODO add option to only change one snapshot in a path, i.e. pass repeat
-    # and start time as well
-    UPDATE_INACTIVE = '''UPDATE schedules
+    UPDATE_INACTIVE = '''UPDATE schedules_meta AS sm
     SET
       active = 0
-    WHERE
-      path = ?;'''
+    WHERE EXISTS(
+      SELECT id
+      FROM schedules s
+      WHERE s.id = sm.schedule_id
+      AND s.path = ?
+      AND sm.start = ?
+      AND sm.repeat = ?);'''
 
     def set_inactive(self, db):
         with db:
-            log.debug(f'Deactivating schedule on path {self.path}')
-            db.execute(self.UPDATE_INACTIVE, (self.path,))
+            log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}')
+            db.execute(self.UPDATE_INACTIVE, (self.path,
+                                              self.start.isoformat(),
+                                              self.repeat))
 
-    UPDATE_ACTIVE = '''UPDATE schedules
+    UPDATE_ACTIVE = '''UPDATE schedules_meta AS sm
     SET
       active = 1
-    WHERE
-      path = ?;'''
+    WHERE EXISTS(
+      SELECT id
+      FROM schedules s
+      WHERE s.id = sm.schedule_id
+      AND s.path = ?
+      AND sm.start = ?
+      AND sm.repeat = ?);'''
 
     def set_active(self, db):
         with db:
-            log.debug(f'Activating schedule on path {self.path}')
-            db.execute(self.UPDATE_ACTIVE, (self.path,))
+            log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}')
+            db.execute(self.UPDATE_ACTIVE, (self.path,
+                                            self.start.isoformat(),
+                                            self.repeat))
 
-    UPDATE_PRUNED = '''UPDATE schedules_meta
+    UPDATE_PRUNED = '''UPDATE schedules_meta AS sm
     SET
       last_pruned = ?,
       pruned_count = pruned_count + ?
-    WHERE
-      start = ? AND
-      repeat = ?;'''
+    WHERE EXISTS(
+      SELECT id
+      FROM schedules s
+      WHERE s.id = sm.schedule_id
+      AND s.path = ?
+      AND sm.start = ?
+      AND sm.repeat = ?);'''
 
     def update_pruned(self, time, db, pruned):
         with db:
             db.execute(self.UPDATE_PRUNED, (time.isoformat(), pruned,
+                                            self.path,
                                             self.start.isoformat(),
                                             self.repeat))
         self.pruned_count += pruned
index 189495f7b3f0335a0e1ad9aa523ae8d01bb28e22..bb02e2e7a5b939c348c95fe4973f075b066f95c2 100644 (file)
@@ -16,14 +16,17 @@ import logging
 from threading import Timer
 import sqlite3
 from .schedule import Schedule
+import traceback
 
 
+MAX_SNAPS_PER_PATH = 50
 SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
 SNAP_DB_PREFIX = 'snap_db'
 # increment this every time the db schema changes and provide upgrade code
 SNAP_DB_VERSION = '0'
 SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
 SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
+SNAPSHOT_PREFIX = 'scheduled'
 
 log = logging.getLogger(__name__)
 
@@ -89,7 +92,7 @@ class SnapSchedClient(CephfsClient):
                                         size).decode('utf-8')
                         con.executescript(db)
                     except rados.ObjectNotFound:
-                        log.info(f'No schedule DB found in {fs}')
+                        log.debug(f'No schedule DB found in {fs}, creating one.')
                         con.executescript(Schedule.CREATE_TABLES)
         return self.sqlite_connections[fs]
 
@@ -122,37 +125,44 @@ class SnapSchedClient(CephfsClient):
                 timer.cancel()
             timers = []
             for row in rows:
-                log.debug(f'adding timer for {row}')
-                log.debug(f'Creating new snapshot timer')
+                log.debug(f'Creating new snapshot timer for {path}')
                 t = Timer(row[1],
                           self.create_scheduled_snapshot,
                           args=[fs, path, row[0], row[2], row[3]])
                 t.start()
                 timers.append(t)
-                log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[1]}s')
+                log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
             self.active_timers[(fs, path)] = timers
-        except Exception as e:
-            log.error(f'refresh raised {e}')
+        except Exception:
+            self._log_exception('refresh_snap_timers')
+
+    def _log_exception(self, fct):
+        log.error(f'{fct} raised an exception:')
+        log.error(traceback.format_exc())
 
     def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
         log.debug(f'Scheduled snapshot of {path} triggered')
         try:
             db = self.get_schedule_db(fs_name)
-            sched = Schedule.get_db_schedule(db, fs_name, path, start, repeat)
+            sched = Schedule.get_db_schedules(path,
+                                              db,
+                                              fs_name,
+                                              repeat,
+                                              start)[0]
             time = datetime.now(timezone.utc)
             with open_filesystem(self, fs_name) as fs_handle:
-                fs_handle.mkdir(f'{path}/.snap/scheduled-{time.strftime(SNAPSHOT_TS_FORMAT)}', 0o755)
+                snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
+                snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+                fs_handle.mkdir(snap_name, 0o755)
             log.info(f'created scheduled snapshot of {path}')
-            # TODO change last snap timestamp in db, maybe first
             sched.update_last(time, db)
-        except cephfs.Error as e:
-            log.info(f'scheduled snapshot creating of {path} failed: {e}')
-            # TODO set inactive if path doesn't exist
+        except cephfs.Error:
+            self._log_exception('create_scheduled_snapshot')
             sched.set_inactive(db)
-        except Exception as e:
+        except Exception:
             # catch all exceptions cause otherwise we'll never know since this
             # is running in a thread
-            log.error(f'ERROR create_scheduled_snapshot raised{e}')
+            self._log_exception('create_scheduled_snapshot')
         finally:
             self.refresh_snap_timers(fs_name, path)
             self.prune_snapshots(sched)
@@ -162,20 +172,19 @@ class SnapSchedClient(CephfsClient):
             log.debug('Pruning snapshots')
             ret = parse_retention(sched.retention)
             path = sched.path
-            if not ret:
-                # TODO prune if too many (300?)
-                log.debug(f'schedule on {path} has no retention specified')
-                return
+            if not ret or "n" not in ret or ret["n"] > MAX_SNAPS_PER_PATH:
+                log.debug(f'Adding n: { MAX_SNAPS_PER_PATH} limit to {path}')
+                ret["n"] = MAX_SNAPS_PER_PATH
             prune_candidates = set()
             time = datetime.now(timezone.utc)
             with open_filesystem(self, sched.fs) as fs_handle:
                 with fs_handle.opendir(f'{path}/.snap') as d_handle:
                     dir_ = fs_handle.readdir(d_handle)
                     while dir_:
-                        if dir_.d_name.startswith(b'scheduled-'):
+                        if dir_.d_name.startswith(b'{SNAPSHOT_PREFIX}-'):
                             log.debug(f'add {dir_.d_name} to pruning')
                             ts = datetime.strptime(
-                                dir_.d_name.lstrip(b'scheduled-').decode('utf-8'),
+                                dir_.d_name.lstrip(b'{SNAPSHOT_PREFIX}-').decode('utf-8'),
                                 SNAPSHOT_TS_FORMAT)
                             prune_candidates.add((dir_, ts))
                         else:
@@ -189,12 +198,15 @@ class SnapSchedClient(CephfsClient):
                 if to_prune:
                     sched.update_pruned(time, self.get_schedule_db(sched.fs),
                                         len(to_prune))
-        except Exception as e:
-            log.debug(f'prune_snapshots threw {e}')
+        except Exception:
+            self._log_exception('prune_snapshots')
 
     def get_prune_set(self, candidates, retention):
         PRUNING_PATTERNS = OrderedDict([
-            #TODO remove M for release
+            # n is for keep last n snapshots, uses the snapshot name timestamp
+            # format for lowest granularity
+            ("n", SNAPSHOT_TS_FORMAT)
+            # TODO remove M for release
             ("M", '%Y-%m-%d-%H_%M'),
             ("h", '%Y-%m-%d-%H'),
             ("d", '%Y-%m-%d'),
@@ -203,17 +215,14 @@ class SnapSchedClient(CephfsClient):
             ("y", '%Y'),
         ])
         keep = set()
-        log.debug(retention)
         for period, date_pattern in PRUNING_PATTERNS.items():
             period_count = retention.get(period, 0)
             if not period_count:
-                log.debug(f'skipping period {period}')
                 continue
             last = None
             for snap in sorted(candidates, key=lambda x: x[0].d_name,
                                reverse=True):
                 snap_ts = snap[1].strftime(date_pattern)
-                log.debug(f'{snap_ts} : {last}')
                 if snap_ts != last:
                     last = snap_ts
                     if snap not in keep:
@@ -222,8 +231,6 @@ class SnapSchedClient(CephfsClient):
                         if len(keep) == period_count:
                             log.debug(f'found enough snapshots for {period_count}{period}')
                             break
-            # TODO maybe do candidates - keep here? we want snaps counting it
-            # hours not be considered for days and it cuts down on iterations
         return candidates - keep
 
     def get_snap_schedules(self, fs, path):
@@ -251,11 +258,11 @@ class SnapSchedClient(CephfsClient):
     @updates_schedule_db
     def activate_snap_schedule(self, fs, path, repeat, start):
         db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs)
+        schedules = Schedule.get_db_schedules(path, db, fs, repeat, start)
         [s.set_active(db) for s in schedules]
 
     @updates_schedule_db
     def deactivate_snap_schedule(self, fs, path, repeat, start):
         db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs)
+        schedules = Schedule.get_db_schedules(path, db, fs, repeat, start)
         [s.set_inactive(db) for s in schedules]
index e87c085f80ab68e372c1c29742b198019b0cb520..8f8d10c8aa71ea5586916ebd198c88e8863c2bc5 100644 (file)
@@ -4,7 +4,6 @@ Copyright (C) 2019 SUSE
 LGPL2.1.  See file COPYING.
 """
 import errno
-import json
 import sqlite3
 from .fs.schedule_client import SnapSchedClient
 from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand
@@ -50,23 +49,29 @@ class Module(MgrModule):
     @CLIReadCommand('fs snap-schedule status',
                     'name=path,type=CephString,req=false '
                     'name=subvol,type=CephString,req=false '
-                    'name=fs,type=CephString,req=false',
+                    'name=fs,type=CephString,req=false '
+                    'name=format,type=CephString,req=false',
                     'List current snapshot schedules')
-    def snap_schedule_get(self, path='/', subvol=None, fs=None):
+    def snap_schedule_get(self, path='/', subvol=None, fs=None, format='plain'):
         use_fs = fs if fs else self.default_fs
         try:
             ret_scheds = self.client.get_snap_schedules(use_fs, path)
         except CephfsConnectionException as e:
             return e.to_tuple()
+        if format == 'json':
+            json_report = ','.join([ret_sched.report_json() for ret_sched in ret_scheds])
+            return 0, f'{json_report}', ''
         return 0, '\n===\n'.join([ret_sched.report() for ret_sched in ret_scheds]), ''
 
     @CLIReadCommand('fs snap-schedule list',
                     'name=path,type=CephString '
                     'name=recursive,type=CephString,req=false '
                     'name=subvol,type=CephString,req=false '
-                    'name=fs,type=CephString,req=false',
+                    'name=fs,type=CephString,req=false '
+                    'name=format,type=CephString,req=false',
                     'Get current snapshot schedule for <path>')
-    def snap_schedule_list(self, path, subvol=None, recursive=False, fs=None):
+    def snap_schedule_list(self, path, subvol=None, recursive=False, fs=None,
+                           format='plain'):
         try:
             use_fs = fs if fs else self.default_fs
             scheds = self.client.list_snap_schedules(use_fs, path, recursive)
@@ -75,6 +80,9 @@ class Module(MgrModule):
             return e.to_tuple()
         if not scheds:
             return errno.ENOENT, '', f'SnapSchedule for {path} not found'
+        if format == 'json':
+            json_list = ','.join([sched.json_list() for sched in scheds])
+            return 0, f'[{json_list}]', ''
         return 0, '\n'.join([str(sched) for sched in scheds]), ''
 
     @CLIWriteCommand('fs snap-schedule add',