From 856e2e073bd90833aef8390c76d5181ea0fb0ac5 Mon Sep 17 00:00:00 2001 From: Ramana Raja Date: Sun, 20 Oct 2024 11:54:55 -0400 Subject: [PATCH] pybind/mgr/rbd_support: add scheduler for mirror group snapshots Signed-off-by: Ramana Raja --- .../mirror_group_snapshot_schedule.py | 352 ++++++++++++++++++ src/pybind/mgr/rbd_support/module.py | 50 +++ src/pybind/mgr/rbd_support/schedule.py | 148 +++++--- 3 files changed, 504 insertions(+), 46 deletions(-) create mode 100644 src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py diff --git a/src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py new file mode 100644 index 0000000000000..8114b7211d625 --- /dev/null +++ b/src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py @@ -0,0 +1,352 @@ +import errno +import json +import rados +import rbd +import traceback + +from datetime import datetime +from threading import Condition, Lock, Thread +from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union + +from .common import get_rbd_pools +from .schedule import LevelSpec, Schedules + + +def namespace_validator(ioctx: rados.Ioctx) -> None: + mode = rbd.RBD().mirror_mode_get(ioctx) + if mode != rbd.RBD_MIRROR_MODE_IMAGE: + raise ValueError("namespace {} is not in mirror image mode".format( + ioctx.get_namespace())) + +def group_validator(group: rbd.Group) -> None: + try: + info = group.mirror_group_get_info() + except rbd.ObjectNotFound: + raise rbd.InvalidArgument("Error getting mirror group info") + if info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + raise rbd.InvalidArgument("Invalid mirror group mode") + +class GroupSpec(NamedTuple): + pool_id: str + namespace: str + group_id: str + + +class MirrorGroupSnapshotScheduleHandler: + MODULE_OPTION_NAME = "mirror_group_snapshot_schedule" + SCHEDULE_OID = "rbd_mirror_group_snapshot_schedule" + REFRESH_DELAY_SECONDS = 60.0 + + def __init__(self, module: Any) -> None: + self.lock = Lock() + self.condition = Condition(self.lock) + self.module = module + self.log = module.log + self.last_refresh_groups = datetime(1970, 1, 1) + # self.create_snapshot_requests = CreateSnapshotRequests(self) + + self.stop_thread = False + self.thread = Thread(target=self.run) + + def setup(self) -> None: + self.init_schedule_queue() + self.thread.start() + + def shutdown(self) -> None: + self.log.info("MirrorGroupSnapshotScheduleHandler: shutting down") + self.stop_thread = True + if self.thread.is_alive(): + self.log.debug("MirrorGroupSnapshotScheduleHandler: joining thread") + self.thread.join() + # self.create_snapshot_requests.wait_for_pending() + self.log.info("MirrorGroupSnapshotScheduleHandler: shut down") + + def run(self) -> None: + try: + self.log.info("MirrorGroupSnapshotScheduleHandler: starting") + while not self.stop_thread: + refresh_delay = self.refresh_groups() + with self.lock: + (group_spec, wait_time) = self.dequeue() + if not group_spec: + self.condition.wait(min(wait_time, refresh_delay)) + continue + pool_id, namespace, group_id = group_spec + # self.create_snapshot_requests.add(pool_id, namespace, group_id) + self.create_group_snapshot(pool_id, namespace, group_id) + with self.lock: + self.enqueue(datetime.now(), pool_id, namespace, group_id) + + except (rados.ConnectionShutdown, rbd.ConnectionShutdown): + self.log.exception("MirrorGroupSnapshotScheduleHandler: client blocklisted") + self.module.client_blocklisted.set() + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def create_group_snapshot(self, pool_id, namespace, group_id): + try: + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + ioctx.set_namespace(namespace) + group_name = rbd.RBD().group_get_name(ioctx, group_id) + if group_name is None: + return + group = rbd.Group(ioctx, group_name) + mirror_info = group.mirror_group_get_info() + if mirror_info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + return + if mirror_info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED or \ + not mirror_info['primary']: + return + snap_id = group.mirror_group_create_snapshot() + self.log.debug( + "create_group_snapshot: {}/{}/{}: snap_id={}".format( + ioctx.get_pool_name(), namespace, group_name, + snap_id)) + except Exception as e: + self.log.error( + "exception when creating group snapshot for {}/{}/{}: {}".format( + pool_id, namespace, group_id, e)) + + def init_schedule_queue(self) -> None: + # schedule_time => group_spec + self.queue: Dict[str, List[GroupSpec]] = {} + # pool_id => {namespace => {group_id => group_name}} + self.groups: Dict[str, Dict[str, Dict[str, str]]] = {} + self.schedules = Schedules(self) + self.refresh_groups() + self.log.debug("MirrorGroupSnapshotScheduleHandler: queue is initialized") + + def load_schedules(self) -> None: + self.log.info("MirrorGroupSnapshotScheduleHandler: load_schedules") + self.schedules.load(namespace_validator, group_validator=group_validator) + + def refresh_groups(self) -> float: + elapsed = (datetime.now() - self.last_refresh_groups).total_seconds() + if elapsed < self.REFRESH_DELAY_SECONDS: + return self.REFRESH_DELAY_SECONDS - elapsed + + self.log.debug("MirrorGroupSnapshotScheduleHandler: refresh_groups") + + with self.lock: + self.load_schedules() + if not self.schedules: + self.log.debug("MirrorGroupSnapshotScheduleHandler: no schedules") + self.groups = {} + self.queue = {} + self.last_refresh_groups = datetime.now() + return self.REFRESH_DELAY_SECONDS + + groups: Dict[str, Dict[str, Dict[str, str]]] = {} + + for pool_id, pool_name in get_rbd_pools(self.module).items(): + if not self.schedules.intersects( + LevelSpec.from_pool_spec(pool_id, pool_name)): + continue + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_pool_groups(ioctx, groups) + + with self.lock: + self.refresh_queue(groups) + self.groups = groups + + self.last_refresh_groups = datetime.now() + return self.REFRESH_DELAY_SECONDS + + def load_pool_groups(self, + ioctx: rados.Ioctx, + groups: Dict[str, Dict[str, Dict[str, str]]]) -> None: + pool_id = str(ioctx.get_pool_id()) + pool_name = ioctx.get_pool_name() + groups[pool_id] = {} + + self.log.debug("load_pool_groups: pool={}".format(pool_name)) + + try: + namespaces = [''] + rbd.RBD().namespace_list(ioctx) + for namespace in namespaces: + if not self.schedules.intersects( + LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)): + continue + self.log.debug("load_pool_groups: pool={}, namespace={}".format( + pool_name, namespace)) + groups[pool_id][namespace] = {} + ioctx.set_namespace(namespace) + mirror_groups = dict(rbd.RBD().mirror_group_info_list( + ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)) + if not mirror_groups: + continue + group_names = dict( + [(x['id'], x['name']) for x in filter( + lambda x: x['id'] in mirror_groups, + rbd.RBD().group_list2(ioctx))]) + for group_id, info in mirror_groups.items(): + if not info['primary']: + continue + group_name = group_names.get(group_id) + if not group_name: + continue + if namespace: + name = "{}/{}/{}".format(pool_name, namespace, + group_name) + else: + name = "{}/{}".format(pool_name, group_name) + self.log.debug( + "load_pool_groups: adding group {}".format(name)) + groups[pool_id][namespace][group_id] = name + except rbd.ConnectionShutdown: + raise + except Exception as e: + self.log.error( + "load_pool_groups: exception when scanning pool {}: {}".format( + pool_name, e)) + + def rebuild_queue(self) -> None: + now = datetime.now() + + # don't remove from queue "due" groups + now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") + + for schedule_time in list(self.queue): + if schedule_time > now_string: + del self.queue[schedule_time] + + if not self.schedules: + return + + for pool_id in self.groups: + for namespace in self.groups[pool_id]: + for group_id in self.groups[pool_id][namespace]: + self.enqueue(now, pool_id, namespace, group_id) + + self.condition.notify() + + def refresh_queue(self, + current_groups: Dict[str, Dict[str, Dict[str, str]]]) -> None: + now = datetime.now() + + for pool_id in self.groups: + for namespace in self.groups[pool_id]: + for group_id in self.groups[pool_id][namespace]: + if pool_id not in current_groups or \ + namespace not in current_groups[pool_id] or \ + group_id not in current_groups[pool_id][namespace]: + self.remove_from_queue(pool_id, namespace, group_id) + + for pool_id in current_groups: + for namespace in current_groups[pool_id]: + for group_id in current_groups[pool_id][namespace]: + if pool_id not in self.groups or \ + namespace not in self.groups[pool_id] or \ + group_id not in self.groups[pool_id][namespace]: + self.enqueue(now, pool_id, namespace, group_id) + + self.condition.notify() + + def enqueue(self, now: datetime, pool_id: str, namespace: str, group_id: str) -> None: + schedule = self.schedules.find(pool_id, namespace, group_id) + if not schedule: + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: no schedule for {}/{}/{}".format( + pool_id, namespace, group_id)) + return + + schedule_time = schedule.next_run(now) + if schedule_time not in self.queue: + self.queue[schedule_time] = [] + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format( + pool_id, namespace, group_id, schedule_time)) + group_spec = GroupSpec(pool_id, namespace, group_id) + if group_spec not in self.queue[schedule_time]: + self.queue[schedule_time].append(group_spec) + + def dequeue(self) -> Tuple[Optional[GroupSpec], float]: + if not self.queue: + return None, 1000.0 + + now = datetime.now() + schedule_time = sorted(self.queue)[0] + + if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: + wait_time = (datetime.strptime(schedule_time, + "%Y-%m-%d %H:%M:%S") - now) + return None, wait_time.total_seconds() + + groups = self.queue[schedule_time] + group = groups.pop(0) + if not groups: + del self.queue[schedule_time] + return group, 0.0 + + def remove_from_queue(self, pool_id: str, namespace: str, group_id: str) -> None: + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: descheduling {}/{}/{}".format( + pool_id, namespace, group_id)) + + empty_slots = [] + group_spec = GroupSpec(pool_id, namespace, group_id) + for schedule_time, groups in self.queue.items(): + if group_spec in groups: + groups.remove(group_spec) + if not groups: + empty_slots.append(schedule_time) + for schedule_time in empty_slots: + del self.queue[schedule_time] + + def add_schedule(self, + level_spec: LevelSpec, + interval: str, + start_time: Optional[str]) -> Tuple[int, str, str]: + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.add(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def remove_schedule(self, + level_spec: LevelSpec, + interval: Optional[str], + start_time: Optional[str]) -> Tuple[int, str, str]: + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + # TODO: optimize to rebuild only affected part of the queue + with self.lock: + self.schedules.remove(level_spec, interval, start_time) + self.rebuild_queue() + return 0, "", "" + + def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: list: level_spec={}".format( + level_spec.name)) + + with self.lock: + result = self.schedules.to_list(level_spec) + + return 0, json.dumps(result, indent=4, sort_keys=True), "" + + def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: + self.log.debug( + "MirrorGroupSnapshotScheduleHandler: status: level_spec={}".format( + level_spec.name)) + + scheduled_groups = [] + with self.lock: + for schedule_time in sorted(self.queue): + for pool_id, namespace, group_id in self.queue[schedule_time]: + if not level_spec.matches(pool_id, namespace, group_id=group_id): + continue + group_name = self.groups[pool_id][namespace][group_id] + scheduled_groups.append({ + 'schedule_time': schedule_time, + 'group': group_name + }) + return 0, json.dumps({'scheduled_groups': scheduled_groups}, + indent=4, sort_keys=True), "" diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py index 369face038ad2..0446edf44da63 100644 --- a/src/pybind/mgr/rbd_support/module.py +++ b/src/pybind/mgr/rbd_support/module.py @@ -20,6 +20,8 @@ from .mirror_snapshot_schedule import image_validator, namespace_validator, \ from .perf import PerfHandler, OSD_PERF_QUERY_COUNTERS from .task import TaskHandler from .trash_purge_schedule import TrashPurgeScheduleHandler +from .mirror_group_snapshot_schedule import group_validator, \ + MirrorGroupSnapshotScheduleHandler class ImageSortBy(enum.Enum): @@ -79,6 +81,7 @@ class Module(MgrModule): type='int', default=10), Option(name=TrashPurgeScheduleHandler.MODULE_OPTION_NAME), + Option(name=MirrorGroupSnapshotScheduleHandler.MODULE_OPTION_NAME), ] def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -91,6 +94,7 @@ class Module(MgrModule): def init_handlers(self) -> None: self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self) + self.mirror_group_snapshot_schedule = MirrorGroupSnapshotScheduleHandler(self) self.perf = PerfHandler(self) self.task = TaskHandler(self) self.trash_purge_schedule = TrashPurgeScheduleHandler(self) @@ -101,6 +105,7 @@ class Module(MgrModule): # implicitly here as 'rados' is a property attribute. self.rados.wait_for_latest_osdmap() self.mirror_snapshot_schedule.setup() + self.mirror_group_snapshot_schedule.setup() self.perf.setup() self.task.setup() self.trash_purge_schedule.setup() @@ -130,6 +135,7 @@ class Module(MgrModule): def shutdown(self) -> None: self.module_ready = False self.mirror_snapshot_schedule.shutdown() + self.mirror_group_snapshot_schedule.shutdown() self.trash_purge_schedule.shutdown() self.task.shutdown() self.perf.shutdown() @@ -180,6 +186,50 @@ class Module(MgrModule): spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator) return self.mirror_snapshot_schedule.status(spec) + @CLIWriteCommand('rbd mirror group snapshot schedule add') + @with_latest_osdmap + def mirror_group_snapshot_schedule_add(self, + level_spec: str, + interval: str, + start_time: Optional[str] = None) -> Tuple[int, str, str]: + """ + Add rbd mirror group snapshot schedule + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator) + return self.mirror_group_snapshot_schedule.add_schedule(spec, interval, start_time) + + @CLIWriteCommand('rbd mirror group snapshot schedule remove') + @with_latest_osdmap + def mirror_group_snapshot_schedule_remove(self, + level_spec: str, + interval: Optional[str] = None, + start_time: Optional[str] = None) -> Tuple[int, str, str]: + """ + Remove rbd mirror group snapshot schedule + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator) + return self.mirror_group_snapshot_schedule.remove_schedule(spec, interval, start_time) + + @CLIReadCommand('rbd mirror group snapshot schedule list') + @with_latest_osdmap + def mirror_group_snapshot_schedule_list(self, + level_spec: str = '') -> Tuple[int, str, str]: + """ + List rbd mirror group snapshot schedules + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator) + return self.mirror_group_snapshot_schedule.list(spec) + + @CLIReadCommand('rbd mirror group snapshot schedule status') + @with_latest_osdmap + def mirror_group_snapshot_schedule_status(self, + level_spec: str = '') -> Tuple[int, str, str]: + """ + Show rbd mirror group snapshot schedule status + """ + spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator) + return self.mirror_group_snapshot_schedule.status(spec) + @CLIReadCommand('rbd perf image stats') @with_latest_osdmap def perf_image_stats(self, diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py index 4968700ad82ae..479323742f231 100644 --- a/src/pybind/mgr/rbd_support/schedule.py +++ b/src/pybind/mgr/rbd_support/schedule.py @@ -22,12 +22,16 @@ class LevelSpec: id: str, pool_id: Optional[str], namespace: Optional[str], - image_id: Optional[str] = None) -> None: + image_id: Optional[str] = None, + group_id: Optional [str] = None) -> None: + if image_id is not None and group_id is not None: + raise ValueError("LevelSpec cannot have both image_id and group_id") self.name = name self.id = id self.pool_id = pool_id self.namespace = namespace self.image_id = image_id + self.group_id = group_id def __eq__(self, level_spec: Any) -> bool: return self.id == level_spec.id @@ -41,8 +45,10 @@ class LevelSpec: return self.namespace is not None if level_spec.namespace != self.namespace: return False - if level_spec.image_id is None: - return self.image_id is not None + if level_spec.image_id is not None or level_spec.group_id is not None: + return False + if self.image_id is not None or self.group_id is not None: + return True return False def is_global(self) -> bool: @@ -54,13 +60,16 @@ class LevelSpec: def matches(self, pool_id: str, namespace: str, - image_id: Optional[str] = None) -> bool: + image_id: Optional[str] = None, + group_id: Optional[str] = None) -> bool: if self.pool_id and self.pool_id != pool_id: return False if self.namespace and self.namespace != namespace: return False if self.image_id and self.image_id != image_id: return False + if self.group_id and self.group_id != group_id: + return False return True def intersects(self, level_spec: 'LevelSpec') -> bool: @@ -72,10 +81,12 @@ class LevelSpec: return True if self.namespace != level_spec.namespace: return False - if self.image_id is None or level_spec.image_id is None: + if (self.image_id is None and self.group_id is None) or (level_spec.image_id is None and level_spec.group_id is None): return True if self.image_id != level_spec.image_id: return False + if self.group_id != level_spec.group_id: + return False return True @classmethod @@ -101,7 +112,8 @@ class LevelSpec: name: str, namespace_validator: Optional[Callable] = None, image_validator: Optional[Callable] = None, - allow_image_level: bool = True) -> 'LevelSpec': + allow_image_level: bool = True, + group_validator: Optional[Callable] = None) -> 'LevelSpec': # parse names like: # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image' match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$', @@ -116,7 +128,9 @@ class LevelSpec: pool_id = None namespace = None image_name = None + group_name = None image_id = None + group_id = None if match.group(1): pool_name = match.group(1) try: @@ -142,37 +156,56 @@ class LevelSpec: if namespace_validator: namespace_validator(ioctx) if match.group(3): - image_name = match.group(3) - try: - with rbd.Image(ioctx, image_name, - read_only=True) as image: - image_id = image.id() - id += "/" + image_id - if image_validator: - image_validator(image) - except rbd.ImageNotFound: - raise ValueError("image {} does not exist".format( - image_name)) - except rbd.InvalidArgument: - raise ValueError( - "image {} is not in snapshot mirror mode".format( + if group_validator: + group_name = match.group(3) + try: + group = rbd.Group(ioctx, group_name) + group_id = group.id() + id += "/" + group_id + group_validator(group) + except rbd.ObjectNotFound: + raise ValueError("group {} does not exist".format( + group_name)) + except rbd.InvalidArgument: + raise ValueError( + "group {} is not in snapshot mirror mode".format( + group_id)) + else: + image_name = match.group(3) + try: + with rbd.Image(ioctx, image_name, + read_only=True) as image: + image_id = image.id() + id += "/" + image_id + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( image_name)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_name)) except rados.ObjectNotFound: raise ValueError("pool {} does not exist".format(pool_name)) - # normalize possible input name like 'rbd//image' - if not namespace and image_name: - name = "{}/{}".format(pool_name, image_name) + # normalize possible input name like 'rbd//image' or 'rbd//group' + if not namespace: + if image_name: + name = "{}/{}".format(pool_name, image_name) + elif group_name: + name = "{}/{}".format(pool_name, group_name) - return LevelSpec(name, id, pool_id, namespace, image_id) + return LevelSpec(name, id, pool_id, namespace, image_id, group_id) @classmethod def from_id(cls, handler: Any, id: str, namespace_validator: Optional[Callable] = None, - image_validator: Optional[Callable] = None) -> 'LevelSpec': + image_validator: Optional[Callable] = None, + group_validator: Optional[Callable] = None) -> 'LevelSpec': # parse ids like: # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id' match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id) @@ -183,6 +216,7 @@ class LevelSpec: pool_id = None namespace = None image_id = None + group_id = None if match.group(1): pool_id = match.group(1) try: @@ -206,26 +240,42 @@ class LevelSpec: elif not match.group(3): name += "/" if match.group(3): - image_id = match.group(3) - try: - with rbd.Image(ioctx, image_id=image_id, + if group_validator: + group_id = match.group(3) + try: + group_name = rbd.RBD().group_get_name( + ioctx, group_id) + name += group_name + group = rbd.Group(ioctx, group_name) + group_validator(group) + except rbd.ObjectNotFound: + raise ValueError("group {} does not exist".format( + group_id)) + except rbd.InvalidArgument: + raise ValueError( + "group {} is not in snapshot mirror mode".format( + group_id)) + else: + image_id = match.group(3) + try: + with rbd.Image(ioctx, image_id=image_id, read_only=True) as image: - image_name = image.get_name() - name += image_name - if image_validator: - image_validator(image) - except rbd.ImageNotFound: - raise ValueError("image {} does not exist".format( - image_id)) - except rbd.InvalidArgument: - raise ValueError( - "image {} is not in snapshot mirror mode".format( - image_id)) - + image_name = image.get_name() + name += image_name + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError( + "image {} does not exist".format( + image_id)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_id)) except rados.ObjectNotFound: raise ValueError("pool {} does not exist".format(pool_id)) - return LevelSpec(name, id, pool_id, namespace, image_id) + return LevelSpec(name, id, pool_id, namespace, image_id, group_id) class Interval: @@ -396,7 +446,8 @@ class Schedules: def load(self, namespace_validator: Optional[Callable] = None, - image_validator: Optional[Callable] = None) -> None: + image_validator: Optional[Callable] = None, + group_validator: Optional[Callable] = None) -> None: self.level_specs = {} self.schedules = {} @@ -417,7 +468,7 @@ class Schedules: try: with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: self.load_from_pool(ioctx, namespace_validator, - image_validator) + image_validator, group_validator) except rados.ConnectionShutdown: raise except rados.Error as e: @@ -428,7 +479,8 @@ class Schedules: def load_from_pool(self, ioctx: rados.Ioctx, namespace_validator: Optional[Callable], - image_validator: Optional[Callable]) -> None: + image_validator: Optional[Callable], + group_validator: Optional[Callable]) -> None: pool_name = ioctx.get_pool_name() stale_keys = [] start_after = '' @@ -451,7 +503,7 @@ class Schedules: try: level_spec = LevelSpec.from_id( self.handler, k, namespace_validator, - image_validator) + image_validator, group_validator) except ValueError: self.handler.log.debug( "Stale schedule key %s in pool %s", @@ -532,10 +584,14 @@ class Schedules: def find(self, pool_id: str, namespace: str, - image_id: Optional[str] = None) -> Optional['Schedule']: + image_id: Optional[str] = None, + group_id: Optional[str] = None) -> Optional['Schedule']: levels = [pool_id, namespace] if image_id: levels.append(image_id) + elif group_id: + levels.append(group_id) + nr_levels = len(levels) while nr_levels >= 0: # an empty spec id implies global schedule -- 2.39.5