self.unregister(pool_id, namespace)
+class CreateSnapshotRequests:
+
+ lock = Lock()
+ condition = Condition(lock)
+
+ def __init__(self, handler):
+ self.handler = handler
+ self.rados = handler.module.rados
+ self.log = handler.log
+ self.pending = set()
+ self.queue = []
+ self.ioctxs = {}
+
+ def __del__(self):
+ self.wait_for_pending()
+
+ def wait_for_pending(self):
+ with self.lock:
+ while self.pending:
+ self.condition.wait()
+
+ def add(self, pool_id, namespace, image_id):
+ image_spec = (pool_id, namespace, image_id)
+
+ self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ max_concurrent = self.handler.module.get_localized_module_option(
+ self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
+
+ with self.lock:
+ if image_spec in self.pending:
+ self.log.info(
+ "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
+ pool_id, namespace, image_id,
+ "previous request is still in progress"))
+ return
+ self.pending.add(image_spec)
+
+ if len(self.pending) > max_concurrent:
+ self.queue.append(image_spec)
+ return
+
+ self.open_image(image_spec)
+
+ def open_image(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ try:
+ ioctx = self.get_ioctx(image_spec)
+
+ def cb(comp, image):
+ self.handle_open_image(image_spec, comp, image)
+
+ rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
+ except Exception as e:
+ self.log.error(
+ "exception when opening {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.finish(image_spec)
+
+ def handle_open_image(self, image_spec, comp, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ if comp.get_return_value() < 0:
+ self.log.error(
+ "error when opening {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+ self.finish(image_spec)
+ return
+
+ self.get_mirror_mode(image_spec, image)
+
+ def get_mirror_mode(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp, mode):
+ self.handle_get_mirror_mode(image_spec, image, comp, mode)
+
+ try:
+ image.aio_mirror_image_get_mode(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when getting mirror mode for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.close_image(image_spec, image)
+
+ def handle_get_mirror_mode(self, image_spec, image, comp, mode):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
+ pool_id, namespace, image_id, comp.get_return_value(), mode))
+
+ if comp.get_return_value() < 0:
+ self.log.error(
+ "error when getting mirror mode for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+ self.close_image(image_spec, image)
+ return
+
+ if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
+ pool_id, namespace, image_id,
+ "snapshot mirroring is not enabled"))
+ self.close_image(image_spec)
+
+ self.get_mirror_info(image_spec, image)
+
+ def get_mirror_info(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp, info):
+ self.handle_get_mirror_info(image_spec, image, comp, info)
+
+ try:
+ image.aio_mirror_image_get_info(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when getting mirror info for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.close_image(image_spec, image)
+
+ def handle_get_mirror_info(self, image_spec, image, comp, info):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
+ pool_id, namespace, image_id, comp.get_return_value(), info))
+
+ if comp.get_return_value() < 0:
+ self.log.error(
+ "error when getting mirror info for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+ self.close_image(image_spec, image)
+ return
+
+ self.create_snapshot(image_spec, image)
+
+ def create_snapshot(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp, snap_id):
+ self.handle_create_snapshot(image_spec, image, comp, snap_id)
+
+ try:
+ image.aio_mirror_image_create_snapshot(0, cb)
+ except Exception as e:
+ self.log.error(
+ "exception when creating snapshot for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.close_image(image_spec, image)
+
+
+ def handle_create_snapshot(self, image_spec, image, comp, snap_id):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
+ pool_id, namespace, image_id, comp.get_return_value(), snap_id))
+
+ if comp.get_return_value() < 0:
+ self.log.error(
+ "error when creating snapshot for {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ self.close_image(image_spec, image)
+
+ def close_image(self, image_spec, image):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.close_image {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ def cb(comp):
+ self.handle_close_image(image_spec, comp)
+
+ try:
+ image.aio_close(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when closing {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, e))
+ self.finish(image_spec)
+
+ def handle_close_image(self, image_spec, comp):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug(
+ "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ if comp.get_return_value() < 0:
+ self.log.error(
+ "error when closing {}/{}/{}: {}".format(
+ pool_id, namespace, image_id, comp.get_return_value()))
+
+ self.finish(image_spec)
+
+ def finish(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+
+ self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
+ pool_id, namespace, image_id))
+
+ self.put_ioctx(image_spec)
+
+ with self.lock:
+ self.pending.remove(image_spec)
+ if not self.queue:
+ return
+ image_spec = self.queue.pop(0)
+
+ self.open_image(image_spec)
+
+ def get_ioctx(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+ nspec = (pool_id, namespace)
+
+ with self.lock:
+ ioctx, images = self.ioctxs.get(nspec, (None, None))
+ if not ioctx:
+ ioctx = self.rados.open_ioctx2(int(pool_id))
+ ioctx.set_namespace(namespace)
+ images = set()
+ self.ioctxs[nspec] = (ioctx, images)
+ images.add(image_spec)
+
+ return ioctx
+
+ def put_ioctx(self, image_spec):
+ pool_id, namespace, image_id = image_spec
+ nspec = (pool_id, namespace)
+
+ with self.lock:
+ ioctx, images = self.ioctxs[nspec]
+ images.remove(image_spec)
+ if not images:
+ del self.ioctxs[nspec]
+
+
class MirrorSnapshotScheduleHandler:
MODULE_OPTION_NAME = "mirror_snapshot_schedule"
+ MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
lock = Lock()
self.module = module
self.log = module.log
self.last_refresh_images = datetime(1970, 1, 1)
+ self.create_snapshot_requests = CreateSnapshotRequests(self)
self.init_schedule_queue()
def _cleanup(self):
self.watchers.unregister_all()
+ self.create_snapshot_requests.wait_for_pending()
def run(self):
try:
self.condition.wait(min(wait_time, 60))
continue
pool_id, namespace, image_id = image_spec
- self.create_snapshot(pool_id, namespace, image_id)
+ self.create_snapshot_requests.add(pool_id, namespace, image_id)
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace, image_id)
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
- def create_snapshot(self, pool_id, namespace, image_id):
- try:
- with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- ioctx.set_namespace(namespace)
- with rbd.Image(ioctx, image_id=image_id) as image:
- mode = image.mirror_image_get_mode()
- if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
- return
- info = image.mirror_image_get_info()
- if info['state'] != rbd.RBD_MIRROR_IMAGE_ENABLED or \
- not info['primary']:
- return
- snap_id = image.mirror_image_create_snapshot()
- self.log.debug(
- "create_snapshot: {}/{}/{}: snap_id={}".format(
- ioctx.get_pool_name(), namespace, image.get_name(),
- snap_id))
- except Exception as e:
- self.log.error(
- "exception when creating snapshot for {}/{}/{}: {}".format(
- pool_id, namespace, image_id, e))
-
-
def init_schedule_queue(self):
self.queue = {}
self.images = {}