[mypy-orchestrator.*]
disallow_untyped_defs = True
+[mypy-snap_schedule.*]
+disallow_untyped_defs = True
+
[mypy-status.*]
disallow_untyped_defs = True
from os import environ
import re
import sqlite3
-from typing import Tuple, Any
+from typing import cast, Any, Dict, List, Tuple, Optional, Union
log = logging.getLogger(__name__)
log.info(('Couldn\'t find datetime.fromisoformat, falling back to '
f'static timestamp parsing ({SNAP_DB_TS_FORMAT}'))
- def ts_parser(ts):
+ def ts_parser(data_string: str) -> datetime: # type: ignore
try:
- date = datetime.strptime(ts, SNAP_DB_TS_FORMAT)
+ date = datetime.strptime(data_string, SNAP_DB_TS_FORMAT)
return date
except ValueError:
- msg = f'''The date string {ts} does not match the required format
+ msg = f'''The date string {data_string} does not match the required format
{SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to
python3.7 or install
https://github.com/movermeyer/backports.datetime_fromisoformat'''
raise ValueError(msg)
-def parse_timestamp(ts):
+def parse_timestamp(ts: str) -> datetime:
date = ts_parser(ts)
# normalize any non utc timezone to utc. If no tzinfo is supplied, assume
# its already utc
return date
-def parse_retention(retention):
+def parse_retention(retention: str) -> Dict[str, int]:
ret = {}
log.debug(f'parse_retention({retention})')
matches = re.findall(r'\d+[a-z]', retention)
RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y']
-def dump_retention(retention):
+def dump_retention(retention: Dict[str, str]) -> str:
ret = ''
for mult in RETENTION_MULTIPLIERS:
if mult in retention:
Wrapper to work with schedules stored in sqlite
'''
def __init__(self,
- path,
- schedule,
- fs_name,
- rel_path,
- start=None,
- subvol=None,
- retention_policy='{}',
- created=None,
- first=None,
- last=None,
- last_pruned=None,
- created_count=0,
- pruned_count=0,
- active=True,
- ):
+ path: str,
+ schedule: str,
+ fs_name: str,
+ rel_path: str,
+ start: Optional[str] = None,
+ subvol: Optional[str] = None,
+ retention_policy: str = '{}',
+ created: Optional[str] = None,
+ first: Optional[str] = None,
+ last: Optional[str] = None,
+ last_pruned: Optional[str] = None,
+ created_count: int = 0,
+ pruned_count: int = 0,
+ active: bool = True,
+ ) -> None:
self.fs = fs_name
self.subvol = subvol
self.path = path
else:
self.start = parse_timestamp(start)
if created is None:
- self.created = datetime.now(timezone.utc)
+ self.created: Optional[datetime] = datetime.now(timezone.utc)
else:
self.created = parse_timestamp(created)
if first:
- self.first = parse_timestamp(first)
+ self.first: Optional[datetime] = parse_timestamp(first)
else:
- self.first = first
+ self.first = None
if last:
- self.last = parse_timestamp(last)
+ self.last: Optional[datetime] = parse_timestamp(last)
else:
- self.last = last
+ self.last = None
if last_pruned:
- self.last_pruned = parse_timestamp(last_pruned)
+ self.last_pruned: Optional[datetime] = parse_timestamp(last_pruned)
else:
- self.last_pruned = last_pruned
+ self.last_pruned = None
self.created_count = created_count
self.pruned_count = pruned_count
self.active = bool(active)
@classmethod
- def _from_db_row(cls, table_row, fs):
- return cls(table_row['path'],
- table_row['schedule'],
+ def _from_db_row(cls, table_row: Dict[str, Union[int, str]], fs: str) -> 'Schedule':
+ return cls(cast(str, table_row['path']),
+ cast(str, table_row['schedule']),
fs,
- table_row['rel_path'],
- table_row['start'],
- table_row['subvol'],
- table_row['retention'],
- table_row['created'],
- table_row['first'],
- table_row['last'],
- table_row['last_pruned'],
- table_row['created_count'],
- table_row['pruned_count'],
- table_row['active'],
+ cast(str, table_row['rel_path']),
+ cast(str, table_row['start']),
+ cast(str, table_row['subvol']),
+ cast(str, table_row['retention']),
+ cast(str, table_row['created']),
+ cast(str, table_row['first']),
+ cast(str, table_row['last']),
+ cast(str, table_row['last_pruned']),
+ cast(int, table_row['created_count']),
+ cast(int, table_row['pruned_count']),
+ cast(bool, table_row['active']),
)
- def __str__(self):
+ def __str__(self) -> str:
return f'''{self.path} {self.schedule} {dump_retention(self.retention)}'''
- def json_list(self):
+ def json_list(self) -> str:
return json.dumps({'path': self.path, 'schedule': self.schedule,
'retention': dump_retention(self.retention)})
GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'''
@classmethod
- def get_db_schedules(cls, path, db, fs,
- schedule=None,
- start=None,
- repeat=None):
+ def get_db_schedules(cls,
+ path: str,
+ db: sqlite3.Connection,
+ fs: str,
+ schedule: Optional[str] = None,
+ start: Optional[str] = None,
+ repeat: Optional[str] = None) -> List['Schedule']:
query = cls.GET_SCHEDULES
data: Tuple[Any, ...] = (path,)
if repeat:
return [cls._from_db_row(row, fs) for row in c.fetchall()]
@classmethod
- def list_schedules(cls, path, db, fs, recursive):
+ def list_schedules(cls,
+ path: str,
+ db: sqlite3.Connection,
+ fs: str, recursive: bool) -> List['Schedule']:
with db:
if recursive:
c = db.execute(cls.PROTO_GET_SCHEDULES + ' path LIKE ?',
active)
SELECT ?, ?, ?, ?, ?, ?'''
- def store_schedule(self, db):
+ def store_schedule(self, db: sqlite3.Connection) -> None:
sched_id = None
with db:
try:
(self.path,))
sched_id = c.fetchone()[0]
pass
+ assert self.created, "self.created should be set"
db.execute(self.INSERT_SCHEDULE_META,
(sched_id,
self.start.strftime(SNAP_DB_TS_FORMAT),
1))
@classmethod
- def rm_schedule(cls, db, path, repeat, start):
+ def rm_schedule(cls,
+ db: sqlite3.Connection,
+ path: str,
+ repeat: Optional[str],
+ start: Optional[str]) -> None:
with db:
cur = db.execute('SELECT id FROM schedules WHERE path = ?',
(path,))
WHERE path = ?'''
@classmethod
- def add_retention(cls, db, path, retention_spec):
+ def add_retention(cls, db: sqlite3.Connection, path: str, retention_spec: str) -> None:
with db:
row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
if not row:
db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
@classmethod
- def rm_retention(cls, db, path, retention_spec):
+ def rm_retention(cls, db: sqlite3.Connection, path: str, retention_spec: str) -> None:
with db:
row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
if not row:
current_retention.pop(r)
db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path))
- def report(self):
+ def report(self) -> str:
return self.report_json()
- def report_json(self):
+ def report_json(self) -> str:
return json.dumps(dict(self.__dict__),
default=lambda o: o.strftime(SNAP_DB_TS_FORMAT))
@classmethod
- def parse_schedule(cls, schedule):
+ def parse_schedule(cls, schedule: str) -> Tuple[int, str]:
return int(schedule[0:-1]), schedule[-1]
@property
- def repeat(self):
+ def repeat(self) -> int:
period, mult = self.parse_schedule(self.schedule)
if mult == 'M':
return period * 60
AND schedules_meta.start = ?
AND schedules_meta.repeat = ?);'''
- def update_last(self, time, db):
+ def update_last(self, time: datetime, db: sqlite3.Connection) -> None:
with db:
db.execute(self.UPDATE_LAST, (time.strftime(SNAP_DB_TS_FORMAT),
time.strftime(SNAP_DB_TS_FORMAT),
AND schedules_meta.start = ?
AND schedules_meta.repeat = ?);'''
- def set_inactive(self, db):
+ def set_inactive(self, db: sqlite3.Connection) -> None:
with db:
log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}')
db.execute(self.UPDATE_INACTIVE, (self.path,
AND schedules_meta.start = ?
AND schedules_meta.repeat = ?);'''
- def set_active(self, db):
+ def set_active(self, db: sqlite3.Connection) -> None:
with db:
log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}')
db.execute(self.UPDATE_ACTIVE, (self.path,
AND schedules_meta.start = ?
AND schedules_meta.repeat = ?);'''
- def update_pruned(self, time, db, pruned):
+ def update_pruned(self,
+ time: datetime,
+ db: sqlite3.Connection,
+ pruned: int) -> None:
with db:
db.execute(self.UPDATE_PRUNED, (time.strftime(SNAP_DB_TS_FORMAT), pruned,
self.path,
from datetime import datetime, timezone
import logging
from threading import Timer
+from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
+ Tuple, TypeVar, Union
import sqlite3
from .schedule import Schedule, parse_retention
import traceback
log = logging.getLogger(__name__)
+CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
+
@contextmanager
-def open_ioctx(self, pool):
+def open_ioctx(self: CephfsClientT, pool: Union[int, str]) -> Iterator[rados.Ioctx]:
try:
if type(pool) is int:
with self.mgr.rados.open_ioctx2(pool) as ioctx:
raise
-def updates_schedule_db(func):
- def f(self, fs, schedule_or_path, *args):
- func(self, fs, schedule_or_path, *args)
+FuncT = TypeVar('FuncT', bound=Callable[..., None])
+
+def updates_schedule_db(func: FuncT) -> FuncT:
+ def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
+ ret = func(self, fs, schedule_or_path, *args)
path = schedule_or_path
if isinstance(schedule_or_path, Schedule):
path = schedule_or_path.path
self.refresh_snap_timers(fs, path)
- return f
+ return ret
+ return cast(FuncT, f)
-def get_prune_set(candidates, retention):
+def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
+ retention: Dict[str, int]) -> Set:
PRUNING_PATTERNS = OrderedDict([
# n is for keep last n snapshots, uses the snapshot name timestamp
# format for lowest granularity
class SnapSchedClient(CephfsClient):
- def __init__(self, mgr):
+ def __init__(self, mgr: Any) -> None:
super(SnapSchedClient, self).__init__(mgr)
# TODO maybe iterate over all fs instance in fsmap and load snap dbs?
- self.sqlite_connections = {}
- self.active_timers = {}
+ self.sqlite_connections: Dict[str, sqlite3.Connection] = {}
+ self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
@property
- def allow_minute_snaps(self):
+ def allow_minute_snaps(self) -> None:
return self.mgr.get_module_option('allow_m_granularity')
- def get_schedule_db(self, fs):
+ def get_schedule_db(self, fs: str) -> sqlite3.Connection:
if fs not in self.sqlite_connections:
self.sqlite_connections[fs] = sqlite3.connect(
':memory:',
con.row_factory = sqlite3.Row
con.execute("PRAGMA FOREIGN_KEYS = 1")
pool = self.get_metadata_pool(fs)
+ assert pool, f'fs "{fs}" not found'
with open_ioctx(self, pool) as ioctx:
try:
size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
con.executescript(Schedule.CREATE_TABLES)
return self.sqlite_connections[fs]
- def store_schedule_db(self, fs):
+ def store_schedule_db(self, fs: str) -> None:
# only store db is it exists, otherwise nothing to do
metadata_pool = self.get_metadata_pool(fs)
if not metadata_pool:
ioctx.write_full(SNAP_DB_OBJECT_NAME,
'\n'.join(db_content).encode('utf-8'))
- def _is_allowed_repeat(self, exec_row, path):
+ def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
if self.allow_minute_snaps:
log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}')
return True
- def refresh_snap_timers(self, fs, path):
+ def refresh_snap_timers(self, fs: str, path: str) -> None:
try:
log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer')
db = self.get_schedule_db(fs)
except Exception:
self._log_exception('refresh_snap_timers')
- def _log_exception(self, fct):
+ def _log_exception(self, fct: str) -> None:
log.error(f'{fct} raised an exception:')
log.error(traceback.format_exc())
- def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
+ def create_scheduled_snapshot(self,
+ fs_name: str, path: str,
+ retention: str, start: str, repeat: str) -> None:
log.debug(f'Scheduled snapshot of {path} triggered')
try:
db = self.get_schedule_db(fs_name)
self.refresh_snap_timers(fs_name, path)
self.prune_snapshots(sched)
- def prune_snapshots(self, sched):
+ def prune_snapshots(self, sched: Schedule) -> None:
try:
log.debug('Pruning snapshots')
ret = sched.retention
except Exception:
self._log_exception('prune_snapshots')
- def get_snap_schedules(self, fs, path):
+ def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
db = self.get_schedule_db(fs)
return Schedule.get_db_schedules(path, db, fs)
- def list_snap_schedules(self, fs, path, recursive):
+ def list_snap_schedules(self, fs: str, path: str, recursive: bool) -> List[Schedule]:
db = self.get_schedule_db(fs)
return Schedule.list_schedules(path, db, fs, recursive)
@updates_schedule_db
# TODO improve interface
- def store_snap_schedule(self, fs, path_, args):
+ def store_snap_schedule(self,
+ fs: str, path_: str,
+ args: Tuple[str, str, str, str,
+ Optional[str], Optional[str]]) -> None:
sched = Schedule(*args)
log.debug(f'repeat is {sched.repeat}')
if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
self.store_schedule_db(sched.fs)
@updates_schedule_db
- def rm_snap_schedule(self, fs, path, schedule, start):
+ def rm_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
db = self.get_schedule_db(fs)
Schedule.rm_schedule(db, path, schedule, start)
@updates_schedule_db
def add_retention_spec(self,
- fs,
- path,
- retention_spec_or_period,
- retention_count):
+ fs: str,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str]) -> None:
retention_spec = retention_spec_or_period
if retention_count:
retention_spec = retention_count + retention_spec
@updates_schedule_db
def rm_retention_spec(self,
- fs,
- path,
- retention_spec_or_period,
- retention_count):
+ fs: str,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str]) -> None:
retention_spec = retention_spec_or_period
if retention_count:
retention_spec = retention_count + retention_spec
Schedule.rm_retention(db, path, retention_spec)
@updates_schedule_db
- def activate_snap_schedule(self, fs, path, schedule, start):
+ def activate_snap_schedule(self,
+ fs: str,
+ path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
db = self.get_schedule_db(fs)
schedules = Schedule.get_db_schedules(path, db, fs,
schedule=schedule,
start=start)
- [s.set_active(db) for s in schedules]
+ for s in schedules:
+ s.set_active(db)
@updates_schedule_db
- def deactivate_snap_schedule(self, fs, path, schedule, start):
+ def deactivate_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
db = self.get_schedule_db(fs)
schedules = Schedule.get_db_schedules(path, db, fs,
schedule=schedule,
start=start)
- [s.set_inactive(db) for s in schedules]
+ for s in schedules:
+ s.set_inactive(db)
import errno
import json
import sqlite3
-from typing import Sequence, Optional
+from typing import Any, Dict, Optional, Tuple
from .fs.schedule_client import SnapSchedClient
from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option
from mgr_util import CephfsConnectionException
),
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self._initialized = Event()
self.client = SnapSchedClient(self)
- def resolve_subvolume_path(self, fs, subvol, path):
+ def resolve_subvolume_path(self, fs: str, subvol: Optional[str], path: str) -> str:
if not subvol:
return path
return subvol_path + path
@property
- def default_fs(self):
+ def default_fs(self) -> str:
fs_map = self.get('fs_map')
if fs_map['filesystems']:
return fs_map['filesystems'][0]['mdsmap']['fs_name']
raise CephfsConnectionException(
-errno.ENOENT, "no filesystem found")
- def serve(self):
+ def serve(self) -> None:
self._initialized.set()
- def handle_command(self, inbuf, cmd):
+ def handle_command(self, inbuf: str, cmd: Dict[str, str]) -> Tuple[int, str, str]:
self._initialized.wait()
return -errno.EINVAL, "", "Unknown command"
@CLIReadCommand('fs snap-schedule status')
def snap_schedule_get(self,
- path: Optional[str] = '/',
+ path: str = '/',
subvol: Optional[str] = None,
fs: Optional[str] = None,
- format: Optional[str] = 'plain'):
+ format: Optional[str] = 'plain') -> Tuple[int, str, str]:
'''
List current snapshot schedules
'''
@CLIReadCommand('fs snap-schedule list')
def snap_schedule_list(self, path: str,
subvol: Optional[str] = None,
- recursive: Optional[bool] = False,
+ recursive: bool = False,
fs: Optional[str] = None,
- format: Optional[str] = 'plain'):
+ format: Optional[str] = 'plain') -> Tuple[int, str, str]:
'''
Get current snapshot schedule for <path>
'''
@CLIWriteCommand('fs snap-schedule add')
def snap_schedule_add(self,
path: str,
- snap_schedule: Optional[str],
+ snap_schedule: str,
start: Optional[str] = None,
fs: Optional[str] = None,
- subvol: Optional[str] = None):
+ subvol: Optional[str] = None) -> Tuple[int, str, str]:
'''
Set a snapshot schedule for <path>
'''
repeat: Optional[str] = None,
start: Optional[str] = None,
subvol: Optional[str] = None,
- fs: Optional[str] = None):
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
'''
Remove a snapshot schedule for <path>
'''
retention_spec_or_period: str,
retention_count: Optional[str] = None,
fs: Optional[str] = None,
- subvol: Optional[str] = None):
+ subvol: Optional[str] = None) -> Tuple[int, str, str]:
'''
Set a retention specification for <path>
'''
retention_spec_or_period: str,
retention_count: Optional[str] = None,
fs: Optional[str] = None,
- subvol: Optional[str] = None):
+ subvol: Optional[str] = None) -> Tuple[int, str, str]:
'''
Remove a retention specification for <path>
'''
repeat: Optional[str] = None,
start: Optional[str] = None,
subvol: Optional[str] = None,
- fs: Optional[str] = None):
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
'''
Activate a snapshot schedule for <path>
'''
repeat: Optional[str] = None,
start: Optional[str] = None,
subvol: Optional[str] = None,
- fs: Optional[str] = None):
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
'''
Deactivate a snapshot schedule for <path>
'''