From: Kefu Chai Date: Fri, 29 Jan 2021 05:37:54 +0000 (+0800) Subject: pybind/mgr/snap-schedule: add typing annotations X-Git-Tag: v17.1.0~3116^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=80cbf407c2d91c4fbe0a4f432497165984832b79;p=ceph.git pybind/mgr/snap-schedule: add typing annotations please note, the behavior of `updates_schedule_db()` is changed so that it now returns the return value of the decorated function. this change makes it a real decorator which keeps the signature of the decorated function unchanged. the reason why we need this change is that mypy does not allow us to match the parameter pack of a Callable at the time of writing. Signed-off-by: Kefu Chai --- diff --git a/src/mypy.ini b/src/mypy.ini index 5ad34b0f7cb9a..829406a9d411b 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -33,6 +33,9 @@ disallow_untyped_defs = True [mypy-orchestrator.*] disallow_untyped_defs = True +[mypy-snap_schedule.*] +disallow_untyped_defs = True + [mypy-status.*] disallow_untyped_defs = True diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py index 233795fac835d..714be62f74817 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule.py @@ -9,7 +9,7 @@ import logging from os import environ import re import sqlite3 -from typing import Tuple, Any +from typing import cast, Any, Dict, List, Tuple, Optional, Union log = logging.getLogger(__name__) @@ -30,12 +30,12 @@ except AttributeError: log.info(('Couldn\'t find datetime.fromisoformat, falling back to ' f'static timestamp parsing ({SNAP_DB_TS_FORMAT}')) - def ts_parser(ts): + def ts_parser(data_string: str) -> datetime: # type: ignore try: - date = datetime.strptime(ts, SNAP_DB_TS_FORMAT) + date = datetime.strptime(data_string, SNAP_DB_TS_FORMAT) return date except ValueError: - msg = f'''The date string {ts} does not match the required format + msg = f'''The date string {data_string} does not match the required format {SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to python3.7 or install https://github.com/movermeyer/backports.datetime_fromisoformat''' @@ -43,7 +43,7 @@ except AttributeError: raise ValueError(msg) -def parse_timestamp(ts): +def parse_timestamp(ts: str) -> datetime: date = ts_parser(ts) # normalize any non utc timezone to utc. If no tzinfo is supplied, assume # its already utc @@ -53,7 +53,7 @@ def parse_timestamp(ts): return date -def parse_retention(retention): +def parse_retention(retention: str) -> Dict[str, int]: ret = {} log.debug(f'parse_retention({retention})') matches = re.findall(r'\d+[a-z]', retention) @@ -69,7 +69,7 @@ def parse_retention(retention): RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y'] -def dump_retention(retention): +def dump_retention(retention: Dict[str, str]) -> str: ret = '' for mult in RETENTION_MULTIPLIERS: if mult in retention: @@ -82,21 +82,21 @@ class Schedule(object): Wrapper to work with schedules stored in sqlite ''' def __init__(self, - path, - schedule, - fs_name, - rel_path, - start=None, - subvol=None, - retention_policy='{}', - created=None, - first=None, - last=None, - last_pruned=None, - created_count=0, - pruned_count=0, - active=True, - ): + path: str, + schedule: str, + fs_name: str, + rel_path: str, + start: Optional[str] = None, + subvol: Optional[str] = None, + retention_policy: str = '{}', + created: Optional[str] = None, + first: Optional[str] = None, + last: Optional[str] = None, + last_pruned: Optional[str] = None, + created_count: int = 0, + pruned_count: int = 0, + active: bool = True, + ) -> None: self.fs = fs_name self.subvol = subvol self.path = path @@ -112,47 +112,47 @@ class Schedule(object): else: self.start = parse_timestamp(start) if created is None: - self.created = datetime.now(timezone.utc) + self.created: Optional[datetime] = datetime.now(timezone.utc) else: self.created = parse_timestamp(created) if first: - self.first = parse_timestamp(first) + self.first: Optional[datetime] = parse_timestamp(first) else: - self.first = first + self.first = None if last: - self.last = parse_timestamp(last) + self.last: Optional[datetime] = parse_timestamp(last) else: - self.last = last + self.last = None if last_pruned: - self.last_pruned = parse_timestamp(last_pruned) + self.last_pruned: Optional[datetime] = parse_timestamp(last_pruned) else: - self.last_pruned = last_pruned + self.last_pruned = None self.created_count = created_count self.pruned_count = pruned_count self.active = bool(active) @classmethod - def _from_db_row(cls, table_row, fs): - return cls(table_row['path'], - table_row['schedule'], + def _from_db_row(cls, table_row: Dict[str, Union[int, str]], fs: str) -> 'Schedule': + return cls(cast(str, table_row['path']), + cast(str, table_row['schedule']), fs, - table_row['rel_path'], - table_row['start'], - table_row['subvol'], - table_row['retention'], - table_row['created'], - table_row['first'], - table_row['last'], - table_row['last_pruned'], - table_row['created_count'], - table_row['pruned_count'], - table_row['active'], + cast(str, table_row['rel_path']), + cast(str, table_row['start']), + cast(str, table_row['subvol']), + cast(str, table_row['retention']), + cast(str, table_row['created']), + cast(str, table_row['first']), + cast(str, table_row['last']), + cast(str, table_row['last_pruned']), + cast(int, table_row['created_count']), + cast(int, table_row['pruned_count']), + cast(bool, table_row['active']), ) - def __str__(self): + def __str__(self) -> str: return f'''{self.path} {self.schedule} {dump_retention(self.retention)}''' - def json_list(self): + def json_list(self) -> str: return json.dumps({'path': self.path, 'schedule': self.schedule, 'retention': dump_retention(self.retention)}) @@ -204,10 +204,13 @@ class Schedule(object): GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?''' @classmethod - def get_db_schedules(cls, path, db, fs, - schedule=None, - start=None, - repeat=None): + def get_db_schedules(cls, + path: str, + db: sqlite3.Connection, + fs: str, + schedule: Optional[str] = None, + start: Optional[str] = None, + repeat: Optional[str] = None) -> List['Schedule']: query = cls.GET_SCHEDULES data: Tuple[Any, ...] = (path,) if repeat: @@ -224,7 +227,10 @@ class Schedule(object): return [cls._from_db_row(row, fs) for row in c.fetchall()] @classmethod - def list_schedules(cls, path, db, fs, recursive): + def list_schedules(cls, + path: str, + db: sqlite3.Connection, + fs: str, recursive: bool) -> List['Schedule']: with db: if recursive: c = db.execute(cls.PROTO_GET_SCHEDULES + ' path LIKE ?', @@ -242,7 +248,7 @@ class Schedule(object): active) SELECT ?, ?, ?, ?, ?, ?''' - def store_schedule(self, db): + def store_schedule(self, db: sqlite3.Connection) -> None: sched_id = None with db: try: @@ -260,6 +266,7 @@ class Schedule(object): (self.path,)) sched_id = c.fetchone()[0] pass + assert self.created, "self.created should be set" db.execute(self.INSERT_SCHEDULE_META, (sched_id, self.start.strftime(SNAP_DB_TS_FORMAT), @@ -269,7 +276,11 @@ class Schedule(object): 1)) @classmethod - def rm_schedule(cls, db, path, repeat, start): + def rm_schedule(cls, + db: sqlite3.Connection, + path: str, + repeat: Optional[str], + start: Optional[str]) -> None: with db: cur = db.execute('SELECT id FROM schedules WHERE path = ?', (path,)) @@ -317,7 +328,7 @@ class Schedule(object): WHERE path = ?''' @classmethod - def add_retention(cls, db, path, retention_spec): + def add_retention(cls, db: sqlite3.Connection, path: str, retention_spec: str) -> None: with db: row = db.execute(cls.GET_RETENTION, (path,)).fetchone() if not row: @@ -336,7 +347,7 @@ class Schedule(object): db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path)) @classmethod - def rm_retention(cls, db, path, retention_spec): + def rm_retention(cls, db: sqlite3.Connection, path: str, retention_spec: str) -> None: with db: row = db.execute(cls.GET_RETENTION, (path,)).fetchone() if not row: @@ -351,19 +362,19 @@ class Schedule(object): current_retention.pop(r) db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path)) - def report(self): + def report(self) -> str: return self.report_json() - def report_json(self): + def report_json(self) -> str: return json.dumps(dict(self.__dict__), default=lambda o: o.strftime(SNAP_DB_TS_FORMAT)) @classmethod - def parse_schedule(cls, schedule): + def parse_schedule(cls, schedule: str) -> Tuple[int, str]: return int(schedule[0:-1]), schedule[-1] @property - def repeat(self): + def repeat(self) -> int: period, mult = self.parse_schedule(self.schedule) if mult == 'M': return period * 60 @@ -389,7 +400,7 @@ class Schedule(object): AND schedules_meta.start = ? AND schedules_meta.repeat = ?);''' - def update_last(self, time, db): + def update_last(self, time: datetime, db: sqlite3.Connection) -> None: with db: db.execute(self.UPDATE_LAST, (time.strftime(SNAP_DB_TS_FORMAT), time.strftime(SNAP_DB_TS_FORMAT), @@ -412,7 +423,7 @@ class Schedule(object): AND schedules_meta.start = ? AND schedules_meta.repeat = ?);''' - def set_inactive(self, db): + def set_inactive(self, db: sqlite3.Connection) -> None: with db: log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}') db.execute(self.UPDATE_INACTIVE, (self.path, @@ -431,7 +442,7 @@ class Schedule(object): AND schedules_meta.start = ? AND schedules_meta.repeat = ?);''' - def set_active(self, db): + def set_active(self, db: sqlite3.Connection) -> None: with db: log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}') db.execute(self.UPDATE_ACTIVE, (self.path, @@ -451,7 +462,10 @@ class Schedule(object): AND schedules_meta.start = ? AND schedules_meta.repeat = ?);''' - def update_pruned(self, time, db, pruned): + def update_pruned(self, + time: datetime, + db: sqlite3.Connection, + pruned: int) -> None: with db: db.execute(self.UPDATE_PRUNED, (time.strftime(SNAP_DB_TS_FORMAT), pruned, self.path, diff --git a/src/pybind/mgr/snap_schedule/fs/schedule_client.py b/src/pybind/mgr/snap_schedule/fs/schedule_client.py index f02d94cc5b6fe..1f8da07e8ee40 100644 --- a/src/pybind/mgr/snap_schedule/fs/schedule_client.py +++ b/src/pybind/mgr/snap_schedule/fs/schedule_client.py @@ -13,6 +13,8 @@ from collections import OrderedDict from datetime import datetime, timezone import logging from threading import Timer +from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \ + Tuple, TypeVar, Union import sqlite3 from .schedule import Schedule, parse_retention import traceback @@ -30,8 +32,10 @@ SNAPSHOT_PREFIX = 'scheduled' log = logging.getLogger(__name__) +CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient) + @contextmanager -def open_ioctx(self, pool): +def open_ioctx(self: CephfsClientT, pool: Union[int, str]) -> Iterator[rados.Ioctx]: try: if type(pool) is int: with self.mgr.rados.open_ioctx2(pool) as ioctx: @@ -46,17 +50,21 @@ def open_ioctx(self, pool): raise -def updates_schedule_db(func): - def f(self, fs, schedule_or_path, *args): - func(self, fs, schedule_or_path, *args) +FuncT = TypeVar('FuncT', bound=Callable[..., None]) + +def updates_schedule_db(func: FuncT) -> FuncT: + def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None: + ret = func(self, fs, schedule_or_path, *args) path = schedule_or_path if isinstance(schedule_or_path, Schedule): path = schedule_or_path.path self.refresh_snap_timers(fs, path) - return f + return ret + return cast(FuncT, f) -def get_prune_set(candidates, retention): +def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]], + retention: Dict[str, int]) -> Set: PRUNING_PATTERNS = OrderedDict([ # n is for keep last n snapshots, uses the snapshot name timestamp # format for lowest granularity @@ -98,17 +106,17 @@ def get_prune_set(candidates, retention): class SnapSchedClient(CephfsClient): - def __init__(self, mgr): + def __init__(self, mgr: Any) -> None: super(SnapSchedClient, self).__init__(mgr) # TODO maybe iterate over all fs instance in fsmap and load snap dbs? - self.sqlite_connections = {} - self.active_timers = {} + self.sqlite_connections: Dict[str, sqlite3.Connection] = {} + self.active_timers: Dict[Tuple[str, str], List[Timer]] = {} @property - def allow_minute_snaps(self): + def allow_minute_snaps(self) -> None: return self.mgr.get_module_option('allow_m_granularity') - def get_schedule_db(self, fs): + def get_schedule_db(self, fs: str) -> sqlite3.Connection: if fs not in self.sqlite_connections: self.sqlite_connections[fs] = sqlite3.connect( ':memory:', @@ -117,6 +125,7 @@ class SnapSchedClient(CephfsClient): con.row_factory = sqlite3.Row con.execute("PRAGMA FOREIGN_KEYS = 1") pool = self.get_metadata_pool(fs) + assert pool, f'fs "{fs}" not found' with open_ioctx(self, pool) as ioctx: try: size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME) @@ -128,7 +137,7 @@ class SnapSchedClient(CephfsClient): con.executescript(Schedule.CREATE_TABLES) return self.sqlite_connections[fs] - def store_schedule_db(self, fs): + def store_schedule_db(self, fs: str) -> None: # only store db is it exists, otherwise nothing to do metadata_pool = self.get_metadata_pool(fs) if not metadata_pool: @@ -144,7 +153,7 @@ class SnapSchedClient(CephfsClient): ioctx.write_full(SNAP_DB_OBJECT_NAME, '\n'.join(db_content).encode('utf-8')) - def _is_allowed_repeat(self, exec_row, path): + def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool: if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M': if self.allow_minute_snaps: log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}') @@ -156,7 +165,7 @@ class SnapSchedClient(CephfsClient): return True - def refresh_snap_timers(self, fs, path): + def refresh_snap_timers(self, fs: str, path: str) -> None: try: log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer') db = self.get_schedule_db(fs) @@ -181,11 +190,13 @@ class SnapSchedClient(CephfsClient): except Exception: self._log_exception('refresh_snap_timers') - def _log_exception(self, fct): + def _log_exception(self, fct: str) -> None: log.error(f'{fct} raised an exception:') log.error(traceback.format_exc()) - def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat): + def create_scheduled_snapshot(self, + fs_name: str, path: str, + retention: str, start: str, repeat: str) -> None: log.debug(f'Scheduled snapshot of {path} triggered') try: db = self.get_schedule_db(fs_name) @@ -213,7 +224,7 @@ class SnapSchedClient(CephfsClient): self.refresh_snap_timers(fs_name, path) self.prune_snapshots(sched) - def prune_snapshots(self, sched): + def prune_snapshots(self, sched: Schedule) -> None: try: log.debug('Pruning snapshots') ret = sched.retention @@ -244,17 +255,20 @@ class SnapSchedClient(CephfsClient): except Exception: self._log_exception('prune_snapshots') - def get_snap_schedules(self, fs, path): + def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]: db = self.get_schedule_db(fs) return Schedule.get_db_schedules(path, db, fs) - def list_snap_schedules(self, fs, path, recursive): + def list_snap_schedules(self, fs: str, path: str, recursive: bool) -> List[Schedule]: db = self.get_schedule_db(fs) return Schedule.list_schedules(path, db, fs, recursive) @updates_schedule_db # TODO improve interface - def store_snap_schedule(self, fs, path_, args): + def store_snap_schedule(self, + fs: str, path_: str, + args: Tuple[str, str, str, str, + Optional[str], Optional[str]]) -> None: sched = Schedule(*args) log.debug(f'repeat is {sched.repeat}') if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps: @@ -266,16 +280,19 @@ class SnapSchedClient(CephfsClient): self.store_schedule_db(sched.fs) @updates_schedule_db - def rm_snap_schedule(self, fs, path, schedule, start): + def rm_snap_schedule(self, + fs: str, path: str, + schedule: Optional[str], + start: Optional[str]) -> None: db = self.get_schedule_db(fs) Schedule.rm_schedule(db, path, schedule, start) @updates_schedule_db def add_retention_spec(self, - fs, - path, - retention_spec_or_period, - retention_count): + fs: str, + path: str, + retention_spec_or_period: str, + retention_count: Optional[str]) -> None: retention_spec = retention_spec_or_period if retention_count: retention_spec = retention_count + retention_spec @@ -284,10 +301,10 @@ class SnapSchedClient(CephfsClient): @updates_schedule_db def rm_retention_spec(self, - fs, - path, - retention_spec_or_period, - retention_count): + fs: str, + path: str, + retention_spec_or_period: str, + retention_count: Optional[str]) -> None: retention_spec = retention_spec_or_period if retention_count: retention_spec = retention_count + retention_spec @@ -295,17 +312,26 @@ class SnapSchedClient(CephfsClient): Schedule.rm_retention(db, path, retention_spec) @updates_schedule_db - def activate_snap_schedule(self, fs, path, schedule, start): + def activate_snap_schedule(self, + fs: str, + path: str, + schedule: Optional[str], + start: Optional[str]) -> None: db = self.get_schedule_db(fs) schedules = Schedule.get_db_schedules(path, db, fs, schedule=schedule, start=start) - [s.set_active(db) for s in schedules] + for s in schedules: + s.set_active(db) @updates_schedule_db - def deactivate_snap_schedule(self, fs, path, schedule, start): + def deactivate_snap_schedule(self, + fs: str, path: str, + schedule: Optional[str], + start: Optional[str]) -> None: db = self.get_schedule_db(fs) schedules = Schedule.get_db_schedules(path, db, fs, schedule=schedule, start=start) - [s.set_inactive(db) for s in schedules] + for s in schedules: + s.set_inactive(db) diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py index a1a34e08b07ac..45ae13593f9d5 100644 --- a/src/pybind/mgr/snap_schedule/module.py +++ b/src/pybind/mgr/snap_schedule/module.py @@ -6,7 +6,7 @@ LGPL2.1. See file COPYING. import errno import json import sqlite3 -from typing import Sequence, Optional +from typing import Any, Dict, Optional, Tuple from .fs.schedule_client import SnapSchedClient from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option from mgr_util import CephfsConnectionException @@ -24,12 +24,12 @@ class Module(MgrModule): ), ] - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(Module, self).__init__(*args, **kwargs) self._initialized = Event() self.client = SnapSchedClient(self) - def resolve_subvolume_path(self, fs, subvol, path): + def resolve_subvolume_path(self, fs: str, subvol: Optional[str], path: str) -> str: if not subvol: return path @@ -41,7 +41,7 @@ class Module(MgrModule): return subvol_path + path @property - def default_fs(self): + def default_fs(self) -> str: fs_map = self.get('fs_map') if fs_map['filesystems']: return fs_map['filesystems'][0]['mdsmap']['fs_name'] @@ -50,19 +50,19 @@ class Module(MgrModule): raise CephfsConnectionException( -errno.ENOENT, "no filesystem found") - def serve(self): + def serve(self) -> None: self._initialized.set() - def handle_command(self, inbuf, cmd): + def handle_command(self, inbuf: str, cmd: Dict[str, str]) -> Tuple[int, str, str]: self._initialized.wait() return -errno.EINVAL, "", "Unknown command" @CLIReadCommand('fs snap-schedule status') def snap_schedule_get(self, - path: Optional[str] = '/', + path: str = '/', subvol: Optional[str] = None, fs: Optional[str] = None, - format: Optional[str] = 'plain'): + format: Optional[str] = 'plain') -> Tuple[int, str, str]: ''' List current snapshot schedules ''' @@ -79,9 +79,9 @@ class Module(MgrModule): @CLIReadCommand('fs snap-schedule list') def snap_schedule_list(self, path: str, subvol: Optional[str] = None, - recursive: Optional[bool] = False, + recursive: bool = False, fs: Optional[str] = None, - format: Optional[str] = 'plain'): + format: Optional[str] = 'plain') -> Tuple[int, str, str]: ''' Get current snapshot schedule for ''' @@ -104,10 +104,10 @@ class Module(MgrModule): @CLIWriteCommand('fs snap-schedule add') def snap_schedule_add(self, path: str, - snap_schedule: Optional[str], + snap_schedule: str, start: Optional[str] = None, fs: Optional[str] = None, - subvol: Optional[str] = None): + subvol: Optional[str] = None) -> Tuple[int, str, str]: ''' Set a snapshot schedule for ''' @@ -137,7 +137,7 @@ class Module(MgrModule): repeat: Optional[str] = None, start: Optional[str] = None, subvol: Optional[str] = None, - fs: Optional[str] = None): + fs: Optional[str] = None) -> Tuple[int, str, str]: ''' Remove a snapshot schedule for ''' @@ -157,7 +157,7 @@ class Module(MgrModule): retention_spec_or_period: str, retention_count: Optional[str] = None, fs: Optional[str] = None, - subvol: Optional[str] = None): + subvol: Optional[str] = None) -> Tuple[int, str, str]: ''' Set a retention specification for ''' @@ -179,7 +179,7 @@ class Module(MgrModule): retention_spec_or_period: str, retention_count: Optional[str] = None, fs: Optional[str] = None, - subvol: Optional[str] = None): + subvol: Optional[str] = None) -> Tuple[int, str, str]: ''' Remove a retention specification for ''' @@ -201,7 +201,7 @@ class Module(MgrModule): repeat: Optional[str] = None, start: Optional[str] = None, subvol: Optional[str] = None, - fs: Optional[str] = None): + fs: Optional[str] = None) -> Tuple[int, str, str]: ''' Activate a snapshot schedule for ''' @@ -221,7 +221,7 @@ class Module(MgrModule): repeat: Optional[str] = None, start: Optional[str] = None, subvol: Optional[str] = None, - fs: Optional[str] = None): + fs: Optional[str] = None) -> Tuple[int, str, str]: ''' Deactivate a snapshot schedule for '''