import errno
import rados
from contextlib import contextmanager
-from mgr_util import CephfsClient, CephfsConnectionException, connection_pool_wrap
-from datetime import datetime, timedelta
+import re
+from mgr_util import CephfsClient, CephfsConnectionException, \
+ open_filesystem
+from collections import OrderedDict
+from datetime import datetime
+import logging
+from operator import attrgetter, itemgetter
from threading import Timer
import sqlite3
SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
SNAP_DB_OBJECT_NAME = 'snap_db'
+TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
+
+log = logging.getLogger(__name__)
@contextmanager
ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
yield ioctx
except rados.ObjectNotFound:
- self.log.error("Failed to locate pool {}".format(pool))
+ log.error("Failed to locate pool {}".format(pool))
raise
self.rel_path = rel_path
self.schedule = schedule
self.retention = retention_policy
+ self._ret = {}
self.first_run = start
self.last_run = None
raise Exception('schedule multiplier not recognized')
+def parse_retention(retention):
+ ret = {}
+ matches = re.findall(r'\d+[a-z]', retention)
+ for m in matches:
+ ret[m[-1]] = int(m[0:-1])
+ matches = re.findall(r'\d+[A-Z]', retention)
+ for m in matches:
+ ret[m[-1]] = int(m[0:-1])
+ return ret
+
+
class SnapSchedClient(CephfsClient):
CREATE_TABLES = '''CREATE TABLE schedules(
id integer PRIMARY KEY ASC,
path text NOT NULL UNIQUE,
- schedule text NOT NULL,
subvol text,
rel_path text NOT NULL,
active int NOT NULL
schedule_id int,
start bigint NOT NULL,
repeat bigint NOT NULL,
+ schedule text NOT NULL,
retention text,
- FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE
+ FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
+ UNIQUE (start, repeat)
);'''
def __init__(self, mgr):
ORDER BY until;'''
def refresh_snap_timers(self, fs):
- self.log.debug(f'SnapDB on {fs} changed, updating next Timer')
- db = self.get_schedule_db(fs)
- rows = []
- with db:
- cur = db.execute(self.EXEC_QUERY)
- rows = cur.fetchmany(1)
- timers = self.active_timers.get(fs, [])
- for timer in timers:
- timer.cancel()
- timers = []
- for row in rows:
- self.log.debug(f'Creating new snapshot timer')
- t = Timer(row[2],
- self.create_scheduled_snapshot,
- args=[fs, row[0], row[1]])
- t.start()
- timers.append(t)
- self.log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s')
- self.active_timers[fs] = timers
+ try:
+ log.debug(f'SnapDB on {fs} changed, updating next Timer')
+ db = self.get_schedule_db(fs)
+ rows = []
+ with db:
+ cur = db.execute(self.EXEC_QUERY)
+ rows = cur.fetchmany(1)
+ log.debug(f'retrieved {cur.rowcount} rows')
+ timers = self.active_timers.get(fs, [])
+ for timer in timers:
+ timer.cancel()
+ timers = []
+ for row in rows:
+ log.debug(f'adding timer for {row}')
+ log.debug(f'Creating new snapshot timer')
+ t = Timer(row[2],
+ self.create_scheduled_snapshot,
+ args=[fs, row[0], row[1]])
+ t.start()
+ timers.append(t)
+ log.debug(f'Will snapshot {row[0]} in fs {fs} in {row[2]}s')
+ self.active_timers[fs] = timers
+ except Exception as e:
+ log.error(f'refresh raised {e}')
def get_schedule_db(self, fs):
if fs not in self.sqlite_connections:
- self.sqlite_connections[fs] = sqlite3.connect(':memory:')
+ self.sqlite_connections[fs] = sqlite3.connect(':memory:',
+ check_same_thread=False)
with self.sqlite_connections[fs] as con:
con.execute("PRAGMA FOREIGN_KEYS = 1")
pool = self.get_metadata_pool(fs)
with open_ioctx(self, pool) as ioctx:
try:
- size, _mtime = ioctx.stat('SNAP_DB_OBJECT_NAME')
- db = ioctx.read('SNAP_DB_OBJECT_NAME', size).decode('utf-8')
+ size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
+ db = ioctx.read(SNAP_DB_OBJECT_NAME,
+ size).decode('utf-8')
con.executescript(db)
except rados.ObjectNotFound:
- self.log.info(f'No schedule DB found in {fs}')
+ log.info(f'No schedule DB found in {fs}')
con.executescript(self.CREATE_TABLES)
return self.sqlite_connections[fs]
-errno.ENOENT, "Filesystem {} does not exist".format(fs))
if fs in self.sqlite_connections:
db_content = []
- # TODO maybe do this in a transaction?
db = self.sqlite_connections[fs]
with db:
for row in db.iterdump():
db_content.append(row)
with open_ioctx(self, metadata_pool) as ioctx:
- ioctx.write_full("SNAP_DB_OBJECT_NAME",
+ ioctx.write_full(SNAP_DB_OBJECT_NAME,
'\n'.join(db_content).encode('utf-8'))
- @connection_pool_wrap
- def create_scheduled_snapshot(self, fs_info, path, retention):
- self.log.debug(f'Scheduled snapshot of {path} triggered')
+ def create_scheduled_snapshot(self, fs_name, path, retention):
+ log.debug(f'Scheduled snapshot of {path} triggered')
try:
- time = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H_%M_%S')
- fs_info[1].mkdir(f'{path}/.snap/scheduled-{time}', 0o755)
- self.log.info(f'created scheduled snapshot of {path}')
+ time = datetime.utcnow().strftime(TS_FORMAT)
+ with open_filesystem(self, fs_name) as fs_handle:
+ fs_handle.mkdir(f'{path}/.snap/scheduled-{time}', 0o755)
+ log.info(f'created scheduled snapshot of {path}')
except cephfs.Error as e:
- self.log.info(f'scheduled snapshot creating of {path} failed')
- raise CephfsConnectionException(-e.args[0], e.args[1])
+ log.info(f'scheduled snapshot creating of {path} failed: {e}')
+ except Exception as e:
+ # catch all exceptions cause otherwise we'll never know since this
+ # is running in a thread
+ log.error(f'ERROR create_scheduled_snapshot raised{e}')
finally:
- self.refresh_snap_timers(fs_info[0])
+ log.info(f'finally branch')
+ self.refresh_snap_timers(fs_name)
+ log.info(f'calling prune')
+ self.prune_snapshots(fs_name, path, retention)
+
+ def prune_snapshots(self, fs_name, path, retention):
+ log.debug('Pruning snapshots')
+ ret = parse_retention(retention)
+ try:
+ prune_candidates = set()
+ with open_filesystem(self, fs_name) as fs_handle:
+ with fs_handle.opendir(f'{path}/.snap') as d_handle:
+ dir_ = fs_handle.readdir(d_handle)
+ while dir_:
+ if dir_.d_name.startswith(b'scheduled-'):
+ log.debug(f'add {dir_.d_name} to pruning')
+ ts = datetime.strptime(
+ dir_.d_name.lstrip(b'scheduled-').decode('utf-8'),
+ TS_FORMAT)
+ prune_candidates.add((dir_, ts))
+ else:
+ log.debug(f'skipping dir entry {dir_.d_name}')
+ dir_ = fs_handle.readdir(d_handle)
+ to_keep = self.get_prune_set(prune_candidates, ret)
+ for k in prune_candidates - to_keep:
+ dirname = k[0].d_name.decode('utf-8')
+ log.debug(f'rmdir on {dirname}')
+ fs_handle.rmdir(f'{path}/.snap/{dirname}')
+ log.debug(f'keeping {to_keep}')
+ except Exception as e:
+ log.debug(f'prune_snapshots threw {e}')
# TODO: handle snap pruning accoring to retention
- @connection_pool_wrap
- def validate_schedule(self, fs_handle, sched):
+ def get_prune_set(self, candidates, retention):
+ PRUNING_PATTERNS = OrderedDict([
+ #TODO remove M for release
+ ("M", '%Y-%m-%d-%H_%M'),
+ ("h", '%Y-%m-%d-%H'),
+ ("d", '%Y-%m-%d'),
+ ("w", '%G-%V'),
+ ("m", '%Y-%m'),
+ ("y", '%Y'),
+ ])
+ keep = set()
+ log.debug(retention)
+ for period, date_pattern in PRUNING_PATTERNS.items():
+ period_count = retention.get(period, 0)
+ if not period_count:
+ log.debug(f'skipping period {period}')
+ continue
+ last = None
+ for snap in sorted(candidates, key=lambda x: x[0].d_name,
+ reverse=True):
+ snap_ts = snap[1].strftime(date_pattern)
+ log.debug(f'{snap_ts} : {last}')
+ if snap_ts != last:
+ last = snap_ts
+ if snap not in keep:
+ log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
+ keep.add(snap)
+ if len(keep) == period_count:
+ log.debug(f'found enough snapshots for {period_count}{period}')
+ break
+ # TODO maybe do candidates - keep here? we want snaps counting it
+ # hours not be considered for days and it cuts down on iterations
+ return keep
+
+ def validate_schedule(self, fs_name, sched):
try:
- fs_handle.stat(sched.path)
+ with open_filesystem(self, fs_name) as fs_handle:
+ fs_handle.stat(sched.path)
except cephfs.ObjectNotFound:
- self.log.error('Path {} not found'.format(sched.path))
+ log.error('Path {} not found'.format(sched.path))
return False
return True
- def list_snap_schedule(self, fs, path):
+ LIST_SCHEDULES = '''SELECT
+ s.path, sm.schedule, sm.retention, sm.start, s.subvol, s.rel_path
+ FROM schedules s
+ INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+ WHERE s.path = ?'''
+
+ def list_snap_schedules(self, fs, path):
db = self.get_schedule_db(fs)
+ c = db.execute(self.LIST_SCHEDULES, (path,))
+ return [Schedule.from_db_row(row, fs) for row in c.fetchall()]
+
+ def dump_snap_schedule(self, fs, path):
+ db = self.get_schedule_db(fs)
+ # TODO retrieve multiple schedules per path from schedule_meta
# with db:
- scheds = []
- for row in db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?',
- (f'{path}%',)):
- scheds.append(row)
- return scheds
+ c = db.execute('SELECT * FROM SCHEDULES WHERE path LIKE ?',
+ (f'{path}%',))
+ return [row for row in c.fetchall()]
+# TODO currently not used, probably broken
UPDATE_SCHEDULE = '''UPDATE schedules
SET
schedule = ?,
SET
start = ?,
repeat = ?,
+ schedule = ?
retention = ?
WHERE schedule_id = (
SELECT id FROM SCHEDULES WHERE path = ?);'''
sched.path))
INSERT_SCHEDULE = '''INSERT INTO
- schedules(path, schedule, subvol, rel_path, active)
- Values(?, ?, ?, ?, ?);'''
+ schedules(path, subvol, rel_path, active)
+ Values(?, ?, ?, ?);'''
INSERT_SCHEDULE_META = '''INSERT INTO
- schedules_meta(schedule_id, start, repeat, retention)
- Values(last_insert_rowid(), ?, ?, ?)'''
+ schedules_meta(schedule_id, start, repeat, schedule, retention)
+ Values(last_insert_rowid(), ?, ?, ?, ?)'''
@updates_schedule_db
def store_snap_schedule(self, fs, sched):
db = self.get_schedule_db(fs)
with db:
- db.execute(self.INSERT_SCHEDULE,
- (sched.path,
- sched.schedule,
- sched.subvol,
- sched.rel_path,
- 1))
+ try:
+ db.execute(self.INSERT_SCHEDULE,
+ (sched.path,
+ sched.subvol,
+ sched.rel_path,
+ 1))
+ except sqlite3.IntegrityError:
+ # might be adding another schedule
+ pass
db.execute(self.INSERT_SCHEDULE_META,
(f'strftime("%s", "{sched.first_run}")',
sched.repeat_in_s(),
+ sched.schedule,
sched.retention))
self.store_schedule_db(sched.fs)
- GET_SCHEULE = '''SELECT
- s.path, s.schedule, sm.retention, sm.start, s.subvol, s.rel_path
- FROM schedules s
- INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
- WHERE s.path = ?'''
-
- def get_snap_schedule(self, fs, path):
- db = self.get_schedule_db(fs)
- c = db.execute(self.GET_SCHEDULE, (path,))
- return Schedule.from_db_row(c.fetchone(), fs)
-
@updates_schedule_db
def rm_snap_schedule(self, fs, path):
db = self.get_schedule_db(fs)
cur = db.execute('SELECT id FROM SCHEDULES WHERE path = ?',
(path,))
id_ = cur.fetchone()
- db.execute('DELETE FROM schedules WHERE id = ?;', (id_,))
+ # TODO check for repeat-start pair and delete that if present, all
+ # otherwise. If repeat-start was speced, delete only those
+ db.execute('DELETE FROM schedules WHERE id = ?;', id_)
self._initialized.wait()
return -errno.EINVAL, "", "Unknown command"
- @CLIReadCommand('fs snap-schedule ls',
+ @CLIReadCommand('fs snap-schedule dump',
'name=path,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
'name=fs,type=CephString,req=false',
'List current snapshot schedules')
- def snap_schedule_ls(self, path='/', subvol=None, fs=None):
+ def snap_schedule_dump(self, path='/', subvol=None, fs=None):
use_fs = fs if fs else self.default_fs
try:
- ret_scheds = self.client.list_snap_schedule(use_fs, path)
+ ret_scheds = self.client.dump_snap_schedule(use_fs, path)
except CephfsConnectionException as e:
return e.to_tuple()
return 0, ' '.join(str(ret_scheds)), ''
- @CLIReadCommand('fs snap-schedule get',
+ @CLIReadCommand('fs snap-schedule list',
'name=path,type=CephString '
'name=subvol,type=CephString,req=false '
'name=fs,type=CephString,req=false',
'Get current snapshot schedule for <path>')
- def snap_schedule_get(self, path, subvol=None, fs=None):
+ def snap_schedule_list(self, path, subvol=None, fs=None):
try:
use_fs = fs if fs else self.default_fs
- sched = self.client.get_snap_schedule(use_fs, path)
+ scheds = self.client.list_snap_schedules(use_fs, path)
except CephfsConnectionException as e:
return e.to_tuple()
- if not sched:
+ if not scheds:
return -1, '', f'SnapSchedule for {path} not found'
- return 0, str(sched), ''
+ return 0, str([str(sched) for sched in scheds]), ''
@CLIWriteCommand('fs snap-schedule set',
'name=path,type=CephString '
path,
snap_schedule,
retention_policy='',
- start='now',
+ start='00:00',
fs=None,
subvol=None):
try:
self.client.store_snap_schedule(use_fs, sched)
suc_msg = f'Schedule set for path {path}'
except sqlite3.IntegrityError:
- existing_sched = self.client.get_snap_schedule(use_fs, path)
- self.log.info(f'Found existing schedule {existing_sched}...updating')
- self.client.update_snap_schedule(use_fs, sched)
- suc_msg = f'Schedule set for path {path}, updated existing schedule {existing_sched}'
+ existing_sched = self.client.list_snap_schedule(use_fs, path)
+ error_msg = f'Found existing schedule {existing_sched}'
+ self.log.error(error_msg)
+ return 1, '', error_msg
except CephfsConnectionException as e:
return e.to_tuple()
return 0, suc_msg, ''
- @CLIWriteCommand('fs snap-schedule rm',
+ @CLIWriteCommand('fs snap-schedule remove',
'name=path,type=CephString '
'name=subvol,type=CephString,req=false '
'name=fs,type=CephString,req=false',