from datetime import datetime, timezone
import json
import logging
-from os import environ
import re
import sqlite3
from typing import cast, Any, Dict, List, Tuple, Optional, Union
RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y']
+TableRowT = Dict[str, Union[int, str]]
+
def dump_retention(retention: Dict[str, str]) -> str:
ret = ''
self.active = bool(active)
@classmethod
- def _from_db_row(cls, table_row: Dict[str, Union[int, str]], fs: str) -> 'Schedule':
+ def _from_db_row(cls, table_row: TableRowT, fs: str) -> 'Schedule':
return cls(cast(str, table_row['path']),
cast(str, table_row['schedule']),
fs,
cast(int, table_row['created_count']),
cast(int, table_row['pruned_count']),
cast(bool, table_row['active']),
- )
+ )
def __str__(self) -> str:
- return f'''{self.path} {self.schedule} {dump_retention(self.retention)}'''
+ return f'{self.path} {self.schedule} {dump_retention(self.retention)}'
def json_list(self) -> str:
return json.dumps({'path': self.path, 'schedule': self.schedule,
sched_id = c.lastrowid
except sqlite3.IntegrityError:
# might be adding another schedule, retrieve sched id
- log.debug(f'found schedule entry for {self.path}, trying to add meta')
+ log.debug((f'found schedule entry for {self.path}, '
+ 'trying to add meta'))
c = db.execute('SELECT id FROM schedules where path = ?',
(self.path,))
sched_id = c.fetchone()[0]
id_ = tuple(row)
if repeat or start:
- meta_delete = 'DELETE FROM schedules_meta WHERE schedule_id = ?'
+ meta_delete = ('DELETE FROM schedules_meta '
+ 'WHERE schedule_id = ?')
delete_param = id_
if repeat:
meta_delete += ' AND schedule = ?'
id_)
if meta_count.fetchone() == (0,):
log.debug(
- f'no more schedules left, cleaning up schedules table')
+ 'no more schedules left, cleaning up schedules table')
db.execute('DELETE FROM schedules WHERE id = ?;', id_)
else:
# just delete the schedule CASCADE DELETE takes care of the
WHERE path = ?'''
@classmethod
- def add_retention(cls, db: sqlite3.Connection, path: str, retention_spec: str) -> None:
+ def add_retention(cls,
+ db: sqlite3.Connection,
+ path: str,
+ retention_spec: str) -> None:
with db:
row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
if not row:
current_retention = json.loads(current)
for r, v in retention.items():
if r in current_retention:
- raise ValueError((f'Retention for {r} is already present '
- 'with value {current_retention[r]}. Please remove first'))
+ msg = (f'Retention for {r} is already present with value'
+ f'{current_retention[r]}. Please remove first')
+ raise ValueError(msg)
current_retention.update(retention)
- db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
+ db.execute(cls.UPDATE_RETENTION,
+ (json.dumps(current_retention), path))
@classmethod
- def rm_retention(cls, db: sqlite3.Connection, path: str, retention_spec: str) -> None:
+ def rm_retention(cls,
+ db: sqlite3.Connection,
+ path: str,
+ retention_spec: str) -> None:
with db:
row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
if not row:
current_retention = json.loads(current)
for r, v in retention.items():
if r not in current_retention or current_retention[r] != v:
- raise ValueError((f'Retention for {r}: {v} was not set for {path} '
- 'can\'t remove'))
+ msg = (f'Retention for {r}: {v} was not set for {path} '
+ 'can\'t remove')
+ raise ValueError(msg)
current_retention.pop(r)
- db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
+ db.execute(cls.UPDATE_RETENTION,
+ (json.dumps(current_retention), path))
def report(self) -> str:
return self.report_json()
def update_last(self, time: datetime, db: sqlite3.Connection) -> None:
with db:
- db.execute(self.UPDATE_LAST, (time.strftime(SNAP_DB_TS_FORMAT),
- time.strftime(SNAP_DB_TS_FORMAT),
- self.path,
- self.start.strftime(SNAP_DB_TS_FORMAT),
- self.repeat))
+ db.execute(self.UPDATE_LAST,
+ (time.strftime(SNAP_DB_TS_FORMAT),
+ time.strftime(SNAP_DB_TS_FORMAT),
+ self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
self.created_count += 1
self.last = time
if not self.first:
def set_inactive(self, db: sqlite3.Connection) -> None:
with db:
- log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}')
- db.execute(self.UPDATE_INACTIVE, (self.path,
- self.start.strftime(SNAP_DB_TS_FORMAT),
- self.repeat))
+ log.debug((f'Deactivating schedule ({self.repeat}, '
+ f'{self.start}) on path {self.path}'))
+ db.execute(self.UPDATE_INACTIVE,
+ (self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
self.active = False
UPDATE_ACTIVE = '''UPDATE schedules_meta
def set_active(self, db: sqlite3.Connection) -> None:
with db:
- log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}')
- db.execute(self.UPDATE_ACTIVE, (self.path,
- self.start.strftime(SNAP_DB_TS_FORMAT),
- self.repeat))
+ log.debug(f'Activating schedule ({self.repeat}, {self.start}) '
+ f'on path {self.path}')
+ db.execute(self.UPDATE_ACTIVE,
+ (self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
self.active = True
UPDATE_PRUNED = '''UPDATE schedules_meta
db: sqlite3.Connection,
pruned: int) -> None:
with db:
- db.execute(self.UPDATE_PRUNED, (time.strftime(SNAP_DB_TS_FORMAT), pruned,
- self.path,
- self.start.strftime(SNAP_DB_TS_FORMAT),
- self.repeat))
+ db.execute(self.UPDATE_PRUNED,
+ (time.strftime(SNAP_DB_TS_FORMAT), pruned,
+ self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
self.pruned_count += pruned
self.last_pruned = time
from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
Tuple, TypeVar, Union
import sqlite3
-from .schedule import Schedule, parse_retention
+from .schedule import Schedule
import traceback
CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
+
@contextmanager
-def open_ioctx(self: CephfsClientT, pool: Union[int, str]) -> Iterator[rados.Ioctx]:
+def open_ioctx(self: CephfsClientT,
+ pool: Union[int, str]) -> Iterator[rados.Ioctx]:
try:
if type(pool) is int:
with self.mgr.rados.open_ioctx2(pool) as ioctx:
FuncT = TypeVar('FuncT', bound=Callable[..., None])
+
def updates_schedule_db(func: FuncT) -> FuncT:
def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
ret = func(self, fs, schedule_or_path, *args)
if snap_ts != last:
last = snap_ts
if snap not in keep:
- log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
+ log.debug((f'keeping {snap[0].d_name} due to '
+ f'{period_count}{period}'))
keep.append(snap)
if len(keep) == period_count:
- log.debug(f'found enough snapshots for {period_count}{period}')
+ log.debug(('found enough snapshots for '
+ f'{period_count}{period}'))
break
if len(keep) > MAX_SNAPS_PER_PATH:
- log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set')
+ log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
+ 'pruning keep set'))
keep = keep[:MAX_SNAPS_PER_PATH]
return candidates - set(keep)
size).decode('utf-8')
con.executescript(db)
except rados.ObjectNotFound:
- log.debug(f'No schedule DB found in {fs}, creating one.')
+ log.debug((f'No schedule DB found in {fs}, '
+ 'creating one.'))
con.executescript(Schedule.CREATE_TABLES)
return self.sqlite_connections[fs]
def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
if self.allow_minute_snaps:
- log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}')
+ log.debug(('Minute repeats allowed, '
+ f'scheduling snapshot on path {path}'))
return True
else:
- log.info(f'Minute repeats disabled, skipping snapshot on path {path}')
+ log.info(('Minute repeats disabled, '
+ f'skipping snapshot on path {path}'))
return False
else:
return True
-
def refresh_snap_timers(self, fs: str, path: str) -> None:
try:
- log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer')
+ log.debug((f'SnapDB on {fs} changed for {path}, '
+ 'updating next Timer'))
db = self.get_schedule_db(fs)
rows = []
with db:
cur = db.execute(Schedule.EXEC_QUERY, (path,))
all_rows = cur.fetchall()
- rows = [r for r in all_rows if self._is_allowed_repeat(r, path)][0:1]
+ rows = [r for r in all_rows
+ if self._is_allowed_repeat(r, path)][0:1]
timers = self.active_timers.get((fs, path), [])
for timer in timers:
timer.cancel()
log.error(traceback.format_exc())
def create_scheduled_snapshot(self,
- fs_name: str, path: str,
- retention: str, start: str, repeat: str) -> None:
+ fs_name: str,
+ path: str,
+ retention: str,
+ start: str,
+ repeat: str) -> None:
log.debug(f'Scheduled snapshot of {path} triggered')
try:
db = self.get_schedule_db(fs_name)
db = self.get_schedule_db(fs)
return Schedule.get_db_schedules(path, db, fs)
- def list_snap_schedules(self, fs: str, path: str, recursive: bool) -> List[Schedule]:
+ def list_snap_schedules(self,
+ fs: str,
+ path: str,
+ recursive: bool) -> List[Schedule]:
db = self.get_schedule_db(fs)
return Schedule.list_schedules(path, db, fs, recursive)