from .common import get_rbd_pools
from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
-MIRRORING_OID = "rbd_mirroring"
-
def namespace_validator(ioctx):
mode = rbd.RBD().mirror_mode_get(ioctx)
if mode != rbd.RBD_MIRROR_MODE_IMAGE:
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
raise rbd.InvalidArgument("Invalid mirror image mode")
-class Watchers:
-
- lock = Lock()
-
- def __init__(self, handler):
- self.rados = handler.module.rados
- self.log = handler.log
- self.watchers = {}
- self.updated = {}
- self.error = {}
- self.epoch = {}
-
- def __del__(self):
- self.unregister_all()
-
- def _clean_watcher(self, pool_id, namespace, watch_id):
- assert self.lock.locked()
-
- del self.watchers[pool_id, namespace]
- self.updated.pop(watch_id, None)
- self.error.pop(watch_id, None)
- self.epoch.pop(watch_id, None)
-
- def check(self, pool_id, namespace, epoch):
- error = None
- with self.lock:
- watch = self.watchers.get((pool_id, namespace))
- if watch is not None:
- error = self.error.get(watch.get_id())
- if not error:
- updated = self.updated[watch.get_id()]
- self.updated[watch.get_id()] = False
- self.epoch[watch.get_id()] = epoch
- return updated
- if error:
- self.unregister(pool_id, namespace)
-
- if self.register(pool_id, namespace):
- return self.check(pool_id, namespace, epoch)
- else:
- return True
-
- def register(self, pool_id, namespace):
-
- def callback(notify_id, notifier_id, watch_id, data):
- self.log.debug("watcher {}: got notify {} from {}".format(
- watch_id, notify_id, notifier_id))
-
- with self.lock:
- self.updated[watch_id] = True
-
- def error_callback(watch_id, error):
- self.log.debug("watcher {}: got errror {}".format(
- watch_id, error))
-
- with self.lock:
- self.error[watch_id] = error
-
- try:
- ioctx = self.rados.open_ioctx2(int(pool_id))
- ioctx.set_namespace(namespace)
- watch = ioctx.watch(MIRRORING_OID, callback, error_callback)
- except rados.ObjectNotFound:
- self.log.debug(
- "{}/{}/{} watcher not registered: object not found".format(
- pool_id, namespace, MIRRORING_OID))
- return False
-
- self.log.debug("{}/{}/{} watcher {} registered".format(
- pool_id, namespace, MIRRORING_OID, watch.get_id()))
-
- with self.lock:
- self.watchers[pool_id, namespace] = watch
- self.updated[watch.get_id()] = True
- return True
-
- def unregister(self, pool_id, namespace):
-
- with self.lock:
- watch = self.watchers[pool_id, namespace]
-
- watch_id = watch.get_id()
-
- try:
- watch.close()
-
- self.log.debug("{}/{}/{} watcher {} unregistered".format(
- pool_id, namespace, MIRRORING_OID, watch_id))
-
- except rados.Error as e:
- self.log.debug(
- "exception when unregistering {}/{} watcher: {}".format(
- pool_id, namespace, e))
-
- with self.lock:
- self._clean_watcher(pool_id, namespace, watch_id)
-
- def unregister_all(self):
- with self.lock:
- watchers = list(self.watchers)
-
- for pool_id, namespace in watchers:
- self.unregister(pool_id, namespace)
-
- def unregister_stale(self, current_epoch):
- with self.lock:
- watchers = list(self.watchers)
-
- for pool_id, namespace in watchers:
- with self.lock:
- watch = self.watchers[pool_id, namespace]
- if self.epoch.get(watch.get_id()) == current_epoch:
- continue
-
- self.log.debug("{}/{}/{} watcher {} stale".format(
- pool_id, namespace, MIRRORING_OID, watch_id))
-
- self.unregister(pool_id, namespace)
-
class CreateSnapshotRequests:
self.thread.start()
def _cleanup(self):
- self.watchers.unregister_all()
self.create_snapshot_requests.wait_for_pending()
def run(self):
def init_schedule_queue(self):
self.queue = {}
self.images = {}
- self.watchers = Watchers(self)
self.refresh_images()
self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
self.load_schedules()
if not self.schedules:
self.log.debug("MirrorSnapshotScheduleHandler: no schedules")
- self.watchers.unregister_all()
self.images = {}
self.queue = {}
self.last_refresh_images = datetime.now()
return self.REFRESH_DELAY_SECONDS
- epoch = int(datetime.now().strftime('%s'))
images = {}
for pool_id, pool_name in get_rbd_pools(self.module).items():
LevelSpec.from_pool_spec(pool_id, pool_name)):
continue
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- self.load_pool_images(ioctx, epoch, images)
+ self.load_pool_images(ioctx, images)
with self.lock:
self.refresh_queue(images)
self.images = images
- self.watchers.unregister_stale(epoch)
self.last_refresh_images = datetime.now()
return self.REFRESH_DELAY_SECONDS
- def load_pool_images(self, ioctx, epoch, images):
+ def load_pool_images(self, ioctx, images):
pool_id = str(ioctx.get_pool_id())
pool_name = ioctx.get_pool_name()
images[pool_id] = {}
pool_name, namespace))
images[pool_id][namespace] = {}
ioctx.set_namespace(namespace)
- updated = self.watchers.check(pool_id, namespace, epoch)
- if not updated:
- self.log.debug("load_pool_images: {}/{} not updated".format(
- pool_name, namespace))
- with self.lock:
- images[pool_id][namespace] = \
- self.images[pool_id][namespace]
- continue
mirror_images = dict(rbd.RBD().mirror_image_info_list(
ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
if not mirror_images: