import time
import errno
import logging
-from threading import Lock
+from threading import Lock, Condition, Event
try:
# py2
from threading import _Timer as Timer
self.ops_in_progress += 1
return self.fs
- def put_fs_handle(self):
+ def put_fs_handle(self, notify):
assert self.ops_in_progress > 0
self.ops_in_progress -= 1
+ if self.ops_in_progress == 0:
+ notify()
- def del_fs_handle(self):
+ def del_fs_handle(self, waiter):
+ if waiter:
+ while self.ops_in_progress != 0:
+ waiter()
if self.is_connection_valid():
self.disconnect()
else:
self.mgr = mgr
self.connections = {}
self.lock = Lock()
+ self.cond = Condition(self.lock)
self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
self.cleanup_connections)
self.timer_task.start()
with self.lock:
conn = self.connections.get(fs_name, None)
if conn:
- conn.put_fs_handle()
+ conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
- def _del_fs_handle(self, fs_name):
+ def _del_fs_handle(self, fs_name, wait=False):
conn = self.connections.pop(fs_name, None)
if conn:
- conn.del_fs_handle()
- def del_fs_handle(self, fs_name):
+ conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+ def del_fs_handle(self, fs_name, wait=False):
+ with self.lock:
+ self._del_fs_handle(fs_name, wait)
+
+ def del_all_handles(self):
with self.lock:
- self._del_fs_handle(fs_name)
+ for fs_name in list(self.connections.keys()):
+ log.info("waiting for pending ops for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name, wait=True)
+ log.info("pending ops completed for '{}'".format(fs_name))
+ # no new connections should have been initialized since its
+ # guarded on shutdown.
+ assert len(self.connections) == 0
class VolumeClient(object):
def __init__(self, mgr):
self.mgr = mgr
+ self.stopping = Event()
self.connection_pool = ConnectionPool(self.mgr)
# TODO: make thread pool size configurable
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
for fs in fs_map['filesystems']:
self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
+ def shutdown(self):
+ log.info("shutting down")
+ # first, note that we're shutting down
+ self.stopping.set()
+ # second, ask purge threads to quit
+ self.purge_queue.cancel_all_jobs()
+ # third, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_handles()
+
def cluster_log(self, msg, lvl=None):
"""
log to cluster log with default log level as WARN.
"""
create volume (pool, filesystem and mds)
"""
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
metadata_pool, data_pool = self.gen_pool_names(volname)
# create pools
r, outs, outb = self.create_pool(metadata_pool, 16)
"""
delete the given module (tear down mds, remove filesystem)
"""
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
self.purge_queue.cancel_purge_job(volname)
self.connection_pool.del_fs_handle(volname)
# Tear down MDS daemons
return self.remove_pool(data_pool)
def list_volumes(self):
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
result = []
fs_map = self.mgr.get("fs_map")
for f in fs_map['filesystems']:
# note that force arg is available for remove type commands
force = kwargs.get('force', False)
+ if self.stopping.isSet():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
# fetch the connection from the pool
if not fs_handle:
try: