def reset_cancel(self):
self.cancel_event.clear()
-class AsyncJobs(object):
+class AsyncJobs(threading.Thread):
"""
Class providing asynchronous execution of jobs via worker threads.
`jobs` are grouped by `volume`, so a `volume` can have N number of
"""
def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
+ threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
self.vc = volume_client
# queue of volumes for starting async jobs
self.q = deque()
self.cv = threading.Condition(self.lock)
# cv for job cancelation
self.waiting = False
+ self.stopping = threading.Event()
self.cancel_cv = threading.Condition(self.lock)
self.nr_concurrent_jobs = nr_concurrent_jobs
+ self.name_pfx = name_pfx
self.threads = []
- for i in range(nr_concurrent_jobs):
- self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
+ for i in range(self.nr_concurrent_jobs):
+ self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
self.threads[-1].start()
+ self.start()
- def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+ def run(self):
+ log.debug("tick thread {} starting".format(self.name))
+ with self.lock:
+ while not self.stopping.is_set():
+ c = len(self.threads)
+ if c > self.nr_concurrent_jobs:
+ # Decrease concurrency: notify threads which are waiting for a job to terminate.
+ log.debug("waking threads to terminate due to job reduction")
+ self.cv.notifyAll()
+ elif c < self.nr_concurrent_jobs:
+ # Increase concurrency: create more threads.
+ log.debug("creating new threads to job increase")
+ for i in range(c, self.nr_concurrent_jobs):
+ self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
+ self.threads[-1].start()
+ self.cv.wait(timeout=5)
+
+ def shutdown(self):
+ self.stopping.set()
+ self.cancel_all_jobs()
+ with self.lock:
+ self.cv.notifyAll()
+ self.join()
+
+ def reconfigure_max_async_threads(self, nr_concurrent_jobs):
"""
reconfigure number of cloner threads
"""
- with self.lock:
- self.nr_concurrent_jobs = nr_concurrent_jobs
- # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
- if len(self.threads) > nr_concurrent_jobs:
- self.cv.notifyAll()
- # Increase in concurrency
- if len(self.threads) < nr_concurrent_jobs:
- for i in range(len(self.threads), nr_concurrent_jobs):
- self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
- self.threads[-1].start()
+ self.nr_concurrent_jobs = nr_concurrent_jobs
def get_job(self):
log.debug("processing {0} volume entries".format(len(self.q)))
log.info("shutting down")
# first, note that we're shutting down
self.stopping.set()
- # second, ask purge threads to quit
- self.purge_queue.cancel_all_jobs()
- # third, delete all libcephfs handles from connection pool
+ # stop clones
+ self.cloner.shutdown()
+ # stop purge threads
+ self.purge_queue.shutdown()
+ # last, delete all libcephfs handles from connection pool
self.connection_pool.del_all_handles()
def cluster_log(self, msg, lvl=None):