log = logging.getLogger(__name__)
+
class JobThread(threading.Thread):
# this is "not" configurable and there is no need for it to be
# configurable. if a thread encounters an exception, we retry
# unless the jobs fetching and execution routines are not implemented
# retry till we hit cap limit.
retries += 1
- log.warning("thread [{0}] encountered fatal error: (attempt#" \
+ log.warning("thread [{0}] encountered fatal error: (attempt#"
" {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
exc_type, exc_value, exc_traceback = sys.exc_info()
log.warning("traceback: {0}".format("".join(
def reset_cancel(self):
self.cancel_event.clear()
+
class AsyncJobs(threading.Thread):
"""
Class providing asynchronous execution of jobs via worker threads.
threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
self.vc = volume_client
# queue of volumes for starting async jobs
- self.q = deque() # type: deque
+ self.q = deque() # type: deque
# volume => job tracking
self.jobs = {}
# lock, cv for kickstarting jobs
"""
log.info("queuing job for volume '{0}'".format(volname))
with lock_timeout_log(self.lock):
- if not volname in self.q:
+ if volname not in self.q:
self.q.append(volname)
self.jobs[volname] = []
self.cv.notifyAll()
"""
log.info("cancelling jobs for volume '{0}'".format(volname))
try:
- if not volname in self.q and not volname in self.jobs:
+ if volname not in self.q and volname not in self.jobs:
return
self.q.remove(volname)
# cancel in-progress operation and wait until complete
j[1].cancel_job()
# wait for cancellation to complete
while self.jobs[volname]:
- log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
+ log.debug("waiting for {0} in-progress jobs for volume '{1}' to "
"cancel".format(len(self.jobs[volname]), volname))
self.cancel_cv.wait()
self.jobs.pop(volname)
log.info("canceling job {0} for volume {1}".format(job, volname))
try:
vol_jobs = [j[0] for j in self.jobs.get(volname, [])]
- if not volname in self.q and not job in vol_jobs:
+ if volname not in self.q and job not in vol_jobs:
return canceled
for j in self.jobs[volname]:
if j[0] == job:
hours and do all kinds of synchronous work. called outside `self.lock`.
"""
raise NotImplementedException()
-