--- /dev/null
+import sys
+import time
+import logging
+import threading
+import traceback
+from collections import deque
+
+from .exception import NotImplementedException
+
+log = logging.getLogger(__name__)
+
+class JobThread(threading.Thread):
+ # this is "not" configurable and there is no need for it to be
+ # configurable. if a thread encounters an exception, we retry
+ # until it hits this many consecutive exceptions.
+ MAX_RETRIES_ON_EXCEPTION = 10
+
+ def __init__(self, async_job, volume_client, name):
+ self.vc = volume_client
+ self.async_job = async_job
+ # event object to cancel jobs
+ self.cancel_event = threading.Event()
+ threading.Thread.__init__(self, name=name)
+
+ def run(self):
+ retries = 0
+ thread_id = threading.currentThread()
+ thread_name = thread_id.getName()
+
+ while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
+ try:
+ # fetch next job to execute
+ with self.async_job.lock:
+ while True:
+ vol_job = self.async_job.get_job()
+ if vol_job:
+ break
+ self.async_job.cv.wait()
+ self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
+
+ # execute the job (outside lock)
+ self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
+
+ # when done, unregister the job
+ with self.async_job.lock:
+ self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
+ retries = 0
+ except NotImplementedException:
+ raise
+ except Exception:
+ # unless the jobs fetching and execution routines are not implemented
+ # retry till we hit cap limit.
+ retries += 1
+ log.warning("thread [{0}] encountered fatal error: (attempt#" \
+ " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ log.warning("traceback: {0}".format("".join(
+ traceback.format_exception(exc_type, exc_value, exc_traceback))))
+ time.sleep(1)
+ log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
+ self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+
+ def cancel_job(self):
+ self.cancel_event.set()
+
+ def should_cancel(self):
+ return self.cancel_event.isSet()
+
+ def reset_cancel(self):
+ self.cancel_event.clear()
+
+class AsyncJobs(object):
+ """
+ Class providing asynchronous execution of jobs via worker threads.
+ `jobs` are grouped by `volume`, so a `volume` can have N number of
+ `jobs` executing concurrently (capped by number of concurrent jobs).
+
+ Usability is simple: subclass this and implement the following:
+ - get_next_job(volname, running_jobs)
+ - execute_job(volname, job, should_cancel)
+
+ ... and do not forget to invoke base class constructor.
+
+ Job cancelation is for a volume as a whole, i.e., all executing jobs
+ for a volume are canceled. Cancelation is poll based -- jobs need to
+ periodically check if cancelation is requested, after which the job
+ should return as soon as possible. Cancelation check is provided
+ via `should_cancel()` lambda passed to `execute_job()`.
+ """
+
+ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
+ self.vc = volume_client
+ # queue of volumes for starting async jobs
+ self.q = deque()
+ # volume => job tracking
+ self.jobs = {}
+ # lock, cv for kickstarting jobs
+ self.lock = threading.Lock()
+ self.cv = threading.Condition(self.lock)
+ # cv for job cancelation
+ self.waiting = False
+ self.cancel_cv = threading.Condition(self.lock)
+
+ self.threads = []
+ for i in range(nr_concurrent_jobs):
+ self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
+ self.threads[-1].start()
+
+ def get_job(self):
+ log.debug("processing {0} volume entries".format(len(self.q)))
+ nr_vols = len(self.q)
+ to_remove = []
+ next_job = None
+ while nr_vols > 0:
+ volname = self.q[0]
+ # do this now so that the other thread pick up jobs for other volumes
+ self.q.rotate(1)
+ running_jobs = [j[0] for j in self.jobs[volname]]
+ (ret, job) = self.get_next_job(volname, running_jobs)
+ if job:
+ next_job = (volname, job)
+ break
+ # this is an optimization when for a given volume there are no more
+ # jobs and no jobs are in progress. in such cases we remove the volume
+ # from the tracking list so as to:
+ #
+ # a. not query the filesystem for jobs over and over again
+ # b. keep the filesystem connection idle so that it can be freed
+ # from the connection pool
+ #
+ # if at all there are jobs for a volume, the volume gets added again
+ # to the tracking list and the jobs get kickstarted.
+ # note that, we do not iterate the volume list fully if there is a
+ # jobs to process (that will take place eventually).
+ if ret == 0 and not job and not running_jobs:
+ to_remove.append(volname)
+ nr_vols -= 1
+ for vol in to_remove:
+ log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
+ self.q.remove(vol)
+ self.jobs.pop(vol)
+ return next_job
+
+ def register_async_job(self, volname, job, thread_id):
+ log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
+ self.jobs[volname].append((job, thread_id))
+
+ def unregister_async_job(self, volname, job, thread_id):
+ log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
+ self.jobs[volname].remove((job, thread_id))
+
+ cancelled = thread_id.should_cancel()
+ thread_id.reset_cancel()
+
+ # wake up cancellation waiters if needed
+ if not self.jobs[volname] and cancelled:
+ logging.info("waking up cancellation waiters")
+ self.cancel_cv.notifyAll()
+
+ def queue_job(self, volname):
+ """
+ queue a volume for asynchronous job execution.
+ """
+ log.info("queuing job for volume '{0}'".format(volname))
+ with self.lock:
+ if not volname in self.q:
+ self.q.append(volname)
+ self.jobs[volname] = []
+ self.cv.notifyAll()
+
+ def _cancel_jobs(self, volname):
+ """
+ cancel all jobs for the volume. do nothing is the no jobs are
+ executing for the given volume. this would wait until all jobs
+ get interrupted and finish execution.
+ """
+ log.info("cancelling jobs for volume '{0}'".format(volname))
+ try:
+ if not volname in self.q and not volname in self.jobs:
+ return
+ self.q.remove(volname)
+ # cancel in-progress operation and wait until complete
+ for j in self.jobs[volname]:
+ j[1].cancel_job()
+ # wait for cancellation to complete
+ while self.jobs[volname]:
+ log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
+ "cancel".format(len(self.jobs[volname]), volname))
+ self.cancel_cv.wait()
+ self.jobs.pop(volname)
+ except (KeyError, ValueError):
+ pass
+
+ def cancel_jobs(self, volname):
+ """
+ cancel all executing jobs for a given volume.
+ """
+ with self.lock:
+ self._cancel_jobs(volname)
+
+ def cancel_all_jobs(self):
+ """
+ call all executing jobs for all volumes.
+ """
+ with self.lock:
+ for volname in list(self.q):
+ self._cancel_jobs(volname)
+
+ def get_next_job(self, volname, running_jobs):
+ """
+ get the next job for asynchronous execution as (retcode, job) tuple. if no
+ jobs are available return (0, None) else return (0, job). on error return
+ (-ret, None). called under `self.lock`.
+ """
+ raise NotImplementedException()
+
+ def execute_job(self, volname, job, should_cancel):
+ """
+ execute a job for a volume. the job can block on I/O operations, sleep for long
+ hours and do all kinds of synchronous work. called outside `self.lock`.
+ """
+ raise NotImplementedException()
+
def __str__(self):
return "{0} ({1})".format(self.errno, self.error_str)
+
+class NotImplementedException(Exception):
+ pass
-import sys
-import time
import errno
import logging
-import threading
-import traceback
-from collections import deque
+from .async_job import AsyncJobs
from .exception import VolumeException
from .operations.volume import open_volume, open_volume_lockless
from .operations.trash import open_trashcan
log = logging.getLogger(__name__)
-class PurgeQueueBase(object):
- """
- Base class for implementing purge queue strategies.
- """
-
- # this is "not" configurable and there is no need for it to be
- # configurable. if a purge thread encounters an exception, we
- # retry, till it hits this many consecutive exceptions after
- # which a warning is sent to `ceph status`.
- MAX_RETRIES_ON_EXCEPTION = 10
-
- class PurgeThread(threading.Thread):
- def __init__(self, volume_client, name, purge_fn):
- self.vc = volume_client
- self.purge_fn = purge_fn
- # event object to cancel ongoing purge
- self.cancel_event = threading.Event()
- threading.Thread.__init__(self, name=name)
-
- def run(self):
- retries = 0
- thread_name = threading.currentThread().getName()
- while retries < PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION:
- try:
- self.purge_fn()
- retries = 0
- except Exception:
- retries += 1
- log.warning("purge thread [{0}] encountered fatal error: (attempt#" \
- " {1}/{2})".format(thread_name, retries,
- PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION))
- exc_type, exc_value, exc_traceback = sys.exc_info()
- log.warning("traceback: {0}".format("".join(
- traceback.format_exception(exc_type, exc_value, exc_traceback))))
- time.sleep(1)
- log.error("purge thread [{0}] reached exception limit, bailing out...".format(thread_name))
- self.vc.cluster_log("purge thread {0} bailing out due to exception".format(thread_name))
-
- def cancel_job(self):
- self.cancel_event.set()
-
- def should_cancel(self):
- return self.cancel_event.isSet()
-
- def reset_cancel(self):
- self.cancel_event.clear()
-
- def __init__(self, volume_client):
- self.vc = volume_client
- # volumes whose subvolumes need to be purged
- self.q = deque()
- # job tracking
- self.jobs = {}
- # lock, cv for kickstarting purge
- self.lock = threading.Lock()
- self.cv = threading.Condition(self.lock)
- # lock, cv for purge cancellation
- self.waiting = False
- self.c_lock = threading.Lock()
- self.c_cv = threading.Condition(self.c_lock)
-
- def queue_purge_job(self, volname):
- with self.lock:
- if not self.q.count(volname):
- self.q.append(volname)
- self.jobs[volname] = []
- self.cv.notifyAll()
-
- def _cancel_purge_job(self, volname):
- log.info("cancelling purge jobs for volume '{0}'".format(volname))
- locked = True
- try:
- if not self.q.count(volname):
- return
- self.q.remove(volname)
- if not self.jobs.get(volname, []):
- return
- # cancel in-progress purge operation and wait until complete
- for j in self.jobs[volname]:
- j[1].cancel_job()
- # wait for cancellation to complete
- with self.c_lock:
- locked = False
- self.waiting = True
- self.lock.release()
- while self.waiting:
- log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
- "cancel".format(len(self.jobs[volname]), volname))
- self.c_cv.wait()
- finally:
- if not locked:
- self.lock.acquire()
-
- def cancel_purge_job(self, volname):
- self.lock.acquire()
- self._cancel_purge_job(volname)
- self.lock.release()
-
- def cancel_all_jobs(self):
- self.lock.acquire()
- for volname in list(self.q):
- self._cancel_purge_job(volname)
- self.lock.release()
-
- def register_job(self, volname, purge_dir):
- log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))
-
- thread_id = threading.currentThread()
- self.jobs[volname].append((purge_dir, thread_id))
-
- def unregister_job(self, volname, purge_dir):
- log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))
-
- thread_id = threading.currentThread()
- self.jobs[volname].remove((purge_dir, thread_id))
-
- cancelled = thread_id.should_cancel()
- thread_id.reset_cancel()
-
- # wake up cancellation waiters if needed
- if not self.jobs[volname] and cancelled:
- logging.info("waking up cancellation waiters")
- self.jobs.pop(volname)
- with self.c_lock:
- self.waiting = False
- self.c_cv.notifyAll()
-
- def get_trash_entry_for_volume(self, volname):
- log.debug("fetching trash entry for volume '{0}'".format(volname))
-
- exclude_entries = [v[0] for v in self.jobs[volname]]
- fs_handle = None
- try:
- with open_volume(self.vc, volname) as fs_handle:
- with open_trashcan(fs_handle, self.vc.volspec) as trashcan:
- path = trashcan.get_trash_entry(exclude_entries)
- ret = 0, path
- except VolumeException as ve:
- if ve.errno == -errno.ENOENT and fs_handle:
- ret = 0, None
- else:
- log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve)
- ret = ve.errno, None
- return ret
-
- def purge_trash_entry_for_volume(self, volname, purge_dir):
- log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
-
- ret = 0
- thread_id = threading.currentThread()
- try:
- with open_volume_lockless(self.vc, volname) as fs_handle:
- with open_trashcan(fs_handle, self.vc.volspec) as trashcan:
- trashcan.purge(purge_dir, should_cancel=lambda: thread_id.should_cancel())
- except VolumeException as ve:
- ret = ve.errno
- return ret
-
-class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
+# helper for fetching a trash entry for a given volume
+def get_trash_entry_for_volume(volume_client, volname, running_jobs):
+ log.debug("fetching trash entry for volume '{0}'".format(volname))
+
+ try:
+ with open_volume(volume_client, volname) as fs_handle:
+ try:
+ with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ path = trashcan.get_trash_entry(running_jobs)
+ return 0, path
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return 0, None
+ raise ve
+ except VolumeException as ve:
+ log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve)
+ return ve.errno, None
+ return ret
+
+# helper for starting a purge operation on a trash entry
+def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel):
+ log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
+
+ ret = 0
+ try:
+ with open_volume_lockless(volume_client, volname) as fs_handle:
+ with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ trashcan.purge(purge_dir, should_cancel)
+ except VolumeException as ve:
+ ret = ve.errno
+ return ret
+
+class ThreadPoolPurgeQueueMixin(AsyncJobs):
"""
Purge queue mixin class maintaining a pool of threads for purging trash entries.
Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
_all_ threads purging entries for one volume (starving other volumes).
"""
def __init__(self, volume_client, tp_size):
- super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client)
- self.threads = []
- for i in range(tp_size):
- self.threads.append(
- PurgeQueueBase.PurgeThread(volume_client, name="purgejob.{}".format(i), purge_fn=self.run))
- self.threads[-1].start()
-
- def pick_purge_dir_from_volume(self):
- log.debug("processing {0} purge job entries".format(len(self.q)))
- nr_vols = len(self.q)
- to_remove = []
- to_purge = None, None
- while nr_vols > 0:
- volname = self.q[0]
- # do this now so that the other thread picks up trash entry
- # for next volume.
- self.q.rotate(1)
- ret, purge_dir = self.get_trash_entry_for_volume(volname)
- if purge_dir:
- to_purge = volname, purge_dir
- break
- # this is an optimization when for a given volume there are no more
- # entries in trash and no purge operations are in progress. in such
- # a case we remove the volume from the tracking list so as to:
- #
- # a. not query the filesystem for trash entries over and over again
- # b. keep the filesystem connection idle so that it can be freed
- # from the connection pool
- #
- # if at all there are subvolume deletes, the volume gets added again
- # to the tracking list and the purge operations kickstarts.
- # note that, we do not iterate the volume list fully if there is a
- # purge entry to process (that will take place eventually).
- if ret == 0 and not purge_dir and not self.jobs[volname]:
- to_remove.append(volname)
- nr_vols -= 1
- for vol in to_remove:
- log.debug("auto removing volume '{0}' from purge job".format(vol))
- self.q.remove(vol)
- self.jobs.pop(vol)
- return to_purge
+ self.vc = volume_client
+ super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
- def get_next_trash_entry(self):
- while True:
- # wait till there's a purge job
- while not self.q:
- log.debug("purge job list empty, waiting...")
- self.cv.wait()
- volname, purge_dir = self.pick_purge_dir_from_volume()
- if purge_dir:
- return volname, purge_dir
- log.debug("no purge jobs available, waiting...")
- self.cv.wait()
+ def get_next_job(self, volname, running_jobs):
+ return get_trash_entry_for_volume(self.vc, volname, running_jobs)
- def run(self):
- while True:
- with self.lock:
- volname, purge_dir = self.get_next_trash_entry()
- self.register_job(volname, purge_dir)
- ret = self.purge_trash_entry_for_volume(volname, purge_dir)
- if ret != 0:
- log.warn("failed to purge {0}.{1} ({2})".format(volname, purge_dir, ret))
- with self.lock:
- self.unregister_job(volname, purge_dir)
- time.sleep(1)
+ def execute_job(self, volname, job, should_cancel):
+ purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)
# job list.
fs_map = self.mgr.get('fs_map')
for fs in fs_map['filesystems']:
- self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
+ self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
def is_stopping(self):
return self.stopping.isSet()
"that is what you want, re-issue the command followed by " \
"--yes-i-really-mean-it.".format(volname)
- self.purge_queue.cancel_purge_job(volname)
+ self.purge_queue.cancel_jobs(volname)
self.connection_pool.del_fs_handle(volname, wait=True)
return delete_volume(self.mgr, volname)
except VolumeException as ve:
# kick the purge threads for async removal -- note that this
# assumes that the subvolume is moved to trashcan for cleanup on error.
- self.purge_queue.queue_purge_job(volname)
+ self.purge_queue.queue_job(volname)
raise ve
def create_subvolume(self, **kwargs):
# assumes that the subvolume is moved to trash can.
# TODO: make purge queue as singleton so that trash can kicks
# the purge threads on dump.
- self.purge_queue.queue_purge_job(volname)
+ self.purge_queue.queue_job(volname)
except VolumeException as ve:
if not (ve.errno == -errno.ENOENT and force):
ret = self.volume_exception_to_retval(ve)
except VolumeException as ve:
try:
target_subvolume.remove()
- self.purge_queue.queue_purge_job(volname)
+ self.purge_queue.queue_job(volname)
except Exception as e:
log.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
raise ve