]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
snap_schedule: add initial interface and basic CRUD ops
authorJan Fajerski <jfajerski@suse.com>
Wed, 6 May 2020 13:32:17 +0000 (15:32 +0200)
committerJan Fajerski <jfajerski@suse.com>
Thu, 27 Aug 2020 13:55:45 +0000 (15:55 +0200)
Storage of schedules is in an in-memory sqlite DB that gets serialized
to an rados object.

Signed-off-by: Jan Fajerski <jfajerski@suse.com>
src/pybind/mgr/snap_schedule/.gitignore [new file with mode: 0644]
src/pybind/mgr/snap_schedule/__init__.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/fs/__init__.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/fs/schedule.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/module.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/requirements.txt [new file with mode: 0644]
src/pybind/mgr/snap_schedule/tests/__init__.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/tests/fs/__init__.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py [new file with mode: 0644]
src/pybind/mgr/snap_schedule/tox.ini [new file with mode: 0644]

diff --git a/src/pybind/mgr/snap_schedule/.gitignore b/src/pybind/mgr/snap_schedule/.gitignore
new file mode 100644 (file)
index 0000000..172bf57
--- /dev/null
@@ -0,0 +1 @@
+.tox
diff --git a/src/pybind/mgr/snap_schedule/__init__.py b/src/pybind/mgr/snap_schedule/__init__.py
new file mode 100644 (file)
index 0000000..7b88dda
--- /dev/null
@@ -0,0 +1,6 @@
+# -*- coding: utf-8 -*-
+
+import os
+
+if 'CEPH_SNAP_SCHEDULE_UNITTEST' not in os.environ:
+    from .module import Module
diff --git a/src/pybind/mgr/snap_schedule/fs/__init__.py b/src/pybind/mgr/snap_schedule/fs/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py
new file mode 100644 (file)
index 0000000..d0e84f8
--- /dev/null
@@ -0,0 +1,187 @@
+"""
+Copyright (C) 2019 SUSE
+
+LGPL2.1.  See file COPYING.
+"""
+import cephfs
+import errno
+import rados
+from contextlib import contextmanager
+from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap
+from datetime import datetime, timedelta
+import sqlite3
+
+
+SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
+SNAP_DB_OBJECT_NAME = 'snap_db'
+
+
+@contextmanager
+def open_ioctx(self, pool):
+    try:
+        if type(pool) is int:
+            with self.mgr.rados.open_ioctx2(pool) as ioctx:
+                ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+                yield ioctx
+        else:
+            with self.mgr.rados.open_ioctx(pool) as ioctx:
+                ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+                yield ioctx
+    except rados.ObjectNotFound:
+        self.log.error("Failed to locate pool {}".format(pool))
+        raise
+
+
+class Schedule(object):
+    '''
+    Wrapper to work with schedules stored in Rados objects
+    '''
+    def __init__(self,
+                 path,
+                 schedule,
+                 retention_policy,
+                 fs_name,
+                 subvol,
+                 first_run=None,
+                 ):
+        self.fs = fs_name
+        self.subvol = subvol
+        self.path = path
+        self.schedule = schedule
+        self.retention = retention_policy
+        self.first_run = None
+        self.last_run = None
+
+    def __str__(self):
+        return f'''{self.path}: {self.schedule}; {self.retention}'''
+
+    @classmethod
+    def from_db_row(cls, table_row, fs):
+        return cls(table_row[0], table_row[1], table_row[2], fs, None)
+
+    def repeat_in_s(self):
+        mult = self.schedule[-1]
+        period = int(self.schedule[0:-1])
+        if mult == 'm':
+            return period * 60
+        elif mult == 'h':
+            return period * 60 * 60
+        elif mult == 'd':
+            return period * 60 * 60 * 24
+        elif mult == 'w':
+            return period * 60 * 60 * 24 * 7
+        else:
+            raise Exception('schedule multiplier not recognized')
+
+
+class SnapSchedClient(CephfsClient):
+
+    CREATE_TABLES = '''CREATE TABLE SCHEDULES(
+        id INTEGER PRIMARY KEY ASC,
+        path text NOT NULL UNIQUE,
+        schedule text NOT NULL,
+        retention text
+    );
+    CREATE TABLE SCHEDULES_META(
+        id INTEGER PRIMARY KEY ASC,
+        schedule_id int,
+        start bigint NOT NULL,
+        repeat bigint NOT NULL,
+        FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE
+    );'''
+    INSERT_SCHEDULE = '''INSERT INTO schedules(path, schedule, retention) Values(?, ?, ?);'''
+    INSERT_SCHEDULE_META = ''' INSERT INTO schedules_meta(schedule_id, start, repeat) Values(last_insert_rowid(), ?, ?)'''
+    UPDATE_SCHEDULE = '''UPDATE schedules SET schedule = ?, retention = ? where path = ?;'''
+    UPDATE_SCHEDULE_META = '''UPDATE schedules_meta SET start = ?, repeat = ? where schedule_id = (SELECT id FROM SCHEDULES WHERE path = ?);'''
+    DELETE_SCHEDULE = '''DELETE FROM schedules WHERE id = ?;'''
+    exec_query = '''select s.id, s.path, sm.repeat - ((strftime("%s", "now") - sm.start) % sm.repeat) "until" from schedules s inner join schedules_meta sm on sm.event_id = s.id order by until;'''
+
+    def __init__(self, mgr):
+        super(SnapSchedClient, self).__init__(mgr)
+        self.sqlite_connections = {}
+
+    def get_schedule_db(self, fs):
+        if fs not in self.sqlite_connections:
+            self.sqlite_connections[fs] = sqlite3.connect(':memory:')
+            with self.sqlite_connections[fs] as con:
+                con.execute("PRAGMA FOREIGN_KEYS = 1")
+                pool = self.get_metadata_pool(fs)
+                with open_ioctx(self, pool) as ioctx:
+                    try:
+                        size, _mtime = ioctx.stat('SNAP_DB_OBJECT_NAME')
+                        db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8')
+                        con.executescript(db)
+                    except rados.ObjectNotFound:
+                        self.log.info('No schedule DB found in {}'.format(fs))
+                        con.executescript(self.CREATE_TABLES)
+        return self.sqlite_connections[fs]
+
+    def store_schedule_db(self, fs):
+        # only store db is it exists, otherwise nothing to do
+        metadata_pool = self.get_metadata_pool(fs)
+        if not metadata_pool:
+            raise CephfsConnectionException(
+                -errno.ENOENT, "Filesystem {} does not exist".format(fs))
+        if fs in self.sqlite_connections:
+            db_content = []
+            # TODO maybe do this in a transaction?
+            for row in self.sqlite_connections[fs].iterdump():
+                db_content.append(row)
+        with open_ioctx(self, metadata_pool) as ioctx:
+            ioctx.write_full("SNAP_DB_OBJECT_NAME",
+                             '\n'.join(db_content).encode('utf-8'))
+
+    @connection_pool_wrap
+    def validate_schedule(self, fs_handle, **kwargs):
+        try:
+            fs_handle.stat(kwargs['sched'].path)
+        except cephfs.ObjectNotFound:
+            self.log.error('Path {} not found'.format(kwargs['sched'].path))
+            return False
+        return True
+
+    def list_snap_schedule(self, fs, path):
+        db = self.get_schedule_db(fs)
+        # with db:
+        scheds = []
+        for row in db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?',
+                              (f'{path}%',)):
+            scheds.append(row)
+        return scheds
+
+    def update_snap_schedule(self, fs, sched):
+        db = self.get_schedule_db(fs)
+        with db:
+            db.execute(self.UPDATE_SCHEDULE,
+                       (sched.schedule,
+                        sched.retention,
+                        sched.path))
+            db.execute(self.UPDATE_SCHEDULE_META,
+                       ('strftime("%s", "now")',
+                        sched.repeat_in_s(),
+                        sched.path))
+
+    def store_snap_schedule(self, fs, sched):
+        db = self.get_schedule_db(fs)
+        with db:
+            db.execute(self.INSERT_SCHEDULE,
+                       (sched.path,
+                        sched.schedule,
+                        sched.retention))
+            db.execute(self.INSERT_SCHEDULE_META,
+                       ('strftime("%s", "now")',
+                        sched.repeat_in_s()))
+            self.store_schedule_db(sched.fs)
+
+    def get_snap_schedule(self, fs, path):
+        db = self.get_schedule_db(fs)
+        c = db.execute('SELECT path, schedule, retention FROM SCHEDULES WHERE path = ?', (path,))
+        return Schedule.from_db_row(c.fetchone(), fs)
+
+    def rm_snap_schedule(self, fs, path):
+        db = self.get_schedule_db(fs)
+        with db:
+            cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?',
+                             (path,))
+            id_ = cur.fetchone()
+            db.execute(self.DELETE_SCHEDULE, id_)
diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py
new file mode 100644 (file)
index 0000000..ac09223
--- /dev/null
@@ -0,0 +1,126 @@
+"""
+Copyright (C) 2019 SUSE
+
+LGPL2.1.  See file COPYING.
+"""
+import errno
+import sqlite3
+from .fs.schedule import SnapSchedClient, Schedule
+from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand
+from mgr_util import CephfsConnectionException
+from rados import ObjectNotFound
+from threading import Event
+try:
+    import queue as Queue
+except ImportError:
+    import Queue
+
+
+class Module(MgrModule):
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self._initialized = Event()
+        self.client = SnapSchedClient(self)
+
+        self._background_jobs = Queue.Queue()
+
+    @property
+    def default_fs(self):
+        fs_map = self.get('fs_map')
+        if fs_map['filesystems']:
+            return fs_map['filesystems'][0]['mdsmap']['fs_name']
+        else:
+            self.log.error('No filesystem instance could be found.')
+            raise CephfsConnectionException(
+                -errno.ENOENT, "no filesystem found")
+
+    def serve(self):
+        self._initialized.set()
+
+    def handle_command(self, inbuf, cmd):
+        self._initialized.wait()
+        return -errno.EINVAL, "", "Unknown command"
+
+    @CLIReadCommand('fs snap-schedule ls',
+                    'name=path,type=CephString,req=false '
+                    'name=subvol,type=CephString,req=false '
+                    'name=fs,type=CephString,req=false',
+                    'List current snapshot schedules')
+    def snap_schedule_ls(self, path='/', subvol=None, fs=None):
+        use_fs = fs if fs else self.default_fs
+        try:
+            ret_scheds = self.client.list_snap_schedule(use_fs, path)
+        except CephfsConnectionException as e:
+            return e.to_tuple()
+        return 0, ' '.join(str(ret_scheds)), ''
+
+    @CLIReadCommand('fs snap-schedule get',
+                    'name=path,type=CephString '
+                    'name=subvol,type=CephString,req=false '
+                    'name=fs,type=CephString,req=false',
+                    'Get current snapshot schedule for <path>')
+    def snap_schedule_get(self, path, subvol=None, fs=None):
+        try:
+            use_fs = fs if fs else self.default_fs
+            sched = self.client.get_snap_schedule(use_fs, path)
+        except CephfsConnectionException as e:
+            return e.to_tuple()
+        if not sched:
+            return -1, '', f'SnapSchedule for {path} not found'
+        return 0, str(sched), ''
+
+    prune_schedule_options = ('name=keep-minutely,type=CephString,req=false '
+                              'name=keep-hourly,type=CephString,req=false '
+                              'name=keep-daily,type=CephString,req=false '
+                              'name=keep-weekly,type=CephString,req=false '
+                              'name=keep-monthly,type=CephString,req=false '
+                              'name=keep-yearly,type=CephString,req=false '
+                              'name=keep-last,type=CephString,req=false '
+                              'name=keep-within,type=CephString,req=false')
+
+    @CLIWriteCommand('fs snap-schedule set',
+                     'name=path,type=CephString '
+                     'name=snap-schedule,type=CephString '
+                     'name=retention-policy,type=CephString,req=false '
+                     'name=fs,type=CephString,req=false '
+                     'name=subvol,type=CephString,req=false',
+                     'Set a snapshot schedule for <path>')
+    def snap_schedule_set(self,
+                          path,
+                          snap_schedule,
+                          retention_policy='',
+                          fs=None,
+                          subvol=None):
+        try:
+            use_fs = fs if fs else self.default_fs
+            # TODO should we allow empty retention policies?
+            sched = Schedule(path, snap_schedule, retention_policy,
+                             use_fs, subvol)
+            # TODO allow schedules on non-existent paths?
+            # self.client.validate_schedule(None, sched=sched, fs_name=fs)
+            self.client.store_snap_schedule(use_fs, sched)
+            suc_msg = f'Schedule set for path {path}'
+        except sqlite3.IntegrityError:
+            existing_sched = self.client.get_snap_schedule(use_fs, path)
+            self.log.info(f'Found existing schedule {existing_sched}...updating')
+            self.client.update_snap_schedule(use_fs, sched)
+            suc_msg = f'Schedule set for path {path}, updated existing schedule {existing_sched}'
+        except CephfsConnectionException as e:
+            return e.to_tuple()
+        return 0, suc_msg, ''
+
+    @CLIWriteCommand('fs snap-schedule rm',
+                     'name=path,type=CephString '
+                     'name=subvol,type=CephString,req=false '
+                     'name=fs,type=CephString,req=false',
+                     'Remove a snapshot schedule for <path>')
+    def snap_schedule_rm(self, path, subvol=None, fs=None):
+        try:
+            use_fs = fs if fs else self.default_fs
+            self.client.rm_snap_schedule(use_fs, path)
+        except CephfsConnectionException as e:
+            return e.to_tuple()
+        except ObjectNotFound as e:
+            return e.errno, '', 'SnapSchedule for {} not found'.format(path)
+        return 0, 'Schedule removed for path {}'.format(path), ''
diff --git a/src/pybind/mgr/snap_schedule/requirements.txt b/src/pybind/mgr/snap_schedule/requirements.txt
new file mode 100644 (file)
index 0000000..e079f8a
--- /dev/null
@@ -0,0 +1 @@
+pytest
diff --git a/src/pybind/mgr/snap_schedule/tests/__init__.py b/src/pybind/mgr/snap_schedule/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/snap_schedule/tests/fs/__init__.py b/src/pybind/mgr/snap_schedule/tests/fs/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py
new file mode 100644 (file)
index 0000000..5b08962
--- /dev/null
@@ -0,0 +1,69 @@
+import pytest
+from pathlib import PurePosixPath
+from fs import schedule
+
+
+class TestScheduleIndex(object):
+
+    in_parameters = [
+        (['/'], ['/'], []),
+        (['/a'], ['/a'], ['/']),
+        (['/a'], ['/a'], ['/', '/bar']),
+        (['/a/b'], ['/a/b'], ['/', '/a']),
+        (['/a/b', '/'], ['/a/b', '/'], ['/a']),
+        (['/a/b/c', '/a/d'], ['/a/b/c', '/a/d'], ['/a', '/', 'a/b']),
+        (['/a/b', '/a/b/c/d'], ['/a/b', '/a/b/c/d'], ['/a', '/', 'a/b/c']),
+        (['/' + '/'.join(['x' for _ in range(1000)])],
+         ['/' + '/'.join(['x' for _ in range(1000)])], ['/a']),
+    ]
+
+    def test_index_ctor(self):
+        s = schedule.ScheduleIndex()
+        assert '/' in s.root
+
+    @pytest.mark.parametrize("add,in_,not_in", in_parameters)
+    def test_add_in(self, add, in_, not_in):
+        if len(add[0]) > 999:
+            pytest.xfail("will reach max recursion depth, deal with tail recursion to get this fixed")
+        s = schedule.ScheduleIndex()
+        [s.add(p) for p in add]
+        for p in in_:
+            assert p in s, '{} not found in schedule'.format(p)
+        for p in not_in:
+            assert p not in s, '{} found in schedule'.format(p)
+
+    descend_parameters = [
+        (['/'], '/', '/', []),
+        (['/a'], '/a', 'a', ['/b']),
+        (['/a/b'], '/a/b', 'b', ['/a/a']),
+        (['/a/b', '/a/b/c'], '/a/b', 'b', ['/a/a']),
+    ]
+
+    @pytest.mark.parametrize("add,descend,pos,not_in", descend_parameters)
+    def test_descend(self, add, descend, pos, not_in):
+        # TODO improve this
+        s = schedule.ScheduleIndex()
+        [s.add(p) for p in add]
+        prefix, result = s._descend(PurePosixPath(descend).parts, s.root)
+        assert pos in result, '{} not in {}'.format(pos, result)
+        for p in not_in:
+            result = s._descend(PurePosixPath(p).parts, s.root)
+            assert result == (False, False)
+
+    def test_gather_subdirs(self):
+        s = schedule.ScheduleIndex()
+        s.add('/a/b')
+        s.add('/a/b/c/d')
+        ret = s._gather_subdirs(s.root['/'].children,
+                                [],
+                                PurePosixPath('/'))
+        assert ret == [PurePosixPath('/a/b'), PurePosixPath('/a/b/c/d')]
+
+    def test_get(self):
+        s = schedule.ScheduleIndex()
+        s.add('/a/b')
+        s.add('/a/b/c/d')
+        ret = s.get('/')
+        assert ret == ['/a/b', '/a/b/c/d']
+        ret = s.get('/a/b/c/')
+        assert ret == ['/a/b/c/d']
diff --git a/src/pybind/mgr/snap_schedule/tox.ini b/src/pybind/mgr/snap_schedule/tox.ini
new file mode 100644 (file)
index 0000000..953895d
--- /dev/null
@@ -0,0 +1,19 @@
+[tox]
+envlist = py27,py3
+skipsdist = true
+; toxworkdir = {env:CEPH_BUILD_DIR}/snap-schedule
+; minversion = 2.8.1
+
+[testenv]
+setenv=
+    LD_LIBRARY_PATH = {toxinidir}/../../../../build/lib
+    PATH = {toxinidir}/../../../../build/bin:$PATH
+    py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2:..
+    py3:  PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3:{toxinidir}
+    CEPH_SNAP_SCHEDULE_UNITTEST = true
+deps =
+    pytest
+    mock
+    py27: pathlib
+commands=
+    pytest {posargs}