from .common import get_rbd_pools
from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
-MIRRORING_OID = "rbd_mirroring"
-
def namespace_validator(ioctx: rados.Ioctx) -> None:
mode = rbd.RBD().mirror_mode_get(ioctx)
if mode != rbd.RBD_MIRROR_MODE_IMAGE:
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
raise rbd.InvalidArgument("Invalid mirror image mode")
-class Watchers:
-
- lock = Lock()
-
- def __init__(self, handler: Any) -> None:
- self.rados = handler.module.rados
- self.log = handler.log
- self.watchers: Dict[Tuple[str, str], rados.Watch] = {}
- self.updated: Dict[int, bool] = {}
- self.error: Dict[int, str] = {}
- self.epoch: Dict[int, int] = {}
-
- def __del__(self) -> None:
- self.unregister_all()
-
- def _clean_watcher(self, pool_id: str, namespace: str, watch_id: int) -> None:
- assert self.lock.locked()
-
- del self.watchers[pool_id, namespace]
- self.updated.pop(watch_id, None)
- self.error.pop(watch_id, None)
- self.epoch.pop(watch_id, None)
-
- def check(self, pool_id: str, namespace: str, epoch: int) -> bool:
- error = None
- with self.lock:
- watch = self.watchers.get((pool_id, namespace))
- if watch is not None:
- error = self.error.get(watch.get_id())
- if not error:
- updated = self.updated[watch.get_id()]
- self.updated[watch.get_id()] = False
- self.epoch[watch.get_id()] = epoch
- return updated
- if error:
- self.unregister(pool_id, namespace)
-
- if self.register(pool_id, namespace):
- return self.check(pool_id, namespace, epoch)
- else:
- return True
-
- def register(self, pool_id: str, namespace: str) -> bool:
-
- def callback(notify_id: str, notifier_id: str, watch_id: int, data: str) -> None:
- self.log.debug("watcher {}: got notify {} from {}".format(
- watch_id, notify_id, notifier_id))
-
- with self.lock:
- self.updated[watch_id] = True
-
- def error_callback(watch_id: int, error: str) -> None:
- self.log.debug("watcher {}: got errror {}".format(
- watch_id, error))
-
- with self.lock:
- self.error[watch_id] = error
-
- try:
- ioctx = self.rados.open_ioctx2(int(pool_id))
- ioctx.set_namespace(namespace)
- watch = ioctx.watch(MIRRORING_OID, callback, error_callback)
- except rados.ObjectNotFound:
- self.log.debug(
- "{}/{}/{} watcher not registered: object not found".format(
- pool_id, namespace, MIRRORING_OID))
- return False
-
- self.log.debug("{}/{}/{} watcher {} registered".format(
- pool_id, namespace, MIRRORING_OID, watch.get_id()))
-
- with self.lock:
- self.watchers[pool_id, namespace] = watch
- self.updated[watch.get_id()] = True
- return True
-
- def unregister(self, pool_id: str, namespace: str) -> None:
-
- with self.lock:
- watch = self.watchers[pool_id, namespace]
-
- watch_id = watch.get_id()
-
- try:
- watch.close()
-
- self.log.debug("{}/{}/{} watcher {} unregistered".format(
- pool_id, namespace, MIRRORING_OID, watch_id))
-
- except rados.Error as e:
- self.log.debug(
- "exception when unregistering {}/{} watcher: {}".format(
- pool_id, namespace, e))
-
- with self.lock:
- self._clean_watcher(pool_id, namespace, watch_id)
-
- def unregister_all(self) -> None:
- with self.lock:
- watchers = list(self.watchers)
-
- for pool_id, namespace in watchers:
- self.unregister(pool_id, namespace)
-
- def unregister_stale(self, current_epoch: int) -> None:
- with self.lock:
- watchers = list(self.watchers)
-
- for pool_id, namespace in watchers:
- with self.lock:
- watch = self.watchers[pool_id, namespace]
- if self.epoch.get(watch.get_id()) == current_epoch:
- continue
-
- self.log.debug("{}/{}/{} watcher {} stale".format(
- pool_id, namespace, MIRRORING_OID, watch.get_id()))
-
- self.unregister(pool_id, namespace)
-
class ImageSpec(NamedTuple):
pool_id: str
self.thread.start()
def _cleanup(self) -> None:
- self.watchers.unregister_all()
self.create_snapshot_requests.wait_for_pending()
def run(self) -> None:
self.queue: Dict[str, List[ImageSpec]] = {}
# pool_id => {namespace => image_id}
self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
- self.watchers = Watchers(self)
self.refresh_images()
self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
self.load_schedules()
if not self.schedules:
self.log.debug("MirrorSnapshotScheduleHandler: no schedules")
- self.watchers.unregister_all()
self.images = {}
self.queue = {}
self.last_refresh_images = datetime.now()
return self.REFRESH_DELAY_SECONDS
- epoch = int(datetime.now().strftime('%s'))
images: Dict[str, Dict[str, Dict[str, str]]] = {}
for pool_id, pool_name in get_rbd_pools(self.module).items():
LevelSpec.from_pool_spec(pool_id, pool_name)):
continue
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- self.load_pool_images(ioctx, epoch, images)
+ self.load_pool_images(ioctx, images)
with self.lock:
self.refresh_queue(images)
self.images = images
- self.watchers.unregister_stale(epoch)
self.last_refresh_images = datetime.now()
return self.REFRESH_DELAY_SECONDS
def load_pool_images(self,
ioctx: rados.Ioctx,
- epoch: int,
images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
pool_id = str(ioctx.get_pool_id())
pool_name = ioctx.get_pool_name()
pool_name, namespace))
images[pool_id][namespace] = {}
ioctx.set_namespace(namespace)
- updated = self.watchers.check(pool_id, namespace, epoch)
- if not updated:
- self.log.debug("load_pool_images: {}/{} not updated".format(
- pool_name, namespace))
- with self.lock:
- images[pool_id][namespace] = \
- self.images[pool_id][namespace]
- continue
mirror_images = dict(rbd.RBD().mirror_image_info_list(
ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
if not mirror_images: