]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/rbd_support: add scheduler for mirror group snapshots
authorRamana Raja <rraja@redhat.com>
Sun, 20 Oct 2024 15:54:55 +0000 (11:54 -0400)
committerPrasanna Kumar Kalever <prasanna.kalever@redhat.com>
Thu, 24 Apr 2025 15:56:27 +0000 (21:26 +0530)
Signed-off-by: Ramana Raja <rraja@redhat.com>
src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py [new file with mode: 0644]
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/schedule.py

diff --git a/src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py
new file mode 100644 (file)
index 0000000..8114b72
--- /dev/null
@@ -0,0 +1,352 @@
+import errno
+import json
+import rados
+import rbd
+import traceback
+
+from datetime import datetime
+from threading import Condition, Lock, Thread
+from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
+
+from .common import get_rbd_pools
+from .schedule import LevelSpec, Schedules
+
+
+def namespace_validator(ioctx: rados.Ioctx) -> None:
+    mode = rbd.RBD().mirror_mode_get(ioctx)
+    if mode != rbd.RBD_MIRROR_MODE_IMAGE:
+        raise ValueError("namespace {} is not in mirror image mode".format(
+            ioctx.get_namespace()))
+
+def group_validator(group: rbd.Group) -> None:
+    try:
+        info = group.mirror_group_get_info()
+    except rbd.ObjectNotFound:
+        raise rbd.InvalidArgument("Error getting mirror group info")
+    if info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+        raise rbd.InvalidArgument("Invalid mirror group mode")
+
+class GroupSpec(NamedTuple):
+    pool_id: str
+    namespace: str
+    group_id: str
+
+
+class MirrorGroupSnapshotScheduleHandler:
+    MODULE_OPTION_NAME = "mirror_group_snapshot_schedule"
+    SCHEDULE_OID = "rbd_mirror_group_snapshot_schedule"
+    REFRESH_DELAY_SECONDS = 60.0
+
+    def __init__(self, module: Any) -> None:
+        self.lock = Lock()
+        self.condition = Condition(self.lock)
+        self.module = module
+        self.log = module.log
+        self.last_refresh_groups = datetime(1970, 1, 1)
+        # self.create_snapshot_requests = CreateSnapshotRequests(self)
+
+        self.stop_thread = False
+        self.thread = Thread(target=self.run)
+
+    def setup(self) -> None:
+        self.init_schedule_queue()
+        self.thread.start()
+
+    def shutdown(self) -> None:
+        self.log.info("MirrorGroupSnapshotScheduleHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("MirrorGroupSnapshotScheduleHandler: joining thread")
+            self.thread.join()
+        # self.create_snapshot_requests.wait_for_pending()
+        self.log.info("MirrorGroupSnapshotScheduleHandler: shut down")
+
+    def run(self) -> None:
+        try:
+            self.log.info("MirrorGroupSnapshotScheduleHandler: starting")
+            while not self.stop_thread:
+                refresh_delay = self.refresh_groups()
+                with self.lock:
+                    (group_spec, wait_time) = self.dequeue()
+                    if not group_spec:
+                        self.condition.wait(min(wait_time, refresh_delay))
+                        continue
+                pool_id, namespace, group_id = group_spec
+                # self.create_snapshot_requests.add(pool_id, namespace, group_id)
+                self.create_group_snapshot(pool_id, namespace, group_id)
+                with self.lock:
+                    self.enqueue(datetime.now(), pool_id, namespace, group_id)
+
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("MirrorGroupSnapshotScheduleHandler: client blocklisted")
+            self.module.client_blocklisted.set()
+        except Exception as ex:
+            self.log.fatal("Fatal runtime error: {}\n{}".format(
+                ex, traceback.format_exc()))
+
+    def create_group_snapshot(self, pool_id, namespace, group_id):
+        try:
+            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+                ioctx.set_namespace(namespace)
+                group_name = rbd.RBD().group_get_name(ioctx, group_id)
+                if group_name is None:
+                    return
+                group = rbd.Group(ioctx, group_name)
+                mirror_info = group.mirror_group_get_info()
+                if mirror_info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+                    return
+                if mirror_info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED or \
+                   not mirror_info['primary']:
+                    return
+                snap_id = group.mirror_group_create_snapshot()
+                self.log.debug(
+                    "create_group_snapshot: {}/{}/{}: snap_id={}".format(
+                        ioctx.get_pool_name(), namespace, group_name,
+                        snap_id))
+        except Exception as e:
+            self.log.error(
+                "exception when creating group snapshot for {}/{}/{}: {}".format(
+                    pool_id, namespace, group_id, e))
+
+    def init_schedule_queue(self) -> None:
+        # schedule_time => group_spec
+        self.queue: Dict[str, List[GroupSpec]] = {}
+        # pool_id => {namespace => {group_id => group_name}}
+        self.groups: Dict[str, Dict[str, Dict[str, str]]] = {}
+        self.schedules = Schedules(self)
+        self.refresh_groups()
+        self.log.debug("MirrorGroupSnapshotScheduleHandler: queue is initialized")
+
+    def load_schedules(self) -> None:
+        self.log.info("MirrorGroupSnapshotScheduleHandler: load_schedules")
+        self.schedules.load(namespace_validator, group_validator=group_validator)
+
+    def refresh_groups(self) -> float:
+        elapsed = (datetime.now() - self.last_refresh_groups).total_seconds()
+        if elapsed < self.REFRESH_DELAY_SECONDS:
+            return self.REFRESH_DELAY_SECONDS - elapsed
+
+        self.log.debug("MirrorGroupSnapshotScheduleHandler: refresh_groups")
+
+        with self.lock:
+            self.load_schedules()
+            if not self.schedules:
+                self.log.debug("MirrorGroupSnapshotScheduleHandler: no schedules")
+                self.groups = {}
+                self.queue = {}
+                self.last_refresh_groups = datetime.now()
+                return self.REFRESH_DELAY_SECONDS
+
+        groups: Dict[str, Dict[str, Dict[str, str]]] = {}
+
+        for pool_id, pool_name in get_rbd_pools(self.module).items():
+            if not self.schedules.intersects(
+                    LevelSpec.from_pool_spec(pool_id, pool_name)):
+                continue
+            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+                self.load_pool_groups(ioctx, groups)
+
+        with self.lock:
+            self.refresh_queue(groups)
+            self.groups = groups
+
+        self.last_refresh_groups = datetime.now()
+        return self.REFRESH_DELAY_SECONDS
+
+    def load_pool_groups(self,
+                         ioctx: rados.Ioctx,
+                         groups: Dict[str, Dict[str, Dict[str, str]]]) -> None:
+        pool_id = str(ioctx.get_pool_id())
+        pool_name = ioctx.get_pool_name()
+        groups[pool_id] = {}
+
+        self.log.debug("load_pool_groups: pool={}".format(pool_name))
+
+        try:
+            namespaces = [''] + rbd.RBD().namespace_list(ioctx)
+            for namespace in namespaces:
+                if not self.schedules.intersects(
+                        LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)):
+                    continue
+                self.log.debug("load_pool_groups: pool={}, namespace={}".format(
+                    pool_name, namespace))
+                groups[pool_id][namespace] = {}
+                ioctx.set_namespace(namespace)
+                mirror_groups = dict(rbd.RBD().mirror_group_info_list(
+                    ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
+                if not mirror_groups:
+                    continue
+                group_names = dict(
+                    [(x['id'], x['name']) for x in filter(
+                        lambda x: x['id'] in mirror_groups,
+                        rbd.RBD().group_list2(ioctx))])
+                for group_id, info in mirror_groups.items():
+                    if not info['primary']:
+                        continue
+                    group_name = group_names.get(group_id)
+                    if not group_name:
+                        continue
+                    if namespace:
+                        name = "{}/{}/{}".format(pool_name, namespace,
+                                                 group_name)
+                    else:
+                        name = "{}/{}".format(pool_name, group_name)
+                    self.log.debug(
+                        "load_pool_groups: adding group {}".format(name))
+                    groups[pool_id][namespace][group_id] = name
+        except rbd.ConnectionShutdown:
+            raise
+        except Exception as e:
+            self.log.error(
+                "load_pool_groups: exception when scanning pool {}: {}".format(
+                    pool_name, e))
+
+    def rebuild_queue(self) -> None:
+        now = datetime.now()
+
+        # don't remove from queue "due" groups
+        now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+
+        for schedule_time in list(self.queue):
+            if schedule_time > now_string:
+                del self.queue[schedule_time]
+
+        if not self.schedules:
+            return
+
+        for pool_id in self.groups:
+            for namespace in self.groups[pool_id]:
+                for group_id in self.groups[pool_id][namespace]:
+                    self.enqueue(now, pool_id, namespace, group_id)
+
+        self.condition.notify()
+
+    def refresh_queue(self,
+                      current_groups: Dict[str, Dict[str, Dict[str, str]]]) -> None:
+        now = datetime.now()
+
+        for pool_id in self.groups:
+            for namespace in self.groups[pool_id]:
+                for group_id in self.groups[pool_id][namespace]:
+                    if pool_id not in current_groups or \
+                       namespace not in current_groups[pool_id] or \
+                       group_id not in current_groups[pool_id][namespace]:
+                        self.remove_from_queue(pool_id, namespace, group_id)
+
+        for pool_id in current_groups:
+            for namespace in current_groups[pool_id]:
+                for group_id in current_groups[pool_id][namespace]:
+                    if pool_id not in self.groups or \
+                       namespace not in self.groups[pool_id] or \
+                       group_id not in self.groups[pool_id][namespace]:
+                        self.enqueue(now, pool_id, namespace, group_id)
+
+        self.condition.notify()
+
+    def enqueue(self, now: datetime, pool_id: str, namespace: str, group_id: str) -> None:
+        schedule = self.schedules.find(pool_id, namespace, group_id)
+        if not schedule:
+            self.log.debug(
+                "MirrorGroupSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
+                    pool_id, namespace, group_id))
+            return
+
+        schedule_time = schedule.next_run(now)
+        if schedule_time not in self.queue:
+            self.queue[schedule_time] = []
+        self.log.debug(
+            "MirrorGroupSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
+                pool_id, namespace, group_id, schedule_time))
+        group_spec = GroupSpec(pool_id, namespace, group_id)
+        if group_spec not in self.queue[schedule_time]:
+            self.queue[schedule_time].append(group_spec)
+
+    def dequeue(self) -> Tuple[Optional[GroupSpec], float]:
+        if not self.queue:
+            return None, 1000.0
+
+        now = datetime.now()
+        schedule_time = sorted(self.queue)[0]
+
+        if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
+            wait_time = (datetime.strptime(schedule_time,
+                                           "%Y-%m-%d %H:%M:%S") - now)
+            return None, wait_time.total_seconds()
+
+        groups = self.queue[schedule_time]
+        group = groups.pop(0)
+        if not groups:
+            del self.queue[schedule_time]
+        return group, 0.0
+
+    def remove_from_queue(self, pool_id: str, namespace: str, group_id: str) -> None:
+        self.log.debug(
+            "MirrorGroupSnapshotScheduleHandler: descheduling {}/{}/{}".format(
+                pool_id, namespace, group_id))
+
+        empty_slots = []
+        group_spec = GroupSpec(pool_id, namespace, group_id)
+        for schedule_time, groups in self.queue.items():
+            if group_spec in groups:
+                groups.remove(group_spec)
+                if not groups:
+                    empty_slots.append(schedule_time)
+        for schedule_time in empty_slots:
+            del self.queue[schedule_time]
+
+    def add_schedule(self,
+                     level_spec: LevelSpec,
+                     interval: str,
+                     start_time: Optional[str]) -> Tuple[int, str, str]:
+        self.log.debug(
+            "MirrorGroupSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
+                level_spec.name, interval, start_time))
+
+        # TODO: optimize to rebuild only affected part of the queue
+        with self.lock:
+            self.schedules.add(level_spec, interval, start_time)
+            self.rebuild_queue()
+        return 0, "", ""
+
+    def remove_schedule(self,
+                        level_spec: LevelSpec,
+                        interval: Optional[str],
+                        start_time: Optional[str]) -> Tuple[int, str, str]:
+        self.log.debug(
+            "MirrorGroupSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
+                level_spec.name, interval, start_time))
+
+        # TODO: optimize to rebuild only affected part of the queue
+        with self.lock:
+            self.schedules.remove(level_spec, interval, start_time)
+            self.rebuild_queue()
+        return 0, "", ""
+
+    def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
+        self.log.debug(
+            "MirrorGroupSnapshotScheduleHandler: list: level_spec={}".format(
+                level_spec.name))
+
+        with self.lock:
+            result = self.schedules.to_list(level_spec)
+
+        return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+    def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
+        self.log.debug(
+            "MirrorGroupSnapshotScheduleHandler: status: level_spec={}".format(
+                level_spec.name))
+
+        scheduled_groups = []
+        with self.lock:
+            for schedule_time in sorted(self.queue):
+                for pool_id, namespace, group_id in self.queue[schedule_time]:
+                    if not level_spec.matches(pool_id, namespace, group_id=group_id):
+                        continue
+                    group_name = self.groups[pool_id][namespace][group_id]
+                    scheduled_groups.append({
+                        'schedule_time': schedule_time,
+                        'group': group_name
+                    })
+        return 0, json.dumps({'scheduled_groups': scheduled_groups},
+                             indent=4, sort_keys=True), ""
index 369face038ad2a4d0899821adf1118432d88bd01..0446edf44da63b89b09cd6dd0e93ca6d80270996 100644 (file)
@@ -20,6 +20,8 @@ from .mirror_snapshot_schedule import image_validator, namespace_validator, \
 from .perf import PerfHandler, OSD_PERF_QUERY_COUNTERS
 from .task import TaskHandler
 from .trash_purge_schedule import TrashPurgeScheduleHandler
+from .mirror_group_snapshot_schedule import group_validator, \
+    MirrorGroupSnapshotScheduleHandler
 
 
 class ImageSortBy(enum.Enum):
@@ -79,6 +81,7 @@ class Module(MgrModule):
                type='int',
                default=10),
         Option(name=TrashPurgeScheduleHandler.MODULE_OPTION_NAME),
+        Option(name=MirrorGroupSnapshotScheduleHandler.MODULE_OPTION_NAME),
     ]
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
@@ -91,6 +94,7 @@ class Module(MgrModule):
 
     def init_handlers(self) -> None:
         self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
+        self.mirror_group_snapshot_schedule = MirrorGroupSnapshotScheduleHandler(self)
         self.perf = PerfHandler(self)
         self.task = TaskHandler(self)
         self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
@@ -101,6 +105,7 @@ class Module(MgrModule):
         # implicitly here as 'rados' is a property attribute.
         self.rados.wait_for_latest_osdmap()
         self.mirror_snapshot_schedule.setup()
+        self.mirror_group_snapshot_schedule.setup()
         self.perf.setup()
         self.task.setup()
         self.trash_purge_schedule.setup()
@@ -130,6 +135,7 @@ class Module(MgrModule):
     def shutdown(self) -> None:
         self.module_ready = False
         self.mirror_snapshot_schedule.shutdown()
+        self.mirror_group_snapshot_schedule.shutdown()
         self.trash_purge_schedule.shutdown()
         self.task.shutdown()
         self.perf.shutdown()
@@ -180,6 +186,50 @@ class Module(MgrModule):
         spec = LevelSpec.from_name(self, level_spec, namespace_validator, image_validator)
         return self.mirror_snapshot_schedule.status(spec)
 
+    @CLIWriteCommand('rbd mirror group snapshot schedule add')
+    @with_latest_osdmap
+    def mirror_group_snapshot_schedule_add(self,
+                                           level_spec: str,
+                                           interval: str,
+                                           start_time: Optional[str] = None) -> Tuple[int, str, str]:
+        """
+        Add rbd mirror group snapshot schedule
+        """
+        spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+        return self.mirror_group_snapshot_schedule.add_schedule(spec, interval, start_time)
+
+    @CLIWriteCommand('rbd mirror group snapshot schedule remove')
+    @with_latest_osdmap
+    def mirror_group_snapshot_schedule_remove(self,
+                                              level_spec: str,
+                                              interval: Optional[str] = None,
+                                              start_time: Optional[str] = None) -> Tuple[int, str, str]:
+        """
+        Remove rbd mirror group snapshot schedule
+        """
+        spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+        return self.mirror_group_snapshot_schedule.remove_schedule(spec, interval, start_time)
+
+    @CLIReadCommand('rbd mirror group snapshot schedule list')
+    @with_latest_osdmap
+    def mirror_group_snapshot_schedule_list(self,
+                                            level_spec: str = '') -> Tuple[int, str, str]:
+        """
+        List rbd mirror group snapshot schedules
+        """
+        spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+        return self.mirror_group_snapshot_schedule.list(spec)
+
+    @CLIReadCommand('rbd mirror group snapshot schedule status')
+    @with_latest_osdmap
+    def mirror_group_snapshot_schedule_status(self,
+                                              level_spec: str = '') -> Tuple[int, str, str]:
+        """
+        Show rbd mirror group snapshot schedule status
+        """
+        spec = LevelSpec.from_name(self, level_spec, namespace_validator, group_validator=group_validator)
+        return self.mirror_group_snapshot_schedule.status(spec)
+
     @CLIReadCommand('rbd perf image stats')
     @with_latest_osdmap
     def perf_image_stats(self,
index 4968700ad82aec3dc13684781381f57129823dd7..479323742f231ad738701fbf6a080e98604f8ce7 100644 (file)
@@ -22,12 +22,16 @@ class LevelSpec:
                  id: str,
                  pool_id: Optional[str],
                  namespace: Optional[str],
-                 image_id: Optional[str] = None) -> None:
+                 image_id: Optional[str] = None,
+                 group_id: Optional [str] = None) -> None:
+        if image_id is not None and group_id is not None:
+            raise ValueError("LevelSpec cannot have both image_id and group_id")
         self.name = name
         self.id = id
         self.pool_id = pool_id
         self.namespace = namespace
         self.image_id = image_id
+        self.group_id = group_id
 
     def __eq__(self, level_spec: Any) -> bool:
         return self.id == level_spec.id
@@ -41,8 +45,10 @@ class LevelSpec:
             return self.namespace is not None
         if level_spec.namespace != self.namespace:
             return False
-        if level_spec.image_id is None:
-            return self.image_id is not None
+        if level_spec.image_id is not None or level_spec.group_id is not None:
+            return False
+        if self.image_id is not None or self.group_id is not None:
+            return True
         return False
 
     def is_global(self) -> bool:
@@ -54,13 +60,16 @@ class LevelSpec:
     def matches(self,
                 pool_id: str,
                 namespace: str,
-                image_id: Optional[str] = None) -> bool:
+                image_id: Optional[str] = None,
+                group_id: Optional[str] = None) -> bool:
         if self.pool_id and self.pool_id != pool_id:
             return False
         if self.namespace and self.namespace != namespace:
             return False
         if self.image_id and self.image_id != image_id:
             return False
+        if self.group_id and self.group_id != group_id:
+            return False
         return True
 
     def intersects(self, level_spec: 'LevelSpec') -> bool:
@@ -72,10 +81,12 @@ class LevelSpec:
             return True
         if self.namespace != level_spec.namespace:
             return False
-        if self.image_id is None or level_spec.image_id is None:
+        if (self.image_id is None and self.group_id is None) or (level_spec.image_id is None and level_spec.group_id is None):
             return True
         if self.image_id != level_spec.image_id:
             return False
+        if self.group_id != level_spec.group_id:
+            return False
         return True
 
     @classmethod
@@ -101,7 +112,8 @@ class LevelSpec:
                   name: str,
                   namespace_validator: Optional[Callable] = None,
                   image_validator: Optional[Callable] = None,
-                  allow_image_level: bool = True) -> 'LevelSpec':
+                  allow_image_level: bool = True,
+                  group_validator: Optional[Callable] = None) -> 'LevelSpec':
         # parse names like:
         # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
         match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
@@ -116,7 +128,9 @@ class LevelSpec:
         pool_id = None
         namespace = None
         image_name = None
+        group_name = None
         image_id = None
+        group_id = None
         if match.group(1):
             pool_name = match.group(1)
             try:
@@ -142,37 +156,56 @@ class LevelSpec:
                             if namespace_validator:
                                 namespace_validator(ioctx)
                         if match.group(3):
-                            image_name = match.group(3)
-                            try:
-                                with rbd.Image(ioctx, image_name,
-                                               read_only=True) as image:
-                                    image_id = image.id()
-                                    id += "/" + image_id
-                                    if image_validator:
-                                        image_validator(image)
-                            except rbd.ImageNotFound:
-                                raise ValueError("image {} does not exist".format(
-                                    image_name))
-                            except rbd.InvalidArgument:
-                                raise ValueError(
-                                    "image {} is not in snapshot mirror mode".format(
+                            if group_validator:
+                                group_name = match.group(3)
+                                try:
+                                    group = rbd.Group(ioctx, group_name)
+                                    group_id = group.id()
+                                    id += "/" + group_id
+                                    group_validator(group)
+                                except rbd.ObjectNotFound:
+                                    raise ValueError("group {} does not exist".format(
+                                        group_name))
+                                except rbd.InvalidArgument:
+                                    raise ValueError(
+                                        "group {} is not in snapshot mirror mode".format(
+                                        group_id))
+                            else:
+                                image_name = match.group(3)
+                                try:
+                                    with rbd.Image(ioctx, image_name,
+                                                read_only=True) as image:
+                                        image_id = image.id()
+                                        id += "/" + image_id
+                                        if image_validator:
+                                            image_validator(image)
+                                except rbd.ImageNotFound:
+                                    raise ValueError("image {} does not exist".format(
                                         image_name))
+                                except rbd.InvalidArgument:
+                                    raise ValueError(
+                                        "image {} is not in snapshot mirror mode".format(
+                                            image_name))
 
             except rados.ObjectNotFound:
                 raise ValueError("pool {} does not exist".format(pool_name))
 
-        # normalize possible input name like 'rbd//image'
-        if not namespace and image_name:
-            name = "{}/{}".format(pool_name, image_name)
+        # normalize possible input name like 'rbd//image' or 'rbd//group'
+        if not namespace:
+            if image_name:
+                name = "{}/{}".format(pool_name, image_name)
+            elif group_name:
+                name = "{}/{}".format(pool_name, group_name)
 
-        return LevelSpec(name, id, pool_id, namespace, image_id)
+        return LevelSpec(name, id, pool_id, namespace, image_id, group_id)
 
     @classmethod
     def from_id(cls,
                 handler: Any,
                 id: str,
                 namespace_validator: Optional[Callable] = None,
-                image_validator: Optional[Callable] = None) -> 'LevelSpec':
+                image_validator: Optional[Callable] = None,
+                group_validator: Optional[Callable] = None) -> 'LevelSpec':
         # parse ids like:
         # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
         match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
@@ -183,6 +216,7 @@ class LevelSpec:
         pool_id = None
         namespace = None
         image_id = None
+        group_id = None
         if match.group(1):
             pool_id = match.group(1)
             try:
@@ -206,26 +240,42 @@ class LevelSpec:
                         elif not match.group(3):
                             name += "/"
                         if match.group(3):
-                            image_id = match.group(3)
-                            try:
-                                with rbd.Image(ioctx, image_id=image_id,
+                            if group_validator:
+                                group_id = match.group(3)
+                                try:
+                                    group_name = rbd.RBD().group_get_name(
+                                        ioctx, group_id)
+                                    name += group_name
+                                    group = rbd.Group(ioctx, group_name)
+                                    group_validator(group)
+                                except rbd.ObjectNotFound:
+                                    raise ValueError("group {} does not exist".format(
+                                        group_id))
+                                except rbd.InvalidArgument:
+                                    raise ValueError(
+                                        "group {} is not in snapshot mirror mode".format(
+                                            group_id))
+                            else:
+                                image_id = match.group(3)
+                                try:
+                                    with rbd.Image(ioctx, image_id=image_id,
                                                read_only=True) as image:
-                                    image_name = image.get_name()
-                                    name += image_name
-                                    if image_validator:
-                                        image_validator(image)
-                            except rbd.ImageNotFound:
-                                raise ValueError("image {} does not exist".format(
-                                    image_id))
-                            except rbd.InvalidArgument:
-                                raise ValueError(
-                                    "image {} is not in snapshot mirror mode".format(
-                                        image_id))
-
+                                        image_name = image.get_name()
+                                        name += image_name
+                                        if image_validator:
+                                            image_validator(image)
+                                except rbd.ImageNotFound:
+                                    raise ValueError(
+                                        "image {} does not exist".format(
+                                         image_id))
+                                except rbd.InvalidArgument:
+                                    raise ValueError(
+                                        "image {} is not in snapshot mirror mode".format(
+                                            image_id))
             except rados.ObjectNotFound:
                 raise ValueError("pool {} does not exist".format(pool_id))
 
-        return LevelSpec(name, id, pool_id, namespace, image_id)
+        return LevelSpec(name, id, pool_id, namespace, image_id, group_id)
 
 
 class Interval:
@@ -396,7 +446,8 @@ class Schedules:
 
     def load(self,
              namespace_validator: Optional[Callable] = None,
-             image_validator: Optional[Callable] = None) -> None:
+             image_validator: Optional[Callable] = None,
+             group_validator: Optional[Callable] = None) -> None:
         self.level_specs = {}
         self.schedules = {}
 
@@ -417,7 +468,7 @@ class Schedules:
             try:
                 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
                     self.load_from_pool(ioctx, namespace_validator,
-                                        image_validator)
+                                        image_validator, group_validator)
             except rados.ConnectionShutdown:
                 raise
             except rados.Error as e:
@@ -428,7 +479,8 @@ class Schedules:
     def load_from_pool(self,
                        ioctx: rados.Ioctx,
                        namespace_validator: Optional[Callable],
-                       image_validator: Optional[Callable]) -> None:
+                       image_validator: Optional[Callable],
+                       group_validator: Optional[Callable]) -> None:
         pool_name = ioctx.get_pool_name()
         stale_keys = []
         start_after = ''
@@ -451,7 +503,7 @@ class Schedules:
                             try:
                                 level_spec = LevelSpec.from_id(
                                     self.handler, k, namespace_validator,
-                                    image_validator)
+                                    image_validator, group_validator)
                             except ValueError:
                                 self.handler.log.debug(
                                     "Stale schedule key %s in pool %s",
@@ -532,10 +584,14 @@ class Schedules:
     def find(self,
              pool_id: str,
              namespace: str,
-             image_id: Optional[str] = None) -> Optional['Schedule']:
+             image_id: Optional[str] = None,
+             group_id: Optional[str] = None) -> Optional['Schedule']:
         levels = [pool_id, namespace]
         if image_id:
             levels.append(image_id)
+        elif group_id:
+            levels.append(group_id)
+
         nr_levels = len(levels)
         while nr_levels >= 0:
             # an empty spec id implies global schedule