--- /dev/null
+# -*- coding: utf-8 -*-
+
+import os
+
+if 'CEPH_SNAP_SCHEDULE_UNITTEST' not in os.environ:
+ from .module import Module
--- /dev/null
+"""
+Copyright (C) 2019 SUSE
+
+LGPL2.1. See file COPYING.
+"""
+import cephfs
+import errno
+import rados
+from contextlib import contextmanager
+from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap
+from datetime import datetime, timedelta
+import sqlite3
+
+
+SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
+SNAP_DB_OBJECT_NAME = 'snap_db'
+
+
+@contextmanager
+def open_ioctx(self, pool):
+ try:
+ if type(pool) is int:
+ with self.mgr.rados.open_ioctx2(pool) as ioctx:
+ ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+ yield ioctx
+ else:
+ with self.mgr.rados.open_ioctx(pool) as ioctx:
+ ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+ yield ioctx
+ except rados.ObjectNotFound:
+ self.log.error("Failed to locate pool {}".format(pool))
+ raise
+
+
+class Schedule(object):
+ '''
+ Wrapper to work with schedules stored in Rados objects
+ '''
+ def __init__(self,
+ path,
+ schedule,
+ retention_policy,
+ fs_name,
+ subvol,
+ first_run=None,
+ ):
+ self.fs = fs_name
+ self.subvol = subvol
+ self.path = path
+ self.schedule = schedule
+ self.retention = retention_policy
+ self.first_run = None
+ self.last_run = None
+
+ def __str__(self):
+ return f'''{self.path}: {self.schedule}; {self.retention}'''
+
+ @classmethod
+ def from_db_row(cls, table_row, fs):
+ return cls(table_row[0], table_row[1], table_row[2], fs, None)
+
+ def repeat_in_s(self):
+ mult = self.schedule[-1]
+ period = int(self.schedule[0:-1])
+ if mult == 'm':
+ return period * 60
+ elif mult == 'h':
+ return period * 60 * 60
+ elif mult == 'd':
+ return period * 60 * 60 * 24
+ elif mult == 'w':
+ return period * 60 * 60 * 24 * 7
+ else:
+ raise Exception('schedule multiplier not recognized')
+
+
+class SnapSchedClient(CephfsClient):
+
+ CREATE_TABLES = '''CREATE TABLE SCHEDULES(
+ id INTEGER PRIMARY KEY ASC,
+ path text NOT NULL UNIQUE,
+ schedule text NOT NULL,
+ retention text
+ );
+ CREATE TABLE SCHEDULES_META(
+ id INTEGER PRIMARY KEY ASC,
+ schedule_id int,
+ start bigint NOT NULL,
+ repeat bigint NOT NULL,
+ FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE
+ );'''
+ INSERT_SCHEDULE = '''INSERT INTO schedules(path, schedule, retention) Values(?, ?, ?);'''
+ INSERT_SCHEDULE_META = ''' INSERT INTO schedules_meta(schedule_id, start, repeat) Values(last_insert_rowid(), ?, ?)'''
+ UPDATE_SCHEDULE = '''UPDATE schedules SET schedule = ?, retention = ? where path = ?;'''
+ UPDATE_SCHEDULE_META = '''UPDATE schedules_meta SET start = ?, repeat = ? where schedule_id = (SELECT id FROM SCHEDULES WHERE path = ?);'''
+ DELETE_SCHEDULE = '''DELETE FROM schedules WHERE id = ?;'''
+ exec_query = '''select s.id, s.path, sm.repeat - ((strftime("%s", "now") - sm.start) % sm.repeat) "until" from schedules s inner join schedules_meta sm on sm.event_id = s.id order by until;'''
+
+ def __init__(self, mgr):
+ super(SnapSchedClient, self).__init__(mgr)
+ self.sqlite_connections = {}
+
+ def get_schedule_db(self, fs):
+ if fs not in self.sqlite_connections:
+ self.sqlite_connections[fs] = sqlite3.connect(':memory:')
+ with self.sqlite_connections[fs] as con:
+ con.execute("PRAGMA FOREIGN_KEYS = 1")
+ pool = self.get_metadata_pool(fs)
+ with open_ioctx(self, pool) as ioctx:
+ try:
+ size, _mtime = ioctx.stat('SNAP_DB_OBJECT_NAME')
+ db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8')
+ con.executescript(db)
+ except rados.ObjectNotFound:
+ self.log.info('No schedule DB found in {}'.format(fs))
+ con.executescript(self.CREATE_TABLES)
+ return self.sqlite_connections[fs]
+
+ def store_schedule_db(self, fs):
+ # only store db is it exists, otherwise nothing to do
+ metadata_pool = self.get_metadata_pool(fs)
+ if not metadata_pool:
+ raise CephfsConnectionException(
+ -errno.ENOENT, "Filesystem {} does not exist".format(fs))
+ if fs in self.sqlite_connections:
+ db_content = []
+ # TODO maybe do this in a transaction?
+ for row in self.sqlite_connections[fs].iterdump():
+ db_content.append(row)
+ with open_ioctx(self, metadata_pool) as ioctx:
+ ioctx.write_full("SNAP_DB_OBJECT_NAME",
+ '\n'.join(db_content).encode('utf-8'))
+
+ @connection_pool_wrap
+ def validate_schedule(self, fs_handle, **kwargs):
+ try:
+ fs_handle.stat(kwargs['sched'].path)
+ except cephfs.ObjectNotFound:
+ self.log.error('Path {} not found'.format(kwargs['sched'].path))
+ return False
+ return True
+
+ def list_snap_schedule(self, fs, path):
+ db = self.get_schedule_db(fs)
+ # with db:
+ scheds = []
+ for row in db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?',
+ (f'{path}%',)):
+ scheds.append(row)
+ return scheds
+
+ def update_snap_schedule(self, fs, sched):
+ db = self.get_schedule_db(fs)
+ with db:
+ db.execute(self.UPDATE_SCHEDULE,
+ (sched.schedule,
+ sched.retention,
+ sched.path))
+ db.execute(self.UPDATE_SCHEDULE_META,
+ ('strftime("%s", "now")',
+ sched.repeat_in_s(),
+ sched.path))
+
+ def store_snap_schedule(self, fs, sched):
+ db = self.get_schedule_db(fs)
+ with db:
+ db.execute(self.INSERT_SCHEDULE,
+ (sched.path,
+ sched.schedule,
+ sched.retention))
+ db.execute(self.INSERT_SCHEDULE_META,
+ ('strftime("%s", "now")',
+ sched.repeat_in_s()))
+ self.store_schedule_db(sched.fs)
+
+ def get_snap_schedule(self, fs, path):
+ db = self.get_schedule_db(fs)
+ c = db.execute('SELECT path, schedule, retention FROM SCHEDULES WHERE path = ?', (path,))
+ return Schedule.from_db_row(c.fetchone(), fs)
+
+ def rm_snap_schedule(self, fs, path):
+ db = self.get_schedule_db(fs)
+ with db:
+ cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?',
+ (path,))
+ id_ = cur.fetchone()
+ db.execute(self.DELETE_SCHEDULE, id_)
--- /dev/null
+"""
+Copyright (C) 2019 SUSE
+
+LGPL2.1. See file COPYING.
+"""
+import errno
+import sqlite3
+from .fs.schedule import SnapSchedClient, Schedule
+from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand
+from mgr_util import CephfsConnectionException
+from rados import ObjectNotFound
+from threading import Event
+try:
+ import queue as Queue
+except ImportError:
+ import Queue
+
+
+class Module(MgrModule):
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self._initialized = Event()
+ self.client = SnapSchedClient(self)
+
+ self._background_jobs = Queue.Queue()
+
+ @property
+ def default_fs(self):
+ fs_map = self.get('fs_map')
+ if fs_map['filesystems']:
+ return fs_map['filesystems'][0]['mdsmap']['fs_name']
+ else:
+ self.log.error('No filesystem instance could be found.')
+ raise CephfsConnectionException(
+ -errno.ENOENT, "no filesystem found")
+
+ def serve(self):
+ self._initialized.set()
+
+ def handle_command(self, inbuf, cmd):
+ self._initialized.wait()
+ return -errno.EINVAL, "", "Unknown command"
+
+ @CLIReadCommand('fs snap-schedule ls',
+ 'name=path,type=CephString,req=false '
+ 'name=subvol,type=CephString,req=false '
+ 'name=fs,type=CephString,req=false',
+ 'List current snapshot schedules')
+ def snap_schedule_ls(self, path='/', subvol=None, fs=None):
+ use_fs = fs if fs else self.default_fs
+ try:
+ ret_scheds = self.client.list_snap_schedule(use_fs, path)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, ' '.join(str(ret_scheds)), ''
+
+ @CLIReadCommand('fs snap-schedule get',
+ 'name=path,type=CephString '
+ 'name=subvol,type=CephString,req=false '
+ 'name=fs,type=CephString,req=false',
+ 'Get current snapshot schedule for <path>')
+ def snap_schedule_get(self, path, subvol=None, fs=None):
+ try:
+ use_fs = fs if fs else self.default_fs
+ sched = self.client.get_snap_schedule(use_fs, path)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ if not sched:
+ return -1, '', f'SnapSchedule for {path} not found'
+ return 0, str(sched), ''
+
+ prune_schedule_options = ('name=keep-minutely,type=CephString,req=false '
+ 'name=keep-hourly,type=CephString,req=false '
+ 'name=keep-daily,type=CephString,req=false '
+ 'name=keep-weekly,type=CephString,req=false '
+ 'name=keep-monthly,type=CephString,req=false '
+ 'name=keep-yearly,type=CephString,req=false '
+ 'name=keep-last,type=CephString,req=false '
+ 'name=keep-within,type=CephString,req=false')
+
+ @CLIWriteCommand('fs snap-schedule set',
+ 'name=path,type=CephString '
+ 'name=snap-schedule,type=CephString '
+ 'name=retention-policy,type=CephString,req=false '
+ 'name=fs,type=CephString,req=false '
+ 'name=subvol,type=CephString,req=false',
+ 'Set a snapshot schedule for <path>')
+ def snap_schedule_set(self,
+ path,
+ snap_schedule,
+ retention_policy='',
+ fs=None,
+ subvol=None):
+ try:
+ use_fs = fs if fs else self.default_fs
+ # TODO should we allow empty retention policies?
+ sched = Schedule(path, snap_schedule, retention_policy,
+ use_fs, subvol)
+ # TODO allow schedules on non-existent paths?
+ # self.client.validate_schedule(None, sched=sched, fs_name=fs)
+ self.client.store_snap_schedule(use_fs, sched)
+ suc_msg = f'Schedule set for path {path}'
+ except sqlite3.IntegrityError:
+ existing_sched = self.client.get_snap_schedule(use_fs, path)
+ self.log.info(f'Found existing schedule {existing_sched}...updating')
+ self.client.update_snap_schedule(use_fs, sched)
+ suc_msg = f'Schedule set for path {path}, updated existing schedule {existing_sched}'
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, suc_msg, ''
+
+ @CLIWriteCommand('fs snap-schedule rm',
+ 'name=path,type=CephString '
+ 'name=subvol,type=CephString,req=false '
+ 'name=fs,type=CephString,req=false',
+ 'Remove a snapshot schedule for <path>')
+ def snap_schedule_rm(self, path, subvol=None, fs=None):
+ try:
+ use_fs = fs if fs else self.default_fs
+ self.client.rm_snap_schedule(use_fs, path)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ except ObjectNotFound as e:
+ return e.errno, '', 'SnapSchedule for {} not found'.format(path)
+ return 0, 'Schedule removed for path {}'.format(path), ''
--- /dev/null
+import pytest
+from pathlib import PurePosixPath
+from fs import schedule
+
+
+class TestScheduleIndex(object):
+
+ in_parameters = [
+ (['/'], ['/'], []),
+ (['/a'], ['/a'], ['/']),
+ (['/a'], ['/a'], ['/', '/bar']),
+ (['/a/b'], ['/a/b'], ['/', '/a']),
+ (['/a/b', '/'], ['/a/b', '/'], ['/a']),
+ (['/a/b/c', '/a/d'], ['/a/b/c', '/a/d'], ['/a', '/', 'a/b']),
+ (['/a/b', '/a/b/c/d'], ['/a/b', '/a/b/c/d'], ['/a', '/', 'a/b/c']),
+ (['/' + '/'.join(['x' for _ in range(1000)])],
+ ['/' + '/'.join(['x' for _ in range(1000)])], ['/a']),
+ ]
+
+ def test_index_ctor(self):
+ s = schedule.ScheduleIndex()
+ assert '/' in s.root
+
+ @pytest.mark.parametrize("add,in_,not_in", in_parameters)
+ def test_add_in(self, add, in_, not_in):
+ if len(add[0]) > 999:
+ pytest.xfail("will reach max recursion depth, deal with tail recursion to get this fixed")
+ s = schedule.ScheduleIndex()
+ [s.add(p) for p in add]
+ for p in in_:
+ assert p in s, '{} not found in schedule'.format(p)
+ for p in not_in:
+ assert p not in s, '{} found in schedule'.format(p)
+
+ descend_parameters = [
+ (['/'], '/', '/', []),
+ (['/a'], '/a', 'a', ['/b']),
+ (['/a/b'], '/a/b', 'b', ['/a/a']),
+ (['/a/b', '/a/b/c'], '/a/b', 'b', ['/a/a']),
+ ]
+
+ @pytest.mark.parametrize("add,descend,pos,not_in", descend_parameters)
+ def test_descend(self, add, descend, pos, not_in):
+ # TODO improve this
+ s = schedule.ScheduleIndex()
+ [s.add(p) for p in add]
+ prefix, result = s._descend(PurePosixPath(descend).parts, s.root)
+ assert pos in result, '{} not in {}'.format(pos, result)
+ for p in not_in:
+ result = s._descend(PurePosixPath(p).parts, s.root)
+ assert result == (False, False)
+
+ def test_gather_subdirs(self):
+ s = schedule.ScheduleIndex()
+ s.add('/a/b')
+ s.add('/a/b/c/d')
+ ret = s._gather_subdirs(s.root['/'].children,
+ [],
+ PurePosixPath('/'))
+ assert ret == [PurePosixPath('/a/b'), PurePosixPath('/a/b/c/d')]
+
+ def test_get(self):
+ s = schedule.ScheduleIndex()
+ s.add('/a/b')
+ s.add('/a/b/c/d')
+ ret = s.get('/')
+ assert ret == ['/a/b', '/a/b/c/d']
+ ret = s.get('/a/b/c/')
+ assert ret == ['/a/b/c/d']
--- /dev/null
+[tox]
+envlist = py27,py3
+skipsdist = true
+; toxworkdir = {env:CEPH_BUILD_DIR}/snap-schedule
+; minversion = 2.8.1
+
+[testenv]
+setenv=
+ LD_LIBRARY_PATH = {toxinidir}/../../../../build/lib
+ PATH = {toxinidir}/../../../../build/bin:$PATH
+ py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2:..
+ py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3:{toxinidir}
+ CEPH_SNAP_SCHEDULE_UNITTEST = true
+deps =
+ pytest
+ mock
+ py27: pathlib
+commands=
+ pytest {posargs}