]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
snap-schedule: Implement pruning according to retention specfication
authorJan Fajerski <jfajerski@suse.com>
Wed, 6 May 2020 13:20:32 +0000 (15:20 +0200)
committerJan Fajerski <jfajerski@suse.com>
Thu, 27 Aug 2020 13:55:45 +0000 (15:55 +0200)
Signed-off-by: Jan Fajerski <jfajerski@suse.com>
src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/module.py

index 970ede4b2bb2412c6567090f484bfed159a3dc6a..bae2cd7d1476a585c58f0e27b97cec929988d3e3 100644 (file)
@@ -7,14 +7,22 @@ import cephfs
 import errno
 import rados
 from contextlib import contextmanager
-from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap
-from datetime import datetime, timedelta
+import re
+from mgr_util import CephfsClient, CephfsConnectionException, \
+        open_filesystem
+from collections import OrderedDict
+from datetime import datetime
+import logging
+from operator import attrgetter, itemgetter
 from threading import Timer
 import sqlite3
 
 
 SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
 SNAP_DB_OBJECT_NAME = 'snap_db'
+TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
+
+log = logging.getLogger(__name__)
 
 
 @contextmanager
@@ -29,7 +37,7 @@ def open_ioctx(self, pool):
                 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
                 yield ioctx
     except rados.ObjectNotFound:
-        self.log.error("Failed to locate pool {}".format(pool))
+        log.error("Failed to locate pool {}".format(pool))
         raise
 
 
@@ -59,6 +67,7 @@ class Schedule(object):
         self.rel_path = rel_path
         self.schedule = schedule
         self.retention = retention_policy
+        self._ret = {}
         self.first_run = start
         self.last_run = None
 
@@ -90,12 +99,22 @@ class Schedule(object):
             raise Exception('schedule multiplier not recognized')
 
 
+def parse_retention(retention):
+    ret = {}
+    matches = re.findall(r'\d+[a-z]', retention)
+    for m in matches:
+        ret[m[-1]] = int(m[0:-1])
+    matches = re.findall(r'\d+[A-Z]', retention)
+    for m in matches:
+        ret[m[-1]] = int(m[0:-1])
+    return ret
+
+
 class SnapSchedClient(CephfsClient):
 
     CREATE_TABLES = '''CREATE TABLE schedules(
         id integer PRIMARY KEY ASC,
         path text NOT NULL UNIQUE,
-        schedule text NOT NULL,
         subvol text,
         rel_path text NOT NULL,
         active int NOT NULL
@@ -105,8 +124,10 @@ class SnapSchedClient(CephfsClient):
         schedule_id int,
         start bigint NOT NULL,
         repeat bigint NOT NULL,
+        schedule text NOT NULL,
         retention text,
-        FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE
+        FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
+        UNIQUE (start, repeat)
     );'''
 
     def __init__(self, mgr):
@@ -123,39 +144,46 @@ class SnapSchedClient(CephfsClient):
         ORDER BY until;'''
 
     def refresh_snap_timers(self, fs):
-        self.log.debug(f'SnapDB on {fs} changed, updating next Timer')
-        db = self.get_schedule_db(fs)
-        rows = []
-        with db:
-            cur = db.execute(self.EXEC_QUERY)
-            rows = cur.fetchmany(1)
-        timers = self.active_timers.get(fs, [])
-        for timer in timers:
-            timer.cancel()
-        timers = []
-        for row in rows:
-            self.log.debug(f'Creating new snapshot timer')
-            t = Timer(row[2],
-                      self.create_scheduled_snapshot,
-                      args=[fs, row[0], row[1]])
-            t.start()
-            timers.append(t)
-            self.log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s')
-        self.active_timers[fs] = timers
+        try:
+            log.debug(f'SnapDB on {fs} changed, updating next Timer')
+            db = self.get_schedule_db(fs)
+            rows = []
+            with db:
+                cur = db.execute(self.EXEC_QUERY)
+                rows = cur.fetchmany(1)
+                log.debug(f'retrieved {cur.rowcount} rows')
+            timers = self.active_timers.get(fs, [])
+            for timer in timers:
+                timer.cancel()
+            timers = []
+            for row in rows:
+                log.debug(f'adding timer for {row}')
+                log.debug(f'Creating new snapshot timer')
+                t = Timer(row[2],
+                          self.create_scheduled_snapshot,
+                          args=[fs, row[0], row[1]])
+                t.start()
+                timers.append(t)
+                log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s')
+            self.active_timers[fs] = timers
+        except Exception as e:
+            log.error(f'refresh raised {e}')
 
     def get_schedule_db(self, fs):
         if fs not in self.sqlite_connections:
-            self.sqlite_connections[fs] = sqlite3.connect(':memory:')
+            self.sqlite_connections[fs] = sqlite3.connect(':memory:',
+                                                          check_same_thread=False)
             with self.sqlite_connections[fs] as con:
                 con.execute("PRAGMA FOREIGN_KEYS = 1")
                 pool = self.get_metadata_pool(fs)
                 with open_ioctx(self, pool) as ioctx:
                     try:
-                        size, _mtime = ioctx.stat('SNAP_DB_OBJECT_NAME')
-                        db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8')
+                        size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
+                        db = ioctx.read(SNAP_DB_OBJECT_NAME,
+                                        size).decode('utf-8')
                         con.executescript(db)
                     except rados.ObjectNotFound:
-                        self.log.info(f'No schedule DB found in {fs}')
+                        log.info(f'No schedule DB found in {fs}')
                         con.executescript(self.CREATE_TABLES)
         return self.sqlite_connections[fs]
 
@@ -167,47 +195,124 @@ class SnapSchedClient(CephfsClient):
                 -errno.ENOENT, "Filesystem {} does not exist".format(fs))
         if fs in self.sqlite_connections:
             db_content = []
-            # TODO maybe do this in a transaction?
             db = self.sqlite_connections[fs]
             with db:
                 for row in db.iterdump():
                     db_content.append(row)
         with open_ioctx(self, metadata_pool) as ioctx:
-            ioctx.write_full("SNAP_DB_OBJECT_NAME",
+            ioctx.write_full(SNAP_DB_OBJECT_NAME,
                              '\n'.join(db_content).encode('utf-8'))
 
-    @connection_pool_wrap
-    def create_scheduled_snapshot(self, fs_info, path, retention):
-        self.log.debug(f'Scheduled snapshot of {path} triggered')
+    def create_scheduled_snapshot(self, fs_name, path, retention):
+        log.debug(f'Scheduled snapshot of {path} triggered')
         try:
-            time = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H_%M_%S')
-            fs_info[1].mkdir(f'{path}/.snap/scheduled-{time}', 0o755)
-            self.log.info(f'created scheduled snapshot of {path}')
+            time = datetime.utcnow().strftime(TS_FORMAT)
+            with open_filesystem(self, fs_name) as fs_handle:
+                fs_handle.mkdir(f'{path}/.snap/scheduled-{time}', 0o755)
+            log.info(f'created scheduled snapshot of {path}')
         except cephfs.Error as e:
-            self.log.info(f'scheduled snapshot creating of {path} failed')
-            raise CephfsConnectionException(-e.args[0], e.args[1])
+            log.info(f'scheduled snapshot creating of {path} failed: {e}')
+        except Exception as e:
+            # catch all exceptions cause otherwise we'll never know since this
+            # is running in a thread
+            log.error(f'ERROR create_scheduled_snapshot raised{e}')
         finally:
-            self.refresh_snap_timers(fs_info[0])
+            log.info(f'finally branch')
+            self.refresh_snap_timers(fs_name)
+            log.info(f'calling prune')
+            self.prune_snapshots(fs_name, path, retention)
+
+    def prune_snapshots(self, fs_name, path, retention):
+        log.debug('Pruning snapshots')
+        ret = parse_retention(retention)
+        try:
+            prune_candidates = set()
+            with open_filesystem(self, fs_name) as fs_handle:
+                with fs_handle.opendir(f'{path}/.snap') as d_handle:
+                    dir_ = fs_handle.readdir(d_handle)
+                    while dir_:
+                        if dir_.d_name.startswith(b'scheduled-'):
+                            log.debug(f'add {dir_.d_name} to pruning')
+                            ts = datetime.strptime(
+                                dir_.d_name.lstrip(b'scheduled-').decode('utf-8'),
+                                TS_FORMAT)
+                            prune_candidates.add((dir_, ts))
+                        else:
+                            log.debug(f'skipping dir entry {dir_.d_name}')
+                        dir_ = fs_handle.readdir(d_handle)
+                to_keep = self.get_prune_set(prune_candidates, ret)
+                for k in prune_candidates - to_keep:
+                    dirname = k[0].d_name.decode('utf-8')
+                    log.debug(f'rmdir on {dirname}')
+                    fs_handle.rmdir(f'{path}/.snap/{dirname}')
+                log.debug(f'keeping {to_keep}')
+        except Exception as e:
+            log.debug(f'prune_snapshots threw {e}')
         # TODO: handle snap pruning accoring to retention
 
-    @connection_pool_wrap
-    def validate_schedule(self, fs_handle, sched):
+    def get_prune_set(self, candidates, retention):
+        PRUNING_PATTERNS = OrderedDict([
+            #TODO remove M for release
+            ("M", '%Y-%m-%d-%H_%M'),
+            ("h", '%Y-%m-%d-%H'),
+            ("d", '%Y-%m-%d'),
+            ("w", '%G-%V'),
+            ("m", '%Y-%m'),
+            ("y", '%Y'),
+        ])
+        keep = set()
+        log.debug(retention)
+        for period, date_pattern in PRUNING_PATTERNS.items():
+            period_count = retention.get(period, 0)
+            if not period_count:
+                log.debug(f'skipping period {period}')
+                continue
+            last = None
+            for snap in sorted(candidates, key=lambda x: x[0].d_name,
+                               reverse=True):
+                snap_ts = snap[1].strftime(date_pattern)
+                log.debug(f'{snap_ts} : {last}')
+                if snap_ts != last:
+                    last = snap_ts
+                    if snap not in keep:
+                        log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
+                        keep.add(snap)
+                        if len(keep) == period_count:
+                            log.debug(f'found enough snapshots for {period_count}{period}')
+                            break
+            # TODO maybe do candidates - keep here? we want snaps counting it
+            # hours not be considered for days and it cuts down on iterations
+        return keep
+
+    def validate_schedule(self, fs_name, sched):
         try:
-            fs_handle.stat(sched.path)
+            with open_filesystem(self, fs_name) as fs_handle:
+                fs_handle.stat(sched.path)
         except cephfs.ObjectNotFound:
-            self.log.error('Path {} not found'.format(sched.path))
+            log.error('Path {} not found'.format(sched.path))
             return False
         return True
 
-    def list_snap_schedule(self, fs, path):
+    LIST_SCHEDULES = '''SELECT
+        s.path, sm.schedule, sm.retention, sm.start, s.subvol, s.rel_path
+        FROM schedules s
+            INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+        WHERE s.path = ?'''
+
+    def list_snap_schedules(self, fs, path):
         db = self.get_schedule_db(fs)
+        c = db.execute(self.LIST_SCHEDULES, (path,))
+        return [Schedule.from_db_row(row, fs) for row in c.fetchall()]
+
+    def dump_snap_schedule(self, fs, path):
+        db = self.get_schedule_db(fs)
+        # TODO retrieve multiple schedules per path from schedule_meta
         # with db:
-        scheds = []
-        for row in db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?',
-                              (f'{path}%',)):
-            scheds.append(row)
-        return scheds
+        c = db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?',
+                       (f'{path}%',))
+        return [row for row in c.fetchall()]
 
+# TODO currently not used, probably broken
     UPDATE_SCHEDULE = '''UPDATE schedules
         SET
             schedule = ?,
@@ -217,6 +322,7 @@ class SnapSchedClient(CephfsClient):
         SET
             start = ?,
             repeat = ?,
+            schedule = ?
             retention = ?
         WHERE schedule_id = (
             SELECT id FROM SCHEDULES WHERE path = ?);'''
@@ -235,39 +341,32 @@ class SnapSchedClient(CephfsClient):
                         sched.path))
 
     INSERT_SCHEDULE = '''INSERT INTO
-        schedules(path, schedule, subvol, rel_path, active)
-        Values(?, ?, ?, ?, ?);'''
+        schedules(path, subvol, rel_path, active)
+        Values(?, ?, ?, ?);'''
     INSERT_SCHEDULE_META = '''INSERT INTO
-        schedules_meta(schedule_id, start, repeat, retention)
-        Values(last_insert_rowid(), ?, ?, ?)'''
+        schedules_meta(schedule_id, start, repeat, schedule, retention)
+        Values(last_insert_rowid(), ?, ?, ?, ?)'''
 
     @updates_schedule_db
     def store_snap_schedule(self, fs, sched):
         db = self.get_schedule_db(fs)
         with db:
-            db.execute(self.INSERT_SCHEDULE,
-                       (sched.path,
-                        sched.schedule,
-                        sched.subvol,
-                        sched.rel_path,
-                        1))
+            try:
+                db.execute(self.INSERT_SCHEDULE,
+                           (sched.path,
+                            sched.subvol,
+                            sched.rel_path,
+                            1))
+            except sqlite3.IntegrityError:
+                # might be adding another schedule
+                pass
             db.execute(self.INSERT_SCHEDULE_META,
                        (f'strftime("%s", "{sched.first_run}")',
                         sched.repeat_in_s(),
+                        sched.schedule,
                         sched.retention))
             self.store_schedule_db(sched.fs)
 
-    GET_SCHEULE = '''SELECT
-        s.path, s.schedule, sm.retention, sm.start, s.subvol, s.rel_path
-        FROM schedules s
-            INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
-        WHERE s.path = ?'''
-
-    def get_snap_schedule(self, fs, path):
-        db = self.get_schedule_db(fs)
-        c = db.execute(self.GET_SCHEDULE, (path,))
-        return Schedule.from_db_row(c.fetchone(), fs)
-
     @updates_schedule_db
     def rm_snap_schedule(self, fs, path):
         db = self.get_schedule_db(fs)
@@ -275,4 +374,6 @@ class SnapSchedClient(CephfsClient):
             cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?',
                              (path,))
             id_ = cur.fetchone()
-            db.execute('DELETE FROM schedules WHERE id = ?;', (id_,))
+            # TODO check for repeat-start pair and delete that if present, all
+            # otherwise. If repeat-start was speced, delete only those
+            db.execute('DELETE FROM schedules WHERE id = ?;', id_)
index 4f685b798f2eaceeaaf105fe370a6d42462a2436..89c78c3ef9759362ca6158b180c0fe34942b4e7e 100644 (file)
@@ -47,33 +47,33 @@ class Module(MgrModule):
         self._initialized.wait()
         return -errno.EINVAL, "", "Unknown command"
 
-    @CLIReadCommand('fs snap-schedule ls',
+    @CLIReadCommand('fs snap-schedule dump',
                     'name=path,type=CephString,req=false '
                     'name=subvol,type=CephString,req=false '
                     'name=fs,type=CephString,req=false',
                     'List current snapshot schedules')
-    def snap_schedule_ls(self, path='/', subvol=None, fs=None):
+    def snap_schedule_dump(self, path='/', subvol=None, fs=None):
         use_fs = fs if fs else self.default_fs
         try:
-            ret_scheds = self.client.list_snap_schedule(use_fs, path)
+            ret_scheds = self.client.dump_snap_schedule(use_fs, path)
         except CephfsConnectionException as e:
             return e.to_tuple()
         return 0, ' '.join(str(ret_scheds)), ''
 
-    @CLIReadCommand('fs snap-schedule get',
+    @CLIReadCommand('fs snap-schedule list',
                     'name=path,type=CephString '
                     'name=subvol,type=CephString,req=false '
                     'name=fs,type=CephString,req=false',
                     'Get current snapshot schedule for <path>')
-    def snap_schedule_get(self, path, subvol=None, fs=None):
+    def snap_schedule_list(self, path, subvol=None, fs=None):
         try:
             use_fs = fs if fs else self.default_fs
-            sched = self.client.get_snap_schedule(use_fs, path)
+            scheds = self.client.list_snap_schedules(use_fs, path)
         except CephfsConnectionException as e:
             return e.to_tuple()
-        if not sched:
+        if not scheds:
             return -1, '', f'SnapSchedule for {path} not found'
-        return 0, str(sched), ''
+        return 0, str([str(sched) for sched in scheds]), ''
 
     @CLIWriteCommand('fs snap-schedule set',
                      'name=path,type=CephString '
@@ -87,7 +87,7 @@ class Module(MgrModule):
                           path,
                           snap_schedule,
                           retention_policy='',
-                          start='now',
+                          start='00:00',
                           fs=None,
                           subvol=None):
         try:
@@ -100,15 +100,15 @@ class Module(MgrModule):
             self.client.store_snap_schedule(use_fs, sched)
             suc_msg = f'Schedule set for path {path}'
         except sqlite3.IntegrityError:
-            existing_sched = self.client.get_snap_schedule(use_fs, path)
-            self.log.info(f'Found existing schedule {existing_sched}...updating')
-            self.client.update_snap_schedule(use_fs, sched)
-            suc_msg = f'Schedule set for path {path}, updated existing schedule {existing_sched}'
+            existing_sched = self.client.list_snap_schedule(use_fs, path)
+            error_msg = f'Found existing schedule {existing_sched}'
+            self.log.error(error_msg)
+            return 1, '', error_msg
         except CephfsConnectionException as e:
             return e.to_tuple()
         return 0, suc_msg, ''
 
-    @CLIWriteCommand('fs snap-schedule rm',
+    @CLIWriteCommand('fs snap-schedule remove',
                      'name=path,type=CephString '
                      'name=subvol,type=CephString,req=false '
                      'name=fs,type=CephString,req=false',