]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/snap_schedule: take snapshots according to schedule.
authorJan Fajerski <jfajerski@suse.com>
Tue, 19 Nov 2019 12:35:20 +0000 (13:35 +0100)
committerJan Fajerski <jfajerski@suse.com>
Thu, 27 Aug 2020 13:55:45 +0000 (15:55 +0200)
Timers are scheduled for the next snapshot to be taken. The next Timer
is canceled in case the snapshot DB is changed. When a Timer triggers it
also schedules the next Timer.

Signed-off-by: Jan Fajerski <jfajerski@suse.com>
src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/module.py
src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py

index d0e84f8523544bb95d6e1367539e4c1e81b28976..970ede4b2bb2412c6567090f484bfed159a3dc6a 100644 (file)
@@ -9,6 +9,7 @@ import rados
 from contextlib import contextmanager
 from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap
 from datetime import datetime, timedelta
+from threading import Timer
 import sqlite3
 
 
@@ -32,6 +33,13 @@ def open_ioctx(self, pool):
         raise
 
 
+def updates_schedule_db(func):
+    def f(self, fs, *args):
+        func(self, fs, *args)
+        self.refresh_snap_timers(fs)
+    return f
+
+
 class Schedule(object):
     '''
     Wrapper to work with schedules stored in Rados objects
@@ -40,24 +48,32 @@ class Schedule(object):
                  path,
                  schedule,
                  retention_policy,
+                 start,
                  fs_name,
                  subvol,
-                 first_run=None,
+                 rel_path,
                  ):
         self.fs = fs_name
         self.subvol = subvol
         self.path = path
+        self.rel_path = rel_path
         self.schedule = schedule
         self.retention = retention_policy
-        self.first_run = None
+        self.first_run = start
         self.last_run = None
 
     def __str__(self):
-        return f'''{self.path}: {self.schedule}; {self.retention}'''
+        return f'''{self.rel_path}: {self.schedule}; {self.retention}'''
 
     @classmethod
     def from_db_row(cls, table_row, fs):
-        return cls(table_row[0], table_row[1], table_row[2], fs, None)
+        return cls(table_row[0],
+                   table_row[1],
+                   table_row[2],
+                   fs,
+                   table_row[3],
+                   table_row[4],
+                   None)
 
     def repeat_in_s(self):
         mult = self.schedule[-1]
@@ -76,29 +92,56 @@ class Schedule(object):
 
 class SnapSchedClient(CephfsClient):
 
-    CREATE_TABLES = '''CREATE TABLE SCHEDULES(
-        id INTEGER PRIMARY KEY ASC,
+    CREATE_TABLES = '''CREATE TABLE schedules(
+        id integer PRIMARY KEY ASC,
         path text NOT NULL UNIQUE,
         schedule text NOT NULL,
-        retention text
+        subvol text,
+        rel_path text NOT NULL,
+        active int NOT NULL
     );
-    CREATE TABLE SCHEDULES_META(
-        id INTEGER PRIMARY KEY ASC,
+    CREATE TABLE schedules_meta(
+        id integer PRIMARY KEY ASC,
         schedule_id int,
         start bigint NOT NULL,
         repeat bigint NOT NULL,
+        retention text,
         FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE
     );'''
-    INSERT_SCHEDULE = '''INSERT INTO schedules(path, schedule, retention) Values(?, ?, ?);'''
-    INSERT_SCHEDULE_META = ''' INSERT INTO schedules_meta(schedule_id, start, repeat) Values(last_insert_rowid(), ?, ?)'''
-    UPDATE_SCHEDULE = '''UPDATE schedules SET schedule = ?, retention = ? where path = ?;'''
-    UPDATE_SCHEDULE_META = '''UPDATE schedules_meta SET start = ?, repeat = ? where schedule_id = (SELECT id FROM SCHEDULES WHERE path = ?);'''
-    DELETE_SCHEDULE = '''DELETE FROM schedules WHERE id = ?;'''
-    exec_query = '''select s.id, s.path, sm.repeat - ((strftime("%s", "now") - sm.start) % sm.repeat) "until" from schedules s inner join schedules_meta sm on sm.event_id = s.id order by until;'''
 
     def __init__(self, mgr):
         super(SnapSchedClient, self).__init__(mgr)
+        # TODO maybe iterate over all fs instance in fsmap and load snap dbs?
         self.sqlite_connections = {}
+        self.active_timers = {}
+
+    EXEC_QUERY = '''SELECT
+        s.path, sm.retention,
+        sm.repeat - (strftime("%s", "now") - sm.start) % sm.repeat "until"
+        FROM schedules s
+            INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+        ORDER BY until;'''
+
+    def refresh_snap_timers(self, fs):
+        self.log.debug(f'SnapDB on {fs} changed, updating next Timer')
+        db = self.get_schedule_db(fs)
+        rows = []
+        with db:
+            cur = db.execute(self.EXEC_QUERY)
+            rows = cur.fetchmany(1)
+        timers = self.active_timers.get(fs, [])
+        for timer in timers:
+            timer.cancel()
+        timers = []
+        for row in rows:
+            self.log.debug(f'Creating new snapshot timer')
+            t = Timer(row[2],
+                      self.create_scheduled_snapshot,
+                      args=[fs, row[0], row[1]])
+            t.start()
+            timers.append(t)
+            self.log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s')
+        self.active_timers[fs] = timers
 
     def get_schedule_db(self, fs):
         if fs not in self.sqlite_connections:
@@ -112,7 +155,7 @@ class SnapSchedClient(CephfsClient):
                         db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8')
                         con.executescript(db)
                     except rados.ObjectNotFound:
-                        self.log.info('No schedule DB found in {}'.format(fs))
+                        self.log.info(f'No schedule DB found in {fs}')
                         con.executescript(self.CREATE_TABLES)
         return self.sqlite_connections[fs]
 
@@ -125,18 +168,34 @@ class SnapSchedClient(CephfsClient):
         if fs in self.sqlite_connections:
             db_content = []
             # TODO maybe do this in a transaction?
-            for row in self.sqlite_connections[fs].iterdump():
-                db_content.append(row)
+            db = self.sqlite_connections[fs]
+            with db:
+                for row in db.iterdump():
+                    db_content.append(row)
         with open_ioctx(self, metadata_pool) as ioctx:
             ioctx.write_full("SNAP_DB_OBJECT_NAME",
                              '\n'.join(db_content).encode('utf-8'))
 
     @connection_pool_wrap
-    def validate_schedule(self, fs_handle, **kwargs):
+    def create_scheduled_snapshot(self, fs_info, path, retention):
+        self.log.debug(f'Scheduled snapshot of {path} triggered')
+        try:
+            time = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H_%M_%S')
+            fs_info[1].mkdir(f'{path}/.snap/scheduled-{time}', 0o755)
+            self.log.info(f'created scheduled snapshot of {path}')
+        except cephfs.Error as e:
+            self.log.info(f'scheduled snapshot creating of {path} failed')
+            raise CephfsConnectionException(-e.args[0], e.args[1])
+        finally:
+            self.refresh_snap_timers(fs_info[0])
+        # TODO: handle snap pruning accoring to retention
+
+    @connection_pool_wrap
+    def validate_schedule(self, fs_handle, sched):
         try:
-            fs_handle.stat(kwargs['sched'].path)
+            fs_handle.stat(sched.path)
         except cephfs.ObjectNotFound:
-            self.log.error('Path {} not found'.format(kwargs['sched'].path))
+            self.log.error('Path {} not found'.format(sched.path))
             return False
         return True
 
@@ -149,39 +208,71 @@ class SnapSchedClient(CephfsClient):
             scheds.append(row)
         return scheds
 
+    UPDATE_SCHEDULE = '''UPDATE schedules
+        SET
+            schedule = ?,
+            active = 1
+        WHERE path = ?;'''
+    UPDATE_SCHEDULE_META = '''UPDATE schedules_meta
+        SET
+            start = ?,
+            repeat = ?,
+            retention = ?
+        WHERE schedule_id = (
+            SELECT id FROM SCHEDULES WHERE path = ?);'''
+
+    @updates_schedule_db
     def update_snap_schedule(self, fs, sched):
         db = self.get_schedule_db(fs)
         with db:
             db.execute(self.UPDATE_SCHEDULE,
                        (sched.schedule,
-                        sched.retention,
                         sched.path))
             db.execute(self.UPDATE_SCHEDULE_META,
-                       ('strftime("%s", "now")',
+                       (f'strftime("%s", "{sched.first_run}")',
                         sched.repeat_in_s(),
+                        sched.retention,
                         sched.path))
 
+    INSERT_SCHEDULE = '''INSERT INTO
+        schedules(path, schedule, subvol, rel_path, active)
+        Values(?, ?, ?, ?, ?);'''
+    INSERT_SCHEDULE_META = '''INSERT INTO
+        schedules_meta(schedule_id, start, repeat, retention)
+        Values(last_insert_rowid(), ?, ?, ?)'''
+
+    @updates_schedule_db
     def store_snap_schedule(self, fs, sched):
         db = self.get_schedule_db(fs)
         with db:
             db.execute(self.INSERT_SCHEDULE,
                        (sched.path,
                         sched.schedule,
-                        sched.retention))
+                        sched.subvol,
+                        sched.rel_path,
+                        1))
             db.execute(self.INSERT_SCHEDULE_META,
-                       ('strftime("%s", "now")',
-                        sched.repeat_in_s()))
+                       (f'strftime("%s", "{sched.first_run}")',
+                        sched.repeat_in_s(),
+                        sched.retention))
             self.store_schedule_db(sched.fs)
 
+    GET_SCHEULE = '''SELECT
+        s.path, s.schedule, sm.retention, sm.start, s.subvol, s.rel_path
+        FROM schedules s
+            INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+        WHERE s.path = ?'''
+
     def get_snap_schedule(self, fs, path):
         db = self.get_schedule_db(fs)
-        c = db.execute('SELECT path, schedule, retention FROM SCHEDULES WHERE path = ?', (path,))
+        c = db.execute(self.GET_SCHEDULE, (path,))
         return Schedule.from_db_row(c.fetchone(), fs)
 
+    @updates_schedule_db
     def rm_snap_schedule(self, fs, path):
         db = self.get_schedule_db(fs)
         with db:
             cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?',
                              (path,))
             id_ = cur.fetchone()
-            db.execute(self.DELETE_SCHEDULE, id_)
+            db.execute('DELETE FROM schedules WHERE id = ?;', (id_,))
index ac092231bef954fe305071c853667b836590a65c..4f685b798f2eaceeaaf105fe370a6d42462a2436 100644 (file)
@@ -10,10 +10,6 @@ from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand
 from mgr_util import CephfsConnectionException
 from rados import ObjectNotFound
 from threading import Event
-try:
-    import queue as Queue
-except ImportError:
-    import Queue
 
 
 class Module(MgrModule):
@@ -23,7 +19,16 @@ class Module(MgrModule):
         self._initialized = Event()
         self.client = SnapSchedClient(self)
 
-        self._background_jobs = Queue.Queue()
+    def resolve_subvolume_path(self, fs, subvol, path):
+        if not subvol:
+            return path
+
+        rc, subvol_path, err = self.remote('fs', 'subvolume', 'getpath',
+                                           fs, subvol)
+        if rc != 0:
+            # TODO custom exception
+            raise Exception(f'Could not resolve {path} in {fs}, {subvol}')
+        return subvol_path + path
 
     @property
     def default_fs(self):
@@ -70,19 +75,11 @@ class Module(MgrModule):
             return -1, '', f'SnapSchedule for {path} not found'
         return 0, str(sched), ''
 
-    prune_schedule_options = ('name=keep-minutely,type=CephString,req=false '
-                              'name=keep-hourly,type=CephString,req=false '
-                              'name=keep-daily,type=CephString,req=false '
-                              'name=keep-weekly,type=CephString,req=false '
-                              'name=keep-monthly,type=CephString,req=false '
-                              'name=keep-yearly,type=CephString,req=false '
-                              'name=keep-last,type=CephString,req=false '
-                              'name=keep-within,type=CephString,req=false')
-
     @CLIWriteCommand('fs snap-schedule set',
                      'name=path,type=CephString '
                      'name=snap-schedule,type=CephString '
                      'name=retention-policy,type=CephString,req=false '
+                     'name=start,type=CephString,req=false '
                      'name=fs,type=CephString,req=false '
                      'name=subvol,type=CephString,req=false',
                      'Set a snapshot schedule for <path>')
@@ -90,15 +87,16 @@ class Module(MgrModule):
                           path,
                           snap_schedule,
                           retention_policy='',
+                          start='now',
                           fs=None,
                           subvol=None):
         try:
             use_fs = fs if fs else self.default_fs
-            # TODO should we allow empty retention policies?
-            sched = Schedule(path, snap_schedule, retention_policy,
-                             use_fs, subvol)
+            abs_path = self.resolve_subvolume_path(fs, subvol, path)
+            sched = Schedule(abs_path, snap_schedule, retention_policy,
+                             start, use_fs, subvol, path)
             # TODO allow schedules on non-existent paths?
-            # self.client.validate_schedule(None, sched=sched, fs_name=fs)
+            # self.client.validate_schedule(fs, sched)
             self.client.store_snap_schedule(use_fs, sched)
             suc_msg = f'Schedule set for path {path}'
         except sqlite3.IntegrityError:
index 5b08962366e2d1a91e5b94532c4e9199bb204d25..156f9c49e45f27290cb49212868f1c48307448ca 100644 (file)
@@ -3,67 +3,3 @@ from pathlib import PurePosixPath
 from fs import schedule
 
 
-class TestScheduleIndex(object):
-
-    in_parameters = [
-        (['/'], ['/'], []),
-        (['/a'], ['/a'], ['/']),
-        (['/a'], ['/a'], ['/', '/bar']),
-        (['/a/b'], ['/a/b'], ['/', '/a']),
-        (['/a/b', '/'], ['/a/b', '/'], ['/a']),
-        (['/a/b/c', '/a/d'], ['/a/b/c', '/a/d'], ['/a', '/', 'a/b']),
-        (['/a/b', '/a/b/c/d'], ['/a/b', '/a/b/c/d'], ['/a', '/', 'a/b/c']),
-        (['/' + '/'.join(['x' for _ in range(1000)])],
-         ['/' + '/'.join(['x' for _ in range(1000)])], ['/a']),
-    ]
-
-    def test_index_ctor(self):
-        s = schedule.ScheduleIndex()
-        assert '/' in s.root
-
-    @pytest.mark.parametrize("add,in_,not_in", in_parameters)
-    def test_add_in(self, add, in_, not_in):
-        if len(add[0]) > 999:
-            pytest.xfail("will reach max recursion depth, deal with tail recursion to get this fixed")
-        s = schedule.ScheduleIndex()
-        [s.add(p) for p in add]
-        for p in in_:
-            assert p in s, '{} not found in schedule'.format(p)
-        for p in not_in:
-            assert p not in s, '{} found in schedule'.format(p)
-
-    descend_parameters = [
-        (['/'], '/', '/', []),
-        (['/a'], '/a', 'a', ['/b']),
-        (['/a/b'], '/a/b', 'b', ['/a/a']),
-        (['/a/b', '/a/b/c'], '/a/b', 'b', ['/a/a']),
-    ]
-
-    @pytest.mark.parametrize("add,descend,pos,not_in", descend_parameters)
-    def test_descend(self, add, descend, pos, not_in):
-        # TODO improve this
-        s = schedule.ScheduleIndex()
-        [s.add(p) for p in add]
-        prefix, result = s._descend(PurePosixPath(descend).parts, s.root)
-        assert pos in result, '{} not in {}'.format(pos, result)
-        for p in not_in:
-            result = s._descend(PurePosixPath(p).parts, s.root)
-            assert result == (False, False)
-
-    def test_gather_subdirs(self):
-        s = schedule.ScheduleIndex()
-        s.add('/a/b')
-        s.add('/a/b/c/d')
-        ret = s._gather_subdirs(s.root['/'].children,
-                                [],
-                                PurePosixPath('/'))
-        assert ret == [PurePosixPath('/a/b'), PurePosixPath('/a/b/c/d')]
-
-    def test_get(self):
-        s = schedule.ScheduleIndex()
-        s.add('/a/b')
-        s.add('/a/b/c/d')
-        ret = s.get('/')
-        assert ret == ['/a/b', '/a/b/c/d']
-        ret = s.get('/a/b/c/')
-        assert ret == ['/a/b/c/d']