]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: generalize schedule reusable code
authorMykola Golub <mgolub@suse.com>
Tue, 18 Feb 2020 13:24:37 +0000 (13:24 +0000)
committerMykola Golub <mgolub@suse.com>
Mon, 24 Feb 2020 13:40:28 +0000 (13:40 +0000)
Also fix the bug in recursive list, when global schedule is not
set and listing a subtree without schedules.

Signed-off-by: Mykola Golub <mgolub@suse.com>
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/schedule.py [new file with mode: 0644]

index 220eab01c0ceb7493abd63860909355a60da37ea..a7435740410172dc161afe8f5da9e411ca799a33 100644 (file)
@@ -5,299 +5,27 @@ import rbd
 import re
 import traceback
 
-from datetime import datetime, timedelta, time
+from datetime import datetime
 from threading import Condition, Lock, Thread
 
 from .common import get_rbd_pools
+from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
 
-SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
-
-SCHEDULE_INTERVAL = "interval"
-SCHEDULE_START_TIME = "start_time"
-
-
-class Interval:
-
-    def __init__(self, minutes):
-        self.minutes = minutes
-
-    def __eq__(self, interval):
-        return self.minutes == interval.minutes
-
-    def __hash__(self):
-        return hash(self.minutes)
-
-    def to_string(self):
-        if self.minutes % (60 * 24) == 0:
-            interval = int(self.minutes / (60 * 24))
-            units = 'd'
-        elif self.minutes % 60 == 0:
-            interval = int(self.minutes / 60)
-            units = 'h'
-        else:
-            interval = int(self.minutes)
-            units = 'm'
-
-        return "{}{}".format(interval, units)
-
-    @classmethod
-    def from_string(cls, interval):
-        match = re.match(r'^(\d+)(d|h|m)?$', interval)
-        if not match:
-            raise ValueError("Invalid interval ({})".format(interval))
-
-        minutes = int(match.group(1))
-        if match.group(2) == 'd':
-            minutes *= 60 * 24
-        elif match.group(2) == 'h':
-            minutes *= 60
-
-        return Interval(minutes)
-
-
-class StartTime:
-
-    def __init__(self, hour, minute, tzinfo):
-        self.time = time(hour, minute, tzinfo=tzinfo)
-        self.minutes = self.time.hour * 60 + self.time.minute
-        if self.time.tzinfo:
-            self.minutes += int(self.time.utcoffset().seconds / 60)
-
-    def __eq__(self, start_time):
-        return self.minutes == start_time.minutes
-
-    def __hash__(self):
-        return hash(self.minutes)
-
-    def to_string(self):
-        return self.time.isoformat()
-
-    @classmethod
-    def from_string(cls, start_time):
-        if not start_time:
-            return None
-
-        try:
-            t = time.fromisoformat(start_time)
-        except ValueError as e:
-            raise ValueError("Invalid start time {}: {}".format(start_time, e))
-
-        return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
-
-class LevelSpec:
-
-    def __init__(self, name, id, pool_id, namespace, image_id):
-        self.name = name
-        self.id = id
-        self.pool_id = pool_id
-        self.namespace = namespace
-        self.image_id = image_id
-
-    def __eq__(self, level_spec):
-        return self.id == level_spec.id
-
-    def is_child_of(self, level_spec):
-        if level_spec.is_global():
-            return not self.is_global()
-        if level_spec.pool_id != self.pool_id:
-            return False
-        if level_spec.namespace is None:
-            return self.namespace is not None
-        if level_spec.namespace != self.namespace:
-            return False
-        if level_spec.image_id is None:
-            return self.image_id is not None
-        return False
-
-    def is_global(self):
-        return self.pool_id is None
-
-    def get_pool_id(self):
-        return self.pool_id
-
-    def matches(self, pool_id, namespace, image_id):
-        if self.pool_id and self.pool_id != pool_id:
-            return False
-        if self.namespace and self.namespace != namespace:
-            return False
-        if self.image_id and self.image_id != image_id:
-            return False
-        return True
-
-    @classmethod
-    def from_name(cls, handler, name):
-        # parse names like:
-        # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
-        match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
-                         name)
-        if not match:
-            raise ValueError("failed to parse {}".format(name))
-
-        id = ""
-        pool_id = None
-        namespace = None
-        image_name = None
-        image_id = None
-        if match.group(1):
-            pool_name = match.group(1)
-            try:
-                pool_id = handler.module.rados.pool_lookup(pool_name)
-                if pool_id is None:
-                    raise ValueError("pool {} does not exist".format(pool_name))
-                pool_id = str(pool_id)
-                id += pool_id
-                if match.group(2) is not None or match.group(3):
-                    id += "/"
-                    with handler.module.rados.open_ioctx(pool_name) as ioctx:
-                        namespace = match.group(2) or ""
-                        if namespace:
-                            namespaces = rbd.RBD().namespace_list(ioctx)
-                            if namespace not in namespaces:
-                                raise ValueError(
-                                    "namespace {} does not exist".format(
-                                        namespace))
-                            id += namespace
-                            ioctx.set_namespace(namespace)
-                            pool_mode = rbd.RBD().mirror_mode_get(ioctx)
-                            if pool_mode != rbd.RBD_MIRROR_MODE_IMAGE:
-                                raise ValueError(
-                                    "namespace {} is not in mirror image mode".format(
-                                        namespace))
-                        if match.group(3):
-                            image_name = match.group(3)
-                            try:
-                                with rbd.Image(ioctx, image_name,
-                                               read_only=True) as image:
-                                    image_id = image.id()
-                                    id += "/" + image_id
-                                    mode = image.mirror_image_get_mode()
-                                    if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
-                                        raise rbd.InvalidArgument(
-                                            "Invalid mirror mode")
-                            except rbd.ImageNotFound:
-                                raise ValueError("image {} does not exist".format(
-                                    image_name))
-                            except rbd.InvalidArgument:
-                                raise ValueError(
-                                    "image {} is not in snapshot mirror mode".format(
-                                        image_name))
-
-            except rados.ObjectNotFound:
-                raise ValueError("pool {} does not exist".format(pool_name))
-
-        # normalize possible input name like 'rbd//image'
-        if not namespace and image_name:
-            name = "{}/{}".format(pool_name, image_name)
-
-        return LevelSpec(name, id, pool_id, namespace, image_id)
-
-    @classmethod
-    def from_id(cls, handler, id):
-        # parse ids like:
-        # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
-        match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
-        if not match:
-            raise ValueError("failed to parse: {}".format(id))
-
-        name = ""
-        pool_id = None
-        namespace = None
-        image_id = None
-        if match.group(1):
-            pool_id = match.group(1)
-            try:
-                pool_name = handler.module.rados.pool_reverse_lookup(
-                    int(pool_id))
-                if pool_name is None:
-                    raise ValueError("pool {} does not exist".format(pool_name))
-                name += pool_name + "/"
-                if match.group(2) is not None or match.group(3):
-                    with handler.module.rados.open_ioctx(pool_name) as ioctx:
-                        namespace = match.group(2) or ""
-                        if namespace:
-                            namespaces = rbd.RBD().namespace_list(ioctx)
-                            if namespace not in namespaces:
-                                raise ValueError(
-                                    "namespace {} does not exist".format(
-                                        namespace))
-                            name += namespace + "/"
-                            ioctx.set_namespace(namespace)
-                            pool_mode = rbd.RBD().mirror_mode_get(ioctx)
-                            if pool_mode != rbd.RBD_MIRROR_MODE_IMAGE:
-                                raise ValueError(
-                                    "namespace {} is not in mirror image mode".format(
-                                        namespace))
-                        elif not match.group(3):
-                            name += "/"
-                        if match.group(3):
-                            image_id = match.group(3)
-                            try:
-                                with rbd.Image(ioctx, image_id=image_id,
-                                               read_only=True) as image:
-                                    image_name = image.get_name()
-                                    name += image_name
-                                    mode = image.mirror_image_get_mode()
-                                    if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
-                                        raise rbd.InvalidArgument(
-                                            "Invalid mirror mode")
-                            except rbd.ImageNotFound:
-                                raise ValueError("image {} does not exist".format(
-                                    image_id))
-                            except rbd.InvalidArgument:
-                                raise ValueError(
-                                    "image {} is not in snapshot mirror mode".format(
-                                        image_id))
-
-            except rados.ObjectNotFound:
-                raise ValueError("pool {} does not exist".format(pool_id))
-
-        return LevelSpec(name, id, pool_id, namespace, image_id)
-
-
-class Schedule:
-
-    def __init__(self, name):
-        self.name = name
-        self.items = set()
-
-    def __len__(self):
-        return len(self.items)
-
-    def add(self, interval, start_time=None):
-        self.items.add((interval, start_time))
-
-    def remove(self, interval, start_time=None):
-        self.items.discard((interval, start_time))
-
-    def to_list(self):
-        return [{SCHEDULE_INTERVAL: i[0].to_string(),
-                 SCHEDULE_START_TIME: i[1] and i[1].to_string() or None}
-                for i in self.items]
-
-    def to_json(self):
-        return json.dumps(self.to_list(), indent=4, sort_keys=True)
-
-    @classmethod
-    def from_json(cls, name, val):
-        try:
-            items = json.loads(val)
-            schedule = Schedule(name)
-            for item in items:
-                interval = Interval.from_string(item[SCHEDULE_INTERVAL])
-                start_time = item[SCHEDULE_START_TIME] and \
-                    StartTime.from_string(item[SCHEDULE_START_TIME]) or None
-                schedule.add(interval, start_time)
-            return schedule
-        except json.JSONDecodeError as e:
-            raise ValueError("Invalid JSON ({})".format(str(e)))
-        except KeyError as e:
-            raise ValueError(
-                "Invalid schedule format (missing key {})".format(str(e)))
-        except TypeError as e:
-            raise ValueError("Invalid schedule format ({})".format(str(e)))
+def namespace_validator(ioctx):
+    mode = rbd.RBD().mirror_mode_get(ioctx)
+    if mode != rbd.RBD_MIRROR_MODE_IMAGE:
+        raise ValueError("namespace {} is not in mirror image mode".format(
+            ioctx.get_namespace()))
 
