]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: create mirror gp snap asynchronously
authorRamana Raja <rraja@redhat.com>
Fri, 24 Jan 2025 20:04:05 +0000 (15:04 -0500)
committerPrasanna Kumar Kalever <prasanna.kalever@redhat.com>
Wed, 30 Jul 2025 17:06:02 +0000 (22:36 +0530)
... in the mirror group snapshot scheduler.

Signed-off-by: Ramana Raja <rraja@redhat.com>
src/pybind/mgr/rbd_support/mirror_group_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py

index 8114b7211d6257f7767fbe485c4191bffef78406..e2bfe8c61a82fd1403166fe95aa7e8432f4fc877 100644 (file)
@@ -32,8 +32,228 @@ class GroupSpec(NamedTuple):
     group_id: str
 
 
+class GroupCreateSnapshotRequests:
+
+    def __init__(self, handler: Any) -> None:
+        self.lock = Lock()
+        self.condition = Condition(self.lock)
+        self.handler = handler
+        self.rados = handler.module.rados
+        self.log = handler.log
+        self.pending: Set[GroupSpec] = set()
+        self.queue: List[GroupSpec] = []
+        self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[GroupSpec]]] = {}
+
+    def wait_for_pending(self) -> None:
+        with self.lock:
+            while self.pending:
+                self.log.debug(
+                    "GroupCreateSnapshotRequests.wait_for_pending: "
+                    "{} groups".format(len(self.pending)))
+                self.condition.wait()
+        self.log.debug("GroupCreateSnapshotRequests.wait_for_pending: done")
+
+    def add(self, pool_id: str, namespace: str, group_id: str) -> None:
+        group_spec = GroupSpec(pool_id, namespace, group_id)
+
+        self.log.debug("GroupCreateSnapshotRequests.add: {}/{}/{}".format(
+            pool_id, namespace, group_id))
+
+        max_concurrent = self.handler.module.get_localized_module_option(
+            self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_GROUP_SNAP_CREATE)
+
+        self.log.debug("GroupCreateSnapshotRequests.add: {}/{}/{} "
+                       "max concurrent snap create {}".format(
+                            pool_id, namespace, group_id, max_concurrent))
+
+        with self.lock:
+            if group_spec in self.pending:
+                self.log.info(
+                    "GroupCreateSnapshotRequests.add: {}/{}/{}: {}".format(
+                        pool_id, namespace, group_id,
+                        "previous request is still in progress"))
+                return
+            self.pending.add(group_spec)
+
+            if len(self.pending) > max_concurrent:
+                self.queue.append(group_spec)
+                return
+
+        self.open_group(group_spec)
+
+    def open_group(self, group_spec: GroupSpec) -> None:
+        pool_id, namespace, group_id = group_spec
+
+        self.log.debug(
+            "GroupCreateSnapshotRequests.open_group for {}/{}/{}".format(
+                pool_id, namespace, group_id))
+
+        try:
+            ioctx = self.get_ioctx(group_spec)
+            group_name = rbd.RBD().group_get_name(ioctx, group_id)
+            group = rbd.Group(ioctx, group_name)
+        except Exception as e:
+            self.log.error(
+                "exception when opening group {}/{}/{}: {}".format(
+                    pool_id, namespace, group_id, e))
+            self.finish(group_spec)
+            return
+
+        self.get_mirror_info(group_spec, group)
+
+    def get_mirror_info(self, group_spec: GroupSpec, group: rbd.Group) -> None:
+        pool_id, namespace, group_id = group_spec
+
+        self.log.debug(
+            "GroupCreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
+                pool_id, namespace, group_id))
+
+        def cb(comp: rados.Completion,
+               info: Optional[Dict[str, Union[str, int]]]) -> None:
+            self.handle_get_mirror_info(group_spec, group, comp, info)
+
+        try:
+            group.aio_mirror_group_get_info(cb)
+        except Exception as e:
+            self.log.error(
+                "exception when getting mirror group info for "
+                "{}/{}/{}: {}".format(pool_id, namespace, group_id, e))
+            self.finish(group_spec)
+
+    def handle_get_mirror_info(self,
+                               group_spec: GroupSpec,
+                               group: rbd.Group,
+                               comp: rados.Completion,
+                               info: Optional[Dict[str, Union[str, int]]]) -> None:
+        pool_id, namespace, group_id = group_spec
+
+        self.log.debug(
+            "GroupCreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: "
+            "r={} info={}".format(pool_id, namespace, group_id,
+                                  comp.get_return_value(), info))
+
+        if info is None:
+            if comp.get_return_value() != -errno.ENOENT:
+                self.log.error(
+                    "error when getting mirror group info for "
+                    "{}/{}/{}: {}".format(
+                        pool_id, namespace, group_id, comp.get_return_value()))
+            self.finish(group_spec)
+            return
+
+        if info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED:
+            self.log.debug(
+                "GroupCreateSnapshotRequests.handle_get_mirror_info: "
+                "{}/{}/{}: {}".format(
+                    pool_id, namespace, group_id,
+                    "is not enabled for mirroring"))
+            self.finish(group_spec)
+            return
+
+        if info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+            self.log.debug(
+                "GroupCreateSnapshotRequests.handle_get_mirror_info: "
+                "{}/{}/{}: {}".format(
+                    pool_id, namespace, group_id,
+                    "is not in snapshot-based mirroring mode"))
+            self.finish(group_spec)
+            return
+
+        if not info['primary'] :
+            self.log.debug(
+                "GroupCreateSnapshotRequests.handle_get_mirror_info: "
+                "{}/{}/{}: {}".format(
+                    pool_id, namespace, group_id,
+                    "is not primary"))
+            self.finish(group_spec)
+            return
+
+        self.create_snapshot(group_spec, group)
+
+    def create_snapshot(self, group_spec: GroupSpec, group: rbd.Group) -> None:
+        pool_id, namespace, group_id = group_spec
+
+        self.log.debug(
+            "GroupCreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
+                pool_id, namespace, group_id))
+
+        def cb(comp: rados.Completion, snap_id: Optional[str]) -> None:
+            self.handle_create_snapshot(group_spec, comp, snap_id)
+
+        try:
+            group.aio_mirror_group_create_snapshot(0, cb)
+        except Exception as e:
+            self.log.error(
+                "exception when creating group snapshot for "
+                "{}/{}/{}: {}".format(pool_id, namespace, group_id, e))
+            self.finish(group_spec)
+
+    def handle_create_snapshot(self,
+                               group_spec: GroupSpec,
+                               comp: rados.Completion,
+                               snap_id: Optional[str]) -> None:
+        pool_id, namespace, group_id = group_spec
+
+        self.log.debug(
+            "GroupCreateSnapshotRequests.handle_create_snapshot for "
+            "{}/{}/{}: r={}, snap_id={}".format(
+                pool_id, namespace, group_id, comp.get_return_value(),
+                snap_id))
+
+        if snap_id is None and comp.get_return_value() != -errno.ENOENT:
+            self.log.error(
+                "error when creating group snapshot for {}/{}/{}: {}".format(
+                    pool_id, namespace, group_id, comp.get_return_value()))
+
+        self.finish(group_spec)
+
+    def finish(self, group_spec: GroupSpec) -> None:
+        pool_id, namespace, group_id = group_spec
+
+        self.log.debug("GroupCreateSnapshotRequests.finish: {}/{}/{}".format(
+            pool_id, namespace, group_id))
+
+        self.put_ioctx(group_spec)
+
+        with self.lock:
+            self.pending.remove(group_spec)
+            self.condition.notify()
+            if not self.queue:
+                return
+            group_spec = self.queue.pop(0)
+
+        self.open_group(group_spec)
+
+    def get_ioctx(self, group_spec: GroupSpec) -> rados.Ioctx:
+        pool_id, namespace, group_id = group_spec
+        nspec = (pool_id, namespace)
+
+        with self.lock:
+            ioctx, groups = self.ioctxs.get(nspec, (None, None))
+            if not ioctx:
+                ioctx = self.rados.open_ioctx2(int(pool_id))
+                ioctx.set_namespace(namespace)
+                groups = set()
+                self.ioctxs[nspec] = (ioctx, groups)
+            assert groups is not None
+            groups.add(group_spec)
+
+        return ioctx
+
+    def put_ioctx(self, group_spec: GroupSpec) -> None:
+        pool_id, namespace, group_id = group_spec
+        nspec = (pool_id, namespace)
+
+        with self.lock:
+            ioctx, groups = self.ioctxs[nspec]
+            groups.remove(group_spec)
+            if not groups:
+                del self.ioctxs[nspec]
+
+
 class MirrorGroupSnapshotScheduleHandler:
     MODULE_OPTION_NAME = "mirror_group_snapshot_schedule"
+    MODULE_OPTION_NAME_MAX_CONCURRENT_GROUP_SNAP_CREATE = "max_concurrent_group_snap_create"
     SCHEDULE_OID = "rbd_mirror_group_snapshot_schedule"
     REFRESH_DELAY_SECONDS = 60.0
 
@@ -43,7 +263,7 @@ class MirrorGroupSnapshotScheduleHandler:
         self.module = module
         self.log = module.log
         self.last_refresh_groups = datetime(1970, 1, 1)
-        # self.create_snapshot_requests = CreateSnapshotRequests(self)
+        self.create_snapshot_requests = GroupCreateSnapshotRequests(self)
 
         self.stop_thread = False
         self.thread = Thread(target=self.run)
@@ -58,7 +278,7 @@ class MirrorGroupSnapshotScheduleHandler:
         if self.thread.is_alive():
             self.log.debug("MirrorGroupSnapshotScheduleHandler: joining thread")
             self.thread.join()
-        self.create_snapshot_requests.wait_for_pending()
+        self.create_snapshot_requests.wait_for_pending()
         self.log.info("MirrorGroupSnapshotScheduleHandler: shut down")
 
     def run(self) -> None:
@@ -72,8 +292,7 @@ class MirrorGroupSnapshotScheduleHandler:
                         self.condition.wait(min(wait_time, refresh_delay))
                         continue
                 pool_id, namespace, group_id = group_spec
-                # self.create_snapshot_requests.add(pool_id, namespace, group_id)
-                self.create_group_snapshot(pool_id, namespace, group_id)
+                self.create_snapshot_requests.add(pool_id, namespace, group_id)
                 with self.lock:
                     self.enqueue(datetime.now(), pool_id, namespace, group_id)
 
@@ -84,30 +303,6 @@ class MirrorGroupSnapshotScheduleHandler:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
 
-    def create_group_snapshot(self, pool_id, namespace, group_id):
-        try:
-            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
-                ioctx.set_namespace(namespace)
-                group_name = rbd.RBD().group_get_name(ioctx, group_id)
-                if group_name is None:
-                    return
-                group = rbd.Group(ioctx, group_name)
-                mirror_info = group.mirror_group_get_info()
-                if mirror_info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
-                    return
-                if mirror_info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED or \
-                   not mirror_info['primary']:
-                    return
-                snap_id = group.mirror_group_create_snapshot()
-                self.log.debug(
-                    "create_group_snapshot: {}/{}/{}: snap_id={}".format(
-                        ioctx.get_pool_name(), namespace, group_name,
-                        snap_id))
-        except Exception as e:
-            self.log.error(
-                "exception when creating group snapshot for {}/{}/{}: {}".format(
-                    pool_id, namespace, group_id, e))
-
     def init_schedule_queue(self) -> None:
         # schedule_time => group_spec
         self.queue: Dict[str, List[GroupSpec]] = {}
index 0446edf44da63b89b09cd6dd0e93ca6d80270996..25449973fdf617ed482024e41d4d39d1599d6622 100644 (file)
@@ -82,6 +82,9 @@ class Module(MgrModule):
                default=10),
         Option(name=TrashPurgeScheduleHandler.MODULE_OPTION_NAME),
         Option(name=MirrorGroupSnapshotScheduleHandler.MODULE_OPTION_NAME),
+        Option(name=MirrorGroupSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_GROUP_SNAP_CREATE,
+               type='int',
+               default=10),
     ]
 
     def __init__(self, *args: Any, **kwargs: Any) -> None: