]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: handle duplicate rbd task commands
authorJason Dillaman <dillaman@redhat.com>
Thu, 25 Jul 2019 16:36:19 +0000 (12:36 -0400)
committerJason Dillaman <dillaman@redhat.com>
Sun, 18 Aug 2019 20:50:11 +0000 (16:50 -0400)
The 'ceph' CLI will duplicate commands within teuthology to test
the MONs idempotency. This shouldn't be required for the MGR module,
but we can keep a fixed set of completed tests to handle this
possible command replay.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 9bdc376c5505aa97532ea15ff6286db9beafd1ec)

src/pybind/mgr/rbd_support/module.py

index a95377ff82d4d442ec5a532a380c961c596c79c2..bd18ea603d45e32d91ac251d44766c2dcc62e9b5 100644 (file)
@@ -79,6 +79,7 @@ VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
                       TASK_REF_ACTION_MIGRATION_ABORT]
 
 TASK_RETRY_INTERVAL = timedelta(seconds=30)
+MAX_COMPLETED_TASKS = 50
 
 
 def extract_pool_key(pool_spec):
@@ -597,6 +598,8 @@ class TaskHandler:
     tasks_by_sequence = dict()
     tasks_by_id = dict()
 
+    completed_tasks = []
+
     sequence = 0
 
     def __init__(self, module):
@@ -722,13 +725,21 @@ class TaskHandler:
         self.tasks_by_sequence[task.sequence] = task
         self.tasks_by_id[task.task_id] = task
 
-    def add_task(self, ioctx, message, refs):
-        self.log.debug("add_task: message={}, refs={}".format(message, refs))
+    def find_task(self, refs):
+        self.log.debug("find_task: refs={}".format(refs))
 
         # search for dups and return the original
         for task_id, task in self.tasks_by_id.items():
             if task.refs == refs:
-                return json.dumps(task.to_dict())
+                return task
+
+        # search for a completed task (message replay)
+        for task in self.completed_tasks:
+            if task.refs == refs:
+                return task
+
+    def add_task(self, ioctx, message, refs):
+        self.log.debug("add_task: message={}, refs={}".format(message, refs))
 
         # ensure unique uuid across all pools
         while True:
@@ -767,6 +778,13 @@ class TaskHandler:
             try:
                 del self.tasks_by_id[task.task_id]
                 del self.tasks_by_sequence[task.sequence]
+
+                # keep a record of the last N tasks to help avoid command replay
+                # races
+                if not task.failed and not task.canceled:
+                    self.completed_tasks.append(task)
+                    self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
+
             except KeyError:
                 pass
 
@@ -952,6 +970,14 @@ class TaskHandler:
         image_spec = self.extract_image_spec(image_spec)
         self.log.info("queue_flatten: {}".format(image_spec))
 
+        refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
+                TASK_REF_POOL_NAME: image_spec[0],
+                TASK_REF_POOL_NAMESPACE: image_spec[1],
+                TASK_REF_IMAGE_NAME: image_spec[2]}
+        task = self.find_task(refs)
+        if task:
+            return 0, task.to_json(), ''
+
         with self.open_ioctx(image_spec) as ioctx:
             with rbd.Image(ioctx, image_spec[2]) as image:
                 try:
@@ -972,6 +998,14 @@ class TaskHandler:
         image_spec = self.extract_image_spec(image_spec)
         self.log.info("queue_remove: {}".format(image_spec))
 
+        refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
+                TASK_REF_POOL_NAME: image_spec[0],
+                TASK_REF_POOL_NAMESPACE: image_spec[1],
+                TASK_REF_IMAGE_NAME: image_spec[2]}
+        task = self.find_task(refs)
+        if task:
+            return 0, task.to_json(), ''
+
         with self.open_ioctx(image_spec) as ioctx:
             with rbd.Image(ioctx, image_spec[2]) as image:
                 if list(image.list_snaps()):