+def image_validator(image):
+    mode = image.mirror_image_get_mode()
+    if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
+        raise rbd.InvalidArgument("Invalid mirror image mode")
 
 class MirrorSnapshotScheduleHandler:
+    MODULE_OPTION_NAME = "mirror_snapshot_schedule"
+    SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
+
     lock = Lock()
     condition = Condition(lock)
     thread = None
@@ -354,15 +82,6 @@ class MirrorSnapshotScheduleHandler:
                     pool_id, namespace, image_id, e))
 
 
-    @classmethod
-    def format_image_spec(cls, image_spec):
-        image = image_spec[2]
-        if image_spec[1]:
-            image = "{}/{}".format(image_spec[1], image)
-        if image_spec[0]:
-            image = "{}/{}".format(image_spec[0], image)
-        return image
-
     def init_schedule_queue(self):
         self.queue = {}
         self.images = {}
@@ -372,74 +91,11 @@ class MirrorSnapshotScheduleHandler:
     def load_schedules(self):
         self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
 
-        schedules = {}
-        schedule_cfg = self.module.get_localized_module_option(
-            'mirror_snapshot_schedule', '')
-        if schedule_cfg:
-            try:
-                schedule = Schedule.from_json('', schedule_cfg)
-                schedules[''] = schedule
-            except ValueError:
-                self.log.error("Failed to decode configured schedule {}".format(
-                    schedule_cfg))
-
-        for pool_id, pool_name in get_rbd_pools(self.module).items():
-            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
-                self.load_pool_schedules(ioctx, schedules)
-
+        schedules = Schedules(self)
+        schedules.load(namespace_validator, image_validator)
         with self.lock:
             self.schedules = schedules
 
-    def load_pool_schedules(self, ioctx, schedules):
-        pool_id = ioctx.get_pool_id()
-        pool_name = ioctx.get_pool_name()
-        stale_keys = ()
-        start_after = ''
-        try:
-            while True:
-                with rados.ReadOpCtx() as read_op:
-                    self.log.info("load_schedules: {}, start_after={}".format(
-                        pool_name, start_after))
-                    it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
-                    ioctx.operate_read_op(read_op, SCHEDULE_OID)
-
-                    it = list(it)
-                    for k, v in it:
-                        start_after = k
-                        v = v.decode()
-                        self.log.info("load_schedule: {} {}".format(k, v))
-
-                        try:
-                            try:
-                                level_spec = LevelSpec.from_id(self, k)
-                            except ValueError:
-                                self.log.debug(
-                                    "Stail schedule key {} in pool".format(
-                                        k, pool_name))
-                                stale_keys += (k,)
-                                continue
-
-                            schedule = Schedule.from_json(level_spec.name, v)
-                            schedules[k] = schedule
-                        except ValueError:
-                            self.log.error(
-                                "Failed to decode schedule: pool={}, {} {}".format(
-                                    pool_name, k, v))
-
-                    if not it:
-                        break
-
-        except StopIteration:
-            pass
-        except rados.ObjectNotFound:
-            # rbd_mirror_snapshot_schedule DNE
-            pass
-
-        if stale_keys:
-            with rados.WriteOpCtx() as write_op:
-                ioctx.remove_omap_keys(write_op, stale_keys)
-                ioctx.operate_write_op(write_op, SCHEDULE_OID)
-
     def refresh_images(self):
         if (datetime.now() - self.last_refresh_images).seconds < 60:
             return
