]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: allow canceling a single asynchronous job for a volume
authorVenky Shankar <vshankar@redhat.com>
Tue, 14 Jan 2020 09:19:42 +0000 (04:19 -0500)
committerRamana Raja <rraja@redhat.com>
Wed, 18 Mar 2020 05:33:06 +0000 (11:03 +0530)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/pybind/mgr/volumes/fs/async_job.py

index bb07fda6e3322f7e898f6a90037b363543cbb68b..3bdedb723b9ce4a0c7b60fd4663b31ae0ea048bc 100644 (file)
@@ -156,7 +156,7 @@ class AsyncJobs(object):
         thread_id.reset_cancel()
 
         # wake up cancellation waiters if needed
-        if not self.jobs[volname] and cancelled:
+        if cancelled:
             logging.info("waking up cancellation waiters")
             self.cancel_cv.notifyAll()
 
@@ -194,6 +194,32 @@ class AsyncJobs(object):
         except (KeyError, ValueError):
             pass
 
+    def _cancel_job(self, volname, job):
+        """
+        cancel a executing job for a given volume. return True if canceled, False
+        otherwise (volume/job not found).
+        """
+        canceled = False
+        log.info("canceling job {0} for volume {1}".format(job, volname))
+        try:
+            if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
+                return canceled
+            for j in self.jobs[volname]:
+                if j[0] == job:
+                    j[1].cancel_job()
+                    # be safe against _cancel_jobs() running concurrently
+                    while j in self.jobs.get(volname, []):
+                        self.cancel_cv.wait()
+                    canceled = True
+                    break
+        except (KeyError, ValueError):
+            pass
+        return canceled
+
+    def cancel_job(self, volname, job):
+        with self.lock:
+            return self._cancel_job(volname, job)
+
     def cancel_jobs(self, volname):
         """
         cancel all executing jobs for a given volume.