From: Jason Dillaman Date: Thu, 25 Jul 2019 16:36:19 +0000 (-0400) Subject: pybind/mgr: handle duplicate rbd task commands X-Git-Tag: v14.2.3~13^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=db0f4094599e0985a1a8db1e4abcd25acc116c89;p=ceph.git pybind/mgr: handle duplicate rbd task commands The 'ceph' CLI will duplicate commands within teuthology to test the MONs idempotency. This shouldn't be required for the MGR module, but we can keep a fixed set of completed tests to handle this possible command replay. Signed-off-by: Jason Dillaman (cherry picked from commit 9bdc376c5505aa97532ea15ff6286db9beafd1ec) --- diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py index a95377ff82d4d..bd18ea603d45e 100644 --- a/src/pybind/mgr/rbd_support/module.py +++ b/src/pybind/mgr/rbd_support/module.py @@ -79,6 +79,7 @@ VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN, TASK_REF_ACTION_MIGRATION_ABORT] TASK_RETRY_INTERVAL = timedelta(seconds=30) +MAX_COMPLETED_TASKS = 50 def extract_pool_key(pool_spec): @@ -597,6 +598,8 @@ class TaskHandler: tasks_by_sequence = dict() tasks_by_id = dict() + completed_tasks = [] + sequence = 0 def __init__(self, module): @@ -722,13 +725,21 @@ class TaskHandler: self.tasks_by_sequence[task.sequence] = task self.tasks_by_id[task.task_id] = task - def add_task(self, ioctx, message, refs): - self.log.debug("add_task: message={}, refs={}".format(message, refs)) + def find_task(self, refs): + self.log.debug("find_task: refs={}".format(refs)) # search for dups and return the original for task_id, task in self.tasks_by_id.items(): if task.refs == refs: - return json.dumps(task.to_dict()) + return task + + # search for a completed task (message replay) + for task in self.completed_tasks: + if task.refs == refs: + return task + + def add_task(self, ioctx, message, refs): + self.log.debug("add_task: message={}, refs={}".format(message, refs)) # ensure unique uuid across all pools while True: @@ -767,6 +778,13 @@ class TaskHandler: try: del self.tasks_by_id[task.task_id] del self.tasks_by_sequence[task.sequence] + + # keep a record of the last N tasks to help avoid command replay + # races + if not task.failed and not task.canceled: + self.completed_tasks.append(task) + self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:] + except KeyError: pass @@ -952,6 +970,14 @@ class TaskHandler: image_spec = self.extract_image_spec(image_spec) self.log.info("queue_flatten: {}".format(image_spec)) + refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + with self.open_ioctx(image_spec) as ioctx: with rbd.Image(ioctx, image_spec[2]) as image: try: @@ -972,6 +998,14 @@ class TaskHandler: image_spec = self.extract_image_spec(image_spec) self.log.info("queue_remove: {}".format(image_spec)) + refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + with self.open_ioctx(image_spec) as ioctx: with rbd.Image(ioctx, image_spec[2]) as image: if list(image.list_snaps()): @@ -981,15 +1015,20 @@ class TaskHandler: return 0, self.add_task(ioctx, "Removing image {}".format( self.format_image_spec(image_spec)), - {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE, - TASK_REF_POOL_NAME: image_spec[0], - TASK_REF_POOL_NAMESPACE: image_spec[1], - TASK_REF_IMAGE_NAME: image_spec[2]}), "" + refs), '' def queue_trash_remove(self, image_id_spec): image_id_spec = self.extract_image_spec(image_id_spec) self.log.info("queue_trash_remove: {}".format(image_id_spec)) + refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE, + TASK_REF_POOL_NAME: image_id_spec[0], + TASK_REF_POOL_NAMESPACE: image_id_spec[1], + TASK_REF_IMAGE_ID: image_id_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + # verify that image exists in trash with self.open_ioctx(image_id_spec) as ioctx: rbd.RBD().trash_get(ioctx, image_id_spec[2]) @@ -997,10 +1036,7 @@ class TaskHandler: return 0, self.add_task(ioctx, "Removing image {} from trash".format( self.format_image_spec(image_id_spec)), - {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE, - TASK_REF_POOL_NAME: image_id_spec[0], - TASK_REF_POOL_NAMESPACE: image_id_spec[1], - TASK_REF_IMAGE_ID: image_id_spec[2]}), "" + refs), '' def validate_image_migrating(self, ioctx, image_spec): with rbd.Image(ioctx, image_spec[2]) as image: @@ -1020,6 +1056,14 @@ class TaskHandler: image_spec = self.extract_image_spec(image_spec) self.log.info("queue_migration_execute: {}".format(image_spec)) + refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + with self.open_ioctx(image_spec) as ioctx: self.validate_image_migrating(ioctx, image_spec) @@ -1039,15 +1083,20 @@ class TaskHandler: self.format_image_spec((dest_pool, status['dest_pool_namespace'], status['dest_image_name']))), - {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE, - TASK_REF_POOL_NAME: image_spec[0], - TASK_REF_POOL_NAMESPACE: image_spec[1], - TASK_REF_IMAGE_NAME: image_spec[2]}), "" + refs), '' def queue_migration_commit(self, image_spec): image_spec = self.extract_image_spec(image_spec) self.log.info("queue_migration_commit: {}".format(image_spec)) + refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + with self.open_ioctx(image_spec) as ioctx: self.validate_image_migrating(ioctx, image_spec) @@ -1059,25 +1108,27 @@ class TaskHandler: return 0, self.add_task(ioctx, "Committing image migration for {}".format( self.format_image_spec(image_spec)), - {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT, - TASK_REF_POOL_NAME: image_spec[0], - TASK_REF_POOL_NAMESPACE: image_spec[1], - TASK_REF_IMAGE_NAME: image_spec[2]}), "" + refs), '' def queue_migration_abort(self, image_spec): image_spec = self.extract_image_spec(image_spec) self.log.info("queue_migration_abort: {}".format(image_spec)) + refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT, + TASK_REF_POOL_NAME: image_spec[0], + TASK_REF_POOL_NAMESPACE: image_spec[1], + TASK_REF_IMAGE_NAME: image_spec[2]} + task = self.find_task(refs) + if task: + return 0, task.to_json(), '' + with self.open_ioctx(image_spec) as ioctx: self.validate_image_migrating(ioctx, image_spec) return 0, self.add_task(ioctx, "Aborting image migration for {}".format( self.format_image_spec(image_spec)), - {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT, - TASK_REF_POOL_NAME: image_spec[0], - TASK_REF_POOL_NAMESPACE: image_spec[1], - TASK_REF_IMAGE_NAME: image_spec[2]}), "" + refs), '' def task_cancel(self, task_id): self.log.info("task_cancel: {}".format(task_id))