@@ -501,28 +157,6 @@ class MirrorSnapshotScheduleHandler:
                 pool_name, e))
             pass
 
-    def find_schedule(self, pool_id, namespace, image_id):
-        levels = [None, pool_id, namespace, image_id]
-        while levels:
-            level_spec_id = "/".join(levels[1:])
-            if level_spec_id in self.schedules:
-                return self.schedules[level_spec_id]
-            del levels[-1]
-        return None
-
-    def calc_schedule_time(self, schedule, now):
-        schedule_time = None
-        for item in schedule.items:
-            period = timedelta(minutes=item[0].minutes)
-            start_time = datetime(1970, 1, 1)
-            if item[1]:
-                start_time += timedelta(minutes=item[1].minutes)
-            time = start_time + \
-                (int((now - start_time) / period) + 1) * period
-            if schedule_time is None or time < schedule_time:
-                schedule_time = time
-        return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
-
     def rebuild_queue(self):
         with self.lock:
             now = datetime.now()
@@ -567,11 +201,11 @@ class MirrorSnapshotScheduleHandler:
 
     def enqueue(self, now, pool_id, namespace, image_id):
 
-        schedule = self.find_schedule(pool_id, namespace, image_id)
+        schedule = self.schedules.find(pool_id, namespace, image_id)
         if not schedule:
             return
 
-        schedule_time = self.calc_schedule_time(schedule, now)
+        schedule_time = schedule.next_run(now)
         if schedule_time not in self.queue:
             self.queue[schedule_time] = []
         self.log.debug("schedule image {}/{}/{} at {}".format(
@@ -593,7 +227,6 @@ class MirrorSnapshotScheduleHandler:
             return None, wait_time.total_seconds()
 
         images = self.queue[schedule_time]
-        image = images[0]
         image = images.pop(0)
         if not images:
             del self.queue[schedule_time]
@@ -609,113 +242,40 @@ class MirrorSnapshotScheduleHandler:
         for schedule_time in empty_slots:
             del self.queue[schedule_time]
 
-    def save_schedule(self, level_spec, schedule):
-        if level_spec.is_global():
-            schedule_cfg = schedule and schedule.to_json() or ''
-            self.module.set_localized_module_option('mirror_snapshot_schedule',
-                                                    schedule_cfg)
-            return
-
-        pool_id = level_spec.get_pool_id()
-        with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
-            with rados.WriteOpCtx() as write_op:
-                if schedule:
-                    ioctx.set_omap(write_op, (level_spec.id, ),
-                                   (schedule.to_json(), ))
-                else:
-                    ioctx.remove_omap_keys(write_op, (level_spec.id, ))
-                ioctx.operate_write_op(write_op, SCHEDULE_OID)
-
-
-    def add_schedule(self, level_spec_name, interval, start_time):
+    def add_schedule(self, level_spec, interval, start_time):
         self.log.debug(
             "add_schedule: level_spec={}, interval={}, start_time={}".format(
-                level_spec_name, interval, start_time))
-        try:
-            level_spec = LevelSpec.from_name(self, level_spec_name)
-        except ValueError as e:
-            return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
-                level_spec_name, e)
+                level_spec.name, interval, start_time))
 
         with self.lock:
-            schedule = self.schedules.get(level_spec.id, Schedule(level_spec_name))
-            schedule.add(Interval.from_string(interval),
-                         StartTime.from_string(start_time))
-            self.save_schedule(level_spec, schedule)
-            self.schedules[level_spec.id] = schedule
+            self.schedules.add(level_spec, interval, start_time)
 
         # TODO: optimize to rebuild only affected part of the queue
         self.rebuild_queue()
         return 0, "", ""
 
-    def remove_schedule(self, level_spec_name, interval, start_time):
-        try:
-            level_spec = LevelSpec.from_name(self, level_spec_name)
-        except ValueError as e:
-            return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
-                level_spec_name, e)
+    def remove_schedule(self, level_spec, interval, start_time):
+        self.log.debug(
+            "remove_schedule: level_spec={}, interval={}, start_time={}".format(
+                level_spec.name, interval, start_time))
 
         with self.lock:
-            schedule = self.schedules.pop(level_spec.id, None)
-            if not schedule:
-                return -errno.ENOENT, '', "No schedule for {}".format(level_spec_name)
-
-            if interval is None:
-                schedule = None
-            else:
-                schedule.remove(Interval.from_string(interval),
-                                StartTime.from_string(start_time))
-            if schedule:
-                self.schedules[level_spec.id] = schedule
-            self.save_schedule(level_spec, schedule)
+            self.schedules.remove(level_spec, interval, start_time)
 
         # TODO: optimize to rebuild only affected part of the queue
         self.rebuild_queue()
         return 0, "", ""
 
-    def list(self, level_spec_name):
-        self.log.debug("list: level_spec={}".format(level_spec_name))
-
-        if not level_spec_name:
-            level_spec_name = ""
-
-        try:
-            level_spec = LevelSpec.from_name(self, level_spec_name)
-        except ValueError as e:
-            return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
-                level_spec_name, e)
+    def list(self, level_spec):
+        self.log.debug("list: level_spec={}".format(level_spec.name))
 
-        result = {}
         with self.lock:
-            parent = LevelSpec.from_id(self, "")
-            if not level_spec.is_global():
-                for level_spec_id in self.schedules:
-                    ls = LevelSpec.from_id(self, level_spec_id)
-                    if ls == level_spec:
-                        parent = ls
-                        break
-                    if level_spec.is_child_of(ls) and ls.is_child_of(parent):
-                        parent = ls
-            for level_spec_id, schedule in self.schedules.items():
-                ls = LevelSpec.from_id(self, level_spec_id)
-                if ls == parent or ls.is_child_of(parent):
-                    result[level_spec_id] = {
-                        'name' : schedule.name,
-                        'schedule' : schedule.to_list(),
-                    }
-        return 0, json.dumps(result, indent=4, sort_keys=True), ""
+            result = self.schedules.to_list(level_spec)
 
-    def status(self, level_spec_name):
-        self.log.debug("status: level_spec={}".format(level_spec_name))
+        return 0, json.dumps(result, indent=4, sort_keys=True), ""
 
-        if not level_spec_name:
-            level_spec_name = ""
-
-        try:
-            level_spec = LevelSpec.from_name(self, level_spec_name)
-        except ValueError as e:
-            return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
-                level_spec_name, e)
+    def status(self, level_spec):
+        self.log.debug("status: level_spec={}".format(level_spec.name))
 
         scheduled_images = []
         with self.lock:
@@ -732,15 +292,25 @@ class MirrorSnapshotScheduleHandler:
                              indent=4, sort_keys=True), ""
 
     def handle_command(self, inbuf, prefix, cmd):
+        level_spec_name = cmd.get('level_spec', "")
+
+        try:
+            level_spec = LevelSpec.from_name(self, level_spec_name,
+                                             namespace_validator,
+                                             image_validator)
+        except ValueError as e:
+            return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
+                level_spec_name, e)
+
         if prefix == 'add':
-            return self.add_schedule(cmd['level_spec'], cmd['interval'],
+            return self.add_schedule(level_spec, cmd['interval'],
                                      cmd.get('start_time'))
         elif prefix == 'remove':
-            return self.remove_schedule(cmd['level_spec'], cmd.get('interval'),
+            return self.remove_schedule(level_spec, cmd.get('interval'),
                                         cmd.get('start_time'))
         elif prefix == 'list':
-            return self.list(cmd.get('level_spec', None))
+            return self.list(level_spec)
         elif prefix == 'status':
-            return self.status(cmd.get('level_spec', None))
+            return self.status(level_spec)
 
         raise NotImplementedError(cmd['prefix'])
index 789d61e1167688d6c92b3198bb2be47d9b1338af..5cf1d54d4049d723582a012019e6196a8965c373 100644 (file)
@@ -115,7 +115,7 @@ class Module(MgrModule):
         }
     ]
     MODULE_OPTIONS = [
-        {'name': 'mirror_snapshot_schedule'},
+        {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME},
     ]
 
     mirror_snapshot_schedule = None
