from datetime import datetime
from threading import Condition, Lock, Thread
-from typing import Any, Dict, List, Optional, Sequence, Set, Tuple
+from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple
from .common import get_rbd_pools
from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
self.unregister(pool_id, namespace)
-ImageSpecT = Tuple[str, str, str]
+class ImageSpec(NamedTuple):
+ pool_id: str
+ namespace: str
+ image_id: str
+
class CreateSnapshotRequests:
self.handler = handler
self.rados = handler.module.rados
self.log = handler.log
- self.pending: Set[ImageSpecT] = set()
- self.queue: List[ImageSpecT] = []
- self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpecT]]] = {}
+ self.pending: Set[ImageSpec] = set()
+ self.queue: List[ImageSpec] = []
+ self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {}
def __del__(self) -> None:
self.wait_for_pending()
self.condition.wait()
def add(self, pool_id: str, namespace: str, image_id: str) -> None:
- image_spec = (pool_id, namespace, image_id)
+ image_spec = ImageSpec(pool_id, namespace, image_id)
self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
pool_id, namespace, image_id))
self.open_image(image_spec)
- def open_image(self, image_spec: ImageSpecT) -> None:
+ def open_image(self, image_spec: ImageSpec) -> None:
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
self.finish(image_spec)
def handle_open_image(self,
- image_spec: ImageSpecT,
+ image_spec: ImageSpec,
comp: rados.Completion,
image: rbd.Image) -> None:
pool_id, namespace, image_id = image_spec
self.get_mirror_mode(image_spec, image)
- def get_mirror_mode(self, image_spec: ImageSpecT, image: rbd.Image) -> None:
+ def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None:
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
self.close_image(image_spec, image)
def handle_get_mirror_mode(self,
- image_spec: ImageSpecT,
+ image_spec: ImageSpec,
image: rbd.Image,
comp: rados.Completion,
mode: str) -> None:
self.get_mirror_info(image_spec, image)
- def get_mirror_info(self, image_spec: ImageSpecT, image: rbd.Image) -> None:
+ def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None:
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
self.close_image(image_spec, image)
def handle_get_mirror_info(self,
- image_spec: ImageSpecT,
+ image_spec: ImageSpec,
image: rbd.Image,
comp: rados.Completion,
info: str) -> None:
self.create_snapshot(image_spec, image)
- def create_snapshot(self, image_spec: ImageSpecT, image: rbd.Image) -> None:
+ def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None:
pool_id, namespace, image_id = image_spec
self.log.debug(
def handle_create_snapshot(self,
- image_spec: ImageSpecT,
+ image_spec: ImageSpec,
image: rbd.Image,
comp: rados.Completion,
snap_id: str) -> None:
self.close_image(image_spec, image)
- def close_image(self, image_spec: ImageSpecT, image: rbd.Image) -> None:
+ def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None:
pool_id, namespace, image_id = image_spec
self.log.debug(
self.finish(image_spec)
def handle_close_image(self,
- image_spec: ImageSpecT,
+ image_spec: ImageSpec,
comp: rados.Completion) -> None:
pool_id, namespace, image_id = image_spec
self.finish(image_spec)
- def finish(self, image_spec: ImageSpecT) -> None:
+ def finish(self, image_spec: ImageSpec) -> None:
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
self.open_image(image_spec)
- def get_ioctx(self, image_spec: ImageSpecT) -> rados.Ioctx:
+ def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx:
pool_id, namespace, image_id = image_spec
nspec = (pool_id, namespace)
return ioctx
- def put_ioctx(self, image_spec: ImageSpecT) -> None:
+ def put_ioctx(self, image_spec: ImageSpec) -> None:
pool_id, namespace, image_id = image_spec
nspec = (pool_id, namespace)
def init_schedule_queue(self) -> None:
# schedule_time => image_spec
- self.queue: Dict[str, List[ImageSpecT]] = {}
+ self.queue: Dict[str, List[ImageSpec]] = {}
# pool_id => {namespace => image_id}
self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
self.watchers = Watchers(self)
self.queue[schedule_time] = []
self.log.debug("schedule image {}/{}/{} at {}".format(
pool_id, namespace, image_id, schedule_time))
- image_spec = (pool_id, namespace, image_id)
+ image_spec = ImageSpec(pool_id, namespace, image_id)
if image_spec not in self.queue[schedule_time]:
- self.queue[schedule_time].append((pool_id, namespace, image_id))
+ self.queue[schedule_time].append(image_spec)
- def dequeue(self) -> Tuple[Optional[Tuple[str, str, str]], float]:
+ def dequeue(self) -> Tuple[Optional[ImageSpec], float]:
if not self.queue:
return None, 1000.0
def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None:
empty_slots = []
+ image_spec = ImageSpec(pool_id, namespace, image_id)
for schedule_time, images in self.queue.items():
- if (pool_id, namespace, image_id) in images:
- images.remove((pool_id, namespace, image_id))
+ if image_spec in images:
+ images.remove(image_spec)
if not images:
empty_slots.append(schedule_time)
for schedule_time in empty_slots: