]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
pybing/mgr/snap_schedule: add separate retention interface
authorJan Fajerski <jfajerski@suse.com>
Tue, 9 Jun 2020 12:49:55 +0000 (14:49 +0200)
committerJan Fajerski <jfajerski@suse.com>
Thu, 27 Aug 2020 13:55:46 +0000 (15:55 +0200)
Signed-off-by: Jan Fajerski <jfajerski@suse.com>
src/pybind/mgr/snap_schedule/fs/schedule.py
src/pybind/mgr/snap_schedule/fs/schedule_client.py
src/pybind/mgr/snap_schedule/module.py
src/pybind/mgr/snap_schedule/tests/conftest.py
src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py

index 781eeb67023a90baa025f13c8c54ba9dfecdd527..c919316af24fdc5e76a2b6404c9920f956f53a62 100644 (file)
@@ -6,11 +6,33 @@ LGPL2.1.  See file COPYING.
 from datetime import datetime, timezone
 import json
 import logging
+import re
 import sqlite3
 
 log = logging.getLogger(__name__)
 
 
+def parse_retention(retention):
+    ret = {}
+    log.debug(f'parse_retention({retention})')
+    matches = re.findall(r'\d+[a-z]', retention)
+    for m in matches:
+        ret[m[-1]] = int(m[0:-1])
+    matches = re.findall(r'\d+[A-Z]', retention)
+    for m in matches:
+        ret[m[-1]] = int(m[0:-1])
+    log.debug(f'parse_retention({retention}) -> {ret}')
+    return ret
+
+RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y']
+
+def dump_retention(retention):
+    ret = ''
+    for mult in RETENTION_MULTIPLIERS:
+        if mult in retention:
+            ret += str(retention[mult]) + mult
+    return ret
+
 class Schedule(object):
     '''
     Wrapper to work with schedules stored in sqlite
@@ -18,11 +40,11 @@ class Schedule(object):
     def __init__(self,
                  path,
                  schedule,
-                 retention_policy,
                  fs_name,
                  rel_path,
                  start=None,
                  subvol=None,
+                 retention_policy='{}',
                  created=None,
                  first=None,
                  last=None,
@@ -36,7 +58,7 @@ class Schedule(object):
         self.path = path
         self.rel_path = rel_path
         self.schedule = schedule
-        self.retention = retention_policy
+        self.retention = json.loads(retention_policy)
         if start is None:
             now = datetime.now(timezone.utc)
             self.start = datetime(now.year,
@@ -63,17 +85,17 @@ class Schedule(object):
             self.last_pruned = last_pruned
         self.created_count = created_count
         self.pruned_count = pruned_count
-        self.active = active
+        self.active = bool(active)
 
     @classmethod
-    def _from_get_query(cls, table_row, fs):
+    def _from_db_row(cls, table_row, fs):
         return cls(table_row['path'],
                    table_row['schedule'],
-                   table_row['retention'],
                    fs,
                    table_row['rel_path'],
                    table_row['start'],
                    table_row['subvol'],
+                   table_row['retention'],
                    table_row['created'],
                    table_row['first'],
                    table_row['last'],
@@ -83,16 +105,17 @@ class Schedule(object):
                    table_row['active'])
 
     def __str__(self):
-        return f'''{self.path} {self.schedule} {self.retention}'''
+        return f'''{self.path} {self.schedule} {dump_retention(self.retention)}'''
 
     def json_list(self):
         return json.dumps({'path': self.path, 'schedule': self.schedule,
-                           'retention': self.retention})
+                           'retention': dump_retention(self.retention)})
 
     CREATE_TABLES = '''CREATE TABLE schedules(
         id INTEGER PRIMARY KEY ASC,
         path TEXT NOT NULL UNIQUE,
         subvol TEXT,
+        retention TEXT DEFAULT '{}',
         rel_path TEXT NOT NULL
     );
     CREATE TABLE schedules_meta(
@@ -107,14 +130,13 @@ class Schedule(object):
         schedule TEXT NOT NULL,
         created_count INT DEFAULT 0,
         pruned_count INT DEFAULT 0,
-        retention TEXT,
         active INT NOT NULL,
         FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
         UNIQUE (schedule_id, start, repeat)
     );'''
 
     EXEC_QUERY = '''SELECT
-        sm.retention,
+        s.retention,
         sm.repeat - (strftime("%s", "now") - strftime("%s", sm.start)) %
         sm.repeat "until",
         sm.start, sm.repeat
@@ -128,7 +150,7 @@ class Schedule(object):
 
     PROTO_GET_SCHEDULES = '''SELECT
           s.path, s.subvol, s.rel_path, sm.active,
-          sm.schedule, sm.retention, sm.start, sm.first, sm.last,
+          sm.schedule, s.retention, sm.start, sm.first, sm.last,
           sm.last_pruned, sm.created, sm.created_count, sm.pruned_count
           FROM schedules s
               INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
@@ -148,7 +170,7 @@ class Schedule(object):
             data += (start,)
         with db:
             c = db.execute(query, data)
-        return [cls._from_get_query(row, fs) for row in c.fetchall()]
+        return [cls._from_db_row(row, fs) for row in c.fetchall()]
 
     @classmethod
     def list_schedules(cls, path, db, fs, recursive):
@@ -159,23 +181,25 @@ class Schedule(object):
             else:
                 c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?',
                                (f'{path}',))
-        return [cls._from_get_query(row, fs) for row in c.fetchall()]
+        return [cls._from_db_row(row, fs) for row in c.fetchall()]
 
     INSERT_SCHEDULE = '''INSERT INTO
-        schedules(path, subvol, rel_path)
-        Values(?, ?, ?);'''
+        schedules(path, subvol, retention, rel_path)
+        Values(?, ?, ?, ?);'''
     INSERT_SCHEDULE_META = '''INSERT INTO
         schedules_meta(schedule_id, start, created, repeat, schedule,
-        retention, active)
-        SELECT ?, ?, ?, ?, ?, ?, ?'''
+        active)
+        SELECT ?, ?, ?, ?, ?, ?'''
 
     def store_schedule(self, db):
         sched_id = None
         with db:
             try:
+                log.debug(f'schedule with retention {self.retention}')
                 c = db.execute(self.INSERT_SCHEDULE,
                                (self.path,
                                 self.subvol,
+                                json.dumps(self.retention),
                                 self.rel_path,))
                 sched_id = c.lastrowid
             except sqlite3.IntegrityError:
@@ -191,7 +215,6 @@ class Schedule(object):
                         self.created.isoformat(),
                         self.repeat,
                         self.schedule,
-                        self.retention,
                         1))
 
     @classmethod
@@ -236,6 +259,45 @@ class Schedule(object):
                 # rest
                 db.execute('DELETE FROM schedules WHERE id = ?;', id_)
 
+    GET_RETENTION = '''SELECT retention FROM schedules
+    WHERE path = ?'''
+    UPDATE_RETENTION = '''UPDATE schedules
+    SET retention = ?
+    WHERE path = ?'''
+
+    @classmethod
+    def add_retention(cls, db, path, retention_spec):
+        with db:
+            row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
+            if not row:
+                raise ValueError(f'No schedule found for {path}')
+            retention = parse_retention(retention_spec)
+            log.debug(f'db result is {tuple(row)}')
+            current = row['retention']
+            current_retention = json.loads(current)
+            for r, v in retention.items():
+                if r in current_retention:
+                    raise ValueError((f'Retention for {r} is already present '
+                                     'with value {current_retention[r]}. Please remove first'))
+            current_retention.update(retention)
+            db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
+
+    @classmethod
+    def rm_retention(cls, db, path, retention_spec):
+        with db:
+            row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
+            if not row:
+                raise ValueError(f'No schedule found for {path}')
+            retention = parse_retention(retention_spec)
+            current = row['retention']
+            current_retention = json.loads(current)
+            for r, v in retention.items():
+                if r not in current_retention or current_retention[r] != v:
+                    raise ValueError((f'Retention for {r}: {v} was not set for {path} '
+                                     'can\'t remove'))
+                current_retention.pop(r)
+            db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
+
     def report(self):
         return self.report_json()
 
@@ -246,7 +308,7 @@ class Schedule(object):
     def repeat(self):
         mult = self.schedule[-1]
         period = int(self.schedule[0:-1])
-        if mult == 'm':
+        if mult == 'M':
             return period * 60
         elif mult == 'h':
             return period * 60 * 60
@@ -255,7 +317,7 @@ class Schedule(object):
         elif mult == 'w':
             return period * 60 * 60 * 24 * 7
         else:
-            raise Exception('schedule multiplier not recognized')
+            raise ValueError(f'schedule multiplier "{mult}" not recognized')
 
     UPDATE_LAST = '''UPDATE schedules_meta AS sm
     SET
