From b7a826df9f7e18db8ea561c08a0571aaba811f0f Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 2 Jul 2019 19:54:49 -0400 Subject: [PATCH] pybind/mgr: new 'rbd task' background task management Fixes: http://tracker.ceph.com/issues/40621 Signed-off-by: Jason Dillaman --- src/include/rbd_types.h | 1 + src/pybind/mgr/rbd_support/module.py | 861 +++++++++++++++++++++++++-- 2 files changed, 799 insertions(+), 63 deletions(-) diff --git a/src/include/rbd_types.h b/src/include/rbd_types.h index 0277fd923c3..94799ab6f1d 100644 --- a/src/include/rbd_types.h +++ b/src/include/rbd_types.h @@ -42,6 +42,7 @@ #define RBD_DIRECTORY "rbd_directory" #define RBD_INFO "rbd_info" #define RBD_NAMESPACE "rbd_namespace" +#define RBD_TASK "rbd_task" /* * rbd_children object in each pool contains omap entries diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py index e57aa709060..b4fe1bb8291 100644 --- a/src/pybind/mgr/rbd_support/module.py +++ b/src/pybind/mgr/rbd_support/module.py @@ -2,17 +2,23 @@ RBD support module """ +import errno import json +import rados +import rbd +import re import time import traceback +import uuid from mgr_module import MgrModule +from contextlib import contextmanager from datetime import datetime, timedelta -from rados import ObjectNotFound -from rbd import RBD +from functools import partial, wraps from threading import Condition, Lock, Thread + GLOBAL_POOL_KEY = (None, None) QUERY_POOL_ID = "pool_id" @@ -41,32 +47,57 @@ STATS_RATE_INTERVAL = timedelta(minutes=1) REPORT_MAX_RESULTS = 64 +RBD_TASK_OID = "rbd_task" -class Module(MgrModule): - COMMANDS = [ - { - "cmd": "rbd perf image stats " - "name=pool_spec,type=CephString,req=false " - "name=sort_by,type=CephChoices,strings=" - "write_ops|write_bytes|write_latency|" - "read_ops|read_bytes|read_latency," - "req=false ", - "desc": "Retrieve current RBD IO performance stats", - "perm": "r" - }, - { - "cmd": "rbd perf image counters " - "name=pool_spec,type=CephString,req=false " - "name=sort_by,type=CephChoices,strings=" - "write_ops|write_bytes|write_latency|" - "read_ops|read_bytes|read_latency," - "req=false ", - "desc": "Retrieve current RBD IO performance counters", - "perm": "r" - } - ] - MODULE_OPTIONS = [] +TASK_SEQUENCE = "sequence" +TASK_ID = "id" +TASK_REFS = "refs" +TASK_MESSAGE = "message" +TASK_RETRY_TIME = "retry_time" +TASK_IN_PROGRESS = "in_progress" +TASK_PROGRESS = "progress" +TASK_CANCELED = "canceled" + +TASK_REF_POOL_NAME = "pool_name" +TASK_REF_POOL_NAMESPACE = "pool_namespace" +TASK_REF_IMAGE_NAME = "image_name" +TASK_REF_IMAGE_ID = "image_id" +TASK_REF_ACTION = "action" + +TASK_REF_ACTION_FLATTEN = "flatten" +TASK_REF_ACTION_REMOVE = "remove" +TASK_REF_ACTION_TRASH_REMOVE = "trash remove" +TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute" +TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit" +TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort" + +VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN, + TASK_REF_ACTION_REMOVE, + TASK_REF_ACTION_TRASH_REMOVE, + TASK_REF_ACTION_MIGRATION_EXECUTE, + TASK_REF_ACTION_MIGRATION_COMMIT, + TASK_REF_ACTION_MIGRATION_ABORT] + +TASK_RETRY_INTERVAL = timedelta(seconds=30) + + +def extract_pool_key(pool_spec): + if not pool_spec: + return GLOBAL_POOL_KEY + match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec) + if not match: + raise ValueError("Invalid pool spec: {}".format(pool_spec)) + return (match.group(1), match.group(2) or '') + + +def get_rbd_pools(module): + osd_map = module.get('osd_map') + return {pool['pool']: pool['pool_name'] for pool in osd_map['pools'] + if 'rbd' in pool.get('application_metadata', {})} + + +class PerfHandler: user_queries = {} image_cache = {} @@ -103,15 +134,6 @@ class Module(MgrModule): 'max_count': OSD_PERF_QUERY_MAX_RESULTS}, } - @classmethod - def extract_pool_key(cls, pool_spec): - if not pool_spec: - return (None, None) - pool_spec = pool_spec.rsplit('/', 1) - if len(pool_spec) == 1 or not pool_spec[1]: - return (pool_spec[0], '') - return (pool_spec[0], pool_spec[1]) - @classmethod def pool_spec_search_keys(cls, pool_key): return [pool_key[0:len(pool_key) - x] @@ -119,27 +141,29 @@ class Module(MgrModule): @classmethod def submatch_pool_key(cls, pool_key, search_key): - return ((pool_key[1] == search_key[1] or not search_key[1]) and - (pool_key[0] == search_key[0] or not search_key[0])) + return ((pool_key[1] == search_key[1] or not search_key[1]) + and (pool_key[0] == search_key[0] or not search_key[0])) + + def __init__(self, module): + self.module = module + self.log = module.log - def __init__(self, *args, **kwargs): - super(Module, self).__init__(*args, **kwargs) self.thread = Thread(target=self.run) self.thread.start() def run(self): try: - self.log.info("run: starting") + self.log.info("PerfHandler: starting") while True: with self.lock: self.scrub_expired_queries() self.process_raw_osd_perf_counters() self.refresh_condition.notify() - stats_period = int(self.get_ceph_option("mgr_stats_period")) + stats_period = int(self.module.get_ceph_option("mgr_stats_period")) self.query_condition.wait(stats_period) - self.log.debug("run: tick") + self.log.debug("PerfHandler: tick") except Exception as ex: self.log.fatal("Fatal runtime error: {}\n{}".format( @@ -152,7 +176,7 @@ class Module(MgrModule): # collect and combine the raw counters from all sort orders raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {}) for query_id in query[QUERY_IDS]: - res = self.get_osd_perf_counters(query_id) + res = self.module.get_osd_perf_counters(query_id) for counter in res['counters']: # replace pool id from object name if it exists k = counter['k'] @@ -219,13 +243,12 @@ class Module(MgrModule): return sum_pool_counters def refresh_image_names(self, resolve_image_names): - rbd = RBD() for pool_id, namespace in resolve_image_names: image_key = (pool_id, namespace) images = self.image_name_cache.setdefault(image_key, {}) - with self.rados.open_ioctx2(int(pool_id)) as ioctx: + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: ioctx.set_namespace(namespace) - for image_meta in rbd.list2(ioctx): + for image_meta in rbd.RBD().list2(ioctx): images[image_meta['id']] = image_meta['name'] self.log.debug("resolve_image_names: {}={}".format(image_key, images)) @@ -275,15 +298,10 @@ class Module(MgrModule): elif not self.image_name_cache: self.scrub_missing_images() - def get_rbd_pools(self): - osd_map = self.get('osd_map') - return {pool['pool']: pool['pool_name'] for pool in osd_map['pools'] - if 'rbd' in pool.get('application_metadata', {})} - def resolve_pool_id(self, pool_name): - pool_id = self.rados.pool_lookup(pool_name) + pool_id = self.module.rados.pool_lookup(pool_name) if not pool_id: - raise ObjectNotFound("Pool '{}' not found".format(pool_name)) + raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name)) return pool_id def scrub_expired_queries(self): @@ -293,7 +311,7 @@ class Module(MgrModule): for pool_key in list(self.user_queries.keys()): user_query = self.user_queries[pool_key] if user_query[QUERY_LAST_REQUEST] < expire_time: - self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS]) + self.module.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS]) del self.user_queries[pool_key] def register_osd_perf_queries(self, pool_id, namespace): @@ -303,14 +321,14 @@ class Module(MgrModule): query = self.prepare_osd_perf_query(pool_id, namespace, counter) self.log.debug("register_osd_perf_queries: {}".format(query)) - query_id = self.add_osd_perf_query(query) + query_id = self.module.add_osd_perf_query(query) if query_id is None: raise RuntimeError('Failed to add OSD perf query: {}'.format(query)) query_ids.append(query_id) except Exception: for query_id in query_ids: - self.remove_osd_perf_query(query_id) + self.module.remove_osd_perf_query(query_id) raise return query_ids @@ -319,7 +337,7 @@ class Module(MgrModule): self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format( pool_key, query_ids)) for query_id in query_ids: - self.remove_osd_perf_query(query_id) + self.module.remove_osd_perf_query(query_id) query_ids[:] = [] def register_query(self, pool_key): @@ -352,7 +370,7 @@ class Module(MgrModule): # processing period user_query[QUERY_POOL_ID_MAP] = { pool_id: pool_name for pool_id, pool_name - in self.get_rbd_pools().items()} + in get_rbd_pools(self.module).items()} self.log.debug("register_query: pool_key={}, query_ids={}".format( pool_key, user_query[QUERY_IDS])) @@ -459,7 +477,7 @@ class Module(MgrModule): report, pool_spec, sort_by)) self.scrub_expired_queries() - pool_key = self.extract_pool_key(pool_spec) + pool_key = extract_pool_key(pool_spec) user_query = self.register_query(pool_key) now = datetime.now() @@ -483,13 +501,730 @@ class Module(MgrModule): return self.get_perf_data( "counter", pool_spec, sort_by, self.extract_counter) - def handle_command(self, inbuf, cmd): + def handle_command(self, inbuf, prefix, cmd): with self.lock: - if cmd['prefix'] == 'rbd perf image stats': + if prefix == 'image stats': return self.get_perf_stats(cmd.get('pool_spec', None), cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0])) - elif cmd['prefix'] == 'rbd perf image counters': + elif prefix == 'image counters': return self.get_perf_counters(cmd.get('pool_spec', None), cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0])) - raise NotImplementedError(cmd['prefix']) + raise NotImplementedError(cmd['prefix']) + + +class Throttle: + def __init__(self, throttle_period): + self.throttle_period = throttle_period + self.time_of_last_call = datetime.min + + def __call__(self, fn): + @wraps(fn) + def wrapper(*args, **kwargs): + now = datetime.now() + if self.time_of_last_call + self.throttle_period <= now: + self.time_of_last_call = now + return fn(*args, **kwargs) + return wrapper + + +class Task: + def __init__(self, sequence, task_id, message, refs): + self.sequence = sequence + self.task_id = task_id + self.message = message + self.refs = refs + self.retry_time = None + self.in_progress = False + self.progress = 0.0 + self.canceled = False + + def __str__(self): + return self.to_json() + + @property + def sequence_key(self): + return "{0:016X}".format(self.sequence) + + def cancel(self): + self.canceled = True + + def to_dict(self): + d = {TASK_SEQUENCE: self.sequence, + TASK_ID: self.task_id, + TASK_MESSAGE: self.message, + TASK_REFS: self.refs + } + if self.retry_time: + d[TASK_RETRY_TIME] = self.retry_time.isoformat() + if self.in_progress: + d[TASK_IN_PROGRESS] = True + d[TASK_PROGRESS] = self.progress + if self.canceled: + d[TASK_CANCELED] = True + return d + + def to_json(self): + return str(json.dumps(self.to_dict())) + + @classmethod + def from_json(cls, val): + try: + d = json.loads(val) + action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION) + if action not in VALID_TASK_ACTIONS: + raise ValueError("Invalid task action: {}".format(action)) + + return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS]) + except json.JSONDecodeError as e: + raise ValueError("Invalid JSON ({})".format(str(e))) + except KeyError as e: + raise ValueError("Invalid task format (missing key {})".format(str(e))) + + +class TaskHandler: + lock = Lock() + condition = Condition(lock) + thread = None + + in_progress_task = None + tasks_by_sequence = dict() + tasks_by_id = dict() + + sequence = 0 + + def __init__(self, module): + self.module = module + self.log = module.log + + with self.lock: + self.init_task_queue() + + self.thread = Thread(target=self.run) + self.thread.start() + + @property + def default_pool_name(self): + return self.module.get_ceph_option("rbd_default_pool") + + def extract_pool_spec(self, pool_spec): + pool_spec = extract_pool_key(pool_spec) + if pool_spec == GLOBAL_POOL_KEY: + pool_spec = (self.default_pool_name, '') + return pool_spec + + def extract_image_spec(self, image_spec): + match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$', + image_spec or '') + if not match: + raise ValueError("Invalid image spec: {}".format(image_spec)) + return (match.group(1) or self.default_pool_name, match.group(2) or '', + match.group(3)) + + def run(self): + try: + self.log.info("TaskHandler: starting") + while True: + with self.lock: + now = datetime.now() + for sequence in sorted([sequence for sequence, task + in self.tasks_by_sequence.items() + if not task.retry_time or task.retry_time <= now]): + self.execute_task(sequence) + + self.condition.wait(5) + self.log.debug("TaskHandler: tick") + + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + @contextmanager + def open_ioctx(self, spec): + try: + with self.module.rados.open_ioctx(spec[0]) as ioctx: + ioctx.set_namespace(spec[1]) + yield ioctx + except rados.ObjectNotFound: + self.log.error("Failed to locate pool {}".format(spec[0])) + raise + + @classmethod + def format_image_spec(cls, image_spec): + image = image_spec[2] + if image_spec[1]: + image = "{}/{}".format(image_spec[1], image) + if image_spec[0]: + image = "{}/{}".format(image_spec[0], image) + return image + + def init_task_queue(self): + for pool_id, pool_name in get_rbd_pools(self.module).items(): + try: + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_task_queue(ioctx, pool_name) + + for namespace in rbd.RBD().namespace_list(ioctx): + ioctx.set_namespace(namespace) + self.load_task_queue(ioctx, pool_name) + except rados.ObjectNotFound: + # pool DNE + pass + + if self.tasks_by_sequence: + self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1] + + self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format( + self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id))) + + def load_task_queue(self, ioctx, pool_name): + pool_spec = pool_name + if ioctx.nspace: + pool_spec += "/{}".format(ioctx.nspace) + + start_after = '' + try: + while True: + with rados.ReadOpCtx() as read_op: + self.log.info("load_task_task: {}, start_after={}".format( + pool_spec, start_after)) + it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) + ioctx.operate_read_op(read_op, RBD_TASK_OID) + + it = list(it) + for k, v in it: + start_after = k + v = v.decode() + self.log.info("load_task_task: task={}".format(v)) + + try: + task = Task.from_json(v) + self.append_task(task) + except ValueError: + self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v)) + + if not it: + break + + except StopIteration: + pass + except rados.ObjectNotFound: + # rbd_task DNE + pass + + def append_task(self, task): + self.tasks_by_sequence[task.sequence] = task + self.tasks_by_id[task.task_id] = task + + def add_task(self, ioctx, message, refs): + self.log.debug("add_task: message={}, refs={}".format(message, refs)) + + # search for dups and return the original + for task_id, task in self.tasks_by_id.items(): + if task.refs == refs: + return json.dumps(task.to_dict()) + + # ensure unique uuid across all pools + while True: + task_id = str(uuid.uuid4()) + if task_id not in self.tasks_by_id: + break + + self.sequence += 1 + task = Task(self.sequence, task_id, message, refs) + + # add the task to the rbd_task omap + task_json = task.to_json() + omap_keys = (task.sequence_key, ) + omap_vals = (str.encode(task_json), ) + self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0])) + + with rados.WriteOpCtx() as write_op: + ioctx.set_omap(write_op, omap_keys, omap_vals) + ioctx.operate_write_op(write_op, RBD_TASK_OID) + self.append_task(task) + + self.condition.notify() + return task_json + + def remove_task(self, ioctx, task, remove_in_memory=True): + self.log.info("remove_task: task={}".format(str(task))) + omap_keys = (task.sequence_key, ) + try: + with rados.WriteOpCtx() as write_op: + ioctx.remove_omap_keys(write_op, omap_keys) + ioctx.operate_write_op(write_op, RBD_TASK_OID) + except rados.ObjectNotFound: + pass + + if remove_in_memory: + try: + del self.tasks_by_id[task.task_id] + del self.tasks_by_sequence[task.sequence] + except KeyError: + pass + + def execute_task(self, sequence): + task = self.tasks_by_sequence[sequence] + self.log.info("execute_task: task={}".format(str(task))) + + pool_valid = False + try: + with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: + pool_valid = True + + action = task.refs[TASK_REF_ACTION] + execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten, + TASK_REF_ACTION_REMOVE: self.execute_remove, + TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove, + TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute, + TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit, + TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort + }.get(action) + if not execute_fn: + self.log.error("Invalid task action: {}".format(action)) + else: + task.in_progress = True + self.in_progress_task = task + self.update_progress(task, 0) + + self.lock.release() + try: + execute_fn(ioctx, task) + + except rbd.OperationCanceled: + self.log.info("Operation canceled: task={}".format( + str(task))) + + finally: + self.lock.acquire() + + task.in_progress = False + self.in_progress_task = None + + self.complete_progress(task) + self.remove_task(ioctx, task) + + except rados.ObjectNotFound as e: + self.log.error("execute_task: {}".format(e)) + if pool_valid: + self.update_progress(task, 0) + else: + # pool DNE -- remove the task + self.complete_progress(task) + self.remove_task(ioctx, task) + + except (rados.Error, rbd.Error) as e: + self.log.error("execute_task: {}".format(e)) + self.update_progress(task, 0) + + finally: + task.in_progress = False + task.retry_time = datetime.now() + TASK_RETRY_INTERVAL + + def progress_callback(self, task, current, total): + progress = float(current) / float(total) + self.log.debug("progress_callback: task={}, progress={}".format( + str(task), progress)) + + # avoid deadlocking when a new command comes in during a progress callback + if not self.lock.acquire(False): + return 0 + + try: + if not self.in_progress_task or self.in_progress_task.canceled: + return -rbd.ECANCELED + self.in_progress_task.progress = progress + finally: + self.lock.release() + + self.throttled_update_progress(task, progress) + return 0 + + def execute_flatten(self, ioctx, task): + self.log.info("execute_flatten: task={}".format(str(task))) + + try: + with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image: + image.flatten(on_progress=partial(self.progress_callback, task)) + except rbd.InvalidArgument: + self.log.info("Image does not have parent: task={}".format(str(task))) + except rbd.ImageNotFound: + self.log.info("Image does not exist: task={}".format(str(task))) + + def execute_remove(self, ioctx, task): + self.log.info("execute_remove: task={}".format(str(task))) + + try: + rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + self.log.info("Image does not exist: task={}".format(str(task))) + + def execute_trash_remove(self, ioctx, task): + self.log.info("execute_trash_remove: task={}".format(str(task))) + + try: + rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + self.log.info("Image does not exist: task={}".format(str(task))) + + def execute_migration_execute(self, ioctx, task): + self.log.info("execute_migration_execute: task={}".format(str(task))) + + try: + rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + self.log.info("Image does not exist: task={}".format(str(task))) + except rbd.InvalidArgument: + self.log.info("Image is not migrating: task={}".format(str(task))) + + def execute_migration_commit(self, ioctx, task): + self.log.info("execute_migration_commit: task={}".format(str(task))) + + try: + rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + self.log.info("Image does not exist: task={}".format(str(task))) + except rbd.InvalidArgument: + self.log.info("Image is not migrating or migration not executed: task={}".format(str(task))) + + def execute_migration_abort(self, ioctx, task): + self.log.info("execute_migration_abort: task={}".format(str(task))) + + try: + rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME], + on_progress=partial(self.progress_callback, task)) + except rbd.ImageNotFound: + self.log.info("Image does not exist: task={}".format(str(task))) + except rbd.InvalidArgument: + self.log.info("Image is not migrating: task={}".format(str(task))) + + def complete_progress(self, task): + self.log.debug("complete_progress: task={}".format(str(task))) + try: + self.module.remote("progress", "complete", task.task_id) + except ImportError: + # progress module is disabled + pass + + def update_progress(self, task, progress): + self.log.debug("update_progress: task={}, progress={}".format(str(task), progress)) + try: + self.module.remote("progress", "update", task.task_id, + task.message, progress, + ["rbd_support"]) + except ImportError: + # progress module is disabled + pass + + @Throttle(timedelta(seconds=1)) + def throttled_update_progress(self, task, progress): + self.update_progress(task, progress) + + def queue_flatten(self, image_spec): + image_spec = self.extract_image_spec(image_spec) + self.log.info("queue_flatten: {}".format(image_spec)) + + with self.open_ioctx(image_spec) as ioctx: + with rbd.Image(ioctx, image_spec[2]) as image: + try: + image.parent_id() + except rbd.ImageNotFound: + raise rbd.ImageNotFound("Image {} does not have a parent".format( + self.format_image_spec(image_spec)), errno=errno.ENOENT) + + return 0, self.add_task(ioctx, + "Flattening image {}".format( + self.format_image_spec(image_spec)), + {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]}), "" + + def queue_remove(self, image_spec): + image_spec = self.extract_image_spec(image_spec) + self.log.info("queue_remove: {}".format(image_spec)) + + with self.open_ioctx(image_spec) as ioctx: + with rbd.Image(ioctx, image_spec[2]) as image: + if list(image.list_snaps()): + raise rbd.ImageBusy("Image {} has snapshots".format( + self.format_image_spec(image_spec)), errno=errno.EBUSY) + + return 0, self.add_task(ioctx, + "Removing image {}".format( + self.format_image_spec(image_spec)), + {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]}), "" + + def queue_trash_remove(self, image_id_spec): + image_id_spec = self.extract_image_spec(image_id_spec) + self.log.info("queue_trash_remove: {}".format(image_id_spec)) + + # verify that image exists in trash + with self.open_ioctx(image_id_spec) as ioctx: + rbd.RBD().trash_get(ioctx, image_id_spec[2]) + + return 0, self.add_task(ioctx, + "Removing image {} from trash".format( + self.format_image_spec(image_id_spec)), + {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE, + TASK_REF_POOL_NAME: image_id_spec[0], + TASK_REF_POOL_NAMESPACE: image_id_spec[1], + TASK_REF_IMAGE_ID: image_id_spec[2]}), "" + + def validate_image_migrating(self, ioctx, image_spec): + with rbd.Image(ioctx, image_spec[2]) as image: + features = image.features() + if features & rbd.RBD_FEATURE_MIGRATING == 0: + raise rbd.InvalidArgument("Image {} is not migrating".format( + self.format_image_spec(image_spec)), errno=errno.EINVAL) + + def resolve_pool_name(self, pool_id): + osd_map = self.module.get('osd_map') + for pool in osd_map['pools']: + if pool['pool'] == pool_id: + return pool['pool_name'] + return '' + + def queue_migration_execute(self, image_spec): + image_spec = self.extract_image_spec(image_spec) + self.log.info("queue_migration_execute: {}".format(image_spec)) + + with self.open_ioctx(image_spec) as ioctx: + self.validate_image_migrating(ioctx, image_spec) + + status = rbd.RBD().migration_status(ioctx, image_spec[2]) + if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED, + rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]: + raise rbd.InvalidArgument("Image {} is not in ready state".format( + self.format_image_spec(image_spec)), errno=errno.EINVAL) + + source_pool = self.resolve_pool_name(status['source_pool_id']) + dest_pool = self.resolve_pool_name(status['dest_pool_id']) + return 0, self.add_task(ioctx, + "Migrating image {} to {}".format( + self.format_image_spec((source_pool, + status['source_pool_namespace'], + status['source_image_name'])), + self.format_image_spec((dest_pool, + status['dest_pool_namespace'], + status['dest_image_name']))), + {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]}), "" + + def queue_migration_commit(self, image_spec): + image_spec = self.extract_image_spec(image_spec) + self.log.info("queue_migration_commit: {}".format(image_spec)) + + with self.open_ioctx(image_spec) as ioctx: + self.validate_image_migrating(ioctx, image_spec) + + status = rbd.RBD().migration_status(ioctx, image_spec[2]) + if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED: + raise rbd.InvalidArgument("Image {} has not completed migration".format( + self.format_image_spec(image_spec)), errno=errno.EINVAL) + + return 0, self.add_task(ioctx, + "Committing image migration for {}".format( + self.format_image_spec(image_spec)), + {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]}), "" + + def queue_migration_abort(self, image_spec): + image_spec = self.extract_image_spec(image_spec) + self.log.info("queue_migration_abort: {}".format(image_spec)) + + with self.open_ioctx(image_spec) as ioctx: + self.validate_image_migrating(ioctx, image_spec) + + return 0, self.add_task(ioctx, + "Aborting image migration for {}".format( + self.format_image_spec(image_spec)), + {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]}), "" + + def task_cancel(self, task_id): + self.log.info("task_cancel: {}".format(task_id)) + + if task_id not in self.tasks_by_id: + self.log.debug("tasks: {}".format(str(self.tasks_by_id))) + raise KeyError("No such task {}".format(task_id)) + + task = self.tasks_by_id[task_id] + task.cancel() + + remove_in_memory = True + if self.in_progress_task and self.in_progress_task.task_id == task_id: + self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task))) + remove_in_memory = False + + # complete any associated event in the progress module + self.complete_progress(task) + + # remove from rbd_task omap + with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], + task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: + self.remove_task(ioctx, task, remove_in_memory) + + return 0, "", "" + + def task_list(self, task_id): + self.log.info("task_list: {}".format(task_id)) + + if task_id: + if task_id not in self.tasks_by_id: + raise KeyError("No such task {}".format(task_id)) + + result = self.tasks_by_id[task_id].to_dict() + else: + result = [] + for sequence in sorted(self.tasks_by_sequence.keys()): + task = self.tasks_by_sequence[sequence] + result.append(task.to_dict()) + + return 0, json.dumps(result), "" + + def handle_command(self, inbuf, prefix, cmd): + with self.lock: + if prefix == 'add flatten': + return self.queue_flatten(cmd['image_spec']) + elif prefix == 'add remove': + return self.queue_remove(cmd['image_spec']) + elif prefix == 'add trash remove': + return self.queue_trash_remove(cmd['image_id_spec']) + elif prefix == 'add migration execute': + return self.queue_migration_execute(cmd['image_spec']) + elif prefix == 'add migration commit': + return self.queue_migration_commit(cmd['image_spec']) + elif prefix == 'add migration abort': + return self.queue_migration_abort(cmd['image_spec']) + elif prefix == 'cancel': + return self.task_cancel(cmd['task_id']) + elif prefix == 'list': + return self.task_list(cmd.get('task_id')) + + raise NotImplementedError(cmd['prefix']) + + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "rbd perf image stats " + "name=pool_spec,type=CephString,req=false " + "name=sort_by,type=CephChoices,strings=" + "write_ops|write_bytes|write_latency|" + "read_ops|read_bytes|read_latency," + "req=false ", + "desc": "Retrieve current RBD IO performance stats", + "perm": "r" + }, + { + "cmd": "rbd perf image counters " + "name=pool_spec,type=CephString,req=false " + "name=sort_by,type=CephChoices,strings=" + "write_ops|write_bytes|write_latency|" + "read_ops|read_bytes|read_latency," + "req=false ", + "desc": "Retrieve current RBD IO performance counters", + "perm": "r" + }, + { + "cmd": "rbd task add flatten " + "name=image_spec,type=CephString", + "desc": "Flatten a cloned image asynchronously in the background", + "perm": "w" + }, + { + "cmd": "rbd task add remove " + "name=image_spec,type=CephString", + "desc": "Remove an image asynchronously in the background", + "perm": "w" + }, + { + "cmd": "rbd task add trash remove " + "name=image_id_spec,type=CephString", + "desc": "Remove an image from the trash asynchronously in the background", + "perm": "w" + }, + { + "cmd": "rbd task add migration execute " + "name=image_spec,type=CephString", + "desc": "Execute an image migration asynchronously in the background", + "perm": "w" + }, + { + "cmd": "rbd task add migration commit " + "name=image_spec,type=CephString", + "desc": "Commit an executed migration asynchronously in the background", + "perm": "w" + }, + { + "cmd": "rbd task add migration abort " + "name=image_spec,type=CephString", + "desc": "Abort a prepared migration asynchronously in the background", + "perm": "w" + }, + { + "cmd": "rbd task cancel " + "name=task_id,type=CephString ", + "desc": "Cancel a pending or running asynchronous task", + "perm": "r" + }, + { + "cmd": "rbd task list " + "name=task_id,type=CephString,req=false ", + "desc": "List pending or running asynchronous tasks", + "perm": "r" + } + ] + MODULE_OPTIONS = [] + + perf = None + task = None + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.perf = PerfHandler(self) + self.task = TaskHandler(self) + + def handle_command(self, inbuf, cmd): + prefix = cmd['prefix'] + try: + try: + if prefix.startswith('rbd perf '): + return self.perf.handle_command(inbuf, prefix[9:], cmd) + elif prefix.startswith('rbd task '): + return self.task.handle_command(inbuf, prefix[9:], cmd) + + except Exception as ex: + # log the full traceback but don't send it to the CLI user + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + raise + + except rados.Error as ex: + return -ex.errno, "", str(ex) + except rbd.OSError as ex: + return -ex.errno, "", str(ex) + except rbd.Error as ex: + return -errno.EINVAL, "", str(ex) + except KeyError as ex: + return -errno.ENOENT, "", str(ex) + except ValueError as ex: + return -errno.EINVAL, "", str(ex) + + raise NotImplementedError(cmd['prefix']) -- 2.39.5