from ..security import Scope
from ..services.ceph_service import CephService
from ..services.exception import handle_send_command_error
-from ..services.rbd import RbdConfiguration
+from ..services.rbd import RbdConfiguration, RbdMirroringService
from ..tools import TaskManager, str_to_bool
from . import APIDoc, APIRouter, Endpoint, EndpointDoc, ReadPermission, \
RESTController, Task, UIRouter
pool = [p for p in pools if p['pool_name'] == pool_name]
if not pool:
raise cherrypy.NotFound('No such pool')
+
+ schedule_info = RbdMirroringService.get_snapshot_schedule_info()
+ if schedule_info:
+ filtered = [
+ info for info in schedule_info
+ if info["name"].split("/", 1)[0] == pool_name
+ ]
+ pool[0]['schedule_info'] = filtered[0] if filtered else {}
return pool[0]
def get(self, pool_name: str, attrs: Optional[str] = None, stats: bool = False) -> dict:
from .ceph_service import CephService
try:
- from typing import List, Optional
+ from typing import Dict, List, Optional
except ImportError:
pass # For typing only
stat['mirror_mode'] = 'journal'
elif mirror_mode == rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
stat['mirror_mode'] = 'snapshot'
- schedule_status = json.loads(_rbd_support_remote(
- 'mirror_snapshot_schedule_status')[1])
- for scheduled_image in schedule_status['scheduled_images']:
- if scheduled_image['image'] == get_image_spec(pool_name, namespace, image_name):
- stat['schedule_info'] = scheduled_image
+ schedule_info = RbdMirroringService.get_snapshot_schedule_info(
+ get_image_spec(pool_name, namespace, image_name)
+ )
+ if schedule_info:
+ stat['schedule_info'] = schedule_info[0]
stat['name'] = image_name
def snapshot_schedule_remove(cls, image_spec: str):
_rbd_support_remote('mirror_snapshot_schedule_remove', image_spec)
+ @classmethod
+ def snapshot_schedule_list(cls, image_spec: str = ''):
+ return _rbd_support_remote('mirror_snapshot_schedule_list', image_spec)
+
+ @classmethod
+ def snapshot_schedule_status(cls, image_spec: str = ''):
+ return _rbd_support_remote('mirror_snapshot_schedule_status', image_spec)
+
+ @classmethod
+ def get_snapshot_schedule_info(cls, image_spec: str = ''):
+ """
+ Retrieve snapshot schedule information by merging schedule list and status.
+
+ Args:
+ image_spec (str, optional): Specification of an RBD image. If empty,
+ retrieves all schedule information.
+ Format: "<pool_name>/<namespace_name>/<image_name>".
+
+ Returns:
+ Optional[List[Dict[str, Any]]]: A list of merged schedule information
+ dictionaries if found, otherwise None.
+ """
+ schedule_info: List[Dict] = []
+
+ # schedule list and status provide the schedule interval
+ # and schedule timestamp respectively.
+ schedule_list_raw = cls.snapshot_schedule_list(image_spec)
+ schedule_status_raw = cls.snapshot_schedule_status(image_spec)
+
+ try:
+ schedule_list = json.loads(
+ schedule_list_raw[1]) if schedule_list_raw and schedule_list_raw[1] else {}
+ schedule_status = json.loads(
+ schedule_status_raw[1]) if schedule_status_raw and schedule_status_raw[1] else {}
+ except (json.JSONDecodeError, TypeError):
+ return None
+
+ if not schedule_list or not schedule_status:
+ return None
+
+ scheduled_images = schedule_status.get("scheduled_images", [])
+
+ for _, schedule in schedule_list.items():
+ name = schedule.get("name")
+ if not name:
+ continue
+
+ # find status entry for this schedule
+ # by matching with the image name
+ image = next((
+ sched_image for sched_image in scheduled_images
+ if sched_image.get("image") == name), None)
+ if not image:
+ continue
+
+ # eventually we are merging both the list and status entries
+ # all the needed info are fetched above and here we are just mapping
+ # it to the dictionary so that in one function we get
+ # the schedule related information.
+ merged = {
+ "name": name,
+ "schedule_interval": schedule.get("schedule", []),
+ "schedule_time": image.get("schedule_time")
+ }
+ schedule_info.append(merged)
+
+ return schedule_info if schedule_info else None
+
class RbdImageMetadataService(object):
def __init__(self, image):