from datetime import datetime, timezone
import json
import logging
+import re
import sqlite3
log = logging.getLogger(__name__)
+def parse_retention(retention):
+ ret = {}
+ log.debug(f'parse_retention({retention})')
+ matches = re.findall(r'\d+[a-z]', retention)
+ for m in matches:
+ ret[m[-1]] = int(m[0:-1])
+ matches = re.findall(r'\d+[A-Z]', retention)
+ for m in matches:
+ ret[m[-1]] = int(m[0:-1])
+ log.debug(f'parse_retention({retention}) -> {ret}')
+ return ret
+
+RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y']
+
+def dump_retention(retention):
+ ret = ''
+ for mult in RETENTION_MULTIPLIERS:
+ if mult in retention:
+ ret += str(retention[mult]) + mult
+ return ret
+
class Schedule(object):
'''
Wrapper to work with schedules stored in sqlite
def __init__(self,
path,
schedule,
- retention_policy,
fs_name,
rel_path,
start=None,
subvol=None,
+ retention_policy='{}',
created=None,
first=None,
last=None,
self.path = path
self.rel_path = rel_path
self.schedule = schedule
- self.retention = retention_policy
+ self.retention = json.loads(retention_policy)
if start is None:
now = datetime.now(timezone.utc)
self.start = datetime(now.year,
self.last_pruned = last_pruned
self.created_count = created_count
self.pruned_count = pruned_count
- self.active = active
+ self.active = bool(active)
@classmethod
- def _from_get_query(cls, table_row, fs):
+ def _from_db_row(cls, table_row, fs):
return cls(table_row['path'],
table_row['schedule'],
- table_row['retention'],
fs,
table_row['rel_path'],
table_row['start'],
table_row['subvol'],
+ table_row['retention'],
table_row['created'],
table_row['first'],
table_row['last'],
table_row['active'])
def __str__(self):
- return f'''{self.path} {self.schedule} {self.retention}'''
+ return f'''{self.path} {self.schedule} {dump_retention(self.retention)}'''
def json_list(self):
return json.dumps({'path': self.path, 'schedule': self.schedule,
- 'retention': self.retention})
+ 'retention': dump_retention(self.retention)})
CREATE_TABLES = '''CREATE TABLE schedules(
id INTEGER PRIMARY KEY ASC,
path TEXT NOT NULL UNIQUE,
subvol TEXT,
+ retention TEXT DEFAULT '{}',
rel_path TEXT NOT NULL
);
CREATE TABLE schedules_meta(
schedule TEXT NOT NULL,
created_count INT DEFAULT 0,
pruned_count INT DEFAULT 0,
- retention TEXT,
active INT NOT NULL,
FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
UNIQUE (schedule_id, start, repeat)
);'''
EXEC_QUERY = '''SELECT
- sm.retention,
+ s.retention,
sm.repeat - (strftime("%s", "now") - strftime("%s", sm.start)) %
sm.repeat "until",
sm.start, sm.repeat
PROTO_GET_SCHEDULES = '''SELECT
s.path, s.subvol, s.rel_path, sm.active,
- sm.schedule, sm.retention, sm.start, sm.first, sm.last,
+ sm.schedule, s.retention, sm.start, sm.first, sm.last,
sm.last_pruned, sm.created, sm.created_count, sm.pruned_count
FROM schedules s
INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
data += (start,)
with db:
c = db.execute(query, data)
- return [cls._from_get_query(row, fs) for row in c.fetchall()]
+ return [cls._from_db_row(row, fs) for row in c.fetchall()]
@classmethod
def list_schedules(cls, path, db, fs, recursive):
else:
c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?',
(f'{path}',))
- return [cls._from_get_query(row, fs) for row in c.fetchall()]
+ return [cls._from_db_row(row, fs) for row in c.fetchall()]
INSERT_SCHEDULE = '''INSERT INTO
- schedules(path, subvol, rel_path)
- Values(?, ?, ?);'''
+ schedules(path, subvol, retention, rel_path)
+ Values(?, ?, ?, ?);'''
INSERT_SCHEDULE_META = '''INSERT INTO
schedules_meta(schedule_id, start, created, repeat, schedule,
- retention, active)
- SELECT ?, ?, ?, ?, ?, ?, ?'''
+ active)
+ SELECT ?, ?, ?, ?, ?, ?'''
def store_schedule(self, db):
sched_id = None
with db:
try:
+ log.debug(f'schedule with retention {self.retention}')
c = db.execute(self.INSERT_SCHEDULE,
(self.path,
self.subvol,
+ json.dumps(self.retention),
self.rel_path,))
sched_id = c.lastrowid
except sqlite3.IntegrityError:
self.created.isoformat(),
self.repeat,
self.schedule,
- self.retention,
1))
@classmethod
# rest
db.execute('DELETE FROM schedules WHERE id = ?;', id_)
+ GET_RETENTION = '''SELECT retention FROM schedules
+ WHERE path = ?'''
+ UPDATE_RETENTION = '''UPDATE schedules
+ SET retention = ?
+ WHERE path = ?'''
+
+ @classmethod
+ def add_retention(cls, db, path, retention_spec):
+ with db:
+ row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
+ if not row:
+ raise ValueError(f'No schedule found for {path}')
+ retention = parse_retention(retention_spec)
+ log.debug(f'db result is {tuple(row)}')
+ current = row['retention']
+ current_retention = json.loads(current)
+ for r, v in retention.items():
+ if r in current_retention:
+ raise ValueError((f'Retention for {r} is already present '
+ 'with value {current_retention[r]}. Please remove first'))
+ current_retention.update(retention)
+ db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
+
+ @classmethod
+ def rm_retention(cls, db, path, retention_spec):
+ with db:
+ row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
+ if not row:
+ raise ValueError(f'No schedule found for {path}')
+ retention = parse_retention(retention_spec)
+ current = row['retention']
+ current_retention = json.loads(current)
+ for r, v in retention.items():
+ if r not in current_retention or current_retention[r] != v:
+ raise ValueError((f'Retention for {r}: {v} was not set for {path} '
+ 'can\'t remove'))
+ current_retention.pop(r)
+ db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
+
def report(self):
return self.report_json()
def repeat(self):
mult = self.schedule[-1]
period = int(self.schedule[0:-1])
- if mult == 'm':
+ if mult == 'M':
return period * 60
elif mult == 'h':
return period * 60 * 60
elif mult == 'w':
return period * 60 * 60 * 24 * 7
else:
- raise Exception('schedule multiplier not recognized')
+ raise ValueError(f'schedule multiplier "{mult}" not recognized')
UPDATE_LAST = '''UPDATE schedules_meta AS sm
SET
self.repeat))
self.created_count += 1
self.last = time
+ if not self.first:
+ self.first = time
UPDATE_INACTIVE = '''UPDATE schedules_meta AS sm
SET
db.execute(self.UPDATE_INACTIVE, (self.path,
self.start.isoformat(),
self.repeat))
+ self.active = False
UPDATE_ACTIVE = '''UPDATE schedules_meta AS sm
SET
db.execute(self.UPDATE_ACTIVE, (self.path,
self.start.isoformat(),
self.repeat))
+ self.active = True
UPDATE_PRUNED = '''UPDATE schedules_meta AS sm
SET
import errno
import rados
from contextlib import contextmanager
-import re
from mgr_util import CephfsClient, CephfsConnectionException, \
open_filesystem
from collections import OrderedDict
import logging
from threading import Timer
import sqlite3
-from .schedule import Schedule
+from .schedule import Schedule, parse_retention
import traceback
return f
-def parse_retention(retention):
- ret = {}
- matches = re.findall(r'\d+[a-z]', retention)
- for m in matches:
- ret[m[-1]] = int(m[0:-1])
- matches = re.findall(r'\d+[A-Z]', retention)
- for m in matches:
- ret[m[-1]] = int(m[0:-1])
- return ret
-
-
class SnapSchedClient(CephfsClient):
def __init__(self, mgr):
def prune_snapshots(self, sched):
try:
log.debug('Pruning snapshots')
- ret = parse_retention(sched.retention)
+ ret = sched.retention
path = sched.path
- if not ret or "n" not in ret or ret["n"] > MAX_SNAPS_PER_PATH:
- log.debug(f'Adding n: { MAX_SNAPS_PER_PATH} limit to {path}')
- ret["n"] = MAX_SNAPS_PER_PATH
prune_candidates = set()
time = datetime.now(timezone.utc)
with open_filesystem(self, sched.fs) as fs_handle:
with fs_handle.opendir(f'{path}/.snap') as d_handle:
dir_ = fs_handle.readdir(d_handle)
while dir_:
- if dir_.d_name.startswith(b'{SNAPSHOT_PREFIX}-'):
+ if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
log.debug(f'add {dir_.d_name} to pruning')
ts = datetime.strptime(
- dir_.d_name.lstrip(b'{SNAPSHOT_PREFIX}-').decode('utf-8'),
+ dir_.d_name.decode('utf-8').lstrip(f'{SNAPSHOT_PREFIX}-'),
SNAPSHOT_TS_FORMAT)
prune_candidates.add((dir_, ts))
else:
PRUNING_PATTERNS = OrderedDict([
# n is for keep last n snapshots, uses the snapshot name timestamp
# format for lowest granularity
- ("n", SNAPSHOT_TS_FORMAT)
+ ("n", SNAPSHOT_TS_FORMAT),
# TODO remove M for release
("M", '%Y-%m-%d-%H_%M'),
("h", '%Y-%m-%d-%H'),
("m", '%Y-%m'),
("y", '%Y'),
])
- keep = set()
+ keep = []
for period, date_pattern in PRUNING_PATTERNS.items():
+ log.debug(f'compiling keep set for period {period}')
period_count = retention.get(period, 0)
if not period_count:
continue
last = snap_ts
if snap not in keep:
log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
- keep.add(snap)
+ keep.append(snap)
if len(keep) == period_count:
log.debug(f'found enough snapshots for {period_count}{period}')
break
- return candidates - keep
+ if len(keep) > MAX_SNAPS_PER_PATH:
+ log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set')
+ keep = keep[:MAX_SNAPS_PER_PATH]
+ return candidates - set(keep)
def get_snap_schedules(self, fs, path):
db = self.get_schedule_db(fs)
db = self.get_schedule_db(fs)
Schedule.rm_schedule(db, path, repeat, start)
+ @updates_schedule_db
+ def add_retention_spec(self,
+ fs,
+ path,
+ retention_spec_or_period,
+ retention_count):
+ retention_spec = retention_spec_or_period
+ if retention_count:
+ retention_spec = retention_count + retention_spec
+ db = self.get_schedule_db(fs)
+ Schedule.add_retention(db, path, retention_spec)
+
+ @updates_schedule_db
+ def rm_retention_spec(self,
+ fs,
+ path,
+ retention_spec_or_period,
+ retention_count):
+ retention_spec = retention_spec_or_period
+ if retention_count:
+ retention_spec = retention_count + retention_spec
+ db = self.get_schedule_db(fs)
+ Schedule.rm_retention(db, path, retention_spec)
+
@updates_schedule_db
def activate_snap_schedule(self, fs, path, repeat, start):
db = self.get_schedule_db(fs)
@CLIWriteCommand('fs snap-schedule add',
'name=path,type=CephString '
'name=snap-schedule,type=CephString '
- 'name=retention-policy,type=CephString,req=false '
'name=start,type=CephString,req=false '
'name=fs,type=CephString,req=false '
'name=subvol,type=CephString,req=false',
def snap_schedule_add(self,
path,
snap_schedule,
- retention_policy='',
start=None,
fs=None,
subvol=None):
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
- self.client.store_snap_schedule(use_fs, abs_path, (abs_path, snap_schedule,
- retention_policy, use_fs, path, start, subvol))
+ self.client.store_snap_schedule(use_fs,
+ abs_path,
+ (abs_path, snap_schedule,
+ use_fs, path, start, subvol))
suc_msg = f'Schedule set for path {path}'
except sqlite3.IntegrityError:
existing_scheds = self.client.get_snap_schedules(use_fs, path)
error_msg = f'Found existing schedule {report}'
self.log.error(error_msg)
return -errno.EEXIST, '', error_msg
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
except CephfsConnectionException as e:
return e.to_tuple()
return 0, suc_msg, ''
return -errno.ENOENT, '', str(e)
return 0, 'Schedule removed for path {}'.format(path), ''
+ @CLIWriteCommand('fs snap-schedule retention add',
+ 'name=path,type=CephString '
+ 'name=retention-spec-or-period,type=CephString '
+ 'name=retention-count,type=CephString,req=false '
+ 'name=fs,type=CephString,req=false '
+ 'name=subvol,type=CephString,req=false',
+ 'Set a retention specification for <path>')
+ def snap_schedule_retention_add(self,
+ path,
+ retention_spec_or_period,
+ retention_count=None,
+ fs=None,
+ subvol=None):
+ try:
+ use_fs = fs if fs else self.default_fs
+ abs_path = self.resolve_subvolume_path(fs, subvol, path)
+ self.client.add_retention_spec(use_fs, abs_path,
+ retention_spec_or_period,
+ retention_count)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ return 0, 'Retention added to path {}'.format(path), ''
+
+ @CLIWriteCommand('fs snap-schedule retention remove',
+ 'name=path,type=CephString '
+ 'name=retention-spec-or-period,type=CephString '
+ 'name=retention-count,type=CephString,req=false '
+ 'name=fs,type=CephString,req=false '
+ 'name=subvol,type=CephString,req=false',
+ 'Remove a retention specification for <path>')
+ def snap_schedule_retention_rm(self,
+ path,
+ retention_spec_or_period,
+ retention_count=None,
+ fs=None,
+ subvol=None):
+ try:
+ use_fs = fs if fs else self.default_fs
+ abs_path = self.resolve_subvolume_path(fs, subvol, path)
+ self.client.rm_retention_spec(use_fs, abs_path,
+ retention_spec_or_period,
+ retention_count)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ return 0, 'Retention removed from path {}'.format(path), ''
+
@CLIWriteCommand('fs snap-schedule activate',
'name=path,type=CephString '
'name=repeat,type=CephString,req=false '
# simple_schedule fixture returns schedules without any timing arguments
# the tuple values correspong to ctor args for Schedule
_simple_schedules = [
- ('/foo', '6h', '', 'fs_name', '/foo'),
- ('/foo', '24h', '10d', 'fs_name', '/foo'),
- ('/bar', '1d', '30d', 'fs_name', '/bar'),
- ('/fnord', '1w', '4w1m', 'fs_name', '/fnord'),
+ ('/foo', '6h', 'fs_name', '/foo'),
+ ('/foo', '24h', 'fs_name', '/foo'),
+ ('/bar', '1d', 'fs_name', '/bar'),
+ ('/fnord', '1w', 'fs_name', '/fnord'),
]
import datetime
-from fs.schedule import Schedule
+import json
+import pytest
+import random
+import sqlite3
+from fs.schedule import Schedule, parse_retention
SELECT_ALL = ('select * from schedules s'
' INNER JOIN schedules_meta sm'
' ON sm.schedule_id = s.id')
+def assert_updated(new, old, update_expected={}):
+ '''
+ This helper asserts that an object new has been updated in the
+ attributes in the dict updated AND has not changed in other attributes
+ compared to old.
+ if update expected is the empty dict, equality is checked
+ '''
+
+ for var in vars(new):
+ if var in update_expected:
+ assert getattr(new, var) == update_expected.get(var), f'new did not update value for {var}'
+ else:
+ assert getattr(new, var) == getattr(old, var), f'new changed unexpectedly in value for {var}'
+
+
class TestSchedule(object):
+ '''
+ Test the schedule class basics and that its methods update self as expected
+ '''
def test_start_default_midnight(self, simple_schedule):
now = datetime.datetime.now(datetime.timezone.utc)
with db:
row = db.execute(SELECT_ALL).fetchone()
- db_schedule = Schedule._from_get_query(row, simple_schedule.fs)
+ db_schedule = Schedule._from_db_row(row, simple_schedule.fs)
- for var in vars(db_schedule):
- assert getattr(simple_schedule, var) == getattr(db_schedule, var)
+ assert_updated(db_schedule, simple_schedule)
def test_store_multiple(self, db, simple_schedules):
[s.store_schedule(db) for s in simple_schedules]
def test_update_last(self, db, simple_schedule):
simple_schedule.store_schedule(db)
- row = ()
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert row['last'] is None
+ _ = db.execute(SELECT_ALL).fetchone()
first_time = datetime.datetime.now(datetime.timezone.utc)
simple_schedule.update_last(first_time, db)
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert datetime.datetime.fromisoformat(row['last']) == first_time
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ simple_schedule)
second_time = datetime.datetime.now(datetime.timezone.utc)
simple_schedule.update_last(second_time, db)
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert datetime.datetime.fromisoformat(row['last']) == second_time
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ simple_schedule)
- def test_update_created_count(self, db, simple_schedule):
+ def test_set_inactive_active(self, db, simple_schedule):
simple_schedule.store_schedule(db)
- row = ()
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert row['created_count'] == 0
+ _ = db.execute(SELECT_ALL).fetchone()
- first_time = datetime.datetime.now(datetime.timezone.utc)
- simple_schedule.update_last(first_time, db)
+ simple_schedule.set_inactive(db)
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert row['created_count'] == 1
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ simple_schedule)
- second_time = datetime.datetime.now(datetime.timezone.utc)
- simple_schedule.update_last(second_time, db)
+ simple_schedule.set_active(db)
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert row['created_count'] == 2
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ simple_schedule)
- def test_update_first(self, db, simple_schedule):
+ def test_update_pruned(self, db, simple_schedule):
simple_schedule.store_schedule(db)
- row = ()
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert row['first'] is None
+ _ = db.execute(SELECT_ALL).fetchone()
+
+ now = datetime.datetime.now(datetime.timezone.utc)
+ pruned_count = random.randint(1, 1000)
+
+ simple_schedule.update_pruned(now, db, pruned_count)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ simple_schedule)
+
+ # TODO test get_schedules and list_schedules
+
+
+class TestScheduleDB(object):
+ '''
+ This class tests that Schedules methods update the DB correctly
+ '''
+
+ def test_update_last(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
first_time = datetime.datetime.now(datetime.timezone.utc)
simple_schedule.update_last(first_time, db)
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert datetime.datetime.fromisoformat(row['first']) == first_time
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ Schedule._from_db_row(before, simple_schedule.fs),
+ {'created_count': 1,
+ 'last': first_time,
+ 'first': first_time})
second_time = datetime.datetime.now(datetime.timezone.utc)
simple_schedule.update_last(second_time, db)
with db:
- row = db.execute(SELECT_ALL).fetchone()
- assert datetime.datetime.fromisoformat(row['first']) == first_time
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ Schedule._from_db_row(after, simple_schedule.fs),
+ {'created_count': 2, 'last': second_time})
+
+ def test_set_inactive_active(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ simple_schedule.set_inactive(db)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ Schedule._from_db_row(before, simple_schedule.fs),
+ {'active': 0})
+
+ simple_schedule.set_active(db)
+
+ with db:
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ Schedule._from_db_row(after, simple_schedule.fs),
+ {'active': 1})
+
+ def test_update_pruned(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ now = datetime.datetime.now(datetime.timezone.utc)
+ pruned_count = random.randint(1, 1000)
+
+ simple_schedule.update_pruned(now, db, pruned_count)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ Schedule._from_db_row(before, simple_schedule.fs),
+ {'last_pruned': now, 'pruned_count': pruned_count})
+
+ def test_add_retention(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ retention = "7d12m"
+ simple_schedule.add_retention(db, simple_schedule.path, retention)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert after['retention'] == json.dumps(parse_retention(retention))
+
+ retention2 = "4w"
+ simple_schedule.add_retention(db, simple_schedule.path, retention2)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert after['retention'] == json.dumps(parse_retention(retention + retention2))
+
+ def test_per_path_and_repeat_uniqness(self, db):
+ s1 = Schedule(*('/foo', '24h', 'fs_name', '/foo'))
+ s2 = Schedule(*('/foo', '1d', 'fs_name', '/foo'))
+ s1.store_schedule(db)
+ with pytest.raises(sqlite3.IntegrityError):
+ s2.store_schedule(db)