thread_id.reset_cancel()
# wake up cancellation waiters if needed
- if not self.jobs[volname] and cancelled:
+ if cancelled:
logging.info("waking up cancellation waiters")
self.cancel_cv.notifyAll()
except (KeyError, ValueError):
pass
+ def _cancel_job(self, volname, job):
+ """
+ cancel a executing job for a given volume. return True if canceled, False
+ otherwise (volume/job not found).
+ """
+ canceled = False
+ log.info("canceling job {0} for volume {1}".format(job, volname))
+ try:
+ if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
+ return canceled
+ for j in self.jobs[volname]:
+ if j[0] == job:
+ j[1].cancel_job()
+ # be safe against _cancel_jobs() running concurrently
+ while j in self.jobs.get(volname, []):
+ self.cancel_cv.wait()
+ canceled = True
+ break
+ except (KeyError, ValueError):
+ pass
+ return canceled
+
+ def cancel_job(self, volname, job):
+ with self.lock:
+ return self._cancel_job(volname, job)
+
def cancel_jobs(self, volname):
"""
cancel all executing jobs for a given volume.