From: Mykola Golub Date: Sat, 7 Mar 2020 10:15:44 +0000 (+0000) Subject: mgr/rbd_support: make mirror_snapshot_schedule rescan only updated pools X-Git-Tag: v17.0.0~2638^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=69259c8d3722830c13da1e33f61fb6a72151b803;p=ceph.git mgr/rbd_support: make mirror_snapshot_schedule rescan only updated pools Signed-off-by: Mykola Golub --- diff --git a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py index 479a967ef923d..fc7a7d986b21a 100644 --- a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py +++ b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py @@ -11,6 +11,8 @@ from threading import Condition, Lock, Thread from .common import get_rbd_pools from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules +MIRRORING_OID = "rbd_mirroring" + def namespace_validator(ioctx): mode = rbd.RBD().mirror_mode_get(ioctx) if mode != rbd.RBD_MIRROR_MODE_IMAGE: @@ -22,6 +24,126 @@ def image_validator(image): if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: raise rbd.InvalidArgument("Invalid mirror image mode") +class Watchers: + + lock = Lock() + + def __init__(self, handler): + self.rados = handler.module.rados + self.log = handler.log + self.watchers = {} + self.updated = {} + self.error = {} + self.epoch = {} + + def __del__(self): + self.unregister_all() + + def _clean_watcher(self, pool_id, namespace, watch_id): + assert self.lock.locked() + + del self.watchers[pool_id, namespace] + self.updated.pop(watch_id, None) + self.error.pop(watch_id, None) + self.epoch.pop(watch_id, None) + + def check(self, pool_id, namespace, epoch): + error = None + with self.lock: + watch = self.watchers.get((pool_id, namespace)) + if watch is not None: + error = self.error.get(watch.get_id()) + if not error: + updated = self.updated[watch.get_id()] + self.updated[watch.get_id()] = False + self.epoch[watch.get_id()] = epoch + return updated + if error: + self.unregister(pool_id, namespace) + + if self.register(pool_id, namespace): + return self.check(pool_id, namespace, epoch) + else: + return True + + def register(self, pool_id, namespace): + + def callback(notify_id, notifier_id, watch_id, data): + self.log.debug("watcher {}: got notify {} from {}".format( + watch_id, notify_id, notifier_id)) + + with self.lock: + self.updated[watch_id] = True + + def error_callback(watch_id, error): + self.log.debug("watcher {}: got errror {}".format( + watch_id, error)) + + with self.lock: + self.error[watch_id] = error + + try: + ioctx = self.rados.open_ioctx2(int(pool_id)) + ioctx.set_namespace(namespace) + watch = ioctx.watch(MIRRORING_OID, callback, error_callback) + except rados.ObjectNotFound: + self.log.debug( + "{}/{}/{} watcher not registered: object not found".format( + pool_id, namespace, MIRRORING_OID)) + return False + + self.log.debug("{}/{}/{} watcher {} registered".format( + pool_id, namespace, MIRRORING_OID, watch.get_id())) + + with self.lock: + self.watchers[pool_id, namespace] = watch + self.updated[watch.get_id()] = True + return True + + def unregister(self, pool_id, namespace): + + with self.lock: + watch = self.watchers[pool_id, namespace] + + watch_id = watch.get_id() + + try: + watch.close() + + self.log.debug("{}/{}/{} watcher {} unregistered".format( + pool_id, namespace, MIRRORING_OID, watch_id)) + + except rados.Error as e: + self.log.debug( + "exception when unregistering {}/{} watcher: {}".format( + pool_id, namespace, e)) + + with self.lock: + self._clean_watcher(pool_id, namespace, watch_id) + + def unregister_all(self): + with self.lock: + watchers = list(self.watchers) + + for pool_id, namespace in watchers: + self.unregister(pool_id, namespace) + + def unregister_stale(self, current_epoch): + with self.lock: + watchers = list(self.watchers) + + for pool_id, namespace in watchers: + with self.lock: + watch = self.watchers[pool_id, namespace] + if self.epoch.get(watch.get_id()) == current_epoch: + continue + + self.log.debug("{}/{}/{} watcher {} stale".format( + pool_id, namespace, MIRRORING_OID, watch_id)) + + self.unregister(pool_id, namespace) + + class MirrorSnapshotScheduleHandler: MODULE_OPTION_NAME = "mirror_snapshot_schedule" SCHEDULE_OID = "rbd_mirror_snapshot_schedule" @@ -85,6 +207,7 @@ class MirrorSnapshotScheduleHandler: def init_schedule_queue(self): self.queue = {} self.images = {} + self.watchers = Watchers(self) self.refresh_images() self.log.debug("scheduler queue is initialized") @@ -104,6 +227,15 @@ class MirrorSnapshotScheduleHandler: self.load_schedules() + with self.lock: + if not self.schedules: + self.watchers.unregister_all() + self.images = {} + self.queue = {} + self.last_refresh_images = datetime.now() + return + + epoch = int(datetime.now().strftime('%s')) images = {} for pool_id, pool_name in get_rbd_pools(self.module).items(): @@ -111,15 +243,16 @@ class MirrorSnapshotScheduleHandler: LevelSpec.from_pool_spec(pool_id, pool_name)): continue with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: - self.load_pool_images(ioctx, images) + self.load_pool_images(ioctx, epoch, images) with self.lock: self.refresh_queue(images) self.images = images + self.watchers.unregister_stale(epoch) self.last_refresh_images = datetime.now() - def load_pool_images(self, ioctx, images): + def load_pool_images(self, ioctx, epoch, images): pool_id = str(ioctx.get_pool_id()) pool_name = ioctx.get_pool_name() images[pool_id] = {} @@ -136,6 +269,14 @@ class MirrorSnapshotScheduleHandler: pool_name, namespace)) images[pool_id][namespace] = {} ioctx.set_namespace(namespace) + updated = self.watchers.check(pool_id, namespace, epoch) + if not updated: + self.log.debug("load_pool_images: {}/{} not updated".format( + pool_name, namespace)) + with self.lock: + images[pool_id][namespace] = \ + self.images[pool_id][namespace] + continue mirror_images = dict(rbd.RBD().mirror_image_info_list( ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)) if not mirror_images: @@ -153,12 +294,13 @@ class MirrorSnapshotScheduleHandler: image_name) else: name = "{}/{}".format(pool_name, image_name) - self.log.debug("Adding image {}".format(name)) + self.log.debug( + "load_pool_images: adding image {}".format(name)) images[pool_id][namespace][image_id] = name except Exception as e: - self.log.error("exception when scanning pool {}: {}".format( - pool_name, e)) - pass + self.log.error( + "load_pool_images: exception when scanning pool {}: {}".format( + pool_name, e)) def rebuild_queue(self): with self.lock: