]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: new 'rbd task' background task management
authorJason Dillaman <dillaman@redhat.com>
Tue, 2 Jul 2019 23:54:49 +0000 (19:54 -0400)
committerJason Dillaman <dillaman@redhat.com>
Sun, 18 Aug 2019 20:47:13 +0000 (16:47 -0400)
Fixes: http://tracker.ceph.com/issues/40621
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit b7a826df9f7e18db8ea561c08a0571aaba811f0f)

src/include/rbd_types.h
src/pybind/mgr/rbd_support/module.py

index 0277fd923c37637d2f765e568be5e6f25cb5dd05..94799ab6f1de60016409529ec5ef7ecbe1651c5d 100644 (file)
@@ -42,6 +42,7 @@
 #define RBD_DIRECTORY           "rbd_directory"
 #define RBD_INFO                "rbd_info"
 #define RBD_NAMESPACE           "rbd_namespace"
+#define RBD_TASK                "rbd_task"
 
 /*
  * rbd_children object in each pool contains omap entries
index e57aa7090604aa275185bb54298e5737fe0569d1..b4fe1bb829151b2a687ca1761934f03e086c2c8d 100644 (file)
@@ -2,17 +2,23 @@
 RBD support module
 """
 
+import errno
 import json
+import rados
+import rbd
+import re
 import time
 import traceback
+import uuid
 
 from mgr_module import MgrModule
 
+from contextlib import contextmanager
 from datetime import datetime, timedelta
-from rados import ObjectNotFound
-from rbd import RBD
+from functools import partial, wraps
 from threading import Condition, Lock, Thread
 
+
 GLOBAL_POOL_KEY = (None, None)
 
 QUERY_POOL_ID = "pool_id"
@@ -41,32 +47,57 @@ STATS_RATE_INTERVAL = timedelta(minutes=1)
 
 REPORT_MAX_RESULTS = 64
 
+RBD_TASK_OID = "rbd_task"
 
-class Module(MgrModule):
-    COMMANDS = [
-        {
-            "cmd": "rbd perf image stats "
-                   "name=pool_spec,type=CephString,req=false "
-                   "name=sort_by,type=CephChoices,strings="
-                   "write_ops|write_bytes|write_latency|"
-                   "read_ops|read_bytes|read_latency,"
-                   "req=false ",
-            "desc": "Retrieve current RBD IO performance stats",
-            "perm": "r"
-        },
-        {
-            "cmd": "rbd perf image counters "
-                   "name=pool_spec,type=CephString,req=false "
-                   "name=sort_by,type=CephChoices,strings="
-                   "write_ops|write_bytes|write_latency|"
-                   "read_ops|read_bytes|read_latency,"
-                   "req=false ",
-            "desc": "Retrieve current RBD IO performance counters",
-            "perm": "r"
-        }
-    ]
-    MODULE_OPTIONS = []
+TASK_SEQUENCE = "sequence"
+TASK_ID = "id"
+TASK_REFS = "refs"
+TASK_MESSAGE = "message"
+TASK_RETRY_TIME = "retry_time"
+TASK_IN_PROGRESS = "in_progress"
+TASK_PROGRESS = "progress"
+TASK_CANCELED = "canceled"
+
+TASK_REF_POOL_NAME = "pool_name"
+TASK_REF_POOL_NAMESPACE = "pool_namespace"
+TASK_REF_IMAGE_NAME = "image_name"
+TASK_REF_IMAGE_ID = "image_id"
+TASK_REF_ACTION = "action"
+
+TASK_REF_ACTION_FLATTEN = "flatten"
+TASK_REF_ACTION_REMOVE = "remove"
+TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
+TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
+TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
+TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
+
+VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
+                      TASK_REF_ACTION_REMOVE,
+                      TASK_REF_ACTION_TRASH_REMOVE,
+                      TASK_REF_ACTION_MIGRATION_EXECUTE,
+                      TASK_REF_ACTION_MIGRATION_COMMIT,
+                      TASK_REF_ACTION_MIGRATION_ABORT]
+
+TASK_RETRY_INTERVAL = timedelta(seconds=30)
+
+
+def extract_pool_key(pool_spec):
+    if not pool_spec:
+        return GLOBAL_POOL_KEY
 
+    match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec)
+    if not match:
+        raise ValueError("Invalid pool spec: {}".format(pool_spec))
+    return (match.group(1), match.group(2) or '')
+
+
+def get_rbd_pools(module):
+    osd_map = module.get('osd_map')
+    return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
+            if 'rbd' in pool.get('application_metadata', {})}
+
+
+class PerfHandler:
     user_queries = {}
     image_cache = {}
 
@@ -103,15 +134,6 @@ class Module(MgrModule):
                       'max_count': OSD_PERF_QUERY_MAX_RESULTS},
         }
 
-    @classmethod
-    def extract_pool_key(cls, pool_spec):
-        if not pool_spec:
-            return (None, None)
-        pool_spec = pool_spec.rsplit('/', 1)
-        if len(pool_spec) == 1 or not pool_spec[1]:
-            return (pool_spec[0], '')
-        return (pool_spec[0], pool_spec[1])
-
     @classmethod
     def pool_spec_search_keys(cls, pool_key):
         return [pool_key[0:len(pool_key) - x]
@@ -119,27 +141,29 @@ class Module(MgrModule):
 
     @classmethod
     def submatch_pool_key(cls, pool_key, search_key):
-        return ((pool_key[1] == search_key[1] or not search_key[1]) and
-                (pool_key[0] == search_key[0] or not search_key[0]))
+        return ((pool_key[1] == search_key[1] or not search_key[1])
+                and (pool_key[0] == search_key[0] or not search_key[0]))
+
+    def __init__(self, module):
+        self.module = module
+        self.log = module.log
 
-    def __init__(self, *args, **kwargs):
-        super(Module, self).__init__(*args, **kwargs)
         self.thread = Thread(target=self.run)
         self.thread.start()
 
     def run(self):
         try:
-            self.log.info("run: starting")
+            self.log.info("PerfHandler: starting")
             while True:
                 with self.lock:
                     self.scrub_expired_queries()
                     self.process_raw_osd_perf_counters()
                     self.refresh_condition.notify()
 
-                    stats_period = int(self.get_ceph_option("mgr_stats_period"))
+                    stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
                     self.query_condition.wait(stats_period)
 
-                self.log.debug("run: tick")
+                self.log.debug("PerfHandler: tick")
 
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
@@ -152,7 +176,7 @@ class Module(MgrModule):
         # collect and combine the raw counters from all sort orders
         raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
         for query_id in query[QUERY_IDS]:
-            res = self.get_osd_perf_counters(query_id)
+            res = self.module.get_osd_perf_counters(query_id)
             for counter in res['counters']:
                 # replace pool id from object name if it exists
                 k = counter['k']
@@ -219,13 +243,12 @@ class Module(MgrModule):
         return sum_pool_counters
 
     def refresh_image_names(self, resolve_image_names):
-        rbd = RBD()
         for pool_id, namespace in resolve_image_names:
             image_key = (pool_id, namespace)
             images = self.image_name_cache.setdefault(image_key, {})
-            with self.rados.open_ioctx2(int(pool_id)) as ioctx:
+            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
                 ioctx.set_namespace(namespace)
-                for image_meta in rbd.list2(ioctx):
+                for image_meta in rbd.RBD().list2(ioctx):
                     images[image_meta['id']] = image_meta['name']
             self.log.debug("resolve_image_names: {}={}".format(image_key, images))
 
@@ -275,15 +298,10 @@ class Module(MgrModule):
         elif not self.image_name_cache:
             self.scrub_missing_images()
 
-    def get_rbd_pools(self):
-        osd_map = self.get('osd_map')
-        return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
-                if 'rbd' in pool.get('application_metadata', {})}
-
     def resolve_pool_id(self, pool_name):
-        pool_id = self.rados.pool_lookup(pool_name)
+        pool_id = self.module.rados.pool_lookup(pool_name)
         if not pool_id:
-            raise ObjectNotFound("Pool '{}' not found".format(pool_name))
+            raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name))
         return pool_id
 
     def scrub_expired_queries(self):
@@ -293,7 +311,7 @@ class Module(MgrModule):
         for pool_key in list(self.user_queries.keys()):
             user_query = self.user_queries[pool_key]
             if user_query[QUERY_LAST_REQUEST] < expire_time:
-                self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
+                self.module.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
                 del self.user_queries[pool_key]
 
     def register_osd_perf_queries(self, pool_id, namespace):
@@ -303,14 +321,14 @@ class Module(MgrModule):
                 query = self.prepare_osd_perf_query(pool_id, namespace, counter)
                 self.log.debug("register_osd_perf_queries: {}".format(query))
 
-                query_id = self.add_osd_perf_query(query)
+                query_id = self.module.add_osd_perf_query(query)
                 if query_id is None:
                     raise RuntimeError('Failed to add OSD perf query: {}'.format(query))
                 query_ids.append(query_id)
 
         except Exception:
             for query_id in query_ids:
-                self.remove_osd_perf_query(query_id)
+                self.module.remove_osd_perf_query(query_id)
             raise
 
         return query_ids
@@ -319,7 +337,7 @@ class Module(MgrModule):
         self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
             pool_key, query_ids))
         for query_id in query_ids:
-            self.remove_osd_perf_query(query_id)
+            self.module.remove_osd_perf_query(query_id)
         query_ids[:] = []
 
     def register_query(self, pool_key):
@@ -352,7 +370,7 @@ class Module(MgrModule):
                 # processing period
                 user_query[QUERY_POOL_ID_MAP] = {
                     pool_id: pool_name for pool_id, pool_name
-                    in self.get_rbd_pools().items()}
+                    in get_rbd_pools(self.module).items()}
 
         self.log.debug("register_query: pool_key={}, query_ids={}".format(
             pool_key, user_query[QUERY_IDS]))
@@ -459,7 +477,7 @@ class Module(MgrModule):
             report, pool_spec, sort_by))
         self.scrub_expired_queries()
 
-        pool_key = self.extract_pool_key(pool_spec)
+        pool_key = extract_pool_key(pool_spec)
         user_query = self.register_query(pool_key)
 
         now = datetime.now()
@@ -483,13 +501,730 @@ class Module(MgrModule):
         return self.get_perf_data(
             "counter", pool_spec, sort_by, self.extract_counter)
 
-    def handle_command(self, inbuf, cmd):
+    def handle_command(self, inbuf, prefix, cmd):
         with self.lock:
-            if cmd['prefix'] == 'rbd perf image stats':
+            if prefix == 'image stats':
                 return self.get_perf_stats(cmd.get('pool_spec', None),
                                            cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
-            elif cmd['prefix'] == 'rbd perf image counters':
+            elif prefix == 'image counters':
                 return self.get_perf_counters(cmd.get('pool_spec', None),
                                               cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
 
-            raise NotImplementedError(cmd['prefix'])
+        raise NotImplementedError(cmd['prefix'])
+
+
+class Throttle:
+    def __init__(self, throttle_period):
+        self.throttle_period = throttle_period
+        self.time_of_last_call = datetime.min
+
+    def __call__(self, fn):
+        @wraps(fn)
+        def wrapper(*args, **kwargs):
+            now = datetime.now()
+            if self.time_of_last_call + self.throttle_period <= now:
+                self.time_of_last_call = now
+                return fn(*args, **kwargs)
+        return wrapper
+
+
+class Task:
+    def __init__(self, sequence, task_id, message, refs):
+        self.sequence = sequence
+        self.task_id = task_id
+        self.message = message
+        self.refs = refs
+        self.retry_time = None
+        self.in_progress = False
+        self.progress = 0.0
+        self.canceled = False
+
+    def __str__(self):
+        return self.to_json()
+
+    @property
+    def sequence_key(self):
+        return "{0:016X}".format(self.sequence)
+
+    def cancel(self):
+        self.canceled = True
+
+    def to_dict(self):
+        d = {TASK_SEQUENCE: self.sequence,
+             TASK_ID: self.task_id,
+             TASK_MESSAGE: self.message,
+             TASK_REFS: self.refs
+             }
+        if self.retry_time:
+            d[TASK_RETRY_TIME] = self.retry_time.isoformat()
+        if self.in_progress:
+            d[TASK_IN_PROGRESS] = True
+            d[TASK_PROGRESS] = self.progress
+        if self.canceled:
+            d[TASK_CANCELED] = True
+        return d
+
+    def to_json(self):
+        return str(json.dumps(self.to_dict()))
+
+    @classmethod
+    def from_json(cls, val):
+        try:
+            d = json.loads(val)
+            action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
+            if action not in VALID_TASK_ACTIONS:
+                raise ValueError("Invalid task action: {}".format(action))
+
+            return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
+        except json.JSONDecodeError as e:
+            raise ValueError("Invalid JSON ({})".format(str(e)))
+        except KeyError as e:
+            raise ValueError("Invalid task format (missing key {})".format(str(e)))
+
+
+class TaskHandler:
+    lock = Lock()
+    condition = Condition(lock)
+    thread = None
+
+    in_progress_task = None
+    tasks_by_sequence = dict()
+    tasks_by_id = dict()
+
+    sequence = 0
+
+    def __init__(self, module):
+        self.module = module
+        self.log = module.log
+
+        with self.lock:
+            self.init_task_queue()
+
+        self.thread = Thread(target=self.run)
+        self.thread.start()
+
+    @property
+    def default_pool_name(self):
+        return self.module.get_ceph_option("rbd_default_pool")
+
+    def extract_pool_spec(self, pool_spec):
+        pool_spec = extract_pool_key(pool_spec)
+        if pool_spec == GLOBAL_POOL_KEY:
+            pool_spec = (self.default_pool_name, '')
+        return pool_spec
+
+    def extract_image_spec(self, image_spec):
+        match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
+                         image_spec or '')
+        if not match:
+            raise ValueError("Invalid image spec: {}".format(image_spec))
+        return (match.group(1) or self.default_pool_name, match.group(2) or '',
+                match.group(3))
+
+    def run(self):
+        try:
+            self.log.info("TaskHandler: starting")
+            while True:
+                with self.lock:
+                    now = datetime.now()
+                    for sequence in sorted([sequence for sequence, task
+                                            in self.tasks_by_sequence.items()
+                                            if not task.retry_time or task.retry_time <= now]):
+                        self.execute_task(sequence)
+
+                    self.condition.wait(5)
+                    self.log.debug("TaskHandler: tick")
+
+        except Exception as ex:
+            self.log.fatal("Fatal runtime error: {}\n{}".format(
+                ex, traceback.format_exc()))
+
+    @contextmanager
+    def open_ioctx(self, spec):
+        try:
+            with self.module.rados.open_ioctx(spec[0]) as ioctx:
+                ioctx.set_namespace(spec[1])
+                yield ioctx
+        except rados.ObjectNotFound:
+            self.log.error("Failed to locate pool {}".format(spec[0]))
+            raise
+
+    @classmethod
+    def format_image_spec(cls, image_spec):
+        image = image_spec[2]
+        if image_spec[1]:
+            image = "{}/{}".format(image_spec[1], image)
+        if image_spec[0]:
+            image = "{}/{}".format(image_spec[0], image)
+        return image
+
+    def init_task_queue(self):
+        for pool_id, pool_name in get_rbd_pools(self.module).items():
+            try:
+                with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+                    self.load_task_queue(ioctx, pool_name)
+
+                    for namespace in rbd.RBD().namespace_list(ioctx):
+                        ioctx.set_namespace(namespace)
+                        self.load_task_queue(ioctx, pool_name)
+            except rados.ObjectNotFound:
+                # pool DNE
+                pass
+
+        if self.tasks_by_sequence:
+            self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
+
+        self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
+            self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
+
+    def load_task_queue(self, ioctx, pool_name):
+        pool_spec = pool_name
+        if ioctx.nspace:
+            pool_spec += "/{}".format(ioctx.nspace)
+
+        start_after = ''
+        try:
+            while True:
+                with rados.ReadOpCtx() as read_op:
+                    self.log.info("load_task_task: {}, start_after={}".format(
+                        pool_spec, start_after))
+                    it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+                    ioctx.operate_read_op(read_op, RBD_TASK_OID)
+
+                    it = list(it)
+                    for k, v in it:
+                        start_after = k
+                        v = v.decode()
+                        self.log.info("load_task_task: task={}".format(v))
+
+                        try:
+                            task = Task.from_json(v)
+                            self.append_task(task)
+                        except ValueError:
+                            self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
+
+                    if not it:
+                        break
+
+        except StopIteration:
+            pass
+        except rados.ObjectNotFound:
+            # rbd_task DNE
+            pass
+
+    def append_task(self, task):
+        self.tasks_by_sequence[task.sequence] = task
+        self.tasks_by_id[task.task_id] = task
+
+    def add_task(self, ioctx, message, refs):
+        self.log.debug("add_task: message={}, refs={}".format(message, refs))
+
+        # search for dups and return the original
+        for task_id, task in self.tasks_by_id.items():
+            if task.refs == refs:
+                return json.dumps(task.to_dict())
+
+        # ensure unique uuid across all pools
+        while True:
+            task_id = str(uuid.uuid4())
+            if task_id not in self.tasks_by_id:
+                break
+
+        self.sequence += 1
+        task = Task(self.sequence, task_id, message, refs)
+
+        # add the task to the rbd_task omap
+        task_json = task.to_json()
+        omap_keys = (task.sequence_key, )
+        omap_vals = (str.encode(task_json), )
+        self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0]))
+
+        with rados.WriteOpCtx() as write_op:
+            ioctx.set_omap(write_op, omap_keys, omap_vals)
+            ioctx.operate_write_op(write_op, RBD_TASK_OID)
+        self.append_task(task)
+
+        self.condition.notify()
+        return task_json
+
+    def remove_task(self, ioctx, task, remove_in_memory=True):
+        self.log.info("remove_task: task={}".format(str(task)))
+        omap_keys = (task.sequence_key, )
+        try:
+            with rados.WriteOpCtx() as write_op:
+                ioctx.remove_omap_keys(write_op, omap_keys)
+                ioctx.operate_write_op(write_op, RBD_TASK_OID)
+        except rados.ObjectNotFound:
+            pass
+
+        if remove_in_memory:
+            try:
+                del self.tasks_by_id[task.task_id]
+                del self.tasks_by_sequence[task.sequence]
+            except KeyError:
+                pass
+
+    def execute_task(self, sequence):
+        task = self.tasks_by_sequence[sequence]
+        self.log.info("execute_task: task={}".format(str(task)))
+
+        pool_valid = False
+        try:
+            with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
+                                  task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
+                pool_valid = True
+
+                action = task.refs[TASK_REF_ACTION]
+                execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
+                              TASK_REF_ACTION_REMOVE: self.execute_remove,
+                              TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
+                              TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
+                              TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
+                              TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
+                              }.get(action)
+                if not execute_fn:
+                    self.log.error("Invalid task action: {}".format(action))
+                else:
+                    task.in_progress = True
+                    self.in_progress_task = task
+                    self.update_progress(task, 0)
+
+                    self.lock.release()
+                    try:
+                        execute_fn(ioctx, task)
+
+                    except rbd.OperationCanceled:
+                        self.log.info("Operation canceled: task={}".format(
+                            str(task)))
+
+                    finally:
+                        self.lock.acquire()
+
+                        task.in_progress = False
+                        self.in_progress_task = None
+
+                    self.complete_progress(task)
+                    self.remove_task(ioctx, task)
+
+        except rados.ObjectNotFound as e:
+            self.log.error("execute_task: {}".format(e))
+            if pool_valid:
+                self.update_progress(task, 0)
+            else:
+                # pool DNE -- remove the task
+                self.complete_progress(task)
+                self.remove_task(ioctx, task)
+
+        except (rados.Error, rbd.Error) as e:
+            self.log.error("execute_task: {}".format(e))
+            self.update_progress(task, 0)
+
+        finally:
+            task.in_progress = False
+            task.retry_time = datetime.now() + TASK_RETRY_INTERVAL
+
+    def progress_callback(self, task, current, total):
+        progress = float(current) / float(total)
+        self.log.debug("progress_callback: task={}, progress={}".format(
+            str(task), progress))
+
+        # avoid deadlocking when a new command comes in during a progress callback
+        if not self.lock.acquire(False):
+            return 0
+
+        try:
+            if not self.in_progress_task or self.in_progress_task.canceled:
+                return -rbd.ECANCELED
+            self.in_progress_task.progress = progress
+        finally:
+            self.lock.release()
+
+        self.throttled_update_progress(task, progress)
+        return 0
+
+    def execute_flatten(self, ioctx, task):
+        self.log.info("execute_flatten: task={}".format(str(task)))
+
+        try:
+            with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
+                image.flatten(on_progress=partial(self.progress_callback, task))
+        except rbd.InvalidArgument:
+            self.log.info("Image does not have parent: task={}".format(str(task)))
+        except rbd.ImageNotFound:
+            self.log.info("Image does not exist: task={}".format(str(task)))
+
+    def execute_remove(self, ioctx, task):
+        self.log.info("execute_remove: task={}".format(str(task)))
+
+        try:
+            rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+                             on_progress=partial(self.progress_callback, task))
+        except rbd.ImageNotFound:
+            self.log.info("Image does not exist: task={}".format(str(task)))
+
+    def execute_trash_remove(self, ioctx, task):
+        self.log.info("execute_trash_remove: task={}".format(str(task)))
+
+        try:
+            rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
+                                   on_progress=partial(self.progress_callback, task))
+        except rbd.ImageNotFound:
+            self.log.info("Image does not exist: task={}".format(str(task)))
+
+    def execute_migration_execute(self, ioctx, task):
+        self.log.info("execute_migration_execute: task={}".format(str(task)))
+
+        try:
+            rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+                                        on_progress=partial(self.progress_callback, task))
+        except rbd.ImageNotFound:
+            self.log.info("Image does not exist: task={}".format(str(task)))
+        except rbd.InvalidArgument:
+            self.log.info("Image is not migrating: task={}".format(str(task)))
+
+    def execute_migration_commit(self, ioctx, task):
+        self.log.info("execute_migration_commit: task={}".format(str(task)))
+
+        try:
+            rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+                                       on_progress=partial(self.progress_callback, task))
+        except rbd.ImageNotFound:
+            self.log.info("Image does not exist: task={}".format(str(task)))
+        except rbd.InvalidArgument:
+            self.log.info("Image is not migrating or migration not executed: task={}".format(str(task)))
+
+    def execute_migration_abort(self, ioctx, task):
+        self.log.info("execute_migration_abort: task={}".format(str(task)))
+
+        try:
+            rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+                                      on_progress=partial(self.progress_callback, task))
+        except rbd.ImageNotFound:
+            self.log.info("Image does not exist: task={}".format(str(task)))
+        except rbd.InvalidArgument:
+            self.log.info("Image is not migrating: task={}".format(str(task)))
+
+    def complete_progress(self, task):
+        self.log.debug("complete_progress: task={}".format(str(task)))
+        try:
+            self.module.remote("progress", "complete", task.task_id)
+        except ImportError:
+            # progress module is disabled
+            pass
+
+    def update_progress(self, task, progress):
+        self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
+        try:
+            self.module.remote("progress", "update", task.task_id,
+                               task.message, progress,
+                               ["rbd_support"])
+        except ImportError:
+            # progress module is disabled
+            pass
+
+    @Throttle(timedelta(seconds=1))
+    def throttled_update_progress(self, task, progress):
+        self.update_progress(task, progress)
+
+    def queue_flatten(self, image_spec):
+        image_spec = self.extract_image_spec(image_spec)
+        self.log.info("queue_flatten: {}".format(image_spec))
+
+        with self.open_ioctx(image_spec) as ioctx:
+            with rbd.Image(ioctx, image_spec[2]) as image:
+                try:
+                    image.parent_id()
+                except rbd.ImageNotFound:
+                    raise rbd.ImageNotFound("Image {} does not have a parent".format(
+                        self.format_image_spec(image_spec)), errno=errno.ENOENT)
+
+            return 0, self.add_task(ioctx,
+                                    "Flattening image {}".format(
+                                        self.format_image_spec(image_spec)),
+                                    {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
+                                     TASK_REF_POOL_NAME: image_spec[0],
+                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
+                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+
+    def queue_remove(self, image_spec):
+        image_spec = self.extract_image_spec(image_spec)
+        self.log.info("queue_remove: {}".format(image_spec))
+
+        with self.open_ioctx(image_spec) as ioctx:
+            with rbd.Image(ioctx, image_spec[2]) as image:
+                if list(image.list_snaps()):
+                    raise rbd.ImageBusy("Image {} has snapshots".format(
+                        self.format_image_spec(image_spec)), errno=errno.EBUSY)
+
+            return 0, self.add_task(ioctx,
+                                    "Removing image {}".format(
+                                        self.format_image_spec(image_spec)),
+                                    {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
+                                     TASK_REF_POOL_NAME: image_spec[0],
+                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
+                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+
+    def queue_trash_remove(self, image_id_spec):
+        image_id_spec = self.extract_image_spec(image_id_spec)
+        self.log.info("queue_trash_remove: {}".format(image_id_spec))
+
+        # verify that image exists in trash
+        with self.open_ioctx(image_id_spec) as ioctx:
+            rbd.RBD().trash_get(ioctx, image_id_spec[2])
+
+            return 0, self.add_task(ioctx,
+                                    "Removing image {} from trash".format(
+                                        self.format_image_spec(image_id_spec)),
+                                    {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
+                                     TASK_REF_POOL_NAME: image_id_spec[0],
+                                     TASK_REF_POOL_NAMESPACE: image_id_spec[1],
+                                     TASK_REF_IMAGE_ID: image_id_spec[2]}), ""
+
+    def validate_image_migrating(self, ioctx, image_spec):
+        with rbd.Image(ioctx, image_spec[2]) as image:
+            features = image.features()
+            if features & rbd.RBD_FEATURE_MIGRATING == 0:
+                raise rbd.InvalidArgument("Image {} is not migrating".format(
+                    self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+    def resolve_pool_name(self, pool_id):
+        osd_map = self.module.get('osd_map')
+        for pool in osd_map['pools']:
+            if pool['pool'] == pool_id:
+                return pool['pool_name']
+        return '<unknown>'
+
+    def queue_migration_execute(self, image_spec):
+        image_spec = self.extract_image_spec(image_spec)
+        self.log.info("queue_migration_execute: {}".format(image_spec))
+
+        with self.open_ioctx(image_spec) as ioctx:
+            self.validate_image_migrating(ioctx, image_spec)
+
+            status = rbd.RBD().migration_status(ioctx, image_spec[2])
+            if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
+                                       rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
+                raise rbd.InvalidArgument("Image {} is not in ready state".format(
+                    self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+            source_pool = self.resolve_pool_name(status['source_pool_id'])
+            dest_pool = self.resolve_pool_name(status['dest_pool_id'])
+            return 0, self.add_task(ioctx,
+                                    "Migrating image {} to {}".format(
+                                        self.format_image_spec((source_pool,
+                                                                status['source_pool_namespace'],
+                                                                status['source_image_name'])),
+                                        self.format_image_spec((dest_pool,
+                                                                status['dest_pool_namespace'],
+                                                                status['dest_image_name']))),
+                                    {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
+                                     TASK_REF_POOL_NAME: image_spec[0],
+                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
+                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+
+    def queue_migration_commit(self, image_spec):
+        image_spec = self.extract_image_spec(image_spec)
+        self.log.info("queue_migration_commit: {}".format(image_spec))
+
+        with self.open_ioctx(image_spec) as ioctx:
+            self.validate_image_migrating(ioctx, image_spec)
+
+            status = rbd.RBD().migration_status(ioctx, image_spec[2])
+            if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
+                raise rbd.InvalidArgument("Image {} has not completed migration".format(
+                    self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+            return 0, self.add_task(ioctx,
+                                    "Committing image migration for {}".format(
+                                        self.format_image_spec(image_spec)),
+                                    {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
+                                     TASK_REF_POOL_NAME: image_spec[0],
+                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
+                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+
+    def queue_migration_abort(self, image_spec):
+        image_spec = self.extract_image_spec(image_spec)
+        self.log.info("queue_migration_abort: {}".format(image_spec))
+
+        with self.open_ioctx(image_spec) as ioctx:
+            self.validate_image_migrating(ioctx, image_spec)
+
+            return 0, self.add_task(ioctx,
+                                    "Aborting image migration for {}".format(
+                                        self.format_image_spec(image_spec)),
+                                    {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
+                                     TASK_REF_POOL_NAME: image_spec[0],
+                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
+                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+
+    def task_cancel(self, task_id):
+        self.log.info("task_cancel: {}".format(task_id))
+
+        if task_id not in self.tasks_by_id:
+            self.log.debug("tasks: {}".format(str(self.tasks_by_id)))
+            raise KeyError("No such task {}".format(task_id))
+
+        task = self.tasks_by_id[task_id]
+        task.cancel()
+
+        remove_in_memory = True
+        if self.in_progress_task and self.in_progress_task.task_id == task_id:
+            self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
+            remove_in_memory = False
+
+        # complete any associated event in the progress module
+        self.complete_progress(task)
+
+        # remove from rbd_task omap
+        with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
+                              task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
+            self.remove_task(ioctx, task, remove_in_memory)
+
+        return 0, "", ""
+
+    def task_list(self, task_id):
+        self.log.info("task_list: {}".format(task_id))
+
+        if task_id:
+            if task_id not in self.tasks_by_id:
+                raise KeyError("No such task {}".format(task_id))
+
+            result = self.tasks_by_id[task_id].to_dict()
+        else:
+            result = []
+            for sequence in sorted(self.tasks_by_sequence.keys()):
+                task = self.tasks_by_sequence[sequence]
+                result.append(task.to_dict())
+
+        return 0, json.dumps(result), ""
+
+    def handle_command(self, inbuf, prefix, cmd):
+        with self.lock:
+            if prefix == 'add flatten':
+                return self.queue_flatten(cmd['image_spec'])
+            elif prefix == 'add remove':
+                return self.queue_remove(cmd['image_spec'])
+            elif prefix == 'add trash remove':
+                return self.queue_trash_remove(cmd['image_id_spec'])
+            elif prefix == 'add migration execute':
+                return self.queue_migration_execute(cmd['image_spec'])
+            elif prefix == 'add migration commit':
+                return self.queue_migration_commit(cmd['image_spec'])
+            elif prefix == 'add migration abort':
+                return self.queue_migration_abort(cmd['image_spec'])
+            elif prefix == 'cancel':
+                return self.task_cancel(cmd['task_id'])
+            elif prefix == 'list':
+                return self.task_list(cmd.get('task_id'))
+
+        raise NotImplementedError(cmd['prefix'])
+
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "rbd perf image stats "
+                   "name=pool_spec,type=CephString,req=false "
+                   "name=sort_by,type=CephChoices,strings="
+                   "write_ops|write_bytes|write_latency|"
+                   "read_ops|read_bytes|read_latency,"
+                   "req=false ",
+            "desc": "Retrieve current RBD IO performance stats",
+            "perm": "r"
+        },
+        {
+            "cmd": "rbd perf image counters "
+                   "name=pool_spec,type=CephString,req=false "
+                   "name=sort_by,type=CephChoices,strings="
+                   "write_ops|write_bytes|write_latency|"
+                   "read_ops|read_bytes|read_latency,"
+                   "req=false ",
+            "desc": "Retrieve current RBD IO performance counters",
+            "perm": "r"
+        },
+        {
+            "cmd": "rbd task add flatten "
+                   "name=image_spec,type=CephString",
+            "desc": "Flatten a cloned image asynchronously in the background",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd task add remove "
+                   "name=image_spec,type=CephString",
+            "desc": "Remove an image asynchronously in the background",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd task add trash remove "
+                   "name=image_id_spec,type=CephString",
+            "desc": "Remove an image from the trash asynchronously in the background",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd task add migration execute "
+                   "name=image_spec,type=CephString",
+            "desc": "Execute an image migration asynchronously in the background",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd task add migration commit "
+                   "name=image_spec,type=CephString",
+            "desc": "Commit an executed migration asynchronously in the background",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd task add migration abort "
+                   "name=image_spec,type=CephString",
+            "desc": "Abort a prepared migration asynchronously in the background",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd task cancel "
+                   "name=task_id,type=CephString ",
+            "desc": "Cancel a pending or running asynchronous task",
+            "perm": "r"
+        },
+        {
+            "cmd": "rbd task list "
+                   "name=task_id,type=CephString,req=false ",
+            "desc": "List pending or running asynchronous tasks",
+            "perm": "r"
+        }
+    ]
+    MODULE_OPTIONS = []
+
+    perf = None
+    task = None
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self.perf = PerfHandler(self)
+        self.task = TaskHandler(self)
+
+    def handle_command(self, inbuf, cmd):
+        prefix = cmd['prefix']
+        try:
+            try:
+                if prefix.startswith('rbd perf '):
+                    return self.perf.handle_command(inbuf, prefix[9:], cmd)
+                elif prefix.startswith('rbd task '):
+                    return self.task.handle_command(inbuf, prefix[9:], cmd)
+
+            except Exception as ex:
+                # log the full traceback but don't send it to the CLI user
+                self.log.fatal("Fatal runtime error: {}\n{}".format(
+                    ex, traceback.format_exc()))
+                raise
+
+        except rados.Error as ex:
+            return -ex.errno, "", str(ex)
+        except rbd.OSError as ex:
+            return -ex.errno, "", str(ex)
+        except rbd.Error as ex:
+            return -errno.EINVAL, "", str(ex)
+        except KeyError as ex:
+            return -errno.ENOENT, "", str(ex)
+        except ValueError as ex:
+            return -errno.EINVAL, "", str(ex)
+
+        raise NotImplementedError(cmd['prefix'])