From: Mykola Golub Date: Tue, 18 Feb 2020 13:24:37 +0000 (+0000) Subject: mgr/rbd_support: generalize schedule reusable code X-Git-Tag: v15.1.1~214^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f1b1ea7041e3c18d77935cf40c4dd3cd6cda0aa8;p=ceph.git mgr/rbd_support: generalize schedule reusable code Also fix the bug in recursive list, when global schedule is not set and listing a subtree without schedules. Signed-off-by: Mykola Golub --- diff --git a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py index 220eab01c0ce..a74357404101 100644 --- a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py +++ b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py @@ -5,299 +5,27 @@ import rbd import re import traceback -from datetime import datetime, timedelta, time +from datetime import datetime from threading import Condition, Lock, Thread from .common import get_rbd_pools +from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules -SCHEDULE_OID = "rbd_mirror_snapshot_schedule" - -SCHEDULE_INTERVAL = "interval" -SCHEDULE_START_TIME = "start_time" - - -class Interval: - - def __init__(self, minutes): - self.minutes = minutes - - def __eq__(self, interval): - return self.minutes == interval.minutes - - def __hash__(self): - return hash(self.minutes) - - def to_string(self): - if self.minutes % (60 * 24) == 0: - interval = int(self.minutes / (60 * 24)) - units = 'd' - elif self.minutes % 60 == 0: - interval = int(self.minutes / 60) - units = 'h' - else: - interval = int(self.minutes) - units = 'm' - - return "{}{}".format(interval, units) - - @classmethod - def from_string(cls, interval): - match = re.match(r'^(\d+)(d|h|m)?$', interval) - if not match: - raise ValueError("Invalid interval ({})".format(interval)) - - minutes = int(match.group(1)) - if match.group(2) == 'd': - minutes *= 60 * 24 - elif match.group(2) == 'h': - minutes *= 60 - - return Interval(minutes) - - -class StartTime: - - def __init__(self, hour, minute, tzinfo): - self.time = time(hour, minute, tzinfo=tzinfo) - self.minutes = self.time.hour * 60 + self.time.minute - if self.time.tzinfo: - self.minutes += int(self.time.utcoffset().seconds / 60) - - def __eq__(self, start_time): - return self.minutes == start_time.minutes - - def __hash__(self): - return hash(self.minutes) - - def to_string(self): - return self.time.isoformat() - - @classmethod - def from_string(cls, start_time): - if not start_time: - return None - - try: - t = time.fromisoformat(start_time) - except ValueError as e: - raise ValueError("Invalid start time {}: {}".format(start_time, e)) - - return StartTime(t.hour, t.minute, tzinfo=t.tzinfo) - -class LevelSpec: - - def __init__(self, name, id, pool_id, namespace, image_id): - self.name = name - self.id = id - self.pool_id = pool_id - self.namespace = namespace - self.image_id = image_id - - def __eq__(self, level_spec): - return self.id == level_spec.id - - def is_child_of(self, level_spec): - if level_spec.is_global(): - return not self.is_global() - if level_spec.pool_id != self.pool_id: - return False - if level_spec.namespace is None: - return self.namespace is not None - if level_spec.namespace != self.namespace: - return False - if level_spec.image_id is None: - return self.image_id is not None - return False - - def is_global(self): - return self.pool_id is None - - def get_pool_id(self): - return self.pool_id - - def matches(self, pool_id, namespace, image_id): - if self.pool_id and self.pool_id != pool_id: - return False - if self.namespace and self.namespace != namespace: - return False - if self.image_id and self.image_id != image_id: - return False - return True - - @classmethod - def from_name(cls, handler, name): - # parse names like: - # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image' - match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$', - name) - if not match: - raise ValueError("failed to parse {}".format(name)) - - id = "" - pool_id = None - namespace = None - image_name = None - image_id = None - if match.group(1): - pool_name = match.group(1) - try: - pool_id = handler.module.rados.pool_lookup(pool_name) - if pool_id is None: - raise ValueError("pool {} does not exist".format(pool_name)) - pool_id = str(pool_id) - id += pool_id - if match.group(2) is not None or match.group(3): - id += "/" - with handler.module.rados.open_ioctx(pool_name) as ioctx: - namespace = match.group(2) or "" - if namespace: - namespaces = rbd.RBD().namespace_list(ioctx) - if namespace not in namespaces: - raise ValueError( - "namespace {} does not exist".format( - namespace)) - id += namespace - ioctx.set_namespace(namespace) - pool_mode = rbd.RBD().mirror_mode_get(ioctx) - if pool_mode != rbd.RBD_MIRROR_MODE_IMAGE: - raise ValueError( - "namespace {} is not in mirror image mode".format( - namespace)) - if match.group(3): - image_name = match.group(3) - try: - with rbd.Image(ioctx, image_name, - read_only=True) as image: - image_id = image.id() - id += "/" + image_id - mode = image.mirror_image_get_mode() - if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: - raise rbd.InvalidArgument( - "Invalid mirror mode") - except rbd.ImageNotFound: - raise ValueError("image {} does not exist".format( - image_name)) - except rbd.InvalidArgument: - raise ValueError( - "image {} is not in snapshot mirror mode".format( - image_name)) - - except rados.ObjectNotFound: - raise ValueError("pool {} does not exist".format(pool_name)) - - # normalize possible input name like 'rbd//image' - if not namespace and image_name: - name = "{}/{}".format(pool_name, image_name) - - return LevelSpec(name, id, pool_id, namespace, image_id) - - @classmethod - def from_id(cls, handler, id): - # parse ids like: - # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id' - match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id) - if not match: - raise ValueError("failed to parse: {}".format(id)) - - name = "" - pool_id = None - namespace = None - image_id = None - if match.group(1): - pool_id = match.group(1) - try: - pool_name = handler.module.rados.pool_reverse_lookup( - int(pool_id)) - if pool_name is None: - raise ValueError("pool {} does not exist".format(pool_name)) - name += pool_name + "/" - if match.group(2) is not None or match.group(3): - with handler.module.rados.open_ioctx(pool_name) as ioctx: - namespace = match.group(2) or "" - if namespace: - namespaces = rbd.RBD().namespace_list(ioctx) - if namespace not in namespaces: - raise ValueError( - "namespace {} does not exist".format( - namespace)) - name += namespace + "/" - ioctx.set_namespace(namespace) - pool_mode = rbd.RBD().mirror_mode_get(ioctx) - if pool_mode != rbd.RBD_MIRROR_MODE_IMAGE: - raise ValueError( - "namespace {} is not in mirror image mode".format( - namespace)) - elif not match.group(3): - name += "/" - if match.group(3): - image_id = match.group(3) - try: - with rbd.Image(ioctx, image_id=image_id, - read_only=True) as image: - image_name = image.get_name() - name += image_name - mode = image.mirror_image_get_mode() - if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: - raise rbd.InvalidArgument( - "Invalid mirror mode") - except rbd.ImageNotFound: - raise ValueError("image {} does not exist".format( - image_id)) - except rbd.InvalidArgument: - raise ValueError( - "image {} is not in snapshot mirror mode".format( - image_id)) - - except rados.ObjectNotFound: - raise ValueError("pool {} does not exist".format(pool_id)) - - return LevelSpec(name, id, pool_id, namespace, image_id) - - -class Schedule: - - def __init__(self, name): - self.name = name - self.items = set() - - def __len__(self): - return len(self.items) - - def add(self, interval, start_time=None): - self.items.add((interval, start_time)) - - def remove(self, interval, start_time=None): - self.items.discard((interval, start_time)) - - def to_list(self): - return [{SCHEDULE_INTERVAL: i[0].to_string(), - SCHEDULE_START_TIME: i[1] and i[1].to_string() or None} - for i in self.items] - - def to_json(self): - return json.dumps(self.to_list(), indent=4, sort_keys=True) - - @classmethod - def from_json(cls, name, val): - try: - items = json.loads(val) - schedule = Schedule(name) - for item in items: - interval = Interval.from_string(item[SCHEDULE_INTERVAL]) - start_time = item[SCHEDULE_START_TIME] and \ - StartTime.from_string(item[SCHEDULE_START_TIME]) or None - schedule.add(interval, start_time) - return schedule - except json.JSONDecodeError as e: - raise ValueError("Invalid JSON ({})".format(str(e))) - except KeyError as e: - raise ValueError( - "Invalid schedule format (missing key {})".format(str(e))) - except TypeError as e: - raise ValueError("Invalid schedule format ({})".format(str(e))) +def namespace_validator(ioctx): + mode = rbd.RBD().mirror_mode_get(ioctx) + if mode != rbd.RBD_MIRROR_MODE_IMAGE: + raise ValueError("namespace {} is not in mirror image mode".format( + ioctx.get_namespace())) +def image_validator(image): + mode = image.mirror_image_get_mode() + if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + raise rbd.InvalidArgument("Invalid mirror image mode") class MirrorSnapshotScheduleHandler: + MODULE_OPTION_NAME = "mirror_snapshot_schedule" + SCHEDULE_OID = "rbd_mirror_snapshot_schedule" + lock = Lock() condition = Condition(lock) thread = None @@ -354,15 +82,6 @@ class MirrorSnapshotScheduleHandler: pool_id, namespace, image_id, e)) - @classmethod - def format_image_spec(cls, image_spec): - image = image_spec[2] - if image_spec[1]: - image = "{}/{}".format(image_spec[1], image) - if image_spec[0]: - image = "{}/{}".format(image_spec[0], image) - return image - def init_schedule_queue(self): self.queue = {} self.images = {} @@ -372,74 +91,11 @@ class MirrorSnapshotScheduleHandler: def load_schedules(self): self.log.info("MirrorSnapshotScheduleHandler: load_schedules") - schedules = {} - schedule_cfg = self.module.get_localized_module_option( - 'mirror_snapshot_schedule', '') - if schedule_cfg: - try: - schedule = Schedule.from_json('', schedule_cfg) - schedules[''] = schedule - except ValueError: - self.log.error("Failed to decode configured schedule {}".format( - schedule_cfg)) - - for pool_id, pool_name in get_rbd_pools(self.module).items(): - with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: - self.load_pool_schedules(ioctx, schedules) - + schedules = Schedules(self) + schedules.load(namespace_validator, image_validator) with self.lock: self.schedules = schedules - def load_pool_schedules(self, ioctx, schedules): - pool_id = ioctx.get_pool_id() - pool_name = ioctx.get_pool_name() - stale_keys = () - start_after = '' - try: - while True: - with rados.ReadOpCtx() as read_op: - self.log.info("load_schedules: {}, start_after={}".format( - pool_name, start_after)) - it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) - ioctx.operate_read_op(read_op, SCHEDULE_OID) - - it = list(it) - for k, v in it: - start_after = k - v = v.decode() - self.log.info("load_schedule: {} {}".format(k, v)) - - try: - try: - level_spec = LevelSpec.from_id(self, k) - except ValueError: - self.log.debug( - "Stail schedule key {} in pool".format( - k, pool_name)) - stale_keys += (k,) - continue - - schedule = Schedule.from_json(level_spec.name, v) - schedules[k] = schedule - except ValueError: - self.log.error( - "Failed to decode schedule: pool={}, {} {}".format( - pool_name, k, v)) - - if not it: - break - - except StopIteration: - pass - except rados.ObjectNotFound: - # rbd_mirror_snapshot_schedule DNE - pass - - if stale_keys: - with rados.WriteOpCtx() as write_op: - ioctx.remove_omap_keys(write_op, stale_keys) - ioctx.operate_write_op(write_op, SCHEDULE_OID) - def refresh_images(self): if (datetime.now() - self.last_refresh_images).seconds < 60: return @@ -501,28 +157,6 @@ class MirrorSnapshotScheduleHandler: pool_name, e)) pass - def find_schedule(self, pool_id, namespace, image_id): - levels = [None, pool_id, namespace, image_id] - while levels: - level_spec_id = "/".join(levels[1:]) - if level_spec_id in self.schedules: - return self.schedules[level_spec_id] - del levels[-1] - return None - - def calc_schedule_time(self, schedule, now): - schedule_time = None - for item in schedule.items: - period = timedelta(minutes=item[0].minutes) - start_time = datetime(1970, 1, 1) - if item[1]: - start_time += timedelta(minutes=item[1].minutes) - time = start_time + \ - (int((now - start_time) / period) + 1) * period - if schedule_time is None or time < schedule_time: - schedule_time = time - return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00") - def rebuild_queue(self): with self.lock: now = datetime.now() @@ -567,11 +201,11 @@ class MirrorSnapshotScheduleHandler: def enqueue(self, now, pool_id, namespace, image_id): - schedule = self.find_schedule(pool_id, namespace, image_id) + schedule = self.schedules.find(pool_id, namespace, image_id) if not schedule: return - schedule_time = self.calc_schedule_time(schedule, now) + schedule_time = schedule.next_run(now) if schedule_time not in self.queue: self.queue[schedule_time] = [] self.log.debug("schedule image {}/{}/{} at {}".format( @@ -593,7 +227,6 @@ class MirrorSnapshotScheduleHandler: return None, wait_time.total_seconds() images = self.queue[schedule_time] - image = images[0] image = images.pop(0) if not images: del self.queue[schedule_time] @@ -609,113 +242,40 @@ class MirrorSnapshotScheduleHandler: for schedule_time in empty_slots: del self.queue[schedule_time] - def save_schedule(self, level_spec, schedule): - if level_spec.is_global(): - schedule_cfg = schedule and schedule.to_json() or '' - self.module.set_localized_module_option('mirror_snapshot_schedule', - schedule_cfg) - return - - pool_id = level_spec.get_pool_id() - with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: - with rados.WriteOpCtx() as write_op: - if schedule: - ioctx.set_omap(write_op, (level_spec.id, ), - (schedule.to_json(), )) - else: - ioctx.remove_omap_keys(write_op, (level_spec.id, )) - ioctx.operate_write_op(write_op, SCHEDULE_OID) - - - def add_schedule(self, level_spec_name, interval, start_time): + def add_schedule(self, level_spec, interval, start_time): self.log.debug( "add_schedule: level_spec={}, interval={}, start_time={}".format( - level_spec_name, interval, start_time)) - try: - level_spec = LevelSpec.from_name(self, level_spec_name) - except ValueError as e: - return -errno.EINVAL, '', "Invalid level spec {}: {}".format( - level_spec_name, e) + level_spec.name, interval, start_time)) with self.lock: - schedule = self.schedules.get(level_spec.id, Schedule(level_spec_name)) - schedule.add(Interval.from_string(interval), - StartTime.from_string(start_time)) - self.save_schedule(level_spec, schedule) - self.schedules[level_spec.id] = schedule + self.schedules.add(level_spec, interval, start_time) # TODO: optimize to rebuild only affected part of the queue self.rebuild_queue() return 0, "", "" - def remove_schedule(self, level_spec_name, interval, start_time): - try: - level_spec = LevelSpec.from_name(self, level_spec_name) - except ValueError as e: - return -errno.EINVAL, '', "Invalid level spec {}: {}".format( - level_spec_name, e) + def remove_schedule(self, level_spec, interval, start_time): + self.log.debug( + "remove_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) with self.lock: - schedule = self.schedules.pop(level_spec.id, None) - if not schedule: - return -errno.ENOENT, '', "No schedule for {}".format(level_spec_name) - - if interval is None: - schedule = None - else: - schedule.remove(Interval.from_string(interval), - StartTime.from_string(start_time)) - if schedule: - self.schedules[level_spec.id] = schedule - self.save_schedule(level_spec, schedule) + self.schedules.remove(level_spec, interval, start_time) # TODO: optimize to rebuild only affected part of the queue self.rebuild_queue() return 0, "", "" - def list(self, level_spec_name): - self.log.debug("list: level_spec={}".format(level_spec_name)) - - if not level_spec_name: - level_spec_name = "" - - try: - level_spec = LevelSpec.from_name(self, level_spec_name) - except ValueError as e: - return -errno.EINVAL, '', "Invalid level spec {}: {}".format( - level_spec_name, e) + def list(self, level_spec): + self.log.debug("list: level_spec={}".format(level_spec.name)) - result = {} with self.lock: - parent = LevelSpec.from_id(self, "") - if not level_spec.is_global(): - for level_spec_id in self.schedules: - ls = LevelSpec.from_id(self, level_spec_id) - if ls == level_spec: - parent = ls - break - if level_spec.is_child_of(ls) and ls.is_child_of(parent): - parent = ls - for level_spec_id, schedule in self.schedules.items(): - ls = LevelSpec.from_id(self, level_spec_id) - if ls == parent or ls.is_child_of(parent): - result[level_spec_id] = { - 'name' : schedule.name, - 'schedule' : schedule.to_list(), - } - return 0, json.dumps(result, indent=4, sort_keys=True), "" + result = self.schedules.to_list(level_spec) - def status(self, level_spec_name): - self.log.debug("status: level_spec={}".format(level_spec_name)) + return 0, json.dumps(result, indent=4, sort_keys=True), "" - if not level_spec_name: - level_spec_name = "" - - try: - level_spec = LevelSpec.from_name(self, level_spec_name) - except ValueError as e: - return -errno.EINVAL, '', "Invalid level spec {}: {}".format( - level_spec_name, e) + def status(self, level_spec): + self.log.debug("status: level_spec={}".format(level_spec.name)) scheduled_images = [] with self.lock: @@ -732,15 +292,25 @@ class MirrorSnapshotScheduleHandler: indent=4, sort_keys=True), "" def handle_command(self, inbuf, prefix, cmd): + level_spec_name = cmd.get('level_spec', "") + + try: + level_spec = LevelSpec.from_name(self, level_spec_name, + namespace_validator, + image_validator) + except ValueError as e: + return -errno.EINVAL, '', "Invalid level spec {}: {}".format( + level_spec_name, e) + if prefix == 'add': - return self.add_schedule(cmd['level_spec'], cmd['interval'], + return self.add_schedule(level_spec, cmd['interval'], cmd.get('start_time')) elif prefix == 'remove': - return self.remove_schedule(cmd['level_spec'], cmd.get('interval'), + return self.remove_schedule(level_spec, cmd.get('interval'), cmd.get('start_time')) elif prefix == 'list': - return self.list(cmd.get('level_spec', None)) + return self.list(level_spec) elif prefix == 'status': - return self.status(cmd.get('level_spec', None)) + return self.status(level_spec) raise NotImplementedError(cmd['prefix']) diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py index 789d61e11676..5cf1d54d4049 100644 --- a/src/pybind/mgr/rbd_support/module.py +++ b/src/pybind/mgr/rbd_support/module.py @@ -115,7 +115,7 @@ class Module(MgrModule): } ] MODULE_OPTIONS = [ - {'name': 'mirror_snapshot_schedule'}, + {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME}, ] mirror_snapshot_schedule = None diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py new file mode 100644 index 000000000000..4c13914e022d --- /dev/null +++ b/src/pybind/mgr/rbd_support/schedule.py @@ -0,0 +1,466 @@ +import json +import rados +import rbd +import re + +from datetime import datetime, timedelta, time + +from .common import get_rbd_pools + +SCHEDULE_INTERVAL = "interval" +SCHEDULE_START_TIME = "start_time" + + +class LevelSpec: + + def __init__(self, name, id, pool_id, namespace, image_id=None): + self.name = name + self.id = id + self.pool_id = pool_id + self.namespace = namespace + self.image_id = image_id + + def __eq__(self, level_spec): + return self.id == level_spec.id + + def is_child_of(self, level_spec): + if level_spec.is_global(): + return not self.is_global() + if level_spec.pool_id != self.pool_id: + return False + if level_spec.namespace is None: + return self.namespace is not None + if level_spec.namespace != self.namespace: + return False + if level_spec.image_id is None: + return self.image_id is not None + return False + + def is_global(self): + return self.pool_id is None + + def get_pool_id(self): + return self.pool_id + + def matches(self, pool_id, namespace, image_id=None): + if self.pool_id and self.pool_id != pool_id: + return False + if self.namespace and self.namespace != namespace: + return False + if self.image_id and self.image_id != image_id: + return False + return True + + @classmethod + def make_global(cls): + return LevelSpec("", "", None, None, None) + + @classmethod + def from_name(cls, handler, name, namespace_validator=None, + image_validator=None, allow_image_level=True): + # parse names like: + # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image' + match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$', + name) + if not match: + raise ValueError("failed to parse {}".format(name)) + if match.group(3) and not allow_image_level: + raise ValueError( + "invalid name {}: image level is not allowed".format(name)) + + id = "" + pool_id = None + namespace = None + image_name = None + image_id = None + if match.group(1): + pool_name = match.group(1) + try: + pool_id = handler.module.rados.pool_lookup(pool_name) + if pool_id is None: + raise ValueError("pool {} does not exist".format(pool_name)) + pool_id = str(pool_id) + id += pool_id + if match.group(2) is not None or match.group(3): + id += "/" + with handler.module.rados.open_ioctx(pool_name) as ioctx: + namespace = match.group(2) or "" + if namespace: + namespaces = rbd.RBD().namespace_list(ioctx) + if namespace not in namespaces: + raise ValueError( + "namespace {} does not exist".format( + namespace)) + id += namespace + ioctx.set_namespace(namespace) + if namespace_validator: + namespace_validator(ioctx) + if match.group(3): + image_name = match.group(3) + try: + with rbd.Image(ioctx, image_name, + read_only=True) as image: + image_id = image.id() + id += "/" + image_id + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( + image_name)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_name)) + + except rados.ObjectNotFound: + raise ValueError("pool {} does not exist".format(pool_name)) + + # normalize possible input name like 'rbd//image' + if not namespace and image_name: + name = "{}/{}".format(pool_name, image_name) + + return LevelSpec(name, id, pool_id, namespace, image_id) + + @classmethod + def from_id(cls, handler, id, namespace_validator=None, + image_validator=None): + # parse ids like: + # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id' + match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id) + if not match: + raise ValueError("failed to parse: {}".format(id)) + + name = "" + pool_id = None + namespace = None + image_id = None + if match.group(1): + pool_id = match.group(1) + try: + pool_name = handler.module.rados.pool_reverse_lookup( + int(pool_id)) + if pool_name is None: + raise ValueError("pool {} does not exist".format(pool_name)) + name += pool_name + "/" + if match.group(2) is not None or match.group(3): + with handler.module.rados.open_ioctx(pool_name) as ioctx: + namespace = match.group(2) or "" + if namespace: + namespaces = rbd.RBD().namespace_list(ioctx) + if namespace not in namespaces: + raise ValueError( + "namespace {} does not exist".format( + namespace)) + name += namespace + "/" + if namespace_validator: + ioctx.set_namespace(namespace) + elif not match.group(3): + name += "/" + if match.group(3): + image_id = match.group(3) + try: + with rbd.Image(ioctx, image_id=image_id, + read_only=True) as image: + image_name = image.get_name() + name += image_name + if image_validator: + image_validator(image) + except rbd.ImageNotFound: + raise ValueError("image {} does not exist".format( + image_id)) + except rbd.InvalidArgument: + raise ValueError( + "image {} is not in snapshot mirror mode".format( + image_id)) + + except rados.ObjectNotFound: + raise ValueError("pool {} does not exist".format(pool_id)) + + return LevelSpec(name, id, pool_id, namespace, image_id) + + +class Interval: + + def __init__(self, minutes): + self.minutes = minutes + + def __eq__(self, interval): + return self.minutes == interval.minutes + + def __hash__(self): + return hash(self.minutes) + + def to_string(self): + if self.minutes % (60 * 24) == 0: + interval = int(self.minutes / (60 * 24)) + units = 'd' + elif self.minutes % 60 == 0: + interval = int(self.minutes / 60) + units = 'h' + else: + interval = int(self.minutes) + units = 'm' + + return "{}{}".format(interval, units) + + @classmethod + def from_string(cls, interval): + match = re.match(r'^(\d+)(d|h|m)?$', interval) + if not match: + raise ValueError("Invalid interval ({})".format(interval)) + + minutes = int(match.group(1)) + if match.group(2) == 'd': + minutes *= 60 * 24 + elif match.group(2) == 'h': + minutes *= 60 + + return Interval(minutes) + + +class StartTime: + + def __init__(self, hour, minute, tzinfo): + self.time = time(hour, minute, tzinfo=tzinfo) + self.minutes = self.time.hour * 60 + self.time.minute + if self.time.tzinfo: + self.minutes += int(self.time.utcoffset().seconds / 60) + + def __eq__(self, start_time): + return self.minutes == start_time.minutes + + def __hash__(self): + return hash(self.minutes) + + def to_string(self): + return self.time.isoformat() + + @classmethod + def from_string(cls, start_time): + if not start_time: + return None + + try: + t = time.fromisoformat(start_time) + except ValueError as e: + raise ValueError("Invalid start time {}: {}".format(start_time, e)) + + return StartTime(t.hour, t.minute, tzinfo=t.tzinfo) + + +class Schedule: + + def __init__(self, name): + self.name = name + self.items = set() + + def __len__(self): + return len(self.items) + + def add(self, interval, start_time=None): + self.items.add((interval, start_time)) + + def remove(self, interval, start_time=None): + self.items.discard((interval, start_time)) + + def next_run(self, now): + schedule_time = None + for item in self.items: + period = timedelta(minutes=item[0].minutes) + start_time = datetime(1970, 1, 1) + if item[1]: + start_time += timedelta(minutes=item[1].minutes) + time = start_time + \ + (int((now - start_time) / period) + 1) * period + if schedule_time is None or time < schedule_time: + schedule_time = time + return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00") + + def to_list(self): + return [{SCHEDULE_INTERVAL: i[0].to_string(), + SCHEDULE_START_TIME: i[1] and i[1].to_string() or None} + for i in self.items] + + def to_json(self): + return json.dumps(self.to_list(), indent=4, sort_keys=True) + + @classmethod + def from_json(cls, name, val): + try: + items = json.loads(val) + schedule = Schedule(name) + for item in items: + interval = Interval.from_string(item[SCHEDULE_INTERVAL]) + start_time = item[SCHEDULE_START_TIME] and \ + StartTime.from_string(item[SCHEDULE_START_TIME]) or None + schedule.add(interval, start_time) + return schedule + except json.JSONDecodeError as e: + raise ValueError("Invalid JSON ({})".format(str(e))) + except KeyError as e: + raise ValueError( + "Invalid schedule format (missing key {})".format(str(e))) + except TypeError as e: + raise ValueError("Invalid schedule format ({})".format(str(e))) + +class Schedules: + + def __init__(self, handler): + self.handler = handler + self.level_specs = {} + self.schedules = {} + + def __len__(self): + return len(self.schedules) + + def load(self, namespace_validator=None, image_validator=None): + + schedule_cfg = self.handler.module.get_localized_module_option( + self.handler.MODULE_OPTION_NAME, '') + if schedule_cfg: + try: + level_spec = LevelSpec.make_global() + self.level_specs[level_spec.id] = level_spec + schedule = Schedule.from_json(level_spec.name, schedule_cfg) + self.schedules[level_spec.id] = schedule + except ValueError: + self.handler.log.error( + "Failed to decode configured schedule {}".format( + schedule_cfg)) + + for pool_id, pool_name in get_rbd_pools(self.handler.module).items(): + with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_from_pool(ioctx, namespace_validator, image_validator) + + def load_from_pool(self, ioctx, namespace_validator, image_validator): + pool_id = ioctx.get_pool_id() + pool_name = ioctx.get_pool_name() + stale_keys = () + start_after = '' + try: + while True: + with rados.ReadOpCtx() as read_op: + self.handler.log.info( + "load_schedules: {}, start_after={}".format( + pool_name, start_after)) + it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) + ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID) + + it = list(it) + for k, v in it: + start_after = k + v = v.decode() + self.handler.log.info( + "load_schedule: {} {}".format(k, v)) + try: + try: + level_spec = LevelSpec.from_id( + self.handler, k, namespace_validator, + image_validator) + except ValueError: + self.handler.log.debug( + "Stail schedule key {} in pool".format( + k, pool_name)) + stale_keys += (k,) + continue + + self.level_specs[level_spec.id] = level_spec + schedule = Schedule.from_json(level_spec.name, v) + self.schedules[level_spec.id] = schedule + except ValueError: + self.handler.log.error( + "Failed to decode schedule: pool={}, {} {}".format( + pool_name, k, v)) + if not it: + break + + except StopIteration: + pass + except rados.ObjectNotFound: + pass + + if stale_keys: + with rados.WriteOpCtx() as write_op: + ioctx.remove_omap_keys(write_op, stale_keys) + ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID) + + def save(self, level_spec, schedule): + if level_spec.is_global(): + schedule_cfg = schedule and schedule.to_json() or '' + self.handler.module.set_localized_module_option( + self.handler.MODULE_OPTION_NAME, schedule_cfg) + return + + pool_id = level_spec.get_pool_id() + with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx: + with rados.WriteOpCtx() as write_op: + if schedule: + ioctx.set_omap(write_op, (level_spec.id, ), + (schedule.to_json(), )) + else: + ioctx.remove_omap_keys(write_op, (level_spec.id, )) + ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID) + + + def add(self, level_spec, interval, start_time): + schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name)) + schedule.add(Interval.from_string(interval), + StartTime.from_string(start_time)) + self.schedules[level_spec.id] = schedule + self.level_specs[level_spec.id] = level_spec + self.save(level_spec, schedule) + + def remove(self, level_spec, interval, start_time): + schedule = self.schedules.pop(level_spec.id, None) + if schedule: + if interval is None: + schedule = None + else: + schedule.remove(Interval.from_string(interval), + StartTime.from_string(start_time)) + if schedule: + self.schedules[level_spec.id] = schedule + if not schedule: + del self.level_specs[level_spec.id] + self.save(level_spec, schedule) + + def find(self, pool_id, namespace, image_id=None): + levels = [None, pool_id, namespace] + if image_id: + levels.append(image_id) + + while levels: + level_spec_id = "/".join(levels[1:]) + if level_spec_id in self.schedules: + return self.schedules[level_spec_id] + del levels[-1] + return None + + def to_list(self, level_spec): + if level_spec.id in self.schedules: + parent = level_spec + else: + # try to find existing parent + parent = None + for level_spec_id in self.schedules: + ls = self.level_specs[level_spec_id] + if ls == level_spec: + parent = ls + break + if level_spec.is_child_of(ls) and \ + (not parent or ls.is_child_of(parent)): + parent = ls + if not parent: + # set to non-existing parent so we still could list its children + parent = level_spec + + result = {} + for level_spec_id, schedule in self.schedules.items(): + ls = self.level_specs[level_spec_id] + if ls == parent or ls.is_child_of(parent): + result[level_spec_id] = { + 'name' : schedule.name, + 'schedule' : schedule.to_list(), + } + return result +