From: Venky Shankar Date: Wed, 4 Dec 2019 04:49:12 +0000 (-0500) Subject: mgr/volumes: purge thread uses new async interface X-Git-Tag: v14.2.8~49^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b05daa5519e97ecf7197654b098adeac3200d51b;p=ceph.git mgr/volumes: purge thread uses new async interface This also makes `_cancel_jobs()` thread safe, which was not the case earlier (with `_cancel_purge_job()`) -- this also makes the code simpler by sharing the lock betweent two condition variables. Signed-off-by: Venky Shankar (cherry picked from commit f16cc1e8eebb0868a1e07c25f8d8e4656b11b7bf) --- diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py new file mode 100644 index 000000000000..b47b45ca87bf --- /dev/null +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -0,0 +1,223 @@ +import sys +import time +import logging +import threading +import traceback +from collections import deque + +from .exception import NotImplementedException + +log = logging.getLogger(__name__) + +class JobThread(threading.Thread): + # this is "not" configurable and there is no need for it to be + # configurable. if a thread encounters an exception, we retry + # until it hits this many consecutive exceptions. + MAX_RETRIES_ON_EXCEPTION = 10 + + def __init__(self, async_job, volume_client, name): + self.vc = volume_client + self.async_job = async_job + # event object to cancel jobs + self.cancel_event = threading.Event() + threading.Thread.__init__(self, name=name) + + def run(self): + retries = 0 + thread_id = threading.currentThread() + thread_name = thread_id.getName() + + while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: + try: + # fetch next job to execute + with self.async_job.lock: + while True: + vol_job = self.async_job.get_job() + if vol_job: + break + self.async_job.cv.wait() + self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) + + # execute the job (outside lock) + self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel()) + + # when done, unregister the job + with self.async_job.lock: + self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id) + retries = 0 + except NotImplementedException: + raise + except Exception: + # unless the jobs fetching and execution routines are not implemented + # retry till we hit cap limit. + retries += 1 + log.warning("thread [{0}] encountered fatal error: (attempt#" \ + " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION)) + exc_type, exc_value, exc_traceback = sys.exc_info() + log.warning("traceback: {0}".format("".join( + traceback.format_exception(exc_type, exc_value, exc_traceback)))) + time.sleep(1) + log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) + self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) + + def cancel_job(self): + self.cancel_event.set() + + def should_cancel(self): + return self.cancel_event.isSet() + + def reset_cancel(self): + self.cancel_event.clear() + +class AsyncJobs(object): + """ + Class providing asynchronous execution of jobs via worker threads. + `jobs` are grouped by `volume`, so a `volume` can have N number of + `jobs` executing concurrently (capped by number of concurrent jobs). + + Usability is simple: subclass this and implement the following: + - get_next_job(volname, running_jobs) + - execute_job(volname, job, should_cancel) + + ... and do not forget to invoke base class constructor. + + Job cancelation is for a volume as a whole, i.e., all executing jobs + for a volume are canceled. Cancelation is poll based -- jobs need to + periodically check if cancelation is requested, after which the job + should return as soon as possible. Cancelation check is provided + via `should_cancel()` lambda passed to `execute_job()`. + """ + + def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): + self.vc = volume_client + # queue of volumes for starting async jobs + self.q = deque() + # volume => job tracking + self.jobs = {} + # lock, cv for kickstarting jobs + self.lock = threading.Lock() + self.cv = threading.Condition(self.lock) + # cv for job cancelation + self.waiting = False + self.cancel_cv = threading.Condition(self.lock) + + self.threads = [] + for i in range(nr_concurrent_jobs): + self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) + self.threads[-1].start() + + def get_job(self): + log.debug("processing {0} volume entries".format(len(self.q))) + nr_vols = len(self.q) + to_remove = [] + next_job = None + while nr_vols > 0: + volname = self.q[0] + # do this now so that the other thread pick up jobs for other volumes + self.q.rotate(1) + running_jobs = [j[0] for j in self.jobs[volname]] + (ret, job) = self.get_next_job(volname, running_jobs) + if job: + next_job = (volname, job) + break + # this is an optimization when for a given volume there are no more + # jobs and no jobs are in progress. in such cases we remove the volume + # from the tracking list so as to: + # + # a. not query the filesystem for jobs over and over again + # b. keep the filesystem connection idle so that it can be freed + # from the connection pool + # + # if at all there are jobs for a volume, the volume gets added again + # to the tracking list and the jobs get kickstarted. + # note that, we do not iterate the volume list fully if there is a + # jobs to process (that will take place eventually). + if ret == 0 and not job and not running_jobs: + to_remove.append(volname) + nr_vols -= 1 + for vol in to_remove: + log.debug("auto removing volume '{0}' from tracked volumes".format(vol)) + self.q.remove(vol) + self.jobs.pop(vol) + return next_job + + def register_async_job(self, volname, job, thread_id): + log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id)) + self.jobs[volname].append((job, thread_id)) + + def unregister_async_job(self, volname, job, thread_id): + log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id)) + self.jobs[volname].remove((job, thread_id)) + + cancelled = thread_id.should_cancel() + thread_id.reset_cancel() + + # wake up cancellation waiters if needed + if not self.jobs[volname] and cancelled: + logging.info("waking up cancellation waiters") + self.cancel_cv.notifyAll() + + def queue_job(self, volname): + """ + queue a volume for asynchronous job execution. + """ + log.info("queuing job for volume '{0}'".format(volname)) + with self.lock: + if not volname in self.q: + self.q.append(volname) + self.jobs[volname] = [] + self.cv.notifyAll() + + def _cancel_jobs(self, volname): + """ + cancel all jobs for the volume. do nothing is the no jobs are + executing for the given volume. this would wait until all jobs + get interrupted and finish execution. + """ + log.info("cancelling jobs for volume '{0}'".format(volname)) + try: + if not volname in self.q and not volname in self.jobs: + return + self.q.remove(volname) + # cancel in-progress operation and wait until complete + for j in self.jobs[volname]: + j[1].cancel_job() + # wait for cancellation to complete + while self.jobs[volname]: + log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \ + "cancel".format(len(self.jobs[volname]), volname)) + self.cancel_cv.wait() + self.jobs.pop(volname) + except (KeyError, ValueError): + pass + + def cancel_jobs(self, volname): + """ + cancel all executing jobs for a given volume. + """ + with self.lock: + self._cancel_jobs(volname) + + def cancel_all_jobs(self): + """ + call all executing jobs for all volumes. + """ + with self.lock: + for volname in list(self.q): + self._cancel_jobs(volname) + + def get_next_job(self, volname, running_jobs): + """ + get the next job for asynchronous execution as (retcode, job) tuple. if no + jobs are available return (0, None) else return (0, job). on error return + (-ret, None). called under `self.lock`. + """ + raise NotImplementedException() + + def execute_job(self, volname, job, should_cancel): + """ + execute a job for a volume. the job can block on I/O operations, sleep for long + hours and do all kinds of synchronous work. called outside `self.lock`. + """ + raise NotImplementedException() + diff --git a/src/pybind/mgr/volumes/fs/exception.py b/src/pybind/mgr/volumes/fs/exception.py index 841b35202eec..de0b19109f35 100644 --- a/src/pybind/mgr/volumes/fs/exception.py +++ b/src/pybind/mgr/volumes/fs/exception.py @@ -32,3 +32,6 @@ class OpSmException(Exception): def __str__(self): return "{0} ({1})".format(self.errno, self.error_str) + +class NotImplementedException(Exception): + pass diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py index 7c079250d6e2..922c8f002768 100644 --- a/src/pybind/mgr/volumes/fs/purge_queue.py +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -1,176 +1,46 @@ -import sys -import time import errno import logging -import threading -import traceback -from collections import deque +from .async_job import AsyncJobs from .exception import VolumeException from .operations.volume import open_volume, open_volume_lockless from .operations.trash import open_trashcan log = logging.getLogger(__name__) -class PurgeQueueBase(object): - """ - Base class for implementing purge queue strategies. - """ - - # this is "not" configurable and there is no need for it to be - # configurable. if a purge thread encounters an exception, we - # retry, till it hits this many consecutive exceptions after - # which a warning is sent to `ceph status`. - MAX_RETRIES_ON_EXCEPTION = 10 - - class PurgeThread(threading.Thread): - def __init__(self, volume_client, name, purge_fn): - self.vc = volume_client - self.purge_fn = purge_fn - # event object to cancel ongoing purge - self.cancel_event = threading.Event() - threading.Thread.__init__(self, name=name) - - def run(self): - retries = 0 - thread_name = threading.currentThread().getName() - while retries < PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION: - try: - self.purge_fn() - retries = 0 - except Exception: - retries += 1 - log.warning("purge thread [{0}] encountered fatal error: (attempt#" \ - " {1}/{2})".format(thread_name, retries, - PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION)) - exc_type, exc_value, exc_traceback = sys.exc_info() - log.warning("traceback: {0}".format("".join( - traceback.format_exception(exc_type, exc_value, exc_traceback)))) - time.sleep(1) - log.error("purge thread [{0}] reached exception limit, bailing out...".format(thread_name)) - self.vc.cluster_log("purge thread {0} bailing out due to exception".format(thread_name)) - - def cancel_job(self): - self.cancel_event.set() - - def should_cancel(self): - return self.cancel_event.isSet() - - def reset_cancel(self): - self.cancel_event.clear() - - def __init__(self, volume_client): - self.vc = volume_client - # volumes whose subvolumes need to be purged - self.q = deque() - # job tracking - self.jobs = {} - # lock, cv for kickstarting purge - self.lock = threading.Lock() - self.cv = threading.Condition(self.lock) - # lock, cv for purge cancellation - self.waiting = False - self.c_lock = threading.Lock() - self.c_cv = threading.Condition(self.c_lock) - - def queue_purge_job(self, volname): - with self.lock: - if not self.q.count(volname): - self.q.append(volname) - self.jobs[volname] = [] - self.cv.notifyAll() - - def _cancel_purge_job(self, volname): - log.info("cancelling purge jobs for volume '{0}'".format(volname)) - locked = True - try: - if not self.q.count(volname): - return - self.q.remove(volname) - if not self.jobs.get(volname, []): - return - # cancel in-progress purge operation and wait until complete - for j in self.jobs[volname]: - j[1].cancel_job() - # wait for cancellation to complete - with self.c_lock: - locked = False - self.waiting = True - self.lock.release() - while self.waiting: - log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \ - "cancel".format(len(self.jobs[volname]), volname)) - self.c_cv.wait() - finally: - if not locked: - self.lock.acquire() - - def cancel_purge_job(self, volname): - self.lock.acquire() - self._cancel_purge_job(volname) - self.lock.release() - - def cancel_all_jobs(self): - self.lock.acquire() - for volname in list(self.q): - self._cancel_purge_job(volname) - self.lock.release() - - def register_job(self, volname, purge_dir): - log.debug("registering purge job: {0}.{1}".format(volname, purge_dir)) - - thread_id = threading.currentThread() - self.jobs[volname].append((purge_dir, thread_id)) - - def unregister_job(self, volname, purge_dir): - log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir)) - - thread_id = threading.currentThread() - self.jobs[volname].remove((purge_dir, thread_id)) - - cancelled = thread_id.should_cancel() - thread_id.reset_cancel() - - # wake up cancellation waiters if needed - if not self.jobs[volname] and cancelled: - logging.info("waking up cancellation waiters") - self.jobs.pop(volname) - with self.c_lock: - self.waiting = False - self.c_cv.notifyAll() - - def get_trash_entry_for_volume(self, volname): - log.debug("fetching trash entry for volume '{0}'".format(volname)) - - exclude_entries = [v[0] for v in self.jobs[volname]] - fs_handle = None - try: - with open_volume(self.vc, volname) as fs_handle: - with open_trashcan(fs_handle, self.vc.volspec) as trashcan: - path = trashcan.get_trash_entry(exclude_entries) - ret = 0, path - except VolumeException as ve: - if ve.errno == -errno.ENOENT and fs_handle: - ret = 0, None - else: - log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve) - ret = ve.errno, None - return ret - - def purge_trash_entry_for_volume(self, volname, purge_dir): - log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname)) - - ret = 0 - thread_id = threading.currentThread() - try: - with open_volume_lockless(self.vc, volname) as fs_handle: - with open_trashcan(fs_handle, self.vc.volspec) as trashcan: - trashcan.purge(purge_dir, should_cancel=lambda: thread_id.should_cancel()) - except VolumeException as ve: - ret = ve.errno - return ret - -class ThreadPoolPurgeQueueMixin(PurgeQueueBase): +# helper for fetching a trash entry for a given volume +def get_trash_entry_for_volume(volume_client, volname, running_jobs): + log.debug("fetching trash entry for volume '{0}'".format(volname)) + + try: + with open_volume(volume_client, volname) as fs_handle: + try: + with open_trashcan(fs_handle, volume_client.volspec) as trashcan: + path = trashcan.get_trash_entry(running_jobs) + return 0, path + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + return 0, None + raise ve + except VolumeException as ve: + log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve) + return ve.errno, None + return ret + +# helper for starting a purge operation on a trash entry +def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel): + log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname)) + + ret = 0 + try: + with open_volume_lockless(volume_client, volname) as fs_handle: + with open_trashcan(fs_handle, volume_client.volspec) as trashcan: + trashcan.purge(purge_dir, should_cancel) + except VolumeException as ve: + ret = ve.errno + return ret + +class ThreadPoolPurgeQueueMixin(AsyncJobs): """ Purge queue mixin class maintaining a pool of threads for purging trash entries. Subvolumes are chosen from volumes in a round robin fashion. If some of the purge @@ -179,68 +49,11 @@ class ThreadPoolPurgeQueueMixin(PurgeQueueBase): _all_ threads purging entries for one volume (starving other volumes). """ def __init__(self, volume_client, tp_size): - super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client) - self.threads = [] - for i in range(tp_size): - self.threads.append( - PurgeQueueBase.PurgeThread(volume_client, name="purgejob.{}".format(i), purge_fn=self.run)) - self.threads[-1].start() - - def pick_purge_dir_from_volume(self): - log.debug("processing {0} purge job entries".format(len(self.q))) - nr_vols = len(self.q) - to_remove = [] - to_purge = None, None - while nr_vols > 0: - volname = self.q[0] - # do this now so that the other thread picks up trash entry - # for next volume. - self.q.rotate(1) - ret, purge_dir = self.get_trash_entry_for_volume(volname) - if purge_dir: - to_purge = volname, purge_dir - break - # this is an optimization when for a given volume there are no more - # entries in trash and no purge operations are in progress. in such - # a case we remove the volume from the tracking list so as to: - # - # a. not query the filesystem for trash entries over and over again - # b. keep the filesystem connection idle so that it can be freed - # from the connection pool - # - # if at all there are subvolume deletes, the volume gets added again - # to the tracking list and the purge operations kickstarts. - # note that, we do not iterate the volume list fully if there is a - # purge entry to process (that will take place eventually). - if ret == 0 and not purge_dir and not self.jobs[volname]: - to_remove.append(volname) - nr_vols -= 1 - for vol in to_remove: - log.debug("auto removing volume '{0}' from purge job".format(vol)) - self.q.remove(vol) - self.jobs.pop(vol) - return to_purge + self.vc = volume_client + super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size) - def get_next_trash_entry(self): - while True: - # wait till there's a purge job - while not self.q: - log.debug("purge job list empty, waiting...") - self.cv.wait() - volname, purge_dir = self.pick_purge_dir_from_volume() - if purge_dir: - return volname, purge_dir - log.debug("no purge jobs available, waiting...") - self.cv.wait() + def get_next_job(self, volname, running_jobs): + return get_trash_entry_for_volume(self.vc, volname, running_jobs) - def run(self): - while True: - with self.lock: - volname, purge_dir = self.get_next_trash_entry() - self.register_job(volname, purge_dir) - ret = self.purge_trash_entry_for_volume(volname, purge_dir) - if ret != 0: - log.warn("failed to purge {0}.{1} ({2})".format(volname, purge_dir, ret)) - with self.lock: - self.unregister_job(volname, purge_dir) - time.sleep(1) + def execute_job(self, volname, job, should_cancel): + purge_trash_entry_for_volume(self.vc, volname, job, should_cancel) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index f5c0102d979b..e384db4745a3 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -50,7 +50,7 @@ class VolumeClient(object): # job list. fs_map = self.mgr.get('fs_map') for fs in fs_map['filesystems']: - self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name']) + self.purge_queue.queue_job(fs['mdsmap']['fs_name']) def is_stopping(self): return self.stopping.isSet() @@ -95,7 +95,7 @@ class VolumeClient(object): "that is what you want, re-issue the command followed by " \ "--yes-i-really-mean-it.".format(volname) - self.purge_queue.cancel_purge_job(volname) + self.purge_queue.cancel_jobs(volname) self.connection_pool.del_fs_handle(volname, wait=True) return delete_volume(self.mgr, volname) @@ -121,7 +121,7 @@ class VolumeClient(object): except VolumeException as ve: # kick the purge threads for async removal -- note that this # assumes that the subvolume is moved to trashcan for cleanup on error. - self.purge_queue.queue_purge_job(volname) + self.purge_queue.queue_job(volname) raise ve def create_subvolume(self, **kwargs): @@ -162,7 +162,7 @@ class VolumeClient(object): # assumes that the subvolume is moved to trash can. # TODO: make purge queue as singleton so that trash can kicks # the purge threads on dump. - self.purge_queue.queue_purge_job(volname) + self.purge_queue.queue_job(volname) except VolumeException as ve: if not (ve.errno == -errno.ENOENT and force): ret = self.volume_exception_to_retval(ve) @@ -311,7 +311,7 @@ class VolumeClient(object): except VolumeException as ve: try: target_subvolume.remove() - self.purge_queue.queue_purge_job(volname) + self.purge_queue.queue_job(volname) except Exception as e: log.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e)) raise ve