from contextlib import contextmanager
from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap
from datetime import datetime, timedelta
+from threading import Timer
import sqlite3
raise
+def updates_schedule_db(func):
+ def f(self, fs, *args):
+ func(self, fs, *args)
+ self.refresh_snap_timers(fs)
+ return f
+
+
class Schedule(object):
'''
Wrapper to work with schedules stored in Rados objects
path,
schedule,
retention_policy,
+ start,
fs_name,
subvol,
- first_run=None,
+ rel_path,
):
self.fs = fs_name
self.subvol = subvol
self.path = path
+ self.rel_path = rel_path
self.schedule = schedule
self.retention = retention_policy
- self.first_run = None
+ self.first_run = start
self.last_run = None
def __str__(self):
- return f'''{self.path}: {self.schedule}; {self.retention}'''
+ return f'''{self.rel_path}: {self.schedule}; {self.retention}'''
@classmethod
def from_db_row(cls, table_row, fs):
- return cls(table_row[0], table_row[1], table_row[2], fs, None)
+ return cls(table_row[0],
+ table_row[1],
+ table_row[2],
+ fs,
+ table_row[3],
+ table_row[4],
+ None)
def repeat_in_s(self):
mult = self.schedule[-1]
class SnapSchedClient(CephfsClient):
- CREATE_TABLES = '''CREATE TABLE SCHEDULES(
- id INTEGER PRIMARY KEY ASC,
+ CREATE_TABLES = '''CREATE TABLE schedules(
+ id integer PRIMARY KEY ASC,
path text NOT NULL UNIQUE,
schedule text NOT NULL,
- retention text
+ subvol text,
+ rel_path text NOT NULL,
+ active int NOT NULL
);
- CREATE TABLE SCHEDULES_META(
- id INTEGER PRIMARY KEY ASC,
+ CREATE TABLE schedules_meta(
+ id integer PRIMARY KEY ASC,
schedule_id int,
start bigint NOT NULL,
repeat bigint NOT NULL,
+ retention text,
FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE
);'''
- INSERT_SCHEDULE = '''INSERT INTO schedules(path, schedule, retention) Values(?, ?, ?);'''
- INSERT_SCHEDULE_META = ''' INSERT INTO schedules_meta(schedule_id, start, repeat) Values(last_insert_rowid(), ?, ?)'''
- UPDATE_SCHEDULE = '''UPDATE schedules SET schedule = ?, retention = ? where path = ?;'''
- UPDATE_SCHEDULE_META = '''UPDATE schedules_meta SET start = ?, repeat = ? where schedule_id = (SELECT id FROM SCHEDULES WHERE path = ?);'''
- DELETE_SCHEDULE = '''DELETE FROM schedules WHERE id = ?;'''
- exec_query = '''select s.id, s.path, sm.repeat - ((strftime("%s", "now") - sm.start) % sm.repeat) "until" from schedules s inner join schedules_meta sm on sm.event_id = s.id order by until;'''
def __init__(self, mgr):
super(SnapSchedClient, self).__init__(mgr)
+ # TODO maybe iterate over all fs instance in fsmap and load snap dbs?
self.sqlite_connections = {}
+ self.active_timers = {}
+
+ EXEC_QUERY = '''SELECT
+ s.path, sm.retention,
+ sm.repeat - (strftime("%s", "now") - sm.start) % sm.repeat "until"
+ FROM schedules s
+ INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+ ORDER BY until;'''
+
+ def refresh_snap_timers(self, fs):
+ self.log.debug(f'SnapDB on {fs} changed, updating next Timer')
+ db = self.get_schedule_db(fs)
+ rows = []
+ with db:
+ cur = db.execute(self.EXEC_QUERY)
+ rows = cur.fetchmany(1)
+ timers = self.active_timers.get(fs, [])
+ for timer in timers:
+ timer.cancel()
+ timers = []
+ for row in rows:
+ self.log.debug(f'Creating new snapshot timer')
+ t = Timer(row[2],
+ self.create_scheduled_snapshot,
+ args=[fs, row[0], row[1]])
+ t.start()
+ timers.append(t)
+ self.log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s')
+ self.active_timers[fs] = timers
def get_schedule_db(self, fs):
if fs not in self.sqlite_connections:
db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8')
con.executescript(db)
except rados.ObjectNotFound:
- self.log.info('No schedule DB found in {}'.format(fs))
+ self.log.info(f'No schedule DB found in {fs}')
con.executescript(self.CREATE_TABLES)
return self.sqlite_connections[fs]
if fs in self.sqlite_connections:
db_content = []
# TODO maybe do this in a transaction?
- for row in self.sqlite_connections[fs].iterdump():
- db_content.append(row)
+ db = self.sqlite_connections[fs]
+ with db:
+ for row in db.iterdump():
+ db_content.append(row)
with open_ioctx(self, metadata_pool) as ioctx:
ioctx.write_full("SNAP_DB_OBJECT_NAME",
'\n'.join(db_content).encode('utf-8'))
@connection_pool_wrap
- def validate_schedule(self, fs_handle, **kwargs):
+ def create_scheduled_snapshot(self, fs_info, path, retention):
+ self.log.debug(f'Scheduled snapshot of {path} triggered')
+ try:
+ time = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H_%M_%S')
+ fs_info[1].mkdir(f'{path}/.snap/scheduled-{time}', 0o755)
+ self.log.info(f'created scheduled snapshot of {path}')
+ except cephfs.Error as e:
+ self.log.info(f'scheduled snapshot creating of {path} failed')
+ raise CephfsConnectionException(-e.args[0], e.args[1])
+ finally:
+ self.refresh_snap_timers(fs_info[0])
+ # TODO: handle snap pruning accoring to retention
+
+ @connection_pool_wrap
+ def validate_schedule(self, fs_handle, sched):
try:
- fs_handle.stat(kwargs['sched'].path)
+ fs_handle.stat(sched.path)
except cephfs.ObjectNotFound:
- self.log.error('Path {} not found'.format(kwargs['sched'].path))
+ self.log.error('Path {} not found'.format(sched.path))
return False
return True
scheds.append(row)
return scheds
+ UPDATE_SCHEDULE = '''UPDATE schedules
+ SET
+ schedule = ?,
+ active = 1
+ WHERE path = ?;'''
+ UPDATE_SCHEDULE_META = '''UPDATE schedules_meta
+ SET
+ start = ?,
+ repeat = ?,
+ retention = ?
+ WHERE schedule_id = (
+ SELECT id FROM SCHEDULES WHERE path = ?);'''
+
+ @updates_schedule_db
def update_snap_schedule(self, fs, sched):
db = self.get_schedule_db(fs)
with db:
db.execute(self.UPDATE_SCHEDULE,
(sched.schedule,
- sched.retention,
sched.path))
db.execute(self.UPDATE_SCHEDULE_META,
- ('strftime("%s", "now")',
+ (f'strftime("%s", "{sched.first_run}")',
sched.repeat_in_s(),
+ sched.retention,
sched.path))
+ INSERT_SCHEDULE = '''INSERT INTO
+ schedules(path, schedule, subvol, rel_path, active)
+ Values(?, ?, ?, ?, ?);'''
+ INSERT_SCHEDULE_META = '''INSERT INTO
+ schedules_meta(schedule_id, start, repeat, retention)
+ Values(last_insert_rowid(), ?, ?, ?)'''
+
+ @updates_schedule_db
def store_snap_schedule(self, fs, sched):
db = self.get_schedule_db(fs)
with db:
db.execute(self.INSERT_SCHEDULE,
(sched.path,
sched.schedule,
- sched.retention))
+ sched.subvol,
+ sched.rel_path,
+ 1))
db.execute(self.INSERT_SCHEDULE_META,
- ('strftime("%s", "now")',
- sched.repeat_in_s()))
+ (f'strftime("%s", "{sched.first_run}")',
+ sched.repeat_in_s(),
+ sched.retention))
self.store_schedule_db(sched.fs)
+ GET_SCHEULE = '''SELECT
+ s.path, s.schedule, sm.retention, sm.start, s.subvol, s.rel_path
+ FROM schedules s
+ INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+ WHERE s.path = ?'''
+
def get_snap_schedule(self, fs, path):
db = self.get_schedule_db(fs)
- c = db.execute('SELECT path, schedule, retention FROM SCHEDULES WHERE path = ?', (path,))
+ c = db.execute(self.GET_SCHEDULE, (path,))
return Schedule.from_db_row(c.fetchone(), fs)
+ @updates_schedule_db
def rm_snap_schedule(self, fs, path):
db = self.get_schedule_db(fs)
with db:
cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?',
(path,))
id_ = cur.fetchone()
- db.execute(self.DELETE_SCHEDULE, id_)
+ db.execute('DELETE FROM schedules WHERE id = ?;', (id_,))
from mgr_util import CephfsConnectionException
from rados import ObjectNotFound
from threading import Event
-try:
- import queue as Queue
-except ImportError:
- import Queue
class Module(MgrModule):
self._initialized = Event()
self.client = SnapSchedClient(self)
- self._background_jobs = Queue.Queue()
+ def resolve_subvolume_path(self, fs, subvol, path):
+ if not subvol:
+ return path
+
+ rc, subvol_path, err = self.remote('fs', 'subvolume', 'getpath',
+ fs, subvol)
+ if rc != 0:
+ # TODO custom exception
+ raise Exception(f'Could not resolve {path} in {fs}, {subvol}')
+ return subvol_path + path
@property
def default_fs(self):
return -1, '', f'SnapSchedule for {path} not found'
return 0, str(sched), ''
- prune_schedule_options = ('name=keep-minutely,type=CephString,req=false '
- 'name=keep-hourly,type=CephString,req=false '
- 'name=keep-daily,type=CephString,req=false '
- 'name=keep-weekly,type=CephString,req=false '
- 'name=keep-monthly,type=CephString,req=false '
- 'name=keep-yearly,type=CephString,req=false '
- 'name=keep-last,type=CephString,req=false '
- 'name=keep-within,type=CephString,req=false')
-
@CLIWriteCommand('fs snap-schedule set',
'name=path,type=CephString '
'name=snap-schedule,type=CephString '
'name=retention-policy,type=CephString,req=false '
+ 'name=start,type=CephString,req=false '
'name=fs,type=CephString,req=false '
'name=subvol,type=CephString,req=false',
'Set a snapshot schedule for <path>')
path,
snap_schedule,
retention_policy='',
+ start='now',
fs=None,
subvol=None):
try:
use_fs = fs if fs else self.default_fs
- # TODO should we allow empty retention policies?
- sched = Schedule(path, snap_schedule, retention_policy,
- use_fs, subvol)
+ abs_path = self.resolve_subvolume_path(fs, subvol, path)
+ sched = Schedule(abs_path, snap_schedule, retention_policy,
+ start, use_fs, subvol, path)
# TODO allow schedules on non-existent paths?
- # self.client.validate_schedule(None, sched=sched, fs_name=fs)
+ # self.client.validate_schedule(fs, sched)
self.client.store_snap_schedule(use_fs, sched)
suc_msg = f'Schedule set for path {path}'
except sqlite3.IntegrityError:
from fs import schedule
-class TestScheduleIndex(object):
-
- in_parameters = [
- (['/'], ['/'], []),
- (['/a'], ['/a'], ['/']),
- (['/a'], ['/a'], ['/', '/bar']),
- (['/a/b'], ['/a/b'], ['/', '/a']),
- (['/a/b', '/'], ['/a/b', '/'], ['/a']),
- (['/a/b/c', '/a/d'], ['/a/b/c', '/a/d'], ['/a', '/', 'a/b']),
- (['/a/b', '/a/b/c/d'], ['/a/b', '/a/b/c/d'], ['/a', '/', 'a/b/c']),
- (['/' + '/'.join(['x' for _ in range(1000)])],
- ['/' + '/'.join(['x' for _ in range(1000)])], ['/a']),
- ]
-
- def test_index_ctor(self):
- s = schedule.ScheduleIndex()
- assert '/' in s.root
-
- @pytest.mark.parametrize("add,in_,not_in", in_parameters)
- def test_add_in(self, add, in_, not_in):
- if len(add[0]) > 999:
- pytest.xfail("will reach max recursion depth, deal with tail recursion to get this fixed")
- s = schedule.ScheduleIndex()
- [s.add(p) for p in add]
- for p in in_:
- assert p in s, '{} not found in schedule'.format(p)
- for p in not_in:
- assert p not in s, '{} found in schedule'.format(p)
-
- descend_parameters = [
- (['/'], '/', '/', []),
- (['/a'], '/a', 'a', ['/b']),
- (['/a/b'], '/a/b', 'b', ['/a/a']),
- (['/a/b', '/a/b/c'], '/a/b', 'b', ['/a/a']),
- ]
-
- @pytest.mark.parametrize("add,descend,pos,not_in", descend_parameters)
- def test_descend(self, add, descend, pos, not_in):
- # TODO improve this
- s = schedule.ScheduleIndex()
- [s.add(p) for p in add]
- prefix, result = s._descend(PurePosixPath(descend).parts, s.root)
- assert pos in result, '{} not in {}'.format(pos, result)
- for p in not_in:
- result = s._descend(PurePosixPath(p).parts, s.root)
- assert result == (False, False)
-
- def test_gather_subdirs(self):
- s = schedule.ScheduleIndex()
- s.add('/a/b')
- s.add('/a/b/c/d')
- ret = s._gather_subdirs(s.root['/'].children,
- [],
- PurePosixPath('/'))
- assert ret == [PurePosixPath('/a/b'), PurePosixPath('/a/b/c/d')]
-
- def test_get(self):
- s = schedule.ScheduleIndex()
- s.add('/a/b')
- s.add('/a/b/c/d')
- ret = s.get('/')
- assert ret == ['/a/b', '/a/b/c/d']
- ret = s.get('/a/b/c/')
- assert ret == ['/a/b/c/d']