@@ -981,15 +1015,20 @@ class TaskHandler:
             return 0, self.add_task(ioctx,
                                     "Removing image {}".format(
                                         self.format_image_spec(image_spec)),
-                                    {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
-                                     TASK_REF_POOL_NAME: image_spec[0],
-                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
-                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+                                    refs), ''
 
     def queue_trash_remove(self, image_id_spec):
         image_id_spec = self.extract_image_spec(image_id_spec)
         self.log.info("queue_trash_remove: {}".format(image_id_spec))
 
+        refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
+                TASK_REF_POOL_NAME: image_id_spec[0],
+                TASK_REF_POOL_NAMESPACE: image_id_spec[1],
+                TASK_REF_IMAGE_ID: image_id_spec[2]}
+        task = self.find_task(refs)
+        if task:
+            return 0, task.to_json(), ''
+
         # verify that image exists in trash
         with self.open_ioctx(image_id_spec) as ioctx:
             rbd.RBD().trash_get(ioctx, image_id_spec[2])
@@ -997,10 +1036,7 @@ class TaskHandler:
             return 0, self.add_task(ioctx,
                                     "Removing image {} from trash".format(
                                         self.format_image_spec(image_id_spec)),
-                                    {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
-                                     TASK_REF_POOL_NAME: image_id_spec[0],
-                                     TASK_REF_POOL_NAMESPACE: image_id_spec[1],
-                                     TASK_REF_IMAGE_ID: image_id_spec[2]}), ""
+                                    refs), ''
 
     def validate_image_migrating(self, ioctx, image_spec):
         with rbd.Image(ioctx, image_spec[2]) as image:
@@ -1020,6 +1056,14 @@ class TaskHandler:
         image_spec = self.extract_image_spec(image_spec)
         self.log.info("queue_migration_execute: {}".format(image_spec))
 
+        refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
+                TASK_REF_POOL_NAME: image_spec[0],
+                TASK_REF_POOL_NAMESPACE: image_spec[1],
+                TASK_REF_IMAGE_NAME: image_spec[2]}
+        task = self.find_task(refs)
+        if task:
+            return 0, task.to_json(), ''
+
         with self.open_ioctx(image_spec) as ioctx:
             self.validate_image_migrating(ioctx, image_spec)
 
@@ -1039,15 +1083,20 @@ class TaskHandler:
                                         self.format_image_spec((dest_pool,
                                                                 status['dest_pool_namespace'],
                                                                 status['dest_image_name']))),
-                                    {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
-                                     TASK_REF_POOL_NAME: image_spec[0],
-                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
-                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+                                    refs), ''
 
     def queue_migration_commit(self, image_spec):
         image_spec = self.extract_image_spec(image_spec)
         self.log.info("queue_migration_commit: {}".format(image_spec))
 
+        refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
+                TASK_REF_POOL_NAME: image_spec[0],
+                TASK_REF_POOL_NAMESPACE: image_spec[1],
+                TASK_REF_IMAGE_NAME: image_spec[2]}
+        task = self.find_task(refs)
+        if task:
+            return 0, task.to_json(), ''
+
         with self.open_ioctx(image_spec) as ioctx:
             self.validate_image_migrating(ioctx, image_spec)
 
@@ -1059,25 +1108,27 @@ class TaskHandler:
             return 0, self.add_task(ioctx,
                                     "Committing image migration for {}".format(
                                         self.format_image_spec(image_spec)),
-                                    {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
-                                     TASK_REF_POOL_NAME: image_spec[0],
-                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
-                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+                                    refs), ''
 
     def queue_migration_abort(self, image_spec):
         image_spec = self.extract_image_spec(image_spec)
         self.log.info("queue_migration_abort: {}".format(image_spec))
 
+        refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
+                TASK_REF_POOL_NAME: image_spec[0],
+                TASK_REF_POOL_NAMESPACE: image_spec[1],
+                TASK_REF_IMAGE_NAME: image_spec[2]}
+        task = self.find_task(refs)
+        if task:
+            return 0, task.to_json(), ''
+
         with self.open_ioctx(image_spec) as ioctx:
             self.validate_image_migrating(ioctx, image_spec)
 
             return 0, self.add_task(ioctx,
                                     "Aborting image migration for {}".format(
                                         self.format_image_spec(image_spec)),
-                                    {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
-                                     TASK_REF_POOL_NAME: image_spec[0],
-                                     TASK_REF_POOL_NAMESPACE: image_spec[1],
-                                     TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+                                    refs), ''
 
     def task_cancel(self, task_id):
         self.log.info("task_cancel: {}".format(task_id))