@@ -279,6 +341,8 @@ class Schedule(object):
                                           self.repeat))
         self.created_count += 1
         self.last = time
+        if not self.first:
+            self.first = time
 
     UPDATE_INACTIVE = '''UPDATE schedules_meta AS sm
     SET
@@ -297,6 +361,7 @@ class Schedule(object):
             db.execute(self.UPDATE_INACTIVE, (self.path,
                                               self.start.isoformat(),
                                               self.repeat))
+        self.active = False
 
     UPDATE_ACTIVE = '''UPDATE schedules_meta AS sm
     SET
@@ -315,6 +380,7 @@ class Schedule(object):
             db.execute(self.UPDATE_ACTIVE, (self.path,
                                             self.start.isoformat(),
                                             self.repeat))
+        self.active = True
 
     UPDATE_PRUNED = '''UPDATE schedules_meta AS sm
     SET
index bb02e2e7a5b939c348c95fe4973f075b066f95c2..58a57aef181fd39f2521c66f1915747c51263be9 100644 (file)
@@ -7,7 +7,6 @@ import cephfs
 import errno
 import rados
 from contextlib import contextmanager
-import re
 from mgr_util import CephfsClient, CephfsConnectionException, \
         open_filesystem
 from collections import OrderedDict
@@ -15,7 +14,7 @@ from datetime import datetime, timezone
 import logging
 from threading import Timer
 import sqlite3
-from .schedule import Schedule
+from .schedule import Schedule, parse_retention
 import traceback
 
 
@@ -57,17 +56,6 @@ def updates_schedule_db(func):
     return f
 
 
-def parse_retention(retention):
-    ret = {}
-    matches = re.findall(r'\d+[a-z]', retention)
-    for m in matches:
-        ret[m[-1]] = int(m[0:-1])
-    matches = re.findall(r'\d+[A-Z]', retention)
-    for m in matches:
-        ret[m[-1]] = int(m[0:-1])
-    return ret
-
-
 class SnapSchedClient(CephfsClient):
 
     def __init__(self, mgr):
@@ -170,21 +158,18 @@ class SnapSchedClient(CephfsClient):
     def prune_snapshots(self, sched):
         try:
             log.debug('Pruning snapshots')
-            ret = parse_retention(sched.retention)
+            ret = sched.retention
             path = sched.path
-            if not ret or "n" not in ret or ret["n"] > MAX_SNAPS_PER_PATH:
-                log.debug(f'Adding n: { MAX_SNAPS_PER_PATH} limit to {path}')
-                ret["n"] = MAX_SNAPS_PER_PATH
             prune_candidates = set()
             time = datetime.now(timezone.utc)
             with open_filesystem(self, sched.fs) as fs_handle:
                 with fs_handle.opendir(f'{path}/.snap') as d_handle:
                     dir_ = fs_handle.readdir(d_handle)
                     while dir_:
-                        if dir_.d_name.startswith(b'{SNAPSHOT_PREFIX}-'):
+                        if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
                             log.debug(f'add {dir_.d_name} to pruning')
                             ts = datetime.strptime(
-                                dir_.d_name.lstrip(b'{SNAPSHOT_PREFIX}-').decode('utf-8'),
+                                dir_.d_name.decode('utf-8').lstrip(f'{SNAPSHOT_PREFIX}-'),
                                 SNAPSHOT_TS_FORMAT)
                             prune_candidates.add((dir_, ts))
                         else:
@@ -205,7 +190,7 @@ class SnapSchedClient(CephfsClient):
         PRUNING_PATTERNS = OrderedDict([
             # n is for keep last n snapshots, uses the snapshot name timestamp
             # format for lowest granularity
-            ("n", SNAPSHOT_TS_FORMAT)
+            ("n", SNAPSHOT_TS_FORMAT),
             # TODO remove M for release
             ("M", '%Y-%m-%d-%H_%M'),
             ("h", '%Y-%m-%d-%H'),
@@ -214,8 +199,9 @@ class SnapSchedClient(CephfsClient):
             ("m", '%Y-%m'),
             ("y", '%Y'),
         ])
-        keep = set()
+        keep = []
         for period, date_pattern in PRUNING_PATTERNS.items():
+            log.debug(f'compiling keep set for period {period}')
             period_count = retention.get(period, 0)
             if not period_count:
                 continue
@@ -227,11 +213,14 @@ class SnapSchedClient(CephfsClient):
                     last = snap_ts
                     if snap not in keep:
                         log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
-                        keep.add(snap)
+                        keep.append(snap)
                         if len(keep) == period_count:
                             log.debug(f'found enough snapshots for {period_count}{period}')
                             break
-        return candidates - keep
+        if len(keep) > MAX_SNAPS_PER_PATH:
+            log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set')
+            keep = keep[:MAX_SNAPS_PER_PATH]
+        return candidates - set(keep)
 
     def get_snap_schedules(self, fs, path):
         db = self.get_schedule_db(fs)
@@ -255,6 +244,30 @@ class SnapSchedClient(CephfsClient):
         db = self.get_schedule_db(fs)
         Schedule.rm_schedule(db, path, repeat, start)
 
+    @updates_schedule_db
+    def add_retention_spec(self,
+                           fs,
+                           path,
+                           retention_spec_or_period,
+                           retention_count):
+        retention_spec = retention_spec_or_period
+        if retention_count:
+            retention_spec = retention_count + retention_spec
+        db = self.get_schedule_db(fs)
+        Schedule.add_retention(db, path, retention_spec)
+
+    @updates_schedule_db
+    def rm_retention_spec(self,
+                          fs,
+                          path,
+                          retention_spec_or_period,
+                          retention_count):
+        retention_spec = retention_spec_or_period
+        if retention_count:
+            retention_spec = retention_count + retention_spec
+        db = self.get_schedule_db(fs)
+        Schedule.rm_retention(db, path, retention_spec)
+
     @updates_schedule_db
     def activate_snap_schedule(self, fs, path, repeat, start):
         db = self.get_schedule_db(fs)
index 8f8d10c8aa71ea5586916ebd198c88e8863c2bc5..917aa5b78d29b2c4eb601ac454913320ba375dc1 100644 (file)
@@ -88,7 +88,6 @@ class Module(MgrModule):
     @CLIWriteCommand('fs snap-schedule add',
                      'name=path,type=CephString '
                      'name=snap-schedule,type=CephString '
-                     'name=retention-policy,type=CephString,req=false '
                      'name=start,type=CephString,req=false '
                      'name=fs,type=CephString,req=false '
                      'name=subvol,type=CephString,req=false',
@@ -96,15 +95,16 @@ class Module(MgrModule):
     def snap_schedule_add(self,
                           path,
                           snap_schedule,
-                          retention_policy='',
                           start=None,
                           fs=None,
                           subvol=None):
         try:
             use_fs = fs if fs else self.default_fs
             abs_path = self.resolve_subvolume_path(fs, subvol, path)
-            self.client.store_snap_schedule(use_fs, abs_path, (abs_path, snap_schedule,
-                                                     retention_policy, use_fs, path, start, subvol))
+            self.client.store_snap_schedule(use_fs,
+                                            abs_path,
+                                            (abs_path, snap_schedule,
+                                             use_fs, path, start, subvol))
             suc_msg = f'Schedule set for path {path}'
         except sqlite3.IntegrityError:
             existing_scheds = self.client.get_snap_schedules(use_fs, path)
@@ -112,6 +112,8 @@ class Module(MgrModule):
             error_msg = f'Found existing schedule {report}'
             self.log.error(error_msg)
             return -errno.EEXIST, '', error_msg
+        except ValueError as e:
+            return -errno.ENOENT, '', str(e)
         except CephfsConnectionException as e:
             return e.to_tuple()
         return 0, suc_msg, ''
@@ -139,6 +141,56 @@ class Module(MgrModule):
             return -errno.ENOENT, '', str(e)
         return 0, 'Schedule removed for path {}'.format(path), ''
 
+    @CLIWriteCommand('fs snap-schedule retention add',
+                     'name=path,type=CephString '
+                     'name=retention-spec-or-period,type=CephString '
+                     'name=retention-count,type=CephString,req=false '
+                     'name=fs,type=CephString,req=false '
+                     'name=subvol,type=CephString,req=false',
+                     'Set a retention specification for <path>')
+    def snap_schedule_retention_add(self,
+                                    path,
+                                    retention_spec_or_period,
+                                    retention_count=None,
+                                    fs=None,
+                                    subvol=None):
+        try:
+            use_fs = fs if fs else self.default_fs
+            abs_path = self.resolve_subvolume_path(fs, subvol, path)
+            self.client.add_retention_spec(use_fs, abs_path,
+                                          retention_spec_or_period,
+                                          retention_count)
+        except CephfsConnectionException as e:
+            return e.to_tuple()
+        except ValueError as e:
+            return -errno.ENOENT, '', str(e)
+        return 0, 'Retention added to path {}'.format(path), ''
+
+    @CLIWriteCommand('fs snap-schedule retention remove',
+                     'name=path,type=CephString '
+                     'name=retention-spec-or-period,type=CephString '
+                     'name=retention-count,type=CephString,req=false '
+                     'name=fs,type=CephString,req=false '
+                     'name=subvol,type=CephString,req=false',
+                     'Remove a retention specification for <path>')
+    def snap_schedule_retention_rm(self,
+                                   path,
+                                   retention_spec_or_period,
+                                   retention_count=None,
+                                   fs=None,
+                                   subvol=None):
+        try:
+            use_fs = fs if fs else self.default_fs
+            abs_path = self.resolve_subvolume_path(fs, subvol, path)
+            self.client.rm_retention_spec(use_fs, abs_path,
+                                          retention_spec_or_period,
+                                          retention_count)
+        except CephfsConnectionException as e:
+            return e.to_tuple()
+        except ValueError as e:
+            return -errno.ENOENT, '', str(e)
+        return 0, 'Retention removed from path {}'.format(path), ''
+
     @CLIWriteCommand('fs snap-schedule activate',
                      'name=path,type=CephString '
                      'name=repeat,type=CephString,req=false '
index 12b7a9066edcc84cebfc6ae029eed89b752bd3dd..727d723f3dd906b20eda99a42f866fa1f486f20c 100644 (file)
@@ -6,10 +6,10 @@ from fs.schedule import Schedule
 # simple_schedule fixture returns schedules without any timing arguments
 # the tuple values correspong to ctor args for Schedule
 _simple_schedules = [
-    ('/foo', '6h', '', 'fs_name', '/foo'),
-    ('/foo', '24h', '10d', 'fs_name', '/foo'),
-    ('/bar', '1d', '30d', 'fs_name', '/bar'),
-    ('/fnord', '1w', '4w1m', 'fs_name', '/fnord'),
+    ('/foo', '6h', 'fs_name', '/foo'),
+    ('/foo', '24h', 'fs_name', '/foo'),
+    ('/bar', '1d', 'fs_name', '/bar'),
+    ('/fnord', '1w', 'fs_name', '/fnord'),
 ]
 
 
index 4eaa908dd61353ff1fb6b11dccd197410d8e3c98..13683353b9eec0f2e9250bd7595fa60708103f99 100644 (file)
@@ -1,12 +1,34 @@
 import datetime
-from fs.schedule import Schedule
+import json
+import pytest
+import random
+import sqlite3
+from fs.schedule import Schedule, parse_retention
 
 SELECT_ALL = ('select * from schedules s'
               ' INNER JOIN schedules_meta sm'
               ' ON sm.schedule_id = s.id')
 
 
+def assert_updated(new, old, update_expected={}):
+    '''
+    This helper asserts that an object new has been updated in the
+    attributes in the dict updated AND has not changed in other attributes
+    compared to old.
+    if update expected is the empty dict, equality is checked
+    '''
+
+    for var in vars(new):
+        if var in update_expected:
+            assert getattr(new, var) == update_expected.get(var), f'new did not update value for {var}'
+        else:
+            assert getattr(new, var) == getattr(old, var), f'new changed unexpectedly in value for {var}'
+
+
 class TestSchedule(object):
+    '''
+    Test the schedule class basics and that its methods update self as expected
+    '''
 
     def test_start_default_midnight(self, simple_schedule):
         now = datetime.datetime.now(datetime.timezone.utc)
@@ -37,10 +59,9 @@ class TestSchedule(object):
         with db:
             row = db.execute(SELECT_ALL).fetchone()
 
-        db_schedule = Schedule._from_get_query(row, simple_schedule.fs)
+        db_schedule = Schedule._from_db_row(row, simple_schedule.fs)
 
-        for var in vars(db_schedule):
-            assert getattr(simple_schedule, var) == getattr(db_schedule, var)
+        assert_updated(db_schedule, simple_schedule)
 
     def test_store_multiple(self, db, simple_schedules):
         [s.store_schedule(db) for s in simple_schedules]
@@ -54,66 +75,162 @@ class TestSchedule(object):
     def test_update_last(self, db, simple_schedule):
         simple_schedule.store_schedule(db)
 
-        row = ()
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert row['last'] is None
+            _ = db.execute(SELECT_ALL).fetchone()
 
         first_time = datetime.datetime.now(datetime.timezone.utc)
         simple_schedule.update_last(first_time, db)
 
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert datetime.datetime.fromisoformat(row['last']) == first_time
+            after = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+                       simple_schedule)
 
         second_time = datetime.datetime.now(datetime.timezone.utc)
         simple_schedule.update_last(second_time, db)
 
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert datetime.datetime.fromisoformat(row['last']) == second_time
+            after2 = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+                       simple_schedule)
 
-    def test_update_created_count(self, db, simple_schedule):
+    def test_set_inactive_active(self, db, simple_schedule):
         simple_schedule.store_schedule(db)
 
-        row = ()
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert row['created_count'] == 0
+            _ = db.execute(SELECT_ALL).fetchone()
 
-        first_time = datetime.datetime.now(datetime.timezone.utc)
-        simple_schedule.update_last(first_time, db)
+        simple_schedule.set_inactive(db)
 
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert row['created_count'] == 1
+            after = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+                       simple_schedule)
 
-        second_time = datetime.datetime.now(datetime.timezone.utc)
-        simple_schedule.update_last(second_time, db)
+        simple_schedule.set_active(db)
 
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert row['created_count'] == 2
+            after2 = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+                       simple_schedule)
 
-    def test_update_first(self, db, simple_schedule):
+    def test_update_pruned(self, db, simple_schedule):
         simple_schedule.store_schedule(db)
 
-        row = ()
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert row['first'] is None
+            _ = db.execute(SELECT_ALL).fetchone()
+
+        now = datetime.datetime.now(datetime.timezone.utc)
+        pruned_count = random.randint(1, 1000)
+
+        simple_schedule.update_pruned(now, db, pruned_count)
+
+        with db:
+            after = db.execute(SELECT_ALL).fetchone()
+
+        assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+                       simple_schedule)
+
+    # TODO test get_schedules and list_schedules
+
+
+class TestScheduleDB(object):
+    '''
+    This class tests that Schedules methods update the DB correctly
+    '''
+
+    def test_update_last(self, db, simple_schedule):
+        simple_schedule.store_schedule(db)
+
+        with db:
+            before = db.execute(SELECT_ALL).fetchone()
 
         first_time = datetime.datetime.now(datetime.timezone.utc)
         simple_schedule.update_last(first_time, db)
 
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert datetime.datetime.fromisoformat(row['first']) == first_time
+            after = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+                       Schedule._from_db_row(before, simple_schedule.fs),
+                       {'created_count': 1,
+                        'last': first_time,
+                        'first': first_time})
 
         second_time = datetime.datetime.now(datetime.timezone.utc)
         simple_schedule.update_last(second_time, db)
 
         with db:
-            row = db.execute(SELECT_ALL).fetchone()
-        assert datetime.datetime.fromisoformat(row['first']) == first_time
+            after2 = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+                       Schedule._from_db_row(after, simple_schedule.fs),
+                       {'created_count': 2, 'last': second_time})
+
+    def test_set_inactive_active(self, db, simple_schedule):
+        simple_schedule.store_schedule(db)
+
+        with db:
+            before = db.execute(SELECT_ALL).fetchone()
+
+        simple_schedule.set_inactive(db)
+
+        with db:
+            after = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+                       Schedule._from_db_row(before, simple_schedule.fs),
+                       {'active': 0})
+
+        simple_schedule.set_active(db)
+
+        with db:
+            after2 = db.execute(SELECT_ALL).fetchone()
+        assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+                       Schedule._from_db_row(after, simple_schedule.fs),
+                       {'active': 1})
+
+    def test_update_pruned(self, db, simple_schedule):
+        simple_schedule.store_schedule(db)
+
+        with db:
+            before = db.execute(SELECT_ALL).fetchone()
+
+        now = datetime.datetime.now(datetime.timezone.utc)
+        pruned_count = random.randint(1, 1000)
+
+        simple_schedule.update_pruned(now, db, pruned_count)
+
+        with db:
+            after = db.execute(SELECT_ALL).fetchone()
+
+        assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+                       Schedule._from_db_row(before, simple_schedule.fs),
+                       {'last_pruned': now, 'pruned_count': pruned_count})
+
+    def test_add_retention(self, db, simple_schedule):
+        simple_schedule.store_schedule(db)
+
+        with db:
+            before = db.execute(SELECT_ALL).fetchone()
+
+        retention = "7d12m"
+        simple_schedule.add_retention(db, simple_schedule.path, retention)
+
+        with db:
+            after = db.execute(SELECT_ALL).fetchone()
+
+        assert after['retention'] == json.dumps(parse_retention(retention))
+
+        retention2 = "4w"
+        simple_schedule.add_retention(db, simple_schedule.path, retention2)
+
+        with db:
+            after = db.execute(SELECT_ALL).fetchone()
+
+        assert after['retention'] == json.dumps(parse_retention(retention + retention2))
+
+    def test_per_path_and_repeat_uniqness(self, db):
+        s1 = Schedule(*('/foo', '24h', 'fs_name', '/foo'))
+        s2 = Schedule(*('/foo', '1d', 'fs_name', '/foo'))
 
+        s1.store_schedule(db)
+        with pytest.raises(sqlite3.IntegrityError):
+            s2.store_schedule(db)