From 3f3df8786944d252f1614c7c5e75ff05fcb9f1a5 Mon Sep 17 00:00:00 2001 From: Jan Fajerski Date: Wed, 6 May 2020 15:20:32 +0200 Subject: [PATCH] snap-schedule: Implement pruning according to retention specfication Signed-off-by: Jan Fajerski --- src/pybind/mgr/snap_schedule/fs/schedule.py | 243 ++++++++++++++------ src/pybind/mgr/snap_schedule/module.py | 28 +-- 2 files changed, 186 insertions(+), 85 deletions(-) diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py index 970ede4b2bb24..bae2cd7d1476a 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule.py @@ -7,14 +7,22 @@ import cephfs import errno import rados from contextlib import contextmanager -from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap -from datetime import datetime, timedelta +import re +from mgr_util import CephfsClient, CephfsConnectionException, \ + open_filesystem +from collections import OrderedDict +from datetime import datetime +import logging +from operator import attrgetter, itemgetter from threading import Timer import sqlite3 SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule' SNAP_DB_OBJECT_NAME = 'snap_db' +TS_FORMAT = '%Y-%m-%d-%H_%M_%S' + +log = logging.getLogger(__name__) @contextmanager @@ -29,7 +37,7 @@ def open_ioctx(self, pool): ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE) yield ioctx except rados.ObjectNotFound: - self.log.error("Failed to locate pool {}".format(pool)) + log.error("Failed to locate pool {}".format(pool)) raise @@ -59,6 +67,7 @@ class Schedule(object): self.rel_path = rel_path self.schedule = schedule self.retention = retention_policy + self._ret = {} self.first_run = start self.last_run = None @@ -90,12 +99,22 @@ class Schedule(object): raise Exception('schedule multiplier not recognized') +def parse_retention(retention): + ret = {} + matches = re.findall(r'\d+[a-z]', retention) + for m in matches: + ret[m[-1]] = int(m[0:-1]) + matches = re.findall(r'\d+[A-Z]', retention) + for m in matches: + ret[m[-1]] = int(m[0:-1]) + return ret + + class SnapSchedClient(CephfsClient): CREATE_TABLES = '''CREATE TABLE schedules( id integer PRIMARY KEY ASC, path text NOT NULL UNIQUE, - schedule text NOT NULL, subvol text, rel_path text NOT NULL, active int NOT NULL @@ -105,8 +124,10 @@ class SnapSchedClient(CephfsClient): schedule_id int, start bigint NOT NULL, repeat bigint NOT NULL, + schedule text NOT NULL, retention text, - FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE + FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE, + UNIQUE (start, repeat) );''' def __init__(self, mgr): @@ -123,39 +144,46 @@ class SnapSchedClient(CephfsClient): ORDER BY until;''' def refresh_snap_timers(self, fs): - self.log.debug(f'SnapDB on {fs} changed, updating next Timer') - db = self.get_schedule_db(fs) - rows = [] - with db: - cur = db.execute(self.EXEC_QUERY) - rows = cur.fetchmany(1) - timers = self.active_timers.get(fs, []) - for timer in timers: - timer.cancel() - timers = [] - for row in rows: - self.log.debug(f'Creating new snapshot timer') - t = Timer(row[2], - self.create_scheduled_snapshot, - args=[fs, row[0], row[1]]) - t.start() - timers.append(t) - self.log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s') - self.active_timers[fs] = timers + try: + log.debug(f'SnapDB on {fs} changed, updating next Timer') + db = self.get_schedule_db(fs) + rows = [] + with db: + cur = db.execute(self.EXEC_QUERY) + rows = cur.fetchmany(1) + log.debug(f'retrieved {cur.rowcount} rows') + timers = self.active_timers.get(fs, []) + for timer in timers: + timer.cancel() + timers = [] + for row in rows: + log.debug(f'adding timer for {row}') + log.debug(f'Creating new snapshot timer') + t = Timer(row[2], + self.create_scheduled_snapshot, + args=[fs, row[0], row[1]]) + t.start() + timers.append(t) + log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s') + self.active_timers[fs] = timers + except Exception as e: + log.error(f'refresh raised {e}') def get_schedule_db(self, fs): if fs not in self.sqlite_connections: - self.sqlite_connections[fs] = sqlite3.connect(':memory:') + self.sqlite_connections[fs] = sqlite3.connect(':memory:', + check_same_thread=False) with self.sqlite_connections[fs] as con: con.execute("PRAGMA FOREIGN_KEYS = 1") pool = self.get_metadata_pool(fs) with open_ioctx(self, pool) as ioctx: try: - size, _mtime = ioctx.stat('SNAP_DB_OBJECT_NAME') - db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8') + size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME) + db = ioctx.read(SNAP_DB_OBJECT_NAME, + size).decode('utf-8') con.executescript(db) except rados.ObjectNotFound: - self.log.info(f'No schedule DB found in {fs}') + log.info(f'No schedule DB found in {fs}') con.executescript(self.CREATE_TABLES) return self.sqlite_connections[fs] @@ -167,47 +195,124 @@ class SnapSchedClient(CephfsClient): -errno.ENOENT, "Filesystem {} does not exist".format(fs)) if fs in self.sqlite_connections: db_content = [] - # TODO maybe do this in a transaction? db = self.sqlite_connections[fs] with db: for row in db.iterdump(): db_content.append(row) with open_ioctx(self, metadata_pool) as ioctx: - ioctx.write_full("SNAP_DB_OBJECT_NAME", + ioctx.write_full(SNAP_DB_OBJECT_NAME, '\n'.join(db_content).encode('utf-8')) - @connection_pool_wrap - def create_scheduled_snapshot(self, fs_info, path, retention): - self.log.debug(f'Scheduled snapshot of {path} triggered') + def create_scheduled_snapshot(self, fs_name, path, retention): + log.debug(f'Scheduled snapshot of {path} triggered') try: - time = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H_%M_%S') - fs_info[1].mkdir(f'{path}/.snap/scheduled-{time}', 0o755) - self.log.info(f'created scheduled snapshot of {path}') + time = datetime.utcnow().strftime(TS_FORMAT) + with open_filesystem(self, fs_name) as fs_handle: + fs_handle.mkdir(f'{path}/.snap/scheduled-{time}', 0o755) + log.info(f'created scheduled snapshot of {path}') except cephfs.Error as e: - self.log.info(f'scheduled snapshot creating of {path} failed') - raise CephfsConnectionException(-e.args[0], e.args[1]) + log.info(f'scheduled snapshot creating of {path} failed: {e}') + except Exception as e: + # catch all exceptions cause otherwise we'll never know since this + # is running in a thread + log.error(f'ERROR create_scheduled_snapshot raised{e}') finally: - self.refresh_snap_timers(fs_info[0]) + log.info(f'finally branch') + self.refresh_snap_timers(fs_name) + log.info(f'calling prune') + self.prune_snapshots(fs_name, path, retention) + + def prune_snapshots(self, fs_name, path, retention): + log.debug('Pruning snapshots') + ret = parse_retention(retention) + try: + prune_candidates = set() + with open_filesystem(self, fs_name) as fs_handle: + with fs_handle.opendir(f'{path}/.snap') as d_handle: + dir_ = fs_handle.readdir(d_handle) + while dir_: + if dir_.d_name.startswith(b'scheduled-'): + log.debug(f'add {dir_.d_name} to pruning') + ts = datetime.strptime( + dir_.d_name.lstrip(b'scheduled-').decode('utf-8'), + TS_FORMAT) + prune_candidates.add((dir_, ts)) + else: + log.debug(f'skipping dir entry {dir_.d_name}') + dir_ = fs_handle.readdir(d_handle) + to_keep = self.get_prune_set(prune_candidates, ret) + for k in prune_candidates - to_keep: + dirname = k[0].d_name.decode('utf-8') + log.debug(f'rmdir on {dirname}') + fs_handle.rmdir(f'{path}/.snap/{dirname}') + log.debug(f'keeping {to_keep}') + except Exception as e: + log.debug(f'prune_snapshots threw {e}') # TODO: handle snap pruning accoring to retention - @connection_pool_wrap - def validate_schedule(self, fs_handle, sched): + def get_prune_set(self, candidates, retention): + PRUNING_PATTERNS = OrderedDict([ + #TODO remove M for release + ("M", '%Y-%m-%d-%H_%M'), + ("h", '%Y-%m-%d-%H'), + ("d", '%Y-%m-%d'), + ("w", '%G-%V'), + ("m", '%Y-%m'), + ("y", '%Y'), + ]) + keep = set() + log.debug(retention) + for period, date_pattern in PRUNING_PATTERNS.items(): + period_count = retention.get(period, 0) + if not period_count: + log.debug(f'skipping period {period}') + continue + last = None + for snap in sorted(candidates, key=lambda x: x[0].d_name, + reverse=True): + snap_ts = snap[1].strftime(date_pattern) + log.debug(f'{snap_ts} : {last}') + if snap_ts != last: + last = snap_ts + if snap not in keep: + log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}') + keep.add(snap) + if len(keep) == period_count: + log.debug(f'found enough snapshots for {period_count}{period}') + break + # TODO maybe do candidates - keep here? we want snaps counting it + # hours not be considered for days and it cuts down on iterations + return keep + + def validate_schedule(self, fs_name, sched): try: - fs_handle.stat(sched.path) + with open_filesystem(self, fs_name) as fs_handle: + fs_handle.stat(sched.path) except cephfs.ObjectNotFound: - self.log.error('Path {} not found'.format(sched.path)) + log.error('Path {} not found'.format(sched.path)) return False return True - def list_snap_schedule(self, fs, path): + LIST_SCHEDULES = '''SELECT + s.path, sm.schedule, sm.retention, sm.start, s.subvol, s.rel_path + FROM schedules s + INNER JOIN schedules_meta sm ON sm.schedule_id = s.id + WHERE s.path = ?''' + + def list_snap_schedules(self, fs, path): db = self.get_schedule_db(fs) + c = db.execute(self.LIST_SCHEDULES, (path,)) + return [Schedule.from_db_row(row, fs) for row in c.fetchall()] + + def dump_snap_schedule(self, fs, path): + db = self.get_schedule_db(fs) + # TODO retrieve multiple schedules per path from schedule_meta # with db: - scheds = [] - for row in db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?', - (f'{path}%',)): - scheds.append(row) - return scheds + c = db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?', + (f'{path}%',)) + return [row for row in c.fetchall()] +# TODO currently not used, probably broken UPDATE_SCHEDULE = '''UPDATE schedules SET schedule = ?, @@ -217,6 +322,7 @@ class SnapSchedClient(CephfsClient): SET start = ?, repeat = ?, + schedule = ? retention = ? WHERE schedule_id = ( SELECT id FROM SCHEDULES WHERE path = ?);''' @@ -235,39 +341,32 @@ class SnapSchedClient(CephfsClient): sched.path)) INSERT_SCHEDULE = '''INSERT INTO - schedules(path, schedule, subvol, rel_path, active) - Values(?, ?, ?, ?, ?);''' + schedules(path, subvol, rel_path, active) + Values(?, ?, ?, ?);''' INSERT_SCHEDULE_META = '''INSERT INTO - schedules_meta(schedule_id, start, repeat, retention) - Values(last_insert_rowid(), ?, ?, ?)''' + schedules_meta(schedule_id, start, repeat, schedule, retention) + Values(last_insert_rowid(), ?, ?, ?, ?)''' @updates_schedule_db def store_snap_schedule(self, fs, sched): db = self.get_schedule_db(fs) with db: - db.execute(self.INSERT_SCHEDULE, - (sched.path, - sched.schedule, - sched.subvol, - sched.rel_path, - 1)) + try: + db.execute(self.INSERT_SCHEDULE, + (sched.path, + sched.subvol, + sched.rel_path, + 1)) + except sqlite3.IntegrityError: + # might be adding another schedule + pass db.execute(self.INSERT_SCHEDULE_META, (f'strftime("%s", "{sched.first_run}")', sched.repeat_in_s(), + sched.schedule, sched.retention)) self.store_schedule_db(sched.fs) - GET_SCHEULE = '''SELECT - s.path, s.schedule, sm.retention, sm.start, s.subvol, s.rel_path - FROM schedules s - INNER JOIN schedules_meta sm ON sm.schedule_id = s.id - WHERE s.path = ?''' - - def get_snap_schedule(self, fs, path): - db = self.get_schedule_db(fs) - c = db.execute(self.GET_SCHEDULE, (path,)) - return Schedule.from_db_row(c.fetchone(), fs) - @updates_schedule_db def rm_snap_schedule(self, fs, path): db = self.get_schedule_db(fs) @@ -275,4 +374,6 @@ class SnapSchedClient(CephfsClient): cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?', (path,)) id_ = cur.fetchone() - db.execute('DELETE FROM schedules WHERE id = ?;', (id_,)) + # TODO check for repeat-start pair and delete that if present, all + # otherwise. If repeat-start was speced, delete only those + db.execute('DELETE FROM schedules WHERE id = ?;', id_) diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py index 4f685b798f2ea..89c78c3ef9759 100644 --- a/src/pybind/mgr/snap_schedule/module.py +++ b/src/pybind/mgr/snap_schedule/module.py @@ -47,33 +47,33 @@ class Module(MgrModule): self._initialized.wait() return -errno.EINVAL, "", "Unknown command" - @CLIReadCommand('fs snap-schedule ls', + @CLIReadCommand('fs snap-schedule dump', 'name=path,type=CephString,req=false ' 'name=subvol,type=CephString,req=false ' 'name=fs,type=CephString,req=false', 'List current snapshot schedules') - def snap_schedule_ls(self, path='/', subvol=None, fs=None): + def snap_schedule_dump(self, path='/', subvol=None, fs=None): use_fs = fs if fs else self.default_fs try: - ret_scheds = self.client.list_snap_schedule(use_fs, path) + ret_scheds = self.client.dump_snap_schedule(use_fs, path) except CephfsConnectionException as e: return e.to_tuple() return 0, ' '.join(str(ret_scheds)), '' - @CLIReadCommand('fs snap-schedule get', + @CLIReadCommand('fs snap-schedule list', 'name=path,type=CephString ' 'name=subvol,type=CephString,req=false ' 'name=fs,type=CephString,req=false', 'Get current snapshot schedule for ') - def snap_schedule_get(self, path, subvol=None, fs=None): + def snap_schedule_list(self, path, subvol=None, fs=None): try: use_fs = fs if fs else self.default_fs - sched = self.client.get_snap_schedule(use_fs, path) + scheds = self.client.list_snap_schedules(use_fs, path) except CephfsConnectionException as e: return e.to_tuple() - if not sched: + if not scheds: return -1, '', f'SnapSchedule for {path} not found' - return 0, str(sched), '' + return 0, str([str(sched) for sched in scheds]), '' @CLIWriteCommand('fs snap-schedule set', 'name=path,type=CephString ' @@ -87,7 +87,7 @@ class Module(MgrModule): path, snap_schedule, retention_policy='', - start='now', + start='00:00', fs=None, subvol=None): try: @@ -100,15 +100,15 @@ class Module(MgrModule): self.client.store_snap_schedule(use_fs, sched) suc_msg = f'Schedule set for path {path}' except sqlite3.IntegrityError: - existing_sched = self.client.get_snap_schedule(use_fs, path) - self.log.info(f'Found existing schedule {existing_sched}...updating') - self.client.update_snap_schedule(use_fs, sched) - suc_msg = f'Schedule set for path {path}, updated existing schedule {existing_sched}' + existing_sched = self.client.list_snap_schedule(use_fs, path) + error_msg = f'Found existing schedule {existing_sched}' + self.log.error(error_msg) + return 1, '', error_msg except CephfsConnectionException as e: return e.to_tuple() return 0, suc_msg, '' - @CLIWriteCommand('fs snap-schedule rm', + @CLIWriteCommand('fs snap-schedule remove', 'name=path,type=CephString ' 'name=subvol,type=CephString,req=false ' 'name=fs,type=CephString,req=false', -- 2.39.5