diff --git a/src/pybind/mgr/rbd_support/schedule.py b/src/pybind/mgr/rbd_support/schedule.py
new file mode 100644 (file)
index 0000000..4c13914
--- /dev/null
@@ -0,0 +1,466 @@
+import json
+import rados
+import rbd
+import re
+
+from datetime import datetime, timedelta, time
+
+from .common import get_rbd_pools
+
+SCHEDULE_INTERVAL = "interval"
+SCHEDULE_START_TIME = "start_time"
+
+
+class LevelSpec:
+
+    def __init__(self, name, id, pool_id, namespace, image_id=None):
+        self.name = name
+        self.id = id
+        self.pool_id = pool_id
+        self.namespace = namespace
+        self.image_id = image_id
+
+    def __eq__(self, level_spec):
+        return self.id == level_spec.id
+
+    def is_child_of(self, level_spec):
+        if level_spec.is_global():
+            return not self.is_global()
+        if level_spec.pool_id != self.pool_id:
+            return False
+        if level_spec.namespace is None:
+            return self.namespace is not None
+        if level_spec.namespace != self.namespace:
+            return False
+        if level_spec.image_id is None:
+            return self.image_id is not None
+        return False
+
+    def is_global(self):
+        return self.pool_id is None
+
+    def get_pool_id(self):
+        return self.pool_id
+
+    def matches(self, pool_id, namespace, image_id=None):
+        if self.pool_id and self.pool_id != pool_id:
+            return False
+        if self.namespace and self.namespace != namespace:
+            return False
+        if self.image_id and self.image_id != image_id:
+            return False
+        return True
+
+    @classmethod
+    def make_global(cls):
+        return LevelSpec("", "", None, None, None)
+
+    @classmethod
+    def from_name(cls, handler, name, namespace_validator=None,
+                  image_validator=None, allow_image_level=True):
+        # parse names like:
+        # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
+        match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
+                         name)
+        if not match:
+            raise ValueError("failed to parse {}".format(name))
+        if match.group(3) and not allow_image_level:
+            raise ValueError(
+                "invalid name {}: image level is not allowed".format(name))
+
+        id = ""
+        pool_id = None
+        namespace = None
+        image_name = None
+        image_id = None
+        if match.group(1):
+            pool_name = match.group(1)
+            try:
+                pool_id = handler.module.rados.pool_lookup(pool_name)
+                if pool_id is None:
+                    raise ValueError("pool {} does not exist".format(pool_name))
+                pool_id = str(pool_id)
+                id += pool_id
+                if match.group(2) is not None or match.group(3):
+                    id += "/"
+                    with handler.module.rados.open_ioctx(pool_name) as ioctx:
+                        namespace = match.group(2) or ""
+                        if namespace:
+                            namespaces = rbd.RBD().namespace_list(ioctx)
+                            if namespace not in namespaces:
+                                raise ValueError(
+                                    "namespace {} does not exist".format(
+                                        namespace))
+                            id += namespace
+                            ioctx.set_namespace(namespace)
+                            if namespace_validator:
+                                namespace_validator(ioctx)
+                        if match.group(3):
+                            image_name = match.group(3)
+                            try:
+                                with rbd.Image(ioctx, image_name,
+                                               read_only=True) as image:
+                                    image_id = image.id()
+                                    id += "/" + image_id
+                                    if image_validator:
+                                        image_validator(image)
+                            except rbd.ImageNotFound:
+                                raise ValueError("image {} does not exist".format(
+                                    image_name))
+                            except rbd.InvalidArgument:
+                                raise ValueError(
+                                    "image {} is not in snapshot mirror mode".format(
+                                        image_name))
+
+            except rados.ObjectNotFound:
+                raise ValueError("pool {} does not exist".format(pool_name))
+
+        # normalize possible input name like 'rbd//image'
+        if not namespace and image_name:
+            name = "{}/{}".format(pool_name, image_name)
+
+        return LevelSpec(name, id, pool_id, namespace, image_id)
+
+    @classmethod
+    def from_id(cls, handler, id, namespace_validator=None,
+                image_validator=None):
+        # parse ids like:
+        # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
+        match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
+        if not match:
+            raise ValueError("failed to parse: {}".format(id))
+
+        name = ""
+        pool_id = None
+        namespace = None
+        image_id = None
+        if match.group(1):
+            pool_id = match.group(1)
+            try:
+                pool_name = handler.module.rados.pool_reverse_lookup(
+                    int(pool_id))
+                if pool_name is None:
+                    raise ValueError("pool {} does not exist".format(pool_name))
+                name += pool_name + "/"
+                if match.group(2) is not None or match.group(3):
+                    with handler.module.rados.open_ioctx(pool_name) as ioctx:
+                        namespace = match.group(2) or ""
+                        if namespace:
+                            namespaces = rbd.RBD().namespace_list(ioctx)
+                            if namespace not in namespaces:
+                                raise ValueError(
+                                    "namespace {} does not exist".format(
+                                        namespace))
+                            name += namespace + "/"
+                            if namespace_validator:
+                                ioctx.set_namespace(namespace)
+                        elif not match.group(3):
+                            name += "/"
+                        if match.group(3):
+                            image_id = match.group(3)
+                            try:
+                                with rbd.Image(ioctx, image_id=image_id,
+                                               read_only=True) as image:
+                                    image_name = image.get_name()
+                                    name += image_name
+                                    if image_validator:
+                                        image_validator(image)
+                            except rbd.ImageNotFound:
+                                raise ValueError("image {} does not exist".format(
+                                    image_id))
+                            except rbd.InvalidArgument:
+                                raise ValueError(
+                                    "image {} is not in snapshot mirror mode".format(
+                                        image_id))
+
+            except rados.ObjectNotFound:
+                raise ValueError("pool {} does not exist".format(pool_id))
+
+        return LevelSpec(name, id, pool_id, namespace, image_id)
+
+
+class Interval:
+
+    def __init__(self, minutes):
+        self.minutes = minutes
+
+    def __eq__(self, interval):
+        return self.minutes == interval.minutes
+
+    def __hash__(self):
+        return hash(self.minutes)
+
+    def to_string(self):
+        if self.minutes % (60 * 24) == 0:
+            interval = int(self.minutes / (60 * 24))
+            units = 'd'
+        elif self.minutes % 60 == 0:
+            interval = int(self.minutes / 60)
+            units = 'h'
+        else:
+            interval = int(self.minutes)
+            units = 'm'
+
+        return "{}{}".format(interval, units)
+
+    @classmethod
+    def from_string(cls, interval):
+        match = re.match(r'^(\d+)(d|h|m)?$', interval)
+        if not match:
+            raise ValueError("Invalid interval ({})".format(interval))
+
+        minutes = int(match.group(1))
+        if match.group(2) == 'd':
+            minutes *= 60 * 24
+        elif match.group(2) == 'h':
+            minutes *= 60
+
+        return Interval(minutes)
+
+
+class StartTime:
+
+    def __init__(self, hour, minute, tzinfo):
+        self.time = time(hour, minute, tzinfo=tzinfo)
+        self.minutes = self.time.hour * 60 + self.time.minute
+        if self.time.tzinfo:
+            self.minutes += int(self.time.utcoffset().seconds / 60)
+
+    def __eq__(self, start_time):
+        return self.minutes == start_time.minutes
+
+    def __hash__(self):
+        return hash(self.minutes)
+
+    def to_string(self):
+        return self.time.isoformat()
+
+    @classmethod
+    def from_string(cls, start_time):
+        if not start_time:
+            return None
+
+        try:
+            t = time.fromisoformat(start_time)
+        except ValueError as e:
+            raise ValueError("Invalid start time {}: {}".format(start_time, e))
+
+        return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
+
+
+class Schedule:
+
+    def __init__(self, name):
+        self.name = name
+        self.items = set()
+
+    def __len__(self):
+        return len(self.items)
+
+    def add(self, interval, start_time=None):
+        self.items.add((interval, start_time))
+
+    def remove(self, interval, start_time=None):
+        self.items.discard((interval, start_time))
+
+    def next_run(self, now):
+        schedule_time = None
+        for item in self.items:
+            period = timedelta(minutes=item[0].minutes)
+            start_time = datetime(1970, 1, 1)
+            if item[1]:
+                start_time += timedelta(minutes=item[1].minutes)
+            time = start_time + \
+                (int((now - start_time) / period) + 1) * period
+            if schedule_time is None or time < schedule_time:
+                schedule_time = time
+        return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
+
+    def to_list(self):
+        return [{SCHEDULE_INTERVAL: i[0].to_string(),
+                 SCHEDULE_START_TIME: i[1] and i[1].to_string() or None}
+                for i in self.items]
+
+    def to_json(self):
+        return json.dumps(self.to_list(), indent=4, sort_keys=True)
+
+    @classmethod
+    def from_json(cls, name, val):
+        try:
+            items = json.loads(val)
+            schedule = Schedule(name)
+            for item in items:
+                interval = Interval.from_string(item[SCHEDULE_INTERVAL])
+                start_time = item[SCHEDULE_START_TIME] and \
+                    StartTime.from_string(item[SCHEDULE_START_TIME]) or None
+                schedule.add(interval, start_time)
+            return schedule
+        except json.JSONDecodeError as e:
+            raise ValueError("Invalid JSON ({})".format(str(e)))
+        except KeyError as e:
+            raise ValueError(
+                "Invalid schedule format (missing key {})".format(str(e)))
+        except TypeError as e:
+            raise ValueError("Invalid schedule format ({})".format(str(e)))
+
+class Schedules:
+
+    def __init__(self, handler):
+        self.handler = handler
+        self.level_specs = {}
+        self.schedules = {}
+
+    def __len__(self):
+        return len(self.schedules)
+
+    def load(self, namespace_validator=None, image_validator=None):
+
+        schedule_cfg = self.handler.module.get_localized_module_option(
+            self.handler.MODULE_OPTION_NAME, '')
+        if schedule_cfg:
+            try:
+                level_spec = LevelSpec.make_global()
+                self.level_specs[level_spec.id] = level_spec
+                schedule = Schedule.from_json(level_spec.name, schedule_cfg)
+                self.schedules[level_spec.id] = schedule
+            except ValueError:
+                self.handler.log.error(
+                    "Failed to decode configured schedule {}".format(
+                        schedule_cfg))
+
+        for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
+            with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+                self.load_from_pool(ioctx, namespace_validator, image_validator)
+
+    def load_from_pool(self, ioctx, namespace_validator, image_validator):
+        pool_id = ioctx.get_pool_id()
+        pool_name = ioctx.get_pool_name()
+        stale_keys = ()
+        start_after = ''
+        try:
+            while True:
+                with rados.ReadOpCtx() as read_op:
+                    self.handler.log.info(
+                        "load_schedules: {}, start_after={}".format(
+                            pool_name, start_after))
+                    it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+                    ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
+
+                    it = list(it)
+                    for k, v in it:
+                        start_after = k
+                        v = v.decode()
+                        self.handler.log.info(
+                            "load_schedule: {} {}".format(k, v))
+                        try:
+                            try:
+                                level_spec = LevelSpec.from_id(
+                                    self.handler, k, namespace_validator,
+                                    image_validator)
+                            except ValueError:
+                                self.handler.log.debug(
+                                    "Stail schedule key {} in pool".format(
+                                        k, pool_name))
+                                stale_keys += (k,)
+                                continue
+
+                            self.level_specs[level_spec.id] = level_spec
+                            schedule = Schedule.from_json(level_spec.name, v)
+                            self.schedules[level_spec.id] = schedule
+                        except ValueError:
+                            self.handler.log.error(
+                                "Failed to decode schedule: pool={}, {} {}".format(
+                                    pool_name, k, v))
+                    if not it:
+                        break
+
+        except StopIteration:
+            pass
+        except rados.ObjectNotFound:
+            pass
+
+        if stale_keys:
+            with rados.WriteOpCtx() as write_op:
+                ioctx.remove_omap_keys(write_op, stale_keys)
+                ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+    def save(self, level_spec, schedule):
+        if level_spec.is_global():
+            schedule_cfg = schedule and schedule.to_json() or ''
+            self.handler.module.set_localized_module_option(
+                self.handler.MODULE_OPTION_NAME, schedule_cfg)
+            return
+
+        pool_id = level_spec.get_pool_id()
+        with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+            with rados.WriteOpCtx() as write_op:
+                if schedule:
+                    ioctx.set_omap(write_op, (level_spec.id, ),
+                                   (schedule.to_json(), ))
+                else:
+                    ioctx.remove_omap_keys(write_op, (level_spec.id, ))
+                ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
+
+
+    def add(self, level_spec, interval, start_time):
+        schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
+        schedule.add(Interval.from_string(interval),
+                     StartTime.from_string(start_time))
+        self.schedules[level_spec.id] = schedule
+        self.level_specs[level_spec.id] = level_spec
+        self.save(level_spec, schedule)
+
+    def remove(self, level_spec, interval, start_time):
+        schedule = self.schedules.pop(level_spec.id, None)
+        if schedule:
+            if interval is None:
+                schedule = None
+            else:
+                schedule.remove(Interval.from_string(interval),
+                                StartTime.from_string(start_time))
+                if schedule:
+                    self.schedules[level_spec.id] = schedule
+            if not schedule:
+                del self.level_specs[level_spec.id]
+        self.save(level_spec, schedule)
+
+    def find(self, pool_id, namespace, image_id=None):
+        levels = [None, pool_id, namespace]
+        if image_id:
+            levels.append(image_id)
+
+        while levels:
+            level_spec_id = "/".join(levels[1:])
+            if level_spec_id in self.schedules:
+                return self.schedules[level_spec_id]
+            del levels[-1]
+        return None
+
+    def to_list(self, level_spec):
+        if level_spec.id in self.schedules:
+            parent = level_spec
+        else:
+            # try to find existing parent
+            parent = None
+            for level_spec_id in self.schedules:
+                ls = self.level_specs[level_spec_id]
+                if ls == level_spec:
+                    parent = ls
+                    break
+                if level_spec.is_child_of(ls) and \
+                   (not parent or ls.is_child_of(parent)):
+                    parent = ls
+            if not parent:
+                # set to non-existing parent so we still could list its children
+                parent = level_spec
+
+        result = {}
+        for level_spec_id, schedule in self.schedules.items():
+            ls = self.level_specs[level_spec_id]
+            if ls == parent or ls.is_child_of(parent):
+                result[level_spec_id] = {
+                    'name' : schedule.name,
+                    'schedule' : schedule.to_list(),
+                }
+        return result
+