group_id: str
+class GroupCreateSnapshotRequests:
+
+ def __init__(self, handler: Any) -> None:
+ self.lock = Lock()
+ self.condition = Condition(self.lock)
+ self.handler = handler
+ self.rados = handler.module.rados
+ self.log = handler.log
+ self.pending: Set[GroupSpec] = set()
+ self.queue: List[GroupSpec] = []
+ self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[GroupSpec]]] = {}
+
+ def wait_for_pending(self) -> None:
+ with self.lock:
+ while self.pending:
+ self.log.debug(
+ "GroupCreateSnapshotRequests.wait_for_pending: "
+ "{} groups".format(len(self.pending)))
+ self.condition.wait()
+ self.log.debug("GroupCreateSnapshotRequests.wait_for_pending: done")
+
+ def add(self, pool_id: str, namespace: str, group_id: str) -> None:
+ group_spec = GroupSpec(pool_id, namespace, group_id)
+
+ self.log.debug("GroupCreateSnapshotRequests.add: {}/{}/{}".format(
+ pool_id, namespace, group_id))
+
+ max_concurrent = self.handler.module.get_localized_module_option(
+ self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_GROUP_SNAP_CREATE)
+
+ self.log.debug("GroupCreateSnapshotRequests.add: {}/{}/{} "
+ "max concurrent snap create {}".format(
+ pool_id, namespace, group_id, max_concurrent))
+
+ with self.lock:
+ if group_spec in self.pending:
+ self.log.info(
+ "GroupCreateSnapshotRequests.add: {}/{}/{}: {}".format(
+ pool_id, namespace, group_id,
+ "previous request is still in progress"))
+ return
+ self.pending.add(group_spec)
+
+ if len(self.pending) > max_concurrent:
+ self.queue.append(group_spec)
+ return
+
+ self.open_group(group_spec)
+
+ def open_group(self, group_spec: GroupSpec) -> None:
+ pool_id, namespace, group_id = group_spec
+
+ self.log.debug(
+ "GroupCreateSnapshotRequests.open_group for {}/{}/{}".format(
+ pool_id, namespace, group_id))
+
+ try:
+ ioctx = self.get_ioctx(group_spec)
+ group_name = rbd.RBD().group_get_name(ioctx, group_id)
+ group = rbd.Group(ioctx, group_name)
+ except Exception as e:
+ self.log.error(
+ "exception when opening group {}/{}/{}: {}".format(
+ pool_id, namespace, group_id, e))
+ self.finish(group_spec)
+ return
+
+ self.get_mirror_info(group_spec, group)
+
+ def get_mirror_info(self, group_spec: GroupSpec, group: rbd.Group) -> None:
+ pool_id, namespace, group_id = group_spec
+
+ self.log.debug(
+ "GroupCreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
+ pool_id, namespace, group_id))
+
+ def cb(comp: rados.Completion,
+ info: Optional[Dict[str, Union[str, int]]]) -> None:
+ self.handle_get_mirror_info(group_spec, group, comp, info)
+
+ try:
+ group.aio_mirror_group_get_info(cb)
+ except Exception as e:
+ self.log.error(
+ "exception when getting mirror group info for "
+ "{}/{}/{}: {}".format(pool_id, namespace, group_id, e))
+ self.finish(group_spec)
+
+ def handle_get_mirror_info(self,
+ group_spec: GroupSpec,
+ group: rbd.Group,
+ comp: rados.Completion,
+ info: Optional[Dict[str, Union[str, int]]]) -> None:
+ pool_id, namespace, group_id = group_spec
+
+ self.log.debug(
+ "GroupCreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: "
+ "r={} info={}".format(pool_id, namespace, group_id,
+ comp.get_return_value(), info))
+
+ if info is None:
+ if comp.get_return_value() != -errno.ENOENT:
+ self.log.error(
+ "error when getting mirror group info for "
+ "{}/{}/{}: {}".format(
+ pool_id, namespace, group_id, comp.get_return_value()))
+ self.finish(group_spec)
+ return
+
+ if info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED:
+ self.log.debug(
+ "GroupCreateSnapshotRequests.handle_get_mirror_info: "
+ "{}/{}/{}: {}".format(
+ pool_id, namespace, group_id,
+ "is not enabled for mirroring"))
+ self.finish(group_spec)
+ return
+
+ if info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ self.log.debug(
+ "GroupCreateSnapshotRequests.handle_get_mirror_info: "
+ "{}/{}/{}: {}".format(
+ pool_id, namespace, group_id,
+ "is not in snapshot-based mirroring mode"))
+ self.finish(group_spec)
+ return
+
+ if not info['primary'] :
+ self.log.debug(
+ "GroupCreateSnapshotRequests.handle_get_mirror_info: "
+ "{}/{}/{}: {}".format(
+ pool_id, namespace, group_id,
+ "is not primary"))
+ self.finish(group_spec)
+ return
+
+ self.create_snapshot(group_spec, group)
+
+ def create_snapshot(self, group_spec: GroupSpec, group: rbd.Group) -> None:
+ pool_id, namespace, group_id = group_spec
+
+ self.log.debug(
+ "GroupCreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
+ pool_id, namespace, group_id))
+
+ def cb(comp: rados.Completion, snap_id: Optional[str]) -> None:
+ self.handle_create_snapshot(group_spec, comp, snap_id)
+
+ try:
+ group.aio_mirror_group_create_snapshot(0, cb)
+ except Exception as e:
+ self.log.error(
+ "exception when creating group snapshot for "
+ "{}/{}/{}: {}".format(pool_id, namespace, group_id, e))
+ self.finish(group_spec)
+
+ def handle_create_snapshot(self,
+ group_spec: GroupSpec,
+ comp: rados.Completion,
+ snap_id: Optional[str]) -> None:
+ pool_id, namespace, group_id = group_spec
+
+ self.log.debug(
+ "GroupCreateSnapshotRequests.handle_create_snapshot for "
+ "{}/{}/{}: r={}, snap_id={}".format(
+ pool_id, namespace, group_id, comp.get_return_value(),
+ snap_id))
+
+ if snap_id is None and comp.get_return_value() != -errno.ENOENT:
+ self.log.error(
+ "error when creating group snapshot for {}/{}/{}: {}".format(
+ pool_id, namespace, group_id, comp.get_return_value()))
+
+ self.finish(group_spec)
+
+ def finish(self, group_spec: GroupSpec) -> None:
+ pool_id, namespace, group_id = group_spec
+
+ self.log.debug("GroupCreateSnapshotRequests.finish: {}/{}/{}".format(
+ pool_id, namespace, group_id))
+
+ self.put_ioctx(group_spec)
+
+ with self.lock:
+ self.pending.remove(group_spec)
+ self.condition.notify()
+ if not self.queue:
+ return
+ group_spec = self.queue.pop(0)
+
+ self.open_group(group_spec)
+
+ def get_ioctx(self, group_spec: GroupSpec) -> rados.Ioctx:
+ pool_id, namespace, group_id = group_spec
+ nspec = (pool_id, namespace)
+
+ with self.lock:
+ ioctx, groups = self.ioctxs.get(nspec, (None, None))
+ if not ioctx:
+ ioctx = self.rados.open_ioctx2(int(pool_id))
+ ioctx.set_namespace(namespace)
+ groups = set()
+ self.ioctxs[nspec] = (ioctx, groups)
+ assert groups is not None
+ groups.add(group_spec)
+
+ return ioctx
+
+ def put_ioctx(self, group_spec: GroupSpec) -> None:
+ pool_id, namespace, group_id = group_spec
+ nspec = (pool_id, namespace)
+
+ with self.lock:
+ ioctx, groups = self.ioctxs[nspec]
+ groups.remove(group_spec)
+ if not groups:
+ del self.ioctxs[nspec]
+
+
class MirrorGroupSnapshotScheduleHandler:
MODULE_OPTION_NAME = "mirror_group_snapshot_schedule"
+ MODULE_OPTION_NAME_MAX_CONCURRENT_GROUP_SNAP_CREATE = "max_concurrent_group_snap_create"
SCHEDULE_OID = "rbd_mirror_group_snapshot_schedule"
REFRESH_DELAY_SECONDS = 60.0
self.module = module
self.log = module.log
self.last_refresh_groups = datetime(1970, 1, 1)
- # self.create_snapshot_requests = CreateSnapshotRequests(self)
+ self.create_snapshot_requests = GroupCreateSnapshotRequests(self)
self.stop_thread = False
self.thread = Thread(target=self.run)
if self.thread.is_alive():
self.log.debug("MirrorGroupSnapshotScheduleHandler: joining thread")
self.thread.join()
- # self.create_snapshot_requests.wait_for_pending()
+ self.create_snapshot_requests.wait_for_pending()
self.log.info("MirrorGroupSnapshotScheduleHandler: shut down")
def run(self) -> None:
self.condition.wait(min(wait_time, refresh_delay))
continue
pool_id, namespace, group_id = group_spec
- # self.create_snapshot_requests.add(pool_id, namespace, group_id)
- self.create_group_snapshot(pool_id, namespace, group_id)
+ self.create_snapshot_requests.add(pool_id, namespace, group_id)
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace, group_id)
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
- def create_group_snapshot(self, pool_id, namespace, group_id):
- try:
- with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- ioctx.set_namespace(namespace)
- group_name = rbd.RBD().group_get_name(ioctx, group_id)
- if group_name is None:
- return
- group = rbd.Group(ioctx, group_name)
- mirror_info = group.mirror_group_get_info()
- if mirror_info['image_mode'] != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
- return
- if mirror_info['state'] != rbd.RBD_MIRROR_GROUP_ENABLED or \
- not mirror_info['primary']:
- return
- snap_id = group.mirror_group_create_snapshot()
- self.log.debug(
- "create_group_snapshot: {}/{}/{}: snap_id={}".format(
- ioctx.get_pool_name(), namespace, group_name,
- snap_id))
- except Exception as e:
- self.log.error(
- "exception when creating group snapshot for {}/{}/{}: {}".format(
- pool_id, namespace, group_id, e))
-
def init_schedule_queue(self) -> None:
# schedule_time => group_spec
self.queue: Dict[str, List[GroupSpec]] = {}