while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
vol_job = None
try:
+ if not self.async_job.run_event.is_set():
+ log.debug('will wait for run_event to bet set. postponing '
+ 'getting jobs until then')
+ self.async_job.run_event.wait()
+ log.debug('run_event has been set, waiting is complete. '
+ 'proceeding to get jobs now')
+
# fetch next job to execute
with lock_timeout_log(self.async_job.lock):
while True:
self.stopping = threading.Event()
self.cancel_cv = threading.Condition(self.lock)
+
+ self.run_event = threading.Event()
+ # let async threads run by default
+ self.run_event.set()
+
self.nr_concurrent_jobs = nr_concurrent_jobs
self.name_pfx = name_pfx
# each async job group uses its own libcephfs connection (pool)
self.spawn_more_threads()
self.cv.wait(timeout=self.wakeup_timeout)
+ def pause(self):
+ self.run_event.clear()
+
+ log.debug('pause() cancelling ongoing jobs now and respective worker '
+ 'threads...')
+ self.cancel_all_jobs(update_queue=False)
+
+ # XXX: cancel_all_jobs() sets jobthread.cancel_event causing all ongoing
+ # jobs to cancel. But if there are no jobs (that is self.q is empty),
+ # cancel_all_jobs() will return without doing anything and
+ # jobthread.cancel_event won't be set. This results in future jobs to be
+ # executed even when config option to pause is already set. Similarly,
+ # when there's only 1 ongoing job, jobthread.cancel_event is set for it
+ # but not for other threads causing rest of threads to pick new jobs
+ # when they are queued.
+ # Therefore, set jobthread.cancel_event explicitly.
+ log.debug('pause() pausing rest of worker threads')
+ for t in self.threads:
+ # is_set(), although technically redundant, is called to emphasize
+ # that cancel_event might be set on some threads but might not be
+ # on others. this prevents removal of the call to set() below after
+ # the incomplete observation that cancel_event is already set on
+ # (some) threads.
+ if not t.cancel_event.is_set():
+ t.cancel_event.set()
+ log.debug('pause() all jobs cancelled and cancel_event have been set for '
+ 'all threads, queue and threads have been paused')
+
+ def resume(self):
+ if self.run_event.is_set():
+ log.debug('resume() no need to resume, run_event is already set.')
+ return
+
+ log.debug('resume() enabling worker threads')
+ for t in self.threads:
+ t.cancel_event.clear()
+
+ self.run_event.set()
+ log.debug('resume() run_event has been set, queue and threads have been'
+ ' resumed')
+
def shutdown(self):
self.stopping.set()
self.cancel_all_jobs()
nr_vols -= 1
for vol in to_remove:
log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
- self.q.remove(vol)
- self.jobs.pop(vol)
+ if vol in self.q:
+ self.q.remove(vol)
+ if vol in self.jobs:
+ self.jobs.pop(vol)
return next_job
def register_async_job(self, volname, job, thread_id):
self.jobs[volname].remove((job, thread_id))
cancelled = thread_id.should_cancel()
- thread_id.reset_cancel()
+ # don't clear cancel_event flag if queuing and threads have been paused
+ # (that is, run_event is not set).
+ if self.run_event.is_set():
+ thread_id.reset_cancel()
# wake up cancellation waiters if needed
if cancelled:
self.jobs[volname] = []
self.cv.notifyAll()
- def _cancel_jobs(self, volname):
+ def _cancel_jobs(self, volname, update_queue=True):
"""
cancel all jobs for the volume. do nothing is the no jobs are
executing for the given volume. this would wait until all jobs
try:
if volname not in self.q and volname not in self.jobs:
return
- self.q.remove(volname)
+
+ if update_queue:
+ self.q.remove(volname)
+
# cancel in-progress operation and wait until complete
for j in self.jobs[volname]:
j[1].cancel_job()
log.debug("waiting for {0} in-progress jobs for volume '{1}' to "
"cancel".format(len(self.jobs[volname]), volname))
self.cancel_cv.wait()
- self.jobs.pop(volname)
+
+ if update_queue:
+ self.jobs.pop(volname)
except (KeyError, ValueError):
pass
with lock_timeout_log(self.lock):
self._cancel_jobs(volname)
- def cancel_all_jobs(self):
+ def cancel_all_jobs(self, update_queue=True):
"""
call all executing jobs for all volumes.
"""
with lock_timeout_log(self.lock):
for volname in list(self.q):
- self._cancel_jobs(volname)
+ self._cancel_jobs(volname, update_queue=update_queue)
def get_next_job(self, volname, running_jobs):
"""
'snapshot_clone_no_wait',
type='bool',
default=True,
- desc='Reject subvolume clone request when cloner threads are busy')
+ desc='Reject subvolume clone request when cloner threads are busy'),
+ Option(
+ 'pause_purging',
+ type='bool',
+ default=False,
+ desc='Pause asynchronous subvolume purge threads'),
+ Option(
+ 'pause_cloning',
+ type='bool',
+ default=False,
+ desc='Pause asynchronous cloner threads')
]
def __init__(self, *args, **kwargs):
self.snapshot_clone_delay = None
self.periodic_async_work = False
self.snapshot_clone_no_wait = None
+ self.pause_purging = False
+ self.pause_cloning = False
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
self.vc.purge_queue.unset_wakeup_timeout()
elif opt['name'] == "snapshot_clone_no_wait":
self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
+ elif opt['name'] == "pause_purging":
+ if self.pause_purging:
+ self.vc.purge_queue.pause()
+ else:
+ self.vc.purge_queue.resume()
+ elif opt['name'] == "pause_cloning":
+ if self.pause_cloning:
+ self.vc.cloner.pause()
+ else:
+ self.vc.cloner.resume()
+
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")