LGPL2.1. See file COPYING.
"""
from datetime import datetime, timezone
+import json
import logging
import sqlite3
def __str__(self):
return f'''{self.path} {self.schedule} {self.retention}'''
+ def json_list(self):
+ return json.dumps({'path': self.path, 'schedule': self.schedule,
+ 'retention': self.retention})
+
CREATE_TABLES = '''CREATE TABLE schedules(
- id integer PRIMARY KEY ASC,
- path text NOT NULL UNIQUE,
- subvol text,
- rel_path text NOT NULL,
- active int NOT NULL
+ id INTEGER PRIMARY KEY ASC,
+ path TEXT NOT NULL UNIQUE,
+ subvol TEXT,
+ rel_path TEXT NOT NULL
);
CREATE TABLE schedules_meta(
id INTEGER PRIMARY KEY ASC,
first TEXT,
last TEXT,
last_pruned TEXT,
- created TEXT,
- repeat BIGINT NOT NULL,
+ created TEXT NOT NULL,
+ repeat INT NOT NULL,
schedule TEXT NOT NULL,
created_count INT DEFAULT 0,
pruned_count INT DEFAULT 0,
retention TEXT,
+ active INT NOT NULL,
FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
UNIQUE (start, repeat)
);'''
WHERE
s.path = ? AND
strftime("%s", "now") - strftime("%s", sm.start) > 0 AND
- s.active = 1
+ sm.active = 1
ORDER BY until;'''
PROTO_GET_SCHEDULES = '''SELECT
- s.path, s.subvol, s.rel_path, s.active,
+ s.path, s.subvol, s.rel_path, sm.active,
sm.schedule, sm.retention, sm.start, sm.first, sm.last,
sm.last_pruned, sm.created, sm.created_count, sm.pruned_count
FROM schedules s
INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
WHERE'''
- GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'
-
- GET_SCHEDULE = PROTO_GET_SCHEDULES + ' s.path = ? and sm.start = ? AND sm.repeat = ?'
+ GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'''
- # TODO merge these two methods
@classmethod
- def get_db_schedules(cls, path, db, fs):
+ def get_db_schedules(cls, path, db, fs, repeat=None, start=None):
+ query = cls.GET_SCHEDULES
+ data = (path,)
+ if repeat:
+ query += ' AND sm.repeat = ?'
+ data += (repeat,)
+ if start:
+ query += ' AND sm.start = ?'
+ data += (start,)
with db:
- c = db.execute(cls.GET_SCHEDULES, (path,))
+ c = db.execute(query, data)
return [cls._from_get_query(row, fs) for row in c.fetchall()]
- @classmethod
- def get_db_schedule(cls, db, fs, path, start, repeat):
- with db:
- c = db.execute(cls.GET_SCHEDULE, (path, start, repeat))
- r = c.fetchone()
- if r:
- return cls._from_get_query(r, fs)
- else:
- return None
-
@classmethod
def list_schedules(cls, path, db, fs, recursive):
with db:
return [cls._from_get_query(row, fs) for row in c.fetchall()]
INSERT_SCHEDULE = '''INSERT INTO
- schedules(path, subvol, rel_path, active)
- Values(?, ?, ?, ?);'''
+ schedules(path, subvol, rel_path)
+ Values(?, ?, ?);'''
INSERT_SCHEDULE_META = '''INSERT INTO
- schedules_meta(schedule_id, start, created, repeat, schedule, retention)
- SELECT ?, ?, ?, ?, ?, ?'''
+ schedules_meta(schedule_id, start, created, repeat, schedule,
+ retention, active)
+ SELECT ?, ?, ?, ?, ?, ?, ?'''
def store_schedule(self, db):
sched_id = None
c = db.execute(self.INSERT_SCHEDULE,
(self.path,
self.subvol,
- self.rel_path,
- 1))
+ self.rel_path,))
sched_id = c.lastrowid
except sqlite3.IntegrityError:
# might be adding another schedule, retrieve sched id
self.created.isoformat(),
self.repeat,
self.schedule,
- self.retention))
+ self.retention,
+ 1))
@classmethod
def rm_schedule(cls, db, path, repeat, start):
db.execute('DELETE FROM schedules WHERE id = ?;', id_)
def report(self):
- import pprint
- return pprint.pformat(self.__dict__)
+ return self.report_json()
+
+ def report_json(self):
+ return json.dumps(dict(self.__dict__), default=lambda o: o.isoformat())
@property
def repeat(self):
else:
raise Exception('schedule multiplier not recognized')
- UPDATE_LAST = '''UPDATE schedules_meta
+ UPDATE_LAST = '''UPDATE schedules_meta AS sm
SET
last = ?,
created_count = created_count + 1,
first = CASE WHEN first IS NULL THEN ? ELSE first END
- WHERE
- start = ? AND
- repeat = ?;'''
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = sm.schedule_id
+ AND s.path = ?
+ AND sm.start = ?
+ AND sm.repeat = ?);'''
def update_last(self, time, db):
with db:
- db.execute(self.UPDATE_LAST, (time.isoformat(), time.isoformat(),
- self.start.isoformat(), self.repeat))
+ db.execute(self.UPDATE_LAST, (time.isoformat(),
+ time.isoformat(),
+ self.path,
+ self.start.isoformat(),
+ self.repeat))
self.created_count += 1
self.last = time
- # TODO add option to only change one snapshot in a path, i.e. pass repeat
- # and start time as well
- UPDATE_INACTIVE = '''UPDATE schedules
+ UPDATE_INACTIVE = '''UPDATE schedules_meta AS sm
SET
active = 0
- WHERE
- path = ?;'''
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = sm.schedule_id
+ AND s.path = ?
+ AND sm.start = ?
+ AND sm.repeat = ?);'''
def set_inactive(self, db):
with db:
- log.debug(f'Deactivating schedule on path {self.path}')
- db.execute(self.UPDATE_INACTIVE, (self.path,))
+ log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}')
+ db.execute(self.UPDATE_INACTIVE, (self.path,
+ self.start.isoformat(),
+ self.repeat))
- UPDATE_ACTIVE = '''UPDATE schedules
+ UPDATE_ACTIVE = '''UPDATE schedules_meta AS sm
SET
active = 1
- WHERE
- path = ?;'''
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = sm.schedule_id
+ AND s.path = ?
+ AND sm.start = ?
+ AND sm.repeat = ?);'''
def set_active(self, db):
with db:
- log.debug(f'Activating schedule on path {self.path}')
- db.execute(self.UPDATE_ACTIVE, (self.path,))
+ log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}')
+ db.execute(self.UPDATE_ACTIVE, (self.path,
+ self.start.isoformat(),
+ self.repeat))
- UPDATE_PRUNED = '''UPDATE schedules_meta
+ UPDATE_PRUNED = '''UPDATE schedules_meta AS sm
SET
last_pruned = ?,
pruned_count = pruned_count + ?
- WHERE
- start = ? AND
- repeat = ?;'''
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = sm.schedule_id
+ AND s.path = ?
+ AND sm.start = ?
+ AND sm.repeat = ?);'''
def update_pruned(self, time, db, pruned):
with db:
db.execute(self.UPDATE_PRUNED, (time.isoformat(), pruned,
+ self.path,
self.start.isoformat(),
self.repeat))
self.pruned_count += pruned
from threading import Timer
import sqlite3
from .schedule import Schedule
+import traceback
+MAX_SNAPS_PER_PATH = 50
SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
SNAP_DB_PREFIX = 'snap_db'
# increment this every time the db schema changes and provide upgrade code
SNAP_DB_VERSION = '0'
SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
+SNAPSHOT_PREFIX = 'scheduled'
log = logging.getLogger(__name__)
size).decode('utf-8')
con.executescript(db)
except rados.ObjectNotFound:
- log.info(f'No schedule DB found in {fs}')
+ log.debug(f'No schedule DB found in {fs}, creating one.')
con.executescript(Schedule.CREATE_TABLES)
return self.sqlite_connections[fs]
timer.cancel()
timers = []
for row in rows:
- log.debug(f'adding timer for {row}')
- log.debug(f'Creating new snapshot timer')
+ log.debug(f'Creating new snapshot timer for {path}')
t = Timer(row[1],
self.create_scheduled_snapshot,
args=[fs, path, row[0], row[2], row[3]])
t.start()
timers.append(t)
- log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[1]}s')
+ log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
self.active_timers[(fs, path)] = timers
- except Exception as e:
- log.error(f'refresh raised {e}')
+ except Exception:
+ self._log_exception('refresh_snap_timers')
+
+ def _log_exception(self, fct):
+ log.error(f'{fct} raised an exception:')
+ log.error(traceback.format_exc())
def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
log.debug(f'Scheduled snapshot of {path} triggered')
try:
db = self.get_schedule_db(fs_name)
- sched = Schedule.get_db_schedule(db, fs_name, path, start, repeat)
+ sched = Schedule.get_db_schedules(path,
+ db,
+ fs_name,
+ repeat,
+ start)[0]
time = datetime.now(timezone.utc)
with open_filesystem(self, fs_name) as fs_handle:
- fs_handle.mkdir(f'{path}/.snap/scheduled-{time.strftime(SNAPSHOT_TS_FORMAT)}', 0o755)
+ snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
+ snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+ fs_handle.mkdir(snap_name, 0o755)
log.info(f'created scheduled snapshot of {path}')
- # TODO change last snap timestamp in db, maybe first
sched.update_last(time, db)
- except cephfs.Error as e:
- log.info(f'scheduled snapshot creating of {path} failed: {e}')
- # TODO set inactive if path doesn't exist
+ except cephfs.Error:
+ self._log_exception('create_scheduled_snapshot')
sched.set_inactive(db)
- except Exception as e:
+ except Exception:
# catch all exceptions cause otherwise we'll never know since this
# is running in a thread
- log.error(f'ERROR create_scheduled_snapshot raised{e}')
+ self._log_exception('create_scheduled_snapshot')
finally:
self.refresh_snap_timers(fs_name, path)
self.prune_snapshots(sched)
log.debug('Pruning snapshots')
ret = parse_retention(sched.retention)
path = sched.path
- if not ret:
- # TODO prune if too many (300?)
- log.debug(f'schedule on {path} has no retention specified')
- return
+ if not ret or "n" not in ret or ret["n"] > MAX_SNAPS_PER_PATH:
+ log.debug(f'Adding n: { MAX_SNAPS_PER_PATH} limit to {path}')
+ ret["n"] = MAX_SNAPS_PER_PATH
prune_candidates = set()
time = datetime.now(timezone.utc)
with open_filesystem(self, sched.fs) as fs_handle:
with fs_handle.opendir(f'{path}/.snap') as d_handle:
dir_ = fs_handle.readdir(d_handle)
while dir_:
- if dir_.d_name.startswith(b'scheduled-'):
+ if dir_.d_name.startswith(b'{SNAPSHOT_PREFIX}-'):
log.debug(f'add {dir_.d_name} to pruning')
ts = datetime.strptime(
- dir_.d_name.lstrip(b'scheduled-').decode('utf-8'),
+ dir_.d_name.lstrip(b'{SNAPSHOT_PREFIX}-').decode('utf-8'),
SNAPSHOT_TS_FORMAT)
prune_candidates.add((dir_, ts))
else:
if to_prune:
sched.update_pruned(time, self.get_schedule_db(sched.fs),
len(to_prune))
- except Exception as e:
- log.debug(f'prune_snapshots threw {e}')
+ except Exception:
+ self._log_exception('prune_snapshots')
def get_prune_set(self, candidates, retention):
PRUNING_PATTERNS = OrderedDict([
- #TODO remove M for release
+ # n is for keep last n snapshots, uses the snapshot name timestamp
+ # format for lowest granularity
+ ("n", SNAPSHOT_TS_FORMAT)
+ # TODO remove M for release
("M", '%Y-%m-%d-%H_%M'),
("h", '%Y-%m-%d-%H'),
("d", '%Y-%m-%d'),
("y", '%Y'),
])
keep = set()
- log.debug(retention)
for period, date_pattern in PRUNING_PATTERNS.items():
period_count = retention.get(period, 0)
if not period_count:
- log.debug(f'skipping period {period}')
continue
last = None
for snap in sorted(candidates, key=lambda x: x[0].d_name,
reverse=True):
snap_ts = snap[1].strftime(date_pattern)
- log.debug(f'{snap_ts} : {last}')
if snap_ts != last:
last = snap_ts
if snap not in keep:
if len(keep) == period_count:
log.debug(f'found enough snapshots for {period_count}{period}')
break
- # TODO maybe do candidates - keep here? we want snaps counting it
- # hours not be considered for days and it cuts down on iterations
return candidates - keep
def get_snap_schedules(self, fs, path):
@updates_schedule_db
def activate_snap_schedule(self, fs, path, repeat, start):
db = self.get_schedule_db(fs)
- schedules = Schedule.get_db_schedules(path, db, fs)
+ schedules = Schedule.get_db_schedules(path, db, fs, repeat, start)
[s.set_active(db) for s in schedules]
@updates_schedule_db
def deactivate_snap_schedule(self, fs, path, repeat, start):
db = self.get_schedule_db(fs)
- schedules = Schedule.get_db_schedules(path, db, fs)
+ schedules = Schedule.get_db_schedules(path, db, fs, repeat, start)
[s.set_inactive(db) for s in schedules]
LGPL2.1. See file COPYING.
"""
import errno
-import json
import sqlite3
from .fs.schedule_client import SnapSchedClient
from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand
@CLIReadCommand('fs snap-schedule status',
'name=path,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
- 'name=fs,type=CephString,req=false',
+ 'name=fs,type=CephString,req=false '
+ 'name=format,type=CephString,req=false',
'List current snapshot schedules')
- def snap_schedule_get(self, path='/', subvol=None, fs=None):
+ def snap_schedule_get(self, path='/', subvol=None, fs=None, format='plain'):
use_fs = fs if fs else self.default_fs
try:
ret_scheds = self.client.get_snap_schedules(use_fs, path)
except CephfsConnectionException as e:
return e.to_tuple()
+ if format == 'json':
+ json_report = ','.join([ret_sched.report_json() for ret_sched in ret_scheds])
+ return 0, f'{json_report}', ''
return 0, '\n===\n'.join([ret_sched.report() for ret_sched in ret_scheds]), ''
@CLIReadCommand('fs snap-schedule list',
'name=path,type=CephString '
'name=recursive,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
- 'name=fs,type=CephString,req=false',
+ 'name=fs,type=CephString,req=false '
+ 'name=format,type=CephString,req=false',
'Get current snapshot schedule for <path>')
- def snap_schedule_list(self, path, subvol=None, recursive=False, fs=None):
+ def snap_schedule_list(self, path, subvol=None, recursive=False, fs=None,
+ format='plain'):
try:
use_fs = fs if fs else self.default_fs
scheds = self.client.list_snap_schedules(use_fs, path, recursive)
return e.to_tuple()
if not scheds:
return errno.ENOENT, '', f'SnapSchedule for {path} not found'
+ if format == 'json':
+ json_list = ','.join([sched.json_list() for sched in scheds])
+ return 0, f'[{json_list}]', ''
return 0, '\n'.join([str(sched) for sched in scheds]), ''
@CLIWriteCommand('fs snap-schedule add',