from .common import get_rbd_pools
from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+MIRRORING_OID = "rbd_mirroring"
+
def namespace_validator(ioctx):
mode = rbd.RBD().mirror_mode_get(ioctx)
if mode != rbd.RBD_MIRROR_MODE_IMAGE:
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
raise rbd.InvalidArgument("Invalid mirror image mode")
+class Watchers:
+
+ lock = Lock()
+
+ def __init__(self, handler):
+ self.rados = handler.module.rados
+ self.log = handler.log
+ self.watchers = {}
+ self.updated = {}
+ self.error = {}
+ self.epoch = {}
+
+ def __del__(self):
+ self.unregister_all()
+
+ def _clean_watcher(self, pool_id, namespace, watch_id):
+ assert self.lock.locked()
+
+ del self.watchers[pool_id, namespace]
+ self.updated.pop(watch_id, None)
+ self.error.pop(watch_id, None)
+ self.epoch.pop(watch_id, None)
+
+ def check(self, pool_id, namespace, epoch):
+ error = None
+ with self.lock:
+ watch = self.watchers.get((pool_id, namespace))
+ if watch is not None:
+ error = self.error.get(watch.get_id())
+ if not error:
+ updated = self.updated[watch.get_id()]
+ self.updated[watch.get_id()] = False
+ self.epoch[watch.get_id()] = epoch
+ return updated
+ if error:
+ self.unregister(pool_id, namespace)
+
+ if self.register(pool_id, namespace):
+ return self.check(pool_id, namespace, epoch)
+ else:
+ return True
+
+ def register(self, pool_id, namespace):
+
+ def callback(notify_id, notifier_id, watch_id, data):
+ self.log.debug("watcher {}: got notify {} from {}".format(
+ watch_id, notify_id, notifier_id))
+
+ with self.lock:
+ self.updated[watch_id] = True
+
+ def error_callback(watch_id, error):
+ self.log.debug("watcher {}: got errror {}".format(
+ watch_id, error))
+
+ with self.lock:
+ self.error[watch_id] = error
+
+ try:
+ ioctx = self.rados.open_ioctx2(int(pool_id))
+ ioctx.set_namespace(namespace)
+ watch = ioctx.watch(MIRRORING_OID, callback, error_callback)
+ except rados.ObjectNotFound:
+ self.log.debug(
+ "{}/{}/{} watcher not registered: object not found".format(
+ pool_id, namespace, MIRRORING_OID))
+ return False
+
+ self.log.debug("{}/{}/{} watcher {} registered".format(
+ pool_id, namespace, MIRRORING_OID, watch.get_id()))
+
+ with self.lock:
+ self.watchers[pool_id, namespace] = watch
+ self.updated[watch.get_id()] = True
+ return True
+
+ def unregister(self, pool_id, namespace):
+
+ with self.lock:
+ watch = self.watchers[pool_id, namespace]
+
+ watch_id = watch.get_id()
+
+ try:
+ watch.close()
+
+ self.log.debug("{}/{}/{} watcher {} unregistered".format(
+ pool_id, namespace, MIRRORING_OID, watch_id))
+
+ except rados.Error as e:
+ self.log.debug(
+ "exception when unregistering {}/{} watcher: {}".format(
+ pool_id, namespace, e))
+
+ with self.lock:
+ self._clean_watcher(pool_id, namespace, watch_id)
+
+ def unregister_all(self):
+ with self.lock:
+ watchers = list(self.watchers)
+
+ for pool_id, namespace in watchers:
+ self.unregister(pool_id, namespace)
+
+ def unregister_stale(self, current_epoch):
+ with self.lock:
+ watchers = list(self.watchers)
+
+ for pool_id, namespace in watchers:
+ with self.lock:
+ watch = self.watchers[pool_id, namespace]
+ if self.epoch.get(watch.get_id()) == current_epoch:
+ continue
+
+ self.log.debug("{}/{}/{} watcher {} stale".format(
+ pool_id, namespace, MIRRORING_OID, watch_id))
+
+ self.unregister(pool_id, namespace)
+
+
class MirrorSnapshotScheduleHandler:
MODULE_OPTION_NAME = "mirror_snapshot_schedule"
SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
def init_schedule_queue(self):
self.queue = {}
self.images = {}
+ self.watchers = Watchers(self)
self.refresh_images()
self.log.debug("scheduler queue is initialized")
self.load_schedules()
+ with self.lock:
+ if not self.schedules:
+ self.watchers.unregister_all()
+ self.images = {}
+ self.queue = {}
+ self.last_refresh_images = datetime.now()
+ return
+
+ epoch = int(datetime.now().strftime('%s'))
images = {}
for pool_id, pool_name in get_rbd_pools(self.module).items():
LevelSpec.from_pool_spec(pool_id, pool_name)):
continue
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- self.load_pool_images(ioctx, images)
+ self.load_pool_images(ioctx, epoch, images)
with self.lock:
self.refresh_queue(images)
self.images = images
+ self.watchers.unregister_stale(epoch)
self.last_refresh_images = datetime.now()
- def load_pool_images(self, ioctx, images):
+ def load_pool_images(self, ioctx, epoch, images):
pool_id = str(ioctx.get_pool_id())
pool_name = ioctx.get_pool_name()
images[pool_id] = {}
pool_name, namespace))
images[pool_id][namespace] = {}
ioctx.set_namespace(namespace)
+ updated = self.watchers.check(pool_id, namespace, epoch)
+ if not updated:
+ self.log.debug("load_pool_images: {}/{} not updated".format(
+ pool_name, namespace))
+ with self.lock:
+ images[pool_id][namespace] = \
+ self.images[pool_id][namespace]
+ continue
mirror_images = dict(rbd.RBD().mirror_image_info_list(
ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
if not mirror_images:
image_name)
else:
name = "{}/{}".format(pool_name, image_name)
- self.log.debug("Adding image {}".format(name))
+ self.log.debug(
+ "load_pool_images: adding image {}".format(name))
images[pool_id][namespace][image_id] = name
except Exception as e:
- self.log.error("exception when scanning pool {}: {}".format(
- pool_name, e))
- pass
+ self.log.error(
+ "load_pool_images: exception when scanning pool {}: {}".format(
+ pool_name, e))
def rebuild_queue(self):
with self.lock: