]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: create mirror snapshots asynchronously 39376/head
authorMykola Golub <mgolub@suse.com>
Wed, 21 Oct 2020 17:07:16 +0000 (18:07 +0100)
committerJason Dillaman <dillaman@redhat.com>
Wed, 10 Feb 2021 18:37:16 +0000 (13:37 -0500)
To scale up with number of images.

Fixes: https://tracker.ceph.com/issues/47827
Signed-off-by: Mykola Golub <mgolub@suse.com>
(cherry picked from commit 08151a191fd41427a751d966d523cacead5fe8fe)

Conflicts:
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py: fixed close_image bug

src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py

index e9cde79b6e205b489b18a4af7ae23ecc9f7c8799..81ad51d3fbdb2794bb2c83b3e19f80ee9197d717 100644 (file)
@@ -144,8 +144,270 @@ class Watchers:
             self.unregister(pool_id, namespace)
 
 
+class CreateSnapshotRequests:
+
+    lock = Lock()
+    condition = Condition(lock)
+
+    def __init__(self, handler):
+        self.handler = handler
+        self.rados = handler.module.rados
+        self.log = handler.log
+        self.pending = set()
+        self.queue = []
+        self.ioctxs = {}
+
+    def __del__(self):
+        self.wait_for_pending()
+
+    def wait_for_pending(self):
+        with self.lock:
+            while self.pending:
+                self.condition.wait()
+
+    def add(self, pool_id, namespace, image_id):
+        image_spec = (pool_id, namespace, image_id)
+
+        self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
+            pool_id, namespace, image_id))
+
+        max_concurrent = self.handler.module.get_localized_module_option(
+            self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
+
+        with self.lock:
+            if image_spec in self.pending:
+                self.log.info(
+                    "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
+                        pool_id, namespace, image_id,
+                        "previous request is still in progress"))
+                return
+            self.pending.add(image_spec)
+
+            if len(self.pending) > max_concurrent:
+                self.queue.append(image_spec)
+                return
+
+        self.open_image(image_spec)
+
+    def open_image(self, image_spec):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
+            pool_id, namespace, image_id))
+
+        try:
+            ioctx = self.get_ioctx(image_spec)
+
+            def cb(comp, image):
+                self.handle_open_image(image_spec, comp, image)
+
+            rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
+        except Exception as e:
+            self.log.error(
+                "exception when opening {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, e))
+            self.finish(image_spec)
+
+    def handle_open_image(self, image_spec, comp, image):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
+                pool_id, namespace, image_id, comp.get_return_value()))
+
+        if comp.get_return_value() < 0:
+            self.log.error(
+                "error when opening {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, comp.get_return_value()))
+            self.finish(image_spec)
+            return
+
+        self.get_mirror_mode(image_spec, image)
+
+    def get_mirror_mode(self, image_spec, image):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
+            pool_id, namespace, image_id))
+
+        def cb(comp, mode):
+            self.handle_get_mirror_mode(image_spec, image, comp, mode)
+
+        try:
+            image.aio_mirror_image_get_mode(cb)
+        except Exception as e:
+            self.log.error(
+                "exception when getting mirror mode for {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, e))
+            self.close_image(image_spec, image)
+
+    def handle_get_mirror_mode(self, image_spec, image, comp, mode):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
+                pool_id, namespace, image_id, comp.get_return_value(), mode))
+
+        if comp.get_return_value() < 0:
+            self.log.error(
+                "error when getting mirror mode for {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, comp.get_return_value()))
+            self.close_image(image_spec, image)
+            return
+
+        if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+            self.log.debug(
+                "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id,
+                    "snapshot mirroring is not enabled"))
+            self.close_image(image_spec, image)
+            return
+
+        self.get_mirror_info(image_spec, image)
+
+    def get_mirror_info(self, image_spec, image):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
+            pool_id, namespace, image_id))
+
+        def cb(comp, info):
+            self.handle_get_mirror_info(image_spec, image, comp, info)
+
+        try:
+            image.aio_mirror_image_get_info(cb)
+        except Exception as e:
+            self.log.error(
+                "exception when getting mirror info for {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, e))
+            self.close_image(image_spec, image)
+
+    def handle_get_mirror_info(self, image_spec, image, comp, info):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
+                pool_id, namespace, image_id, comp.get_return_value(), info))
+
+        if comp.get_return_value() < 0:
+            self.log.error(
+                "error when getting mirror info for {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, comp.get_return_value()))
+            self.close_image(image_spec, image)
+            return
+
+        self.create_snapshot(image_spec, image)
+
+    def create_snapshot(self, image_spec, image):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
+                pool_id, namespace, image_id))
+
+        def cb(comp, snap_id):
+            self.handle_create_snapshot(image_spec, image, comp, snap_id)
+
+        try:
+            image.aio_mirror_image_create_snapshot(0, cb)
+        except Exception as e:
+            self.log.error(
+                "exception when creating snapshot for {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, e))
+            self.close_image(image_spec, image)
+
+
+    def handle_create_snapshot(self, image_spec, image, comp, snap_id):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
+                pool_id, namespace, image_id, comp.get_return_value(), snap_id))
+
+        if comp.get_return_value() < 0:
+            self.log.error(
+                "error when creating snapshot for {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, comp.get_return_value()))
+
+        self.close_image(image_spec, image)
+
+    def close_image(self, image_spec, image):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.close_image {}/{}/{}".format(
+                pool_id, namespace, image_id))
+
+        def cb(comp):
+            self.handle_close_image(image_spec, comp)
+
+        try:
+            image.aio_close(cb)
+        except Exception as e:
+            self.log.error(
+                "exception when closing {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, e))
+            self.finish(image_spec)
+
+    def handle_close_image(self, image_spec, comp):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug(
+            "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
+                pool_id, namespace, image_id, comp.get_return_value()))
+
+        if comp.get_return_value() < 0:
+            self.log.error(
+                "error when closing {}/{}/{}: {}".format(
+                    pool_id, namespace, image_id, comp.get_return_value()))
+
+        self.finish(image_spec)
+
+    def finish(self, image_spec):
+        pool_id, namespace, image_id = image_spec
+
+        self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
+            pool_id, namespace, image_id))
+
+        self.put_ioctx(image_spec)
+
+        with self.lock:
+            self.pending.remove(image_spec)
+            if not self.queue:
+                return
+            image_spec = self.queue.pop(0)
+
+        self.open_image(image_spec)
+
+    def get_ioctx(self, image_spec):
+        pool_id, namespace, image_id = image_spec
+        nspec = (pool_id, namespace)
+
+        with self.lock:
+            ioctx, images = self.ioctxs.get(nspec, (None, None))
+            if not ioctx:
+                ioctx = self.rados.open_ioctx2(int(pool_id))
+                ioctx.set_namespace(namespace)
+                images = set()
+                self.ioctxs[nspec] = (ioctx, images)
+            images.add(image_spec)
+
+        return ioctx
+
+    def put_ioctx(self, image_spec):
+        pool_id, namespace, image_id = image_spec
+        nspec = (pool_id, namespace)
+
+        with self.lock:
+            ioctx, images = self.ioctxs[nspec]
+            images.remove(image_spec)
+            if not images:
+                del self.ioctxs[nspec]
+
+
 class MirrorSnapshotScheduleHandler:
     MODULE_OPTION_NAME = "mirror_snapshot_schedule"
+    MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
     SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
 
     lock = Lock()
@@ -156,6 +418,7 @@ class MirrorSnapshotScheduleHandler:
         self.module = module
         self.log = module.log
         self.last_refresh_images = datetime(1970, 1, 1)
+        self.create_snapshot_requests = CreateSnapshotRequests(self)
 
         self.init_schedule_queue()
 
@@ -164,6 +427,7 @@ class MirrorSnapshotScheduleHandler:
 
     def _cleanup(self):
         self.watchers.unregister_all()
+        self.create_snapshot_requests.wait_for_pending()
 
     def run(self):
         try:
@@ -176,7 +440,7 @@ class MirrorSnapshotScheduleHandler:
                         self.condition.wait(min(wait_time, 60))
                         continue
                 pool_id, namespace, image_id = image_spec
-                self.create_snapshot(pool_id, namespace, image_id)
+                self.create_snapshot_requests.add(pool_id, namespace, image_id)
                 with self.lock:
                     self.enqueue(datetime.now(), pool_id, namespace, image_id)
 
@@ -184,29 +448,6 @@ class MirrorSnapshotScheduleHandler:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
 
-    def create_snapshot(self, pool_id, namespace, image_id):
-        try:
-            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
-                ioctx.set_namespace(namespace)
-                with rbd.Image(ioctx, image_id=image_id) as image:
-                    mode = image.mirror_image_get_mode()
-                    if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
-                        return
-                    info = image.mirror_image_get_info()
-                    if info['state'] != rbd.RBD_MIRROR_IMAGE_ENABLED or \
-                       not info['primary']:
-                        return
-                    snap_id = image.mirror_image_create_snapshot()
-                    self.log.debug(
-                        "create_snapshot: {}/{}/{}: snap_id={}".format(
-                            ioctx.get_pool_name(), namespace, image.get_name(),
-                            snap_id))
-        except Exception as e:
-            self.log.error(
-                "exception when creating snapshot for {}/{}/{}: {}".format(
-                    pool_id, namespace, image_id, e))
-
-
     def init_schedule_queue(self):
         self.queue = {}
         self.images = {}
index 6062ceffa78a51422a4acaed993e9c990f0ae631..82bd06e6238d1035d65542e7571723e906b4ce48 100644 (file)
@@ -145,6 +145,7 @@ class Module(MgrModule):
     ]
     MODULE_OPTIONS = [
         {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME},
+        {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE, 'type': 'int', 'default': 10},
         {'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME},
     ]