self.queue: Dict[str, List[ImageSpec]] = {}
# pool_id => {namespace => image_id}
self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
+ self.schedules = Schedules(self)
self.refresh_images()
self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
def load_schedules(self) -> None:
self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
-
- schedules = Schedules(self)
- schedules.load(namespace_validator, image_validator)
- self.schedules = schedules
+ self.schedules.load(namespace_validator, image_validator)
def refresh_images(self) -> float:
elapsed = (datetime.now() - self.last_refresh_images).total_seconds()
self.level_specs: Dict[str, LevelSpec] = {}
self.schedules: Dict[str, Schedule] = {}
- def __len__(self) -> int:
- return len(self.schedules)
-
- def load(self,
- namespace_validator: Optional[Callable] = None,
- image_validator: Optional[Callable] = None) -> None:
-
- schedule_cfg = self.handler.module.get_module_option(
- self.handler.MODULE_OPTION_NAME, '')
-
# Previous versions incorrectly stored the global config in
# the localized module option. Check the config is here and fix it.
+ schedule_cfg = self.handler.module.get_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
if not schedule_cfg:
schedule_cfg = self.handler.module.get_localized_module_option(
self.handler.MODULE_OPTION_NAME, '')
self.handler.module.set_localized_module_option(
self.handler.MODULE_OPTION_NAME, None)
+ def __len__(self) -> int:
+ return len(self.schedules)
+
+ def load(self,
+ namespace_validator: Optional[Callable] = None,
+ image_validator: Optional[Callable] = None) -> None:
+ self.level_specs = {}
+ self.schedules = {}
+
+ schedule_cfg = self.handler.module.get_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
if schedule_cfg:
try:
level_spec = LevelSpec.make_global()
self.queue: Dict[str, List[Tuple[str, str]]] = {}
# pool_id => {namespace => pool_name}
self.pools: Dict[str, Dict[str, str]] = {}
+ self.schedules = Schedules(self)
self.refresh_pools()
self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
def load_schedules(self) -> None:
self.log.info("TrashPurgeScheduleHandler: load_schedules")
-
- schedules = Schedules(self)
- schedules.load()
- self.schedules = schedules
+ self.schedules.load()
def refresh_pools(self) -> float:
elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()