]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: make mirror_snapshot_schedule rescan only updated pools
authorMykola Golub <mgolub@suse.com>
Sat, 7 Mar 2020 10:15:44 +0000 (10:15 +0000)
committerMykola Golub <mgolub@suse.com>
Mon, 13 Apr 2020 17:45:41 +0000 (18:45 +0100)
Signed-off-by: Mykola Golub <mgolub@suse.com>
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py

index 479a967ef923df308f0d94bd5d18f2fc153af18f..fc7a7d986b21a9ddf9218902f0fa8354b90f5174 100644 (file)
@@ -11,6 +11,8 @@ from threading import Condition, Lock, Thread
 from .common import get_rbd_pools
 from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
 
+MIRRORING_OID = "rbd_mirroring"
+
 def namespace_validator(ioctx):
     mode = rbd.RBD().mirror_mode_get(ioctx)
     if mode != rbd.RBD_MIRROR_MODE_IMAGE:
@@ -22,6 +24,126 @@ def image_validator(image):
     if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
         raise rbd.InvalidArgument("Invalid mirror image mode")
 
+class Watchers:
+
+    lock = Lock()
+
+    def __init__(self, handler):
+        self.rados = handler.module.rados
+        self.log = handler.log
+        self.watchers = {}
+        self.updated = {}
+        self.error = {}
+        self.epoch = {}
+
+    def __del__(self):
+        self.unregister_all()
+
+    def _clean_watcher(self, pool_id, namespace, watch_id):
+        assert self.lock.locked()
+
+        del self.watchers[pool_id, namespace]
+        self.updated.pop(watch_id, None)
+        self.error.pop(watch_id, None)
+        self.epoch.pop(watch_id, None)
+
+    def check(self, pool_id, namespace, epoch):
+        error = None
+        with self.lock:
+            watch = self.watchers.get((pool_id, namespace))
+            if watch is not None:
+                error = self.error.get(watch.get_id())
+                if not error:
+                    updated = self.updated[watch.get_id()]
+                    self.updated[watch.get_id()] = False
+                    self.epoch[watch.get_id()] = epoch
+                    return updated
+        if error:
+            self.unregister(pool_id, namespace)
+
+        if self.register(pool_id, namespace):
+            return self.check(pool_id, namespace, epoch)
+        else:
+            return True
+
+    def register(self, pool_id, namespace):
+
+        def callback(notify_id, notifier_id, watch_id, data):
+            self.log.debug("watcher {}: got notify {} from {}".format(
+                watch_id, notify_id, notifier_id))
+
+            with self.lock:
+                self.updated[watch_id] = True
+
+        def error_callback(watch_id, error):
+            self.log.debug("watcher {}: got errror {}".format(
+                watch_id, error))
+
+            with self.lock:
+                self.error[watch_id] = error
+
+        try:
+            ioctx = self.rados.open_ioctx2(int(pool_id))
+            ioctx.set_namespace(namespace)
+            watch = ioctx.watch(MIRRORING_OID, callback, error_callback)
+        except rados.ObjectNotFound:
+            self.log.debug(
+                "{}/{}/{} watcher not registered: object not found".format(
+                    pool_id, namespace, MIRRORING_OID))
+            return False
+
+        self.log.debug("{}/{}/{} watcher {} registered".format(
+            pool_id, namespace, MIRRORING_OID, watch.get_id()))
+
+        with self.lock:
+            self.watchers[pool_id, namespace] = watch
+            self.updated[watch.get_id()] = True
+        return True
+
+    def unregister(self, pool_id, namespace):
+
+        with self.lock:
+            watch = self.watchers[pool_id, namespace]
+
+        watch_id = watch.get_id()
+
+        try:
+            watch.close()
+
+            self.log.debug("{}/{}/{} watcher {} unregistered".format(
+                pool_id, namespace, MIRRORING_OID, watch_id))
+
+        except rados.Error as e:
+            self.log.debug(
+                "exception when unregistering {}/{} watcher: {}".format(
+                    pool_id, namespace, e))
+
+        with self.lock:
+            self._clean_watcher(pool_id, namespace, watch_id)
+
+    def unregister_all(self):
+        with self.lock:
+            watchers = list(self.watchers)
+
+        for pool_id, namespace in watchers:
+            self.unregister(pool_id, namespace)
+
+    def unregister_stale(self, current_epoch):
+        with self.lock:
+            watchers = list(self.watchers)
+
+        for pool_id, namespace in watchers:
+            with self.lock:
+                watch = self.watchers[pool_id, namespace]
+                if self.epoch.get(watch.get_id()) == current_epoch:
+                    continue
+
+            self.log.debug("{}/{}/{} watcher {} stale".format(
+                pool_id, namespace, MIRRORING_OID, watch_id))
+
+            self.unregister(pool_id, namespace)
+
+
 class MirrorSnapshotScheduleHandler:
     MODULE_OPTION_NAME = "mirror_snapshot_schedule"
     SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
@@ -85,6 +207,7 @@ class MirrorSnapshotScheduleHandler:
     def init_schedule_queue(self):
         self.queue = {}
         self.images = {}
+        self.watchers = Watchers(self)
         self.refresh_images()
         self.log.debug("scheduler queue is initialized")
 
@@ -104,6 +227,15 @@ class MirrorSnapshotScheduleHandler:
 
         self.load_schedules()
 
+        with self.lock:
+            if not self.schedules:
+                self.watchers.unregister_all()
+                self.images = {}
+                self.queue = {}
+                self.last_refresh_images = datetime.now()
+                return
+
+        epoch = int(datetime.now().strftime('%s'))
         images = {}
 
         for pool_id, pool_name in get_rbd_pools(self.module).items():
@@ -111,15 +243,16 @@ class MirrorSnapshotScheduleHandler:
                     LevelSpec.from_pool_spec(pool_id, pool_name)):
                 continue
             with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
-                self.load_pool_images(ioctx, images)
+                self.load_pool_images(ioctx, epoch, images)
 
         with self.lock:
             self.refresh_queue(images)
             self.images = images
 
+        self.watchers.unregister_stale(epoch)
         self.last_refresh_images = datetime.now()
 
-    def load_pool_images(self, ioctx, images):
+    def load_pool_images(self, ioctx, epoch, images):
         pool_id = str(ioctx.get_pool_id())
         pool_name = ioctx.get_pool_name()
         images[pool_id] = {}
@@ -136,6 +269,14 @@ class MirrorSnapshotScheduleHandler:
                     pool_name, namespace))
                 images[pool_id][namespace] = {}
                 ioctx.set_namespace(namespace)
+                updated = self.watchers.check(pool_id, namespace, epoch)
+                if not updated:
+                    self.log.debug("load_pool_images: {}/{} not updated".format(
+                        pool_name, namespace))
+                    with self.lock:
+                        images[pool_id][namespace] = \
+                            self.images[pool_id][namespace]
+                    continue
                 mirror_images = dict(rbd.RBD().mirror_image_info_list(
                     ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
                 if not mirror_images:
@@ -153,12 +294,13 @@ class MirrorSnapshotScheduleHandler:
                                                  image_name)
                     else:
                         name = "{}/{}".format(pool_name, image_name)
-                    self.log.debug("Adding image {}".format(name))
+                    self.log.debug(
+                        "load_pool_images: adding image {}".format(name))
                     images[pool_id][namespace][image_id] = name
         except Exception as e:
-            self.log.error("exception when scanning pool {}: {}".format(
-                pool_name, e))
-            pass
+            self.log.error(
+                "load_pool_images: exception when scanning pool {}: {}".format(
+                    pool_name, e))
 
     def rebuild_queue(self):
         with self.lock: