self.jobs[volname] = []
self.cv.notifyAll()
- def cancel_purge_job(self, volname):
+ def _cancel_purge_job(self, volname):
log.info("cancelling purge jobs for volume '{0}'".format(volname))
- self.lock.acquire()
- unlock = True
+ locked = True
try:
if not self.q.count(volname):
return
j[1].cancel_job()
# wait for cancellation to complete
with self.c_lock:
- unlock = False
+ locked = False
self.waiting = True
self.lock.release()
while self.waiting:
"cancel".format(len(self.jobs[volname]), volname))
self.c_cv.wait()
finally:
- if unlock:
- self.lock.release()
+ if not locked:
+ self.lock.acquire()
+
+ def cancel_purge_job(self, volname):
+ self.lock.acquire()
+ self._cancel_purge_job(volname)
+ self.lock.release()
+
+ def cancel_all_jobs(self):
+ self.lock.acquire()
+ for volname in list(self.q):
+ self._cancel_purge_job(volname)
+ self.lock.release()
def register_job(self, volname, purge_dir):
log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))
import time
import errno
import logging
-from threading import Lock
+from threading import Lock, Condition, Event
try:
# py2
from threading import _Timer as Timer
self.ops_in_progress += 1
return self.fs
- def put_fs_handle(self):
+ def put_fs_handle(self, notify):
assert self.ops_in_progress > 0
self.ops_in_progress -= 1
+ if self.ops_in_progress == 0:
+ notify()
- def del_fs_handle(self):
+ def del_fs_handle(self, waiter):
+ if waiter:
+ while self.ops_in_progress != 0:
+ waiter()
if self.is_connection_valid():
self.disconnect()
else:
self.mgr = mgr
self.connections = {}
self.lock = Lock()
+ self.cond = Condition(self.lock)
self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
self.cleanup_connections)
self.timer_task.start()
with self.lock:
conn = self.connections.get(fs_name, None)
if conn:
- conn.put_fs_handle()
+ conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
- def _del_fs_handle(self, fs_name):
+ def _del_fs_handle(self, fs_name, wait=False):
conn = self.connections.pop(fs_name, None)
if conn:
- conn.del_fs_handle()
- def del_fs_handle(self, fs_name):
+ conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+ def del_fs_handle(self, fs_name, wait=False):
+ with self.lock:
+ self._del_fs_handle(fs_name, wait)
+
+ def del_all_handles(self):
with self.lock:
- self._del_fs_handle(fs_name)
+ for fs_name in list(self.connections.keys()):
+ log.info("waiting for pending ops for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name, wait=True)
+ log.info("pending ops completed for '{}'".format(fs_name))
+ # no new connections should have been initialized since its
+ # guarded on shutdown.
+ assert len(self.connections) == 0
class VolumeClient(object):
def __init__(self, mgr):
self.mgr = mgr
+ self.stopping = Event()
self.connection_pool = ConnectionPool(self.mgr)
# TODO: make thread pool size configurable
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
for fs in fs_map['filesystems']:
self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
+ def shutdown(self):
+ log.info("shutting down")
+ # first, note that we're shutting down
+ self.stopping.set()
+ # second, ask purge threads to quit
+ self.purge_queue.cancel_all_jobs()
+ # third, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_handles()
+
def cluster_log(self, msg, lvl=None):
"""
log to cluster log with default log level as WARN.
"""
create volume (pool, filesystem and mds)
"""
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
metadata_pool, data_pool = self.gen_pool_names(volname)
# create pools
r, outs, outb = self.create_pool(metadata_pool)
"""
delete the given module (tear down mds, remove filesystem)
"""
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
self.purge_queue.cancel_purge_job(volname)
self.connection_pool.del_fs_handle(volname)
# Tear down MDS daemons
return self.remove_pool(data_pool)
def list_volumes(self):
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
result = []
fs_map = self.mgr.get("fs_map")
for f in fs_map['filesystems']:
# note that force arg is available for remove type commands
force = kwargs.get('force', False)
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
# fetch the connection from the pool
if not fs_handle:
try: