import re
import traceback
-from datetime import datetime, timedelta, time
+from datetime import datetime
from threading import Condition, Lock, Thread
from .common import get_rbd_pools
+from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
-SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
-
-SCHEDULE_INTERVAL = "interval"
-SCHEDULE_START_TIME = "start_time"
-
-
-class Interval:
-
- def __init__(self, minutes):
- self.minutes = minutes
-
- def __eq__(self, interval):
- return self.minutes == interval.minutes
-
- def __hash__(self):
- return hash(self.minutes)
-
- def to_string(self):
- if self.minutes % (60 * 24) == 0:
- interval = int(self.minutes / (60 * 24))
- units = 'd'
- elif self.minutes % 60 == 0:
- interval = int(self.minutes / 60)
- units = 'h'
- else:
- interval = int(self.minutes)
- units = 'm'
-
- return "{}{}".format(interval, units)
-
- @classmethod
- def from_string(cls, interval):
- match = re.match(r'^(\d+)(d|h|m)?$', interval)
- if not match:
- raise ValueError("Invalid interval ({})".format(interval))
-
- minutes = int(match.group(1))
- if match.group(2) == 'd':
- minutes *= 60 * 24
- elif match.group(2) == 'h':
- minutes *= 60
-
- return Interval(minutes)
-
-
-class StartTime:
-
- def __init__(self, hour, minute, tzinfo):
- self.time = time(hour, minute, tzinfo=tzinfo)
- self.minutes = self.time.hour * 60 + self.time.minute
- if self.time.tzinfo:
- self.minutes += int(self.time.utcoffset().seconds / 60)
-
- def __eq__(self, start_time):
- return self.minutes == start_time.minutes
-
- def __hash__(self):
- return hash(self.minutes)
-
- def to_string(self):
- return self.time.isoformat()
-
- @classmethod
- def from_string(cls, start_time):
- if not start_time:
- return None
-
- try:
- t = time.fromisoformat(start_time)
- except ValueError as e:
- raise ValueError("Invalid start time {}: {}".format(start_time, e))
-
- return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
-
-class LevelSpec:
-
- def __init__(self, name, id, pool_id, namespace, image_id):
- self.name = name
- self.id = id
- self.pool_id = pool_id
- self.namespace = namespace
- self.image_id = image_id
-
- def __eq__(self, level_spec):
- return self.id == level_spec.id
-
- def is_child_of(self, level_spec):
- if level_spec.is_global():
- return not self.is_global()
- if level_spec.pool_id != self.pool_id:
- return False
- if level_spec.namespace is None:
- return self.namespace is not None
- if level_spec.namespace != self.namespace:
- return False
- if level_spec.image_id is None:
- return self.image_id is not None
- return False
-
- def is_global(self):
- return self.pool_id is None
-
- def get_pool_id(self):
- return self.pool_id
-
- def matches(self, pool_id, namespace, image_id):
- if self.pool_id and self.pool_id != pool_id:
- return False
- if self.namespace and self.namespace != namespace:
- return False
- if self.image_id and self.image_id != image_id:
- return False
- return True
-
- @classmethod
- def from_name(cls, handler, name):
- # parse names like:
- # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
- match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
- name)
- if not match:
- raise ValueError("failed to parse {}".format(name))
-
- id = ""
- pool_id = None
- namespace = None
- image_name = None
- image_id = None
- if match.group(1):
- pool_name = match.group(1)
- try:
- pool_id = handler.module.rados.pool_lookup(pool_name)
- if pool_id is None:
- raise ValueError("pool {} does not exist".format(pool_name))
- pool_id = str(pool_id)
- id += pool_id
- if match.group(2) is not None or match.group(3):
- id += "/"
- with handler.module.rados.open_ioctx(pool_name) as ioctx:
- namespace = match.group(2) or ""
- if namespace:
- namespaces = rbd.RBD().namespace_list(ioctx)
- if namespace not in namespaces:
- raise ValueError(
- "namespace {} does not exist".format(
- namespace))
- id += namespace
- ioctx.set_namespace(namespace)
- pool_mode = rbd.RBD().mirror_mode_get(ioctx)
- if pool_mode != rbd.RBD_MIRROR_MODE_IMAGE:
- raise ValueError(
- "namespace {} is not in mirror image mode".format(
- namespace))
- if match.group(3):
- image_name = match.group(3)
- try:
- with rbd.Image(ioctx, image_name,
- read_only=True) as image:
- image_id = image.id()
- id += "/" + image_id
- mode = image.mirror_image_get_mode()
- if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
- raise rbd.InvalidArgument(
- "Invalid mirror mode")
- except rbd.ImageNotFound:
- raise ValueError("image {} does not exist".format(
- image_name))
- except rbd.InvalidArgument:
- raise ValueError(
- "image {} is not in snapshot mirror mode".format(
- image_name))
-
- except rados.ObjectNotFound:
- raise ValueError("pool {} does not exist".format(pool_name))
-
- # normalize possible input name like 'rbd//image'
- if not namespace and image_name:
- name = "{}/{}".format(pool_name, image_name)
-
- return LevelSpec(name, id, pool_id, namespace, image_id)
-
- @classmethod
- def from_id(cls, handler, id):
- # parse ids like:
- # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
- match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
- if not match:
- raise ValueError("failed to parse: {}".format(id))
-
- name = ""
- pool_id = None
- namespace = None
- image_id = None
- if match.group(1):
- pool_id = match.group(1)
- try:
- pool_name = handler.module.rados.pool_reverse_lookup(
- int(pool_id))
- if pool_name is None:
- raise ValueError("pool {} does not exist".format(pool_name))
- name += pool_name + "/"
- if match.group(2) is not None or match.group(3):
- with handler.module.rados.open_ioctx(pool_name) as ioctx:
- namespace = match.group(2) or ""
- if namespace:
- namespaces = rbd.RBD().namespace_list(ioctx)
- if namespace not in namespaces:
- raise ValueError(
- "namespace {} does not exist".format(
- namespace))
- name += namespace + "/"
- ioctx.set_namespace(namespace)
- pool_mode = rbd.RBD().mirror_mode_get(ioctx)
- if pool_mode != rbd.RBD_MIRROR_MODE_IMAGE:
- raise ValueError(
- "namespace {} is not in mirror image mode".format(
- namespace))
- elif not match.group(3):
- name += "/"
- if match.group(3):
- image_id = match.group(3)
- try:
- with rbd.Image(ioctx, image_id=image_id,
- read_only=True) as image:
- image_name = image.get_name()
- name += image_name
- mode = image.mirror_image_get_mode()
- if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
- raise rbd.InvalidArgument(
- "Invalid mirror mode")
- except rbd.ImageNotFound:
- raise ValueError("image {} does not exist".format(
- image_id))
- except rbd.InvalidArgument:
- raise ValueError(
- "image {} is not in snapshot mirror mode".format(
- image_id))
-
- except rados.ObjectNotFound:
- raise ValueError("pool {} does not exist".format(pool_id))
-
- return LevelSpec(name, id, pool_id, namespace, image_id)
-
-
-class Schedule:
-
- def __init__(self, name):
- self.name = name
- self.items = set()
-
- def __len__(self):
- return len(self.items)
-
- def add(self, interval, start_time=None):
- self.items.add((interval, start_time))
-
- def remove(self, interval, start_time=None):
- self.items.discard((interval, start_time))
-
- def to_list(self):
- return [{SCHEDULE_INTERVAL: i[0].to_string(),
- SCHEDULE_START_TIME: i[1] and i[1].to_string() or None}
- for i in self.items]
-
- def to_json(self):
- return json.dumps(self.to_list(), indent=4, sort_keys=True)
-
- @classmethod
- def from_json(cls, name, val):
- try:
- items = json.loads(val)
- schedule = Schedule(name)
- for item in items:
- interval = Interval.from_string(item[SCHEDULE_INTERVAL])
- start_time = item[SCHEDULE_START_TIME] and \
- StartTime.from_string(item[SCHEDULE_START_TIME]) or None
- schedule.add(interval, start_time)
- return schedule
- except json.JSONDecodeError as e:
- raise ValueError("Invalid JSON ({})".format(str(e)))
- except KeyError as e:
- raise ValueError(
- "Invalid schedule format (missing key {})".format(str(e)))
- except TypeError as e:
- raise ValueError("Invalid schedule format ({})".format(str(e)))
+def namespace_validator(ioctx):
+ mode = rbd.RBD().mirror_mode_get(ioctx)
+ if mode != rbd.RBD_MIRROR_MODE_IMAGE:
+ raise ValueError("namespace {} is not in mirror image mode".format(
+ ioctx.get_namespace()))
+def image_validator(image):
+ mode = image.mirror_image_get_mode()
+ if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+ raise rbd.InvalidArgument("Invalid mirror image mode")
class MirrorSnapshotScheduleHandler:
+ MODULE_OPTION_NAME = "mirror_snapshot_schedule"
+ SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
+
lock = Lock()
condition = Condition(lock)
thread = None
pool_id, namespace, image_id, e))
- @classmethod
- def format_image_spec(cls, image_spec):
- image = image_spec[2]
- if image_spec[1]:
- image = "{}/{}".format(image_spec[1], image)
- if image_spec[0]:
- image = "{}/{}".format(image_spec[0], image)
- return image
-
def init_schedule_queue(self):
self.queue = {}
self.images = {}
def load_schedules(self):
self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
- schedules = {}
- schedule_cfg = self.module.get_localized_module_option(
- 'mirror_snapshot_schedule', '')
- if schedule_cfg:
- try:
- schedule = Schedule.from_json('', schedule_cfg)
- schedules[''] = schedule
- except ValueError:
- self.log.error("Failed to decode configured schedule {}".format(
- schedule_cfg))
-
- for pool_id, pool_name in get_rbd_pools(self.module).items():
- with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- self.load_pool_schedules(ioctx, schedules)
-
+ schedules = Schedules(self)
+ schedules.load(namespace_validator, image_validator)
with self.lock:
self.schedules = schedules
- def load_pool_schedules(self, ioctx, schedules):
- pool_id = ioctx.get_pool_id()
- pool_name = ioctx.get_pool_name()
- stale_keys = ()
- start_after = ''
- try:
- while True:
- with rados.ReadOpCtx() as read_op:
- self.log.info("load_schedules: {}, start_after={}".format(
- pool_name, start_after))
- it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
- ioctx.operate_read_op(read_op, SCHEDULE_OID)
-
- it = list(it)
- for k, v in it:
- start_after = k
- v = v.decode()
- self.log.info("load_schedule: {} {}".format(k, v))
-
- try:
- try:
- level_spec = LevelSpec.from_id(self, k)
- except ValueError:
- self.log.debug(
- "Stail schedule key {} in pool".format(
- k, pool_name))
- stale_keys += (k,)
- continue
-
- schedule = Schedule.from_json(level_spec.name, v)
- schedules[k] = schedule
- except ValueError:
- self.log.error(
- "Failed to decode schedule: pool={}, {} {}".format(
- pool_name, k, v))
-
- if not it:
- break
-
- except StopIteration:
- pass
- except rados.ObjectNotFound:
- # rbd_mirror_snapshot_schedule DNE
- pass
-
- if stale_keys:
- with rados.WriteOpCtx() as write_op:
- ioctx.remove_omap_keys(write_op, stale_keys)
- ioctx.operate_write_op(write_op, SCHEDULE_OID)
-
def refresh_images(self):
if (datetime.now() - self.last_refresh_images).seconds < 60:
return
pool_name, e))
pass
- def find_schedule(self, pool_id, namespace, image_id):
- levels = [None, pool_id, namespace, image_id]
- while levels:
- level_spec_id = "/".join(levels[1:])
- if level_spec_id in self.schedules:
- return self.schedules[level_spec_id]
- del levels[-1]
- return None
-
- def calc_schedule_time(self, schedule, now):
- schedule_time = None
- for item in schedule.items:
- period = timedelta(minutes=item[0].minutes)
- start_time = datetime(1970, 1, 1)
- if item[1]:
- start_time += timedelta(minutes=item[1].minutes)
- time = start_time + \
- (int((now - start_time) / period) + 1) * period
- if schedule_time is None or time < schedule_time:
- schedule_time = time
- return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
-
def rebuild_queue(self):
with self.lock:
now = datetime.now()
def enqueue(self, now, pool_id, namespace, image_id):
- schedule = self.find_schedule(pool_id, namespace, image_id)
+ schedule = self.schedules.find(pool_id, namespace, image_id)
if not schedule:
return
- schedule_time = self.calc_schedule_time(schedule, now)
+ schedule_time = schedule.next_run(now)
if schedule_time not in self.queue:
self.queue[schedule_time] = []
self.log.debug("schedule image {}/{}/{} at {}".format(
return None, wait_time.total_seconds()
images = self.queue[schedule_time]
- image = images[0]
image = images.pop(0)
if not images:
del self.queue[schedule_time]
for schedule_time in empty_slots:
del self.queue[schedule_time]
- def save_schedule(self, level_spec, schedule):
- if level_spec.is_global():
- schedule_cfg = schedule and schedule.to_json() or ''
- self.module.set_localized_module_option('mirror_snapshot_schedule',
- schedule_cfg)
- return
-
- pool_id = level_spec.get_pool_id()
- with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
- with rados.WriteOpCtx() as write_op:
- if schedule:
- ioctx.set_omap(write_op, (level_spec.id, ),
- (schedule.to_json(), ))
- else:
- ioctx.remove_omap_keys(write_op, (level_spec.id, ))
- ioctx.operate_write_op(write_op, SCHEDULE_OID)
-
-
- def add_schedule(self, level_spec_name, interval, start_time):
+ def add_schedule(self, level_spec, interval, start_time):
self.log.debug(
"add_schedule: level_spec={}, interval={}, start_time={}".format(
- level_spec_name, interval, start_time))
- try:
- level_spec = LevelSpec.from_name(self, level_spec_name)
- except ValueError as e:
- return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
- level_spec_name, e)
+ level_spec.name, interval, start_time))
with self.lock:
- schedule = self.schedules.get(level_spec.id, Schedule(level_spec_name))
- schedule.add(Interval.from_string(interval),
- StartTime.from_string(start_time))
- self.save_schedule(level_spec, schedule)
- self.schedules[level_spec.id] = schedule
+ self.schedules.add(level_spec, interval, start_time)
# TODO: optimize to rebuild only affected part of the queue
self.rebuild_queue()
return 0, "", ""
- def remove_schedule(self, level_spec_name, interval, start_time):
- try:
- level_spec = LevelSpec.from_name(self, level_spec_name)
- except ValueError as e:
- return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
- level_spec_name, e)
+ def remove_schedule(self, level_spec, interval, start_time):
+ self.log.debug(
+ "remove_schedule: level_spec={}, interval={}, start_time={}".format(
+ level_spec.name, interval, start_time))
with self.lock:
- schedule = self.schedules.pop(level_spec.id, None)
- if not schedule:
- return -errno.ENOENT, '', "No schedule for {}".format(level_spec_name)
-
- if interval is None:
- schedule = None
- else:
- schedule.remove(Interval.from_string(interval),
- StartTime.from_string(start_time))
- if schedule:
- self.schedules[level_spec.id] = schedule
- self.save_schedule(level_spec, schedule)
+ self.schedules.remove(level_spec, interval, start_time)
# TODO: optimize to rebuild only affected part of the queue
self.rebuild_queue()
return 0, "", ""
- def list(self, level_spec_name):
- self.log.debug("list: level_spec={}".format(level_spec_name))
-
- if not level_spec_name:
- level_spec_name = ""
-
- try:
- level_spec = LevelSpec.from_name(self, level_spec_name)
- except ValueError as e:
- return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
- level_spec_name, e)
+ def list(self, level_spec):
+ self.log.debug("list: level_spec={}".format(level_spec.name))
- result = {}
with self.lock:
- parent = LevelSpec.from_id(self, "")
- if not level_spec.is_global():
- for level_spec_id in self.schedules:
- ls = LevelSpec.from_id(self, level_spec_id)
- if ls == level_spec:
- parent = ls
- break
- if level_spec.is_child_of(ls) and ls.is_child_of(parent):
- parent = ls
- for level_spec_id, schedule in self.schedules.items():
- ls = LevelSpec.from_id(self, level_spec_id)
- if ls == parent or ls.is_child_of(parent):
- result[level_spec_id] = {
- 'name' : schedule.name,
- 'schedule' : schedule.to_list(),
- }
- return 0, json.dumps(result, indent=4, sort_keys=True), ""
+ result = self.schedules.to_list(level_spec)
- def status(self, level_spec_name):
- self.log.debug("status: level_spec={}".format(level_spec_name))
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
- if not level_spec_name:
- level_spec_name = ""
-
- try:
- level_spec = LevelSpec.from_name(self, level_spec_name)
- except ValueError as e:
- return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
- level_spec_name, e)
+ def status(self, level_spec):
+ self.log.debug("status: level_spec={}".format(level_spec.name))
scheduled_images = []
with self.lock:
indent=4, sort_keys=True), ""
def handle_command(self, inbuf, prefix, cmd):
+ level_spec_name = cmd.get('level_spec', "")
+
+ try:
+ level_spec = LevelSpec.from_name(self, level_spec_name,
+ namespace_validator,
+ image_validator)
+ except ValueError as e:
+ return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
+ level_spec_name, e)
+
if prefix == 'add':
- return self.add_schedule(cmd['level_spec'], cmd['interval'],
+ return self.add_schedule(level_spec, cmd['interval'],
cmd.get('start_time'))
elif prefix == 'remove':
- return self.remove_schedule(cmd['level_spec'], cmd.get('interval'),
+ return self.remove_schedule(level_spec, cmd.get('interval'),
cmd.get('start_time'))
elif prefix == 'list':
- return self.list(cmd.get('level_spec', None))
+ return self.list(level_spec)
elif prefix == 'status':
- return self.status(cmd.get('level_spec', None))
+ return self.status(level_spec)
raise NotImplementedError(cmd['prefix'])
--- /dev/null
+import json
+import rados
+import rbd
+import re
+
+from datetime import datetime, timedelta, time
+
+from .common import get_rbd_pools
+
+SCHEDULE_INTERVAL = "interval"
+SCHEDULE_START_TIME = "start_time"
+
+
+class LevelSpec:
+
+ def __init__(self, name, id, pool_id, namespace, image_id=None):
+ self.name = name
+ self.id = id
+ self.pool_id = pool_id
+ self.namespace = namespace
+ self.image_id = image_id
+
+ def __eq__(self, level_spec):
+ return self.id == level_spec.id
+
+ def is_child_of(self, level_spec):
+ if level_spec.is_global():
+ return not self.is_global()
+ if level_spec.pool_id != self.pool_id:
+ return False
+ if level_spec.namespace is None:
+ return self.namespace is not None
+ if level_spec.namespace != self.namespace:
+ return False
+ if level_spec.image_id is None:
+ return self.image_id is not None
+ return False
+
+ def is_global(self):
+ return self.pool_id is None
+
+ def get_pool_id(self):
+ return self.pool_id
+
+ def matches(self, pool_id, namespace, image_id=None):
+ if self.pool_id and self.pool_id != pool_id:
+ return False
+ if self.namespace and self.namespace != namespace:
+ return False
+ if self.image_id and self.image_id != image_id:
+ return False
+ return True
+
+ @classmethod
+ def make_global(cls):
+ return LevelSpec("", "", None, None, None)
+
+ @classmethod
+ def from_name(cls, handler, name, namespace_validator=None,
+ image_validator=None, allow_image_level=True):
+ # parse names like:
+ # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
+ match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
+ name)
+ if not match:
+ raise ValueError("failed to parse {}".format(name))
+ if match.group(3) and not allow_image_level:
+ raise ValueError(
+ "invalid name {}: image level is not allowed".format(name))
+
+ id = ""
+ pool_id = None
+ namespace = None
+ image_name = None
+ image_id = None
+ if match.group(1):
+ pool_name = match.group(1)
+ try:
+ pool_id = handler.module.rados.pool_lookup(pool_name)
+ if pool_id is None:
+ raise ValueError("pool {} does not exist".format(pool_name))
+ pool_id = str(pool_id)
+ id += pool_id
+ if match.group(2) is not None or match.group(3):
+ id += "/"
+ with handler.module.rados.open_ioctx(pool_name) as ioctx:
+ namespace = match.group(2) or ""
+ if namespace:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ if namespace not in namespaces:
+ raise ValueError(
+ "namespace {} does not exist".format(
+ namespace))
+ id += namespace
+ ioctx.set_namespace(namespace)
+ if namespace_validator:
+ namespace_validator(ioctx)
+ if match.group(3):
+ image_name = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_name,
+ read_only=True) as image:
+ image_id = image.id()
+ id += "/" + image_id
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
+ image_name))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_name))
+
+ except rados.ObjectNotFound:
+ raise ValueError("pool {} does not exist".format(pool_name))
+
+ # normalize possible input name like 'rbd//image'
+ if not namespace and image_name:
+ name = "{}/{}".format(pool_name, image_name)
+
+ return LevelSpec(name, id, pool_id, namespace, image_id)
+
+ @classmethod
+ def from_id(cls, handler, id, namespace_validator=None,
+ image_validator=None):
+ # parse ids like:
+ # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
+ match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
+ if not match:
+ raise ValueError("failed to parse: {}".format(id))
+
+ name = ""
+ pool_id = None
+ namespace = None
+ image_id = None
+ if match.group(1):
+ pool_id = match.group(1)
+ try:
+ pool_name = handler.module.rados.pool_reverse_lookup(
+ int(pool_id))
+ if pool_name is None:
+ raise ValueError("pool {} does not exist".format(pool_name))
+ name += pool_name + "/"
+ if match.group(2) is not None or match.group(3):
+ with handler.module.rados.open_ioctx(pool_name) as ioctx:
+ namespace = match.group(2) or ""
+ if namespace:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ if namespace not in namespaces:
+ raise ValueError(
+ "namespace {} does not exist".format(
+ namespace))
+ name += namespace + "/"
+ if namespace_validator:
+ ioctx.set_namespace(namespace)
+ elif not match.group(3):
+ name += "/"
+ if match.group(3):
+ image_id = match.group(3)
+ try:
+ with rbd.Image(ioctx, image_id=image_id,
+ read_only=True) as image:
+ image_name = image.get_name()
+ name += image_name
+ if image_validator:
+ image_validator(image)
+ except rbd.ImageNotFound:
+ raise ValueError("image {} does not exist".format(
+ image_id))
+ except rbd.InvalidArgument:
+ raise ValueError(
+ "image {} is not in snapshot mirror mode".format(
+ image_id))
+
+ except rados.ObjectNotFound:
+ raise ValueError("pool {} does not exist".format(pool_id))
+
+ return LevelSpec(name, id, pool_id, namespace, image_id)
+
+
+class Interval:
+
+ def __init__(self, minutes):
+ self.minutes = minutes
+
+ def __eq__(self, interval):
+ return self.minutes == interval.minutes
+
+ def __hash__(self):
+ return hash(self.minutes)
+
+ def to_string(self):
+ if self.minutes % (60 * 24) == 0:
+ interval = int(self.minutes / (60 * 24))
+ units = 'd'
+ elif self.minutes % 60 == 0:
+ interval = int(self.minutes / 60)
+ units = 'h'
+ else:
+ interval = int(self.minutes)
+ units = 'm'
+
+ return "{}{}".format(interval, units)
+
+ @classmethod
+ def from_string(cls, interval):
+ match = re.match(r'^(\d+)(d|h|m)?$', interval)
+ if not match:
+ raise ValueError("Invalid interval ({})".format(interval))
+
+ minutes = int(match.group(1))
+ if match.group(2) == 'd':
+ minutes *= 60 * 24
+ elif match.group(2) == 'h':
+ minutes *= 60
+
+ return Interval(minutes)
+
+
+class StartTime:
+
+ def __init__(self, hour, minute, tzinfo):
+ self.time = time(hour, minute, tzinfo=tzinfo)
+ self.minutes = self.time.hour * 60 + self.time.minute
+ if self.time.tzinfo:
+ self.minutes += int(self.time.utcoffset().seconds / 60)
+
+ def __eq__(self, start_time):
+ return self.minutes == start_time.minutes
+
+ def __hash__(self):
+ return hash(self.minutes)
+
+ def to_string(self):
+ return self.time.isoformat()
+
+ @classmethod
+ def from_string(cls, start_time):
+ if not start_time:
+ return None
+
+ try:
+ t = time.fromisoformat(start_time)
+ except ValueError as e:
+ raise ValueError("Invalid start time {}: {}".format(start_time, e))
+
+ return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
+
+
+class Schedule:
+
+ def __init__(self, name):
+ self.name = name
+ self.items = set()
+
+ def __len__(self):
+ return len(self.items)
+
+ def add(self, interval, start_time=None):
+ self.items.add((interval, start_time))
+
+ def remove(self, interval, start_time=None):
+ self.items.discard((interval, start_time))
+
+ def next_run(self, now):
+ schedule_time = None
+ for item in self.items:
+ period = timedelta(minutes=item[0].minutes)
+ start_time = datetime(1970, 1, 1)
+ if item[1]:
+ start_time += timedelta(minutes=item[1].minutes)
+ time = start_time + \
+ (int((now - start_time) / period) + 1) * period
+ if schedule_time is None or time < schedule_time:
+ schedule_time = time
+ return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
+
+ def to_list(self):
+ return [{SCHEDULE_INTERVAL: i[0].to_string(),
+ SCHEDULE_START_TIME: i[1] and i[1].to_string() or None}
+ for i in self.items]
+
+ def to_json(self):
+ return json.dumps(self.to_list(), indent=4, sort_keys=True)
+
+ @classmethod
+ def from_json(cls, name, val):
+ try:
+ items = json.loads(val)
+ schedule = Schedule(name)
+ for item in items:
+ interval = Interval.from_string(item[SCHEDULE_INTERVAL])
+ start_time = item[SCHEDULE_START_TIME] and \
+ StartTime.from_string(item[SCHEDULE_START_TIME]) or None
+ schedule.add(interval, start_time)
+ return schedule
+ except json.JSONDecodeError as e:
+ raise ValueError("Invalid JSON ({})".format(str(e)))
+ except KeyError as e:
+ raise ValueError(
+ "Invalid schedule format (missing key {})".format(str(e)))
+ except TypeError as e:
+ raise ValueError("Invalid schedule format ({})".format(str(e)))
+
+class Schedules:
+
+ def __init__(self, handler):
+ self.handler = handler
+ self.level_specs = {}
+ self.schedules = {}
+
+ def __len__(self):
+ return len(self.schedules)
+
+ def load(self, namespace_validator=None, image_validator=None):
+
+ schedule_cfg = self.handler.module.get_localized_module_option(
+ self.handler.MODULE_OPTION_NAME, '')
+ if schedule_cfg:
+ try:
+ level_spec = LevelSpec.make_global()
+ self.level_specs[level_spec.id] = level_spec
+ schedule = Schedule.from_json(level_spec.name, schedule_cfg)
+ self.schedules[level_spec.id] = schedule
+ except ValueError:
+ self.handler.log.error(
+ "Failed to decode configured schedule {}".format(
+ schedule_cfg))
+
+ for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
+ with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_from_pool(ioctx, namespace_validator, image_validator)
+
+ def load_from_pool(self, ioctx, namespace_validator, image_validator):
+ pool_id = ioctx.get_pool_id()
+ pool_name = ioctx.get_pool_name()
+ stale_keys = ()
+ start_after = ''
+ try:
+ while True:
+ with rados.ReadOpCtx() as read_op:
+ self.handler.log.info(
+ "load_schedules: {}, start_after={}".format(
+ pool_name, start_after))
+ it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+ ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
+
+ it = list(it)
+ for k, v in it:
+ start_after = k
+ v = v.decode()
+ self.handler.log.info(
+ "load_schedule: {} {}".format(k, v))
+ try:
+ try:
+ level_spec = LevelSpec.from_id(
+ self.handler, k, namespace_validator,
+ image_validator)
+ except ValueError:
+ self.handler.log.debug(
+ "Stail schedule key {} in pool".format(
+ k, pool_name))
+ stale_keys += (k,)
+ continue
+
+ self.level_specs[level_spec.id] = level_spec
+ schedule = Schedule.from_json(level_spec.name, v)
+ self.schedules[level_spec.id] = schedule
+ except ValueError:
+ self.handler.log.error(
+ "Failed to decode schedule: pool={}, {} {}".format(
+ pool_name, k, v))
+ if not it:
+ break
+
+ except StopIteration:
+ pass
+ except rados.ObjectNotFound:
+ pass
+
+ if stale_keys:
+ with rados.WriteOpCtx() as write_op:
+ ioctx.remove_omap_keys(write_op, stale_keys)
+ ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+ def save(self, level_spec, schedule):
+ if level_spec.is_global():
+ schedule_cfg = schedule and schedule.to_json() or ''
+ self.handler.module.set_localized_module_option(
+ self.handler.MODULE_OPTION_NAME, schedule_cfg)
+ return
+
+ pool_id = level_spec.get_pool_id()
+ with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ with rados.WriteOpCtx() as write_op:
+ if schedule:
+ ioctx.set_omap(write_op, (level_spec.id, ),
+ (schedule.to_json(), ))
+ else:
+ ioctx.remove_omap_keys(write_op, (level_spec.id, ))
+ ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+
+ def add(self, level_spec, interval, start_time):
+ schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
+ schedule.add(Interval.from_string(interval),
+ StartTime.from_string(start_time))
+ self.schedules[level_spec.id] = schedule
+ self.level_specs[level_spec.id] = level_spec
+ self.save(level_spec, schedule)
+
+ def remove(self, level_spec, interval, start_time):
+ schedule = self.schedules.pop(level_spec.id, None)
+ if schedule:
+ if interval is None:
+ schedule = None
+ else:
+ schedule.remove(Interval.from_string(interval),
+ StartTime.from_string(start_time))
+ if schedule:
+ self.schedules[level_spec.id] = schedule
+ if not schedule:
+ del self.level_specs[level_spec.id]
+ self.save(level_spec, schedule)
+
+ def find(self, pool_id, namespace, image_id=None):
+ levels = [None, pool_id, namespace]
+ if image_id:
+ levels.append(image_id)
+
+ while levels:
+ level_spec_id = "/".join(levels[1:])
+ if level_spec_id in self.schedules:
+ return self.schedules[level_spec_id]
+ del levels[-1]
+ return None
+
+ def to_list(self, level_spec):
+ if level_spec.id in self.schedules:
+ parent = level_spec
+ else:
+ # try to find existing parent
+ parent = None
+ for level_spec_id in self.schedules:
+ ls = self.level_specs[level_spec_id]
+ if ls == level_spec:
+ parent = ls
+ break
+ if level_spec.is_child_of(ls) and \
+ (not parent or ls.is_child_of(parent)):
+ parent = ls
+ if not parent:
+ # set to non-existing parent so we still could list its children
+ parent = level_spec
+
+ result = {}
+ for level_spec_id, schedule in self.schedules.items():
+ ls = self.level_specs[level_spec_id]
+ if ls == parent or ls.is_child_of(parent):
+ result[level_spec_id] = {
+ 'name' : schedule.name,
+ 'schedule' : schedule.to_list(),
+ }
+ return result
+