From 39cbd137858251f9f79f245bd37fffeba907e199 Mon Sep 17 00:00:00 2001 From: Jan Fajerski Date: Tue, 19 Nov 2019 13:35:20 +0100 Subject: [PATCH] pybind/mgr/snap_schedule: take snapshots according to schedule. Timers are scheduled for the next snapshot to be taken. The next Timer is canceled in case the snapshot DB is changed. When a Timer triggers it also schedules the next Timer. Signed-off-by: Jan Fajerski --- src/pybind/mgr/snap_schedule/fs/schedule.py | 147 ++++++++++++++---- src/pybind/mgr/snap_schedule/module.py | 34 ++-- .../snap_schedule/tests/fs/test_schedule.py | 64 -------- 3 files changed, 135 insertions(+), 110 deletions(-) diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py index d0e84f8523544..970ede4b2bb24 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule.py @@ -9,6 +9,7 @@ import rados from contextlib import contextmanager from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap from datetime import datetime, timedelta +from threading import Timer import sqlite3 @@ -32,6 +33,13 @@ def open_ioctx(self, pool): raise +def updates_schedule_db(func): + def f(self, fs, *args): + func(self, fs, *args) + self.refresh_snap_timers(fs) + return f + + class Schedule(object): ''' Wrapper to work with schedules stored in Rados objects @@ -40,24 +48,32 @@ class Schedule(object): path, schedule, retention_policy, + start, fs_name, subvol, - first_run=None, + rel_path, ): self.fs = fs_name self.subvol = subvol self.path = path + self.rel_path = rel_path self.schedule = schedule self.retention = retention_policy - self.first_run = None + self.first_run = start self.last_run = None def __str__(self): - return f'''{self.path}: {self.schedule}; {self.retention}''' + return f'''{self.rel_path}: {self.schedule}; {self.retention}''' @classmethod def from_db_row(cls, table_row, fs): - return cls(table_row[0], table_row[1], table_row[2], fs, None) + return cls(table_row[0], + table_row[1], + table_row[2], + fs, + table_row[3], + table_row[4], + None) def repeat_in_s(self): mult = self.schedule[-1] @@ -76,29 +92,56 @@ class Schedule(object): class SnapSchedClient(CephfsClient): - CREATE_TABLES = '''CREATE TABLE SCHEDULES( - id INTEGER PRIMARY KEY ASC, + CREATE_TABLES = '''CREATE TABLE schedules( + id integer PRIMARY KEY ASC, path text NOT NULL UNIQUE, schedule text NOT NULL, - retention text + subvol text, + rel_path text NOT NULL, + active int NOT NULL ); - CREATE TABLE SCHEDULES_META( - id INTEGER PRIMARY KEY ASC, + CREATE TABLE schedules_meta( + id integer PRIMARY KEY ASC, schedule_id int, start bigint NOT NULL, repeat bigint NOT NULL, + retention text, FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE );''' - INSERT_SCHEDULE = '''INSERT INTO schedules(path, schedule, retention) Values(?, ?, ?);''' - INSERT_SCHEDULE_META = ''' INSERT INTO schedules_meta(schedule_id, start, repeat) Values(last_insert_rowid(), ?, ?)''' - UPDATE_SCHEDULE = '''UPDATE schedules SET schedule = ?, retention = ? where path = ?;''' - UPDATE_SCHEDULE_META = '''UPDATE schedules_meta SET start = ?, repeat = ? where schedule_id = (SELECT id FROM SCHEDULES WHERE path = ?);''' - DELETE_SCHEDULE = '''DELETE FROM schedules WHERE id = ?;''' - exec_query = '''select s.id, s.path, sm.repeat - ((strftime("%s", "now") - sm.start) % sm.repeat) "until" from schedules s inner join schedules_meta sm on sm.event_id = s.id order by until;''' def __init__(self, mgr): super(SnapSchedClient, self).__init__(mgr) + # TODO maybe iterate over all fs instance in fsmap and load snap dbs? self.sqlite_connections = {} + self.active_timers = {} + + EXEC_QUERY = '''SELECT + s.path, sm.retention, + sm.repeat - (strftime("%s", "now") - sm.start) % sm.repeat "until" + FROM schedules s + INNER JOIN schedules_meta sm ON sm.schedule_id = s.id + ORDER BY until;''' + + def refresh_snap_timers(self, fs): + self.log.debug(f'SnapDB on {fs} changed, updating next Timer') + db = self.get_schedule_db(fs) + rows = [] + with db: + cur = db.execute(self.EXEC_QUERY) + rows = cur.fetchmany(1) + timers = self.active_timers.get(fs, []) + for timer in timers: + timer.cancel() + timers = [] + for row in rows: + self.log.debug(f'Creating new snapshot timer') + t = Timer(row[2], + self.create_scheduled_snapshot, + args=[fs, row[0], row[1]]) + t.start() + timers.append(t) + self.log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s') + self.active_timers[fs] = timers def get_schedule_db(self, fs): if fs not in self.sqlite_connections: @@ -112,7 +155,7 @@ class SnapSchedClient(CephfsClient): db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8') con.executescript(db) except rados.ObjectNotFound: - self.log.info('No schedule DB found in {}'.format(fs)) + self.log.info(f'No schedule DB found in {fs}') con.executescript(self.CREATE_TABLES) return self.sqlite_connections[fs] @@ -125,18 +168,34 @@ class SnapSchedClient(CephfsClient): if fs in self.sqlite_connections: db_content = [] # TODO maybe do this in a transaction? - for row in self.sqlite_connections[fs].iterdump(): - db_content.append(row) + db = self.sqlite_connections[fs] + with db: + for row in db.iterdump(): + db_content.append(row) with open_ioctx(self, metadata_pool) as ioctx: ioctx.write_full("SNAP_DB_OBJECT_NAME", '\n'.join(db_content).encode('utf-8')) @connection_pool_wrap - def validate_schedule(self, fs_handle, **kwargs): + def create_scheduled_snapshot(self, fs_info, path, retention): + self.log.debug(f'Scheduled snapshot of {path} triggered') + try: + time = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H_%M_%S') + fs_info[1].mkdir(f'{path}/.snap/scheduled-{time}', 0o755) + self.log.info(f'created scheduled snapshot of {path}') + except cephfs.Error as e: + self.log.info(f'scheduled snapshot creating of {path} failed') + raise CephfsConnectionException(-e.args[0], e.args[1]) + finally: + self.refresh_snap_timers(fs_info[0]) + # TODO: handle snap pruning accoring to retention + + @connection_pool_wrap + def validate_schedule(self, fs_handle, sched): try: - fs_handle.stat(kwargs['sched'].path) + fs_handle.stat(sched.path) except cephfs.ObjectNotFound: - self.log.error('Path {} not found'.format(kwargs['sched'].path)) + self.log.error('Path {} not found'.format(sched.path)) return False return True @@ -149,39 +208,71 @@ class SnapSchedClient(CephfsClient): scheds.append(row) return scheds + UPDATE_SCHEDULE = '''UPDATE schedules + SET + schedule = ?, + active = 1 + WHERE path = ?;''' + UPDATE_SCHEDULE_META = '''UPDATE schedules_meta + SET + start = ?, + repeat = ?, + retention = ? + WHERE schedule_id = ( + SELECT id FROM SCHEDULES WHERE path = ?);''' + + @updates_schedule_db def update_snap_schedule(self, fs, sched): db = self.get_schedule_db(fs) with db: db.execute(self.UPDATE_SCHEDULE, (sched.schedule, - sched.retention, sched.path)) db.execute(self.UPDATE_SCHEDULE_META, - ('strftime("%s", "now")', + (f'strftime("%s", "{sched.first_run}")', sched.repeat_in_s(), + sched.retention, sched.path)) + INSERT_SCHEDULE = '''INSERT INTO + schedules(path, schedule, subvol, rel_path, active) + Values(?, ?, ?, ?, ?);''' + INSERT_SCHEDULE_META = '''INSERT INTO + schedules_meta(schedule_id, start, repeat, retention) + Values(last_insert_rowid(), ?, ?, ?)''' + + @updates_schedule_db def store_snap_schedule(self, fs, sched): db = self.get_schedule_db(fs) with db: db.execute(self.INSERT_SCHEDULE, (sched.path, sched.schedule, - sched.retention)) + sched.subvol, + sched.rel_path, + 1)) db.execute(self.INSERT_SCHEDULE_META, - ('strftime("%s", "now")', - sched.repeat_in_s())) + (f'strftime("%s", "{sched.first_run}")', + sched.repeat_in_s(), + sched.retention)) self.store_schedule_db(sched.fs) + GET_SCHEULE = '''SELECT + s.path, s.schedule, sm.retention, sm.start, s.subvol, s.rel_path + FROM schedules s + INNER JOIN schedules_meta sm ON sm.schedule_id = s.id + WHERE s.path = ?''' + def get_snap_schedule(self, fs, path): db = self.get_schedule_db(fs) - c = db.execute('SELECT path, schedule, retention FROM SCHEDULES WHERE path = ?', (path,)) + c = db.execute(self.GET_SCHEDULE, (path,)) return Schedule.from_db_row(c.fetchone(), fs) + @updates_schedule_db def rm_snap_schedule(self, fs, path): db = self.get_schedule_db(fs) with db: cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?', (path,)) id_ = cur.fetchone() - db.execute(self.DELETE_SCHEDULE, id_) + db.execute('DELETE FROM schedules WHERE id = ?;', (id_,)) diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py index ac092231bef95..4f685b798f2ea 100644 --- a/src/pybind/mgr/snap_schedule/module.py +++ b/src/pybind/mgr/snap_schedule/module.py @@ -10,10 +10,6 @@ from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand from mgr_util import CephfsConnectionException from rados import ObjectNotFound from threading import Event -try: - import queue as Queue -except ImportError: - import Queue class Module(MgrModule): @@ -23,7 +19,16 @@ class Module(MgrModule): self._initialized = Event() self.client = SnapSchedClient(self) - self._background_jobs = Queue.Queue() + def resolve_subvolume_path(self, fs, subvol, path): + if not subvol: + return path + + rc, subvol_path, err = self.remote('fs', 'subvolume', 'getpath', + fs, subvol) + if rc != 0: + # TODO custom exception + raise Exception(f'Could not resolve {path} in {fs}, {subvol}') + return subvol_path + path @property def default_fs(self): @@ -70,19 +75,11 @@ class Module(MgrModule): return -1, '', f'SnapSchedule for {path} not found' return 0, str(sched), '' - prune_schedule_options = ('name=keep-minutely,type=CephString,req=false ' - 'name=keep-hourly,type=CephString,req=false ' - 'name=keep-daily,type=CephString,req=false ' - 'name=keep-weekly,type=CephString,req=false ' - 'name=keep-monthly,type=CephString,req=false ' - 'name=keep-yearly,type=CephString,req=false ' - 'name=keep-last,type=CephString,req=false ' - 'name=keep-within,type=CephString,req=false') - @CLIWriteCommand('fs snap-schedule set', 'name=path,type=CephString ' 'name=snap-schedule,type=CephString ' 'name=retention-policy,type=CephString,req=false ' + 'name=start,type=CephString,req=false ' 'name=fs,type=CephString,req=false ' 'name=subvol,type=CephString,req=false', 'Set a snapshot schedule for ') @@ -90,15 +87,16 @@ class Module(MgrModule): path, snap_schedule, retention_policy='', + start='now', fs=None, subvol=None): try: use_fs = fs if fs else self.default_fs - # TODO should we allow empty retention policies? - sched = Schedule(path, snap_schedule, retention_policy, - use_fs, subvol) + abs_path = self.resolve_subvolume_path(fs, subvol, path) + sched = Schedule(abs_path, snap_schedule, retention_policy, + start, use_fs, subvol, path) # TODO allow schedules on non-existent paths? - # self.client.validate_schedule(None, sched=sched, fs_name=fs) + # self.client.validate_schedule(fs, sched) self.client.store_snap_schedule(use_fs, sched) suc_msg = f'Schedule set for path {path}' except sqlite3.IntegrityError: diff --git a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py index 5b08962366e2d..156f9c49e45f2 100644 --- a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py +++ b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py @@ -3,67 +3,3 @@ from pathlib import PurePosixPath from fs import schedule -class TestScheduleIndex(object): - - in_parameters = [ - (['/'], ['/'], []), - (['/a'], ['/a'], ['/']), - (['/a'], ['/a'], ['/', '/bar']), - (['/a/b'], ['/a/b'], ['/', '/a']), - (['/a/b', '/'], ['/a/b', '/'], ['/a']), - (['/a/b/c', '/a/d'], ['/a/b/c', '/a/d'], ['/a', '/', 'a/b']), - (['/a/b', '/a/b/c/d'], ['/a/b', '/a/b/c/d'], ['/a', '/', 'a/b/c']), - (['/' + '/'.join(['x' for _ in range(1000)])], - ['/' + '/'.join(['x' for _ in range(1000)])], ['/a']), - ] - - def test_index_ctor(self): - s = schedule.ScheduleIndex() - assert '/' in s.root - - @pytest.mark.parametrize("add,in_,not_in", in_parameters) - def test_add_in(self, add, in_, not_in): - if len(add[0]) > 999: - pytest.xfail("will reach max recursion depth, deal with tail recursion to get this fixed") - s = schedule.ScheduleIndex() - [s.add(p) for p in add] - for p in in_: - assert p in s, '{} not found in schedule'.format(p) - for p in not_in: - assert p not in s, '{} found in schedule'.format(p) - - descend_parameters = [ - (['/'], '/', '/', []), - (['/a'], '/a', 'a', ['/b']), - (['/a/b'], '/a/b', 'b', ['/a/a']), - (['/a/b', '/a/b/c'], '/a/b', 'b', ['/a/a']), - ] - - @pytest.mark.parametrize("add,descend,pos,not_in", descend_parameters) - def test_descend(self, add, descend, pos, not_in): - # TODO improve this - s = schedule.ScheduleIndex() - [s.add(p) for p in add] - prefix, result = s._descend(PurePosixPath(descend).parts, s.root) - assert pos in result, '{} not in {}'.format(pos, result) - for p in not_in: - result = s._descend(PurePosixPath(p).parts, s.root) - assert result == (False, False) - - def test_gather_subdirs(self): - s = schedule.ScheduleIndex() - s.add('/a/b') - s.add('/a/b/c/d') - ret = s._gather_subdirs(s.root['/'].children, - [], - PurePosixPath('/')) - assert ret == [PurePosixPath('/a/b'), PurePosixPath('/a/b/c/d')] - - def test_get(self): - s = schedule.ScheduleIndex() - s.add('/a/b') - s.add('/a/b/c/d') - ret = s.get('/') - assert ret == ['/a/b', '/a/b/c/d'] - ret = s.get('/a/b/c/') - assert ret == ['/a/b/c/d'] -- 2.39.5