--- /dev/null
+import errno
+import json
+import rados
+import rbd
+import traceback
+
+from datetime import datetime
+from threading import Condition, Lock, Thread
+from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
+
+from .common import get_rbd_pools
+from .schedule import LevelSpec, Schedules
+
+
+def namespace_validator(ioctx: rados.Ioctx) -> None:
+ mode = rbd.RBD().mirror_mode_get(ioctx)
+ if mode != rbd.RBD_MIRROR_MODE_IMAGE:
+ raise ValueError("namespace {} is not in mirror image mode".format(
+ ioctx.get_namespace()))
+
+def group_validator(group: rbd.Group) -> None:
+ try:
+ info = group.mirror_group_get_info()
+ except rbd.ObjectNotFound:
+ raise rbd.InvalidArgument("Error getting mirror group info")
+ if info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ raise rbd.InvalidArgument("Invalid mirror group mode")
+
+class GroupSpec(NamedTuple):
+ pool_id: str
+ namespace: str
+ group_id: str
+
+
+class MirrorGroupSnapshotScheduleHandler:
+ MODULE_OPTION_NAME = "mirror_group_snapshot_schedule"
+ SCHEDULE_OID = "rbd_mirror_group_snapshot_schedule"
+ REFRESH_DELAY_SECONDS = 60.0
+
+ def __init__(self, module: Any) -> None:
+ self.lock = Lock()
+ self.condition = Condition(self.lock)
+ self.module = module
+ self.log = module.log
+ self.last_refresh_groups = datetime(1970, 1, 1)
+ # self.create_snapshot_requests = CreateSnapshotRequests(self)
+
+ self.stop_thread = False
+ self.thread = Thread(target=self.run)
+
+ def setup(self) -> None:
+ self.init_schedule_queue()
+ self.thread.start()
+
+ def shutdown(self) -> None:
+ self.log.info("MirrorGroupSnapshotScheduleHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("MirrorGroupSnapshotScheduleHandler: joining thread")
+ self.thread.join()
+ # self.create_snapshot_requests.wait_for_pending()
+ self.log.info("MirrorGroupSnapshotScheduleHandler: shut down")
+
+ def run(self) -> None:
+ try:
+ self.log.info("MirrorGroupSnapshotScheduleHandler: starting")
+ while not self.stop_thread:
+ refresh_delay = self.refresh_groups()
+ with self.lock:
+ (group_spec, wait_time) = self.dequeue()
+ if not group_spec:
+ self.condition.wait(min(wait_time, refresh_delay))
+ continue
+ pool_id, namespace, group_id = group_spec
+ # self.create_snapshot_requests.add(pool_id, namespace, group_id)
+ self.create_group_snapshot(pool_id, namespace, group_id)
+ with self.lock:
+ self.enqueue(datetime.now(), pool_id, namespace, group_id)
+
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("MirrorGroupSnapshotScheduleHandler: client blocklisted")
+ self.module.client_blocklisted.set()
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ def create_group_snapshot(self, pool_id, namespace, group_id):
+ try:
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ ioctx.set_namespace(namespace)
+ group_name = rbd.RBD().group_get_name(ioctx, group_id)
+ if group_name is None:
+ return
+ group = rbd.Group(ioctx, group_name)
+ mirror_info = group.mirror_group_get_info()
+ if mirror_info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ return
+ if mirror_info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED or \
+ not mirror_info['primary']:
+ return
+ snap_id = group.mirror_group_create_snapshot()
+ self.log.debug(
+ "create_group_snapshot: {}/{}/{}: snap_id={}".format(
+ ioctx.get_pool_name(), namespace, group_name,
+ snap_id))
+ except Exception as e:
+ self.log.error(
+ "exception when creating group snapshot for {}/{}/{}: {}".format(
+ pool_id, namespace, group_id, e))
+
+ def init_schedule_queue(self) -> None:
+ # schedule_time => group_spec
+ self.queue: Dict[str, List[GroupSpec]] = {}
+ # pool_id => {namespace => {group_id => group_name}}
+ self.groups: Dict[str, Dict[str, Dict[str, str]]] = {}
+ self.schedules = Schedules(self)
+ self.refresh_groups()
+ self.log.debug("MirrorGroupSnapshotScheduleHandler: queue is initialized")
+
+ def load_schedules(self) -> None:
+ self.log.info("MirrorGroupSnapshotScheduleHandler: load_schedules")
+ self.schedules.load(namespace_validator, group_validator=group_validator)
+
+ def refresh_groups(self) -> float:
+ elapsed = (datetime.now() - self.last_refresh_groups).total_seconds()
+ if elapsed < self.REFRESH_DELAY_SECONDS:
+ return self.REFRESH_DELAY_SECONDS - elapsed
+
+ self.log.debug("MirrorGroupSnapshotScheduleHandler: refresh_groups")
+
+ with self.lock:
+ self.load_schedules()
+ if not self.schedules:
+ self.log.debug("MirrorGroupSnapshotScheduleHandler: no schedules")
+ self.groups = {}
+ self.queue = {}
+ self.last_refresh_groups = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ groups: Dict[str, Dict[str, Dict[str, str]]] = {}
+
+ for pool_id, pool_name in get_rbd_pools(self.module).items():
+ if not self.schedules.intersects(
+ LevelSpec.from_pool_spec(pool_id, pool_name)):
+ continue
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_pool_groups(ioctx, groups)
+
+ with self.lock:
+ self.refresh_queue(groups)
+ self.groups = groups
+
+ self.last_refresh_groups = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
+
+ def load_pool_groups(self,
+ ioctx: rados.Ioctx,
+ groups: Dict[str, Dict[str, Dict[str, str]]]) -> None:
+ pool_id = str(ioctx.get_pool_id())
+ pool_name = ioctx.get_pool_name()
+ groups[pool_id] = {}
+
+ self.log.debug("load_pool_groups: pool={}".format(pool_name))
+
+ try:
+ namespaces = [''] + rbd.RBD().namespace_list(ioctx)
+ for namespace in namespaces:
+ if not self.schedules.intersects(
+ LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)):
+ continue
+ self.log.debug("load_pool_groups: pool={}, namespace={}".format(
+ pool_name, namespace))
+ groups[pool_id][namespace] = {}
+ ioctx.set_namespace(namespace)
+ mirror_groups = dict(rbd.RBD().mirror_group_info_list(
+ ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
+ if not mirror_groups:
+ continue
+ group_names = dict(
+ [(x['id'], x['name']) for x in filter(
+ lambda x: x['id'] in mirror_groups,
+ rbd.RBD().group_list2(ioctx))])
+ for group_id, info in mirror_groups.items():
+ if not info['primary']:
+ continue
+ group_name = group_names.get(group_id)
+ if not group_name:
+ continue
+ if namespace:
+ name = "{}/{}/{}".format(pool_name, namespace,
+ group_name)
+ else:
+ name = "{}/{}".format(pool_name, group_name)
+ self.log.debug(
+ "load_pool_groups: adding group {}".format(name))
+ groups[pool_id][namespace][group_id] = name
+ except rbd.ConnectionShutdown:
+ raise
+ except Exception as e:
+ self.log.error(
+ "load_pool_groups: exception when scanning pool {}: {}".format(
+ pool_name, e))
+
+ def rebuild_queue(self) -> None:
+ now = datetime.now()
+
+ # don't remove from queue "due" groups
+ now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+
+ for schedule_time in list(self.queue):
+ if schedule_time > now_string:
+ del self.queue[schedule_time]
+
+ if not self.schedules:
+ return
+
+ for pool_id in self.groups:
+ for namespace in self.groups[pool_id]:
+ for group_id in self.groups[pool_id][namespace]:
+ self.enqueue(now, pool_id, namespace, group_id)
+
+ self.condition.notify()
+
+ def refresh_queue(self,
+ current_groups: Dict[str, Dict[str, Dict[str, str]]]) -> None:
+ now = datetime.now()
+
+ for pool_id in self.groups:
+ for namespace in self.groups[pool_id]:
+ for group_id in self.groups[pool_id][namespace]:
+ if pool_id not in current_groups or \
+ namespace not in current_groups[pool_id] or \
+ group_id not in current_groups[pool_id][namespace]:
+ self.remove_from_queue(pool_id, namespace, group_id)
+
+ for pool_id in current_groups:
+ for namespace in current_groups[pool_id]:
+ for group_id in current_groups[pool_id][namespace]:
+ if pool_id not in self.groups or \
+ namespace not in self.groups[pool_id] or \
+ group_id not in self.groups[pool_id][namespace]:
+ self.enqueue(now, pool_id, namespace, group_id)
+
+ self.condition.notify()
+
+ def enqueue(self, now: datetime, pool_id: str, namespace: str, group_id: str) -> None:
+ schedule = self.schedules.find(pool_id, namespace, group_id)
+ if not schedule:
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
+ pool_id, namespace, group_id))
+ return
+
+ schedule_time = schedule.next_run(now)
+ if schedule_time not in self.queue:
+ self.queue[schedule_time] = []
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
+ pool_id, namespace, group_id, schedule_time))
+ group_spec = GroupSpec(pool_id, namespace, group_id)
+ if group_spec not in self.queue[schedule_time]:
+ self.queue[schedule_time].append(group_spec)
+
+ def dequeue(self) -> Tuple[Optional[GroupSpec], float]:
+ if not self.queue:
+ return None, 1000.0
+
+ now = datetime.now()
+ schedule_time = sorted(self.queue)[0]
+
+ if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
+ wait_time = (datetime.strptime(schedule_time,
+ "%Y-%m-%d %H:%M:%S") - now)
+ return None, wait_time.total_seconds()
+
+ groups = self.queue[schedule_time]
+ group = groups.pop(0)
+ if not groups:
+ del self.queue[schedule_time]
+ return group, 0.0
+
+ def remove_from_queue(self, pool_id: str, namespace: str, group_id: str) -> None:
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: descheduling {}/{}/{}".format(
+ pool_id, namespace, group_id))
+
+ empty_slots = []
+ group_spec = GroupSpec(pool_id, namespace, group_id)
+ for schedule_time, groups in self.queue.items():
+ if group_spec in groups:
+ groups.remove(group_spec)
+ if not groups:
+ empty_slots.append(schedule_time)
+ for schedule_time in empty_slots:
+ del self.queue[schedule_time]
+
+ def add_schedule(self,
+ level_spec: LevelSpec,
+ interval: str,
+ start_time: Optional[str]) -> Tuple[int, str, str]:
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.add(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def remove_schedule(self,
+ level_spec: LevelSpec,
+ interval: Optional[str],
+ start_time: Optional[str]) -> Tuple[int, str, str]:
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
+
+ # TODO: optimize to rebuild only affected part of the queue
+ with self.lock:
+ self.schedules.remove(level_spec, interval, start_time)
+ self.rebuild_queue()
+ return 0, "", ""
+
+ def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: list: level_spec={}".format(
+ level_spec.name))
+
+ with self.lock:
+ result = self.schedules.to_list(level_spec)
+
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+ def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
+ self.log.debug(
+ "MirrorGroupSnapshotScheduleHandler: status: level_spec={}".format(
+ level_spec.name))
+
+ scheduled_groups = []
+ with self.lock:
+ for schedule_time in sorted(self.queue):
+ for pool_id, namespace, group_id in self.queue[schedule_time]:
+ if not level_spec.matches(pool_id, namespace, group_id=group_id):
+ continue
+ group_name = self.groups[pool_id][namespace][group_id]
+ scheduled_groups.append({
+ 'schedule_time': schedule_time,
+ 'group': group_name
+ })
+ return 0, json.dumps({'scheduled_groups': scheduled_groups},
+ indent=4, sort_keys=True), ""
from .perf import PerfHandler, OSD_PERF_QUERY_COUNTERS
from .task import TaskHandler
from .trash_purge_schedule import TrashPurgeScheduleHandler
+from .mirror_group_snapshot_schedule import group_validator, \
+ MirrorGroupSnapshotScheduleHandler
class ImageSortBy(enum.Enum):
type='int',
default=10),
Option(name=TrashPurgeScheduleHandler.MODULE_OPTION_NAME),
+ Option(name=MirrorGroupSnapshotScheduleHandler.MODULE_OPTION_NAME),
]
def __init__(self, *args: Any, **kwargs: Any) -> None:
def init_handlers(self) -> None:
self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
+ self.mirror_group_snapshot_schedule = MirrorGroupSnapshotScheduleHandler(self)
self.perf = PerfHandler(self)
self.task = TaskHandler(self)
self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
# implicitly here as 'rados' is a property attribute.
self.rados.wait_for_latest_osdmap()
self.mirror_snapshot_schedule.setup()
+ self.mirror_group_snapshot_schedule.setup()
self.perf.setup()
self.task.setup()
self.trash_purge_schedule.setup()
def shutdown(self) -> None:
self.module_ready = False
self.mirror_snapshot_schedule.shutdown()
+ self.mirror_group_snapshot_schedule.shutdown()
self.trash_purge_schedule.shutdown()
self.task.shutdown()
self.perf.shutdown()
spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator)
return self.mirror_snapshot_schedule.status(spec)
+ @CLIWriteCommand('rbd mirror group snapshot schedule add')
+ @with_latest_osdmap
+ def mirror_group_snapshot_schedule_add(self,
+ level_spec: str,
+ interval: str,
+ start_time: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Add rbd mirror group snapshot schedule
+ """
+ spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+ return self.mirror_group_snapshot_schedule.add_schedule(spec, interval, start_time)
+
+ @CLIWriteCommand('rbd mirror group snapshot schedule remove')
+ @with_latest_osdmap
+ def mirror_group_snapshot_schedule_remove(self,
+ level_spec: str,
+ interval: Optional[str] = None,
+ start_time: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Remove rbd mirror group snapshot schedule
+ """
+ spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+ return self.mirror_group_snapshot_schedule.remove_schedule(spec, interval, start_time)
+
+ @CLIReadCommand('rbd mirror group snapshot schedule list')
+ @with_latest_osdmap
+ def mirror_group_snapshot_schedule_list(self,
+ level_spec: str = '') -> Tuple[int, str, str]:
+ """
+ List rbd mirror group snapshot schedules
+ """
+ spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+ return self.mirror_group_snapshot_schedule.list(spec)
+
+ @CLIReadCommand('rbd mirror group snapshot schedule status')
+ @with_latest_osdmap
+ def mirror_group_snapshot_schedule_status(self,
+ level_spec: str = '') -> Tuple[int, str, str]:
+ """
+ Show rbd mirror group snapshot schedule status
+ """
+ spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+ return self.mirror_group_snapshot_schedule.status(spec)
+
@CLIReadCommand('rbd perf image stats')
@with_latest_osdmap
def perf_image_stats(self,
id: str,
pool_id: Optional[str],
namespace: Optional[str],
- image_id: Optional[str] = None) -> None:
+ image_id: Optional[str] = None,
+ group_id: Optional [str] = None) -> None:
+ if image_id is not None and group_id is not None:
+ raise ValueError("LevelSpec cannot have both image_id and group_id")
self.name = name
self.id = id
self.pool_id = pool_id
self.namespace = namespace
self.image_id = image_id
+ self.group_id = group_id
def __eq__(self, level_spec: Any) -> bool:
return self.id == level_spec.id
return self.namespace is not None
if level_spec.namespace != self.namespace:
return False
- if level_spec.image_id is None:
- return self.image_id is not None
+ if level_spec.image_id is not None or level_spec.group_id is not None:
+ return False
+ if self.image_id is not None or self.group_id is not None:
+ return True
return False
def is_global(self) -> bool:
def matches(self,
pool_id: str,
namespace: str,
- image_id: Optional[str] = None) -> bool:
+ image_id: Optional[str] = None,
+ group_id: Optional[str] = None) -> bool:
if self.pool_id and self.pool_id != pool_id:
return False
if self.namespace and self.namespace != namespace:
return False
if self.image_id and self.image_id != image_id:
return False
+ if self.group_id and self.group_id != group_id:
+ return False
return True
def intersects(self, level_spec: 'LevelSpec') -> bool:
return True
if self.namespace != level_spec.namespace:
return False
- if self.image_id is None or level_spec.image_id is None:
+ if (self.image_id is None and self.group_id is None) or (level_spec.image_id is None and level_spec.group_id is None):
return True
if self.image_id != level_spec.image_id:
return False
+ if self.group_id != level_spec.group_id:
+ return False
return True
@classmethod
name: str,
namespace_validator: Optional[Callable] = None,
image_validator: Optional[Callable] = None,
- allow_image_level: bool = True) -> 'LevelSpec':
+ allow_image_level: bool = True,
+ group_validator: Optional[Callable] = None) -> 'LevelSpec':
# parse names like:
# '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
pool_id = None
namespace = None
image_name = None
+ group_name = None
image_id = None
+ group_id = None
if match.group(1):
pool_name = match.group(1)
try:
if namespace_validator:
namespace_validator(ioctx)
if match.group(3):
- image_name = match.group(3)
- try:
- with rbd.Image(ioctx, image_name,
- read_only=True) as image:
- image_id = image.id()
- id += "/" + image_id
- if image_validator:
- image_validator(image)
- except rbd.ImageNotFound:
- raise ValueError("image {} does not exist".format(
- image_name))
- except rbd.InvalidArgument:
- raise ValueError(
- "image {} is not in snapshot mirror mode".format(
+ if group_validator:
+ group_name = match.group(3)
+ try:
+ group = rbd.Group(ioctx, group_name)
+ group_id = group.id()
+ id += "/" + group_id
+ group_validator(group)
+ except rbd.ObjectNotFound:
+ raise ValueError("group {} does not exist".format(
+ group_name))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "group {} is not in snapshot mirror mode".format(
+ group_id))
+ else:
+ image_name = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_name,
+ read_only=True) as image:
+ image_id = image.id()
+ id += "/" + image_id
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
image_name))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_name))
except rados.ObjectNotFound:
raise ValueError("pool {} does not exist".format(pool_name))
- # normalize possible input name like 'rbd//image'
- if not namespace and image_name:
- name = "{}/{}".format(pool_name, image_name)
+ # normalize possible input name like 'rbd//image' or 'rbd//group'
+ if not namespace:
+ if image_name:
+ name = "{}/{}".format(pool_name, image_name)
+ elif group_name:
+ name = "{}/{}".format(pool_name, group_name)
- return LevelSpec(name, id, pool_id, namespace, image_id)
+ return LevelSpec(name, id, pool_id, namespace, image_id, group_id)
@classmethod
def from_id(cls,
handler: Any,
id: str,
namespace_validator: Optional[Callable] = None,
- image_validator: Optional[Callable] = None) -> 'LevelSpec':
+ image_validator: Optional[Callable] = None,
+ group_validator: Optional[Callable] = None) -> 'LevelSpec':
# parse ids like:
# '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
pool_id = None
namespace = None
image_id = None
+ group_id = None
if match.group(1):
pool_id = match.group(1)
try:
elif not match.group(3):
name += "/"
if match.group(3):
- image_id = match.group(3)
- try:
- with rbd.Image(ioctx, image_id=image_id,
+ if group_validator:
+ group_id = match.group(3)
+ try:
+ group_name = rbd.RBD().group_get_name(
+ ioctx, group_id)
+ name += group_name
+ group = rbd.Group(ioctx, group_name)
+ group_validator(group)
+ except rbd.ObjectNotFound:
+ raise ValueError("group {} does not exist".format(
+ group_id))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "group {} is not in snapshot mirror mode".format(
+ group_id))
+ else:
+ image_id = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_id=image_id,
read_only=True) as image:
- image_name = image.get_name()
- name += image_name
- if image_validator:
- image_validator(image)
- except rbd.ImageNotFound:
- raise ValueError("image {} does not exist".format(
- image_id))
- except rbd.InvalidArgument:
- raise ValueError(
- "image {} is not in snapshot mirror mode".format(
- image_id))
-
+ image_name = image.get_name()
+ name += image_name
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError(
+ "image {} does not exist".format(
+ image_id))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_id))
except rados.ObjectNotFound:
raise ValueError("pool {} does not exist".format(pool_id))
- return LevelSpec(name, id, pool_id, namespace, image_id)
+ return LevelSpec(name, id, pool_id, namespace, image_id, group_id)
class Interval:
def load(self,
namespace_validator: Optional[Callable] = None,
- image_validator: Optional[Callable] = None) -> None:
+ image_validator: Optional[Callable] = None,
+ group_validator: Optional[Callable] = None) -> None:
self.level_specs = {}
self.schedules = {}
try:
with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
self.load_from_pool(ioctx, namespace_validator,
- image_validator)
+ image_validator, group_validator)
except rados.ConnectionShutdown:
raise
except rados.Error as e:
def load_from_pool(self,
ioctx: rados.Ioctx,
namespace_validator: Optional[Callable],
- image_validator: Optional[Callable]) -> None:
+ image_validator: Optional[Callable],
+ group_validator: Optional[Callable]) -> None:
pool_name = ioctx.get_pool_name()
stale_keys = []
start_after = ''
try:
level_spec = LevelSpec.from_id(
self.handler, k, namespace_validator,
- image_validator)
+ image_validator, group_validator)
except ValueError:
self.handler.log.debug(
"Stale schedule key %s in pool %s",
def find(self,
pool_id: str,
namespace: str,
- image_id: Optional[str] = None) -> Optional['Schedule']:
+ image_id: Optional[str] = None,
+ group_id: Optional[str] = None) -> Optional['Schedule']:
levels = [pool_id, namespace]
if image_id:
levels.append(image_id)
+ elif group_id:
+ levels.append(group_id)
+
nr_levels = len(levels)
while nr_levels >= 0:
# an empty spec id implies global schedule