from collections import deque
from mgr_util import lock_timeout_log, CephfsClient
+from .operations.volume import list_volumes
from .exception import NotImplementedException
log = logging.getLogger(__name__)
log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
self.async_job.threads.remove(self)
return
+ timo = self.async_job.wakeup_timeout
+ if timo is not None:
+ vols = [e['name'] for e in list_volumes(self.vc.mgr)]
+ missing = set(vols) - set(self.async_job.q)
+ for m in missing:
+ self.async_job.jobs[m] = []
+ self.async_job.q.append(m)
vol_job = self.async_job.get_job()
if vol_job:
break
- self.async_job.cv.wait()
+ self.async_job.cv.wait(timeout=timo)
self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
# execute the job (outside lock)
def reset_cancel(self):
self.cancel_event.clear()
-
class AsyncJobs(threading.Thread):
"""
Class providing asynchronous execution of jobs via worker threads.
via `should_cancel()` lambda passed to `execute_job()`.
"""
+ # not made configurable on purpose
+ WAKEUP_TIMEOUT = 5.0
+
def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
self.vc = volume_client
self.name_pfx = name_pfx
# each async job group uses its own libcephfs connection (pool)
self.fs_client = CephfsClient(self.vc.mgr)
+ self.wakeup_timeout = None
self.threads = []
for i in range(self.nr_concurrent_jobs):
self.threads[-1].start()
self.start()
+ def set_wakeup_timeout(self):
+ with self.lock:
+ # not made configurable on purpose
+ self.wakeup_timeout = AsyncJobs.WAKEUP_TIMEOUT
+ self.cv.notifyAll()
+
+ def unset_wakeup_timeout(self):
+ with self.lock:
+ self.wakeup_timeout = None
+ self.cv.notifyAll()
+
def run(self):
log.debug("tick thread {} starting".format(self.name))
with lock_timeout_log(self.lock):
'snapshot_clone_delay',
type='int',
default=0,
- desc='Delay clone begin operation by snapshot_clone_delay seconds')
+ desc='Delay clone begin operation by snapshot_clone_delay seconds'),
+ Option(
+ 'periodic_async_work',
+ type='bool',
+ default=False,
+ desc='Periodically check for async work')
]
def __init__(self, *args, **kwargs):
# for mypy
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
+ self.periodic_async_work = False
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
elif opt['name'] == "snapshot_clone_delay":
self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
+ elif opt['name'] == "periodic_async_work":
+ if self.periodic_async_work:
+ self.vc.cloner.set_wakeup_timeout()
+ self.vc.purge_queue.set_wakeup_timeout()
+ else:
+ self.vc.cloner.unset_wakeup_timeout()
+ self.vc.purge_queue.unset_wakeup_timeout()
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")