From: Venky Shankar Date: Thu, 10 Oct 2019 13:23:10 +0000 (-0400) Subject: mgr/volumes: cleanup libcephfs handles when stopping X-Git-Tag: v15.1.0~978^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2eb0c503047bdf46b0a884df96819cba8979b624;p=ceph.git mgr/volumes: cleanup libcephfs handles when stopping Signed-off-by: Venky Shankar --- diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py index a76065cbab95..d0814ca7817f 100644 --- a/src/pybind/mgr/volumes/fs/purge_queue.py +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -75,10 +75,9 @@ class PurgeQueueBase(object): self.jobs[volname] = [] self.cv.notifyAll() - def cancel_purge_job(self, volname): + def _cancel_purge_job(self, volname): log.info("cancelling purge jobs for volume '{0}'".format(volname)) - self.lock.acquire() - unlock = True + locked = True try: if not self.q.count(volname): return @@ -90,7 +89,7 @@ class PurgeQueueBase(object): j[1].cancel_job() # wait for cancellation to complete with self.c_lock: - unlock = False + locked = False self.waiting = True self.lock.release() while self.waiting: @@ -98,8 +97,19 @@ class PurgeQueueBase(object): "cancel".format(len(self.jobs[volname]), volname)) self.c_cv.wait() finally: - if unlock: - self.lock.release() + if not locked: + self.lock.acquire() + + def cancel_purge_job(self, volname): + self.lock.acquire() + self._cancel_purge_job(volname) + self.lock.release() + + def cancel_all_jobs(self): + self.lock.acquire() + for volname in list(self.q): + self._cancel_purge_job(volname) + self.lock.release() def register_job(self, volname, purge_dir): log.debug("registering purge job: {0}.{1}".format(volname, purge_dir)) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 0187cc1b8902..0d62fe040bf1 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -2,7 +2,7 @@ import json import time import errno import logging -from threading import Lock +from threading import Lock, Condition, Event try: # py2 from threading import _Timer as Timer @@ -43,11 +43,16 @@ class ConnectionPool(object): self.ops_in_progress += 1 return self.fs - def put_fs_handle(self): + def put_fs_handle(self, notify): assert self.ops_in_progress > 0 self.ops_in_progress -= 1 + if self.ops_in_progress == 0: + notify() - def del_fs_handle(self): + def del_fs_handle(self, waiter): + if waiter: + while self.ops_in_progress != 0: + waiter() if self.is_connection_valid(): self.disconnect() else: @@ -108,6 +113,7 @@ class ConnectionPool(object): self.mgr = mgr self.connections = {} self.lock = Lock() + self.cond = Condition(self.lock) self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, self.cleanup_connections) self.timer_task.start() @@ -150,19 +156,31 @@ class ConnectionPool(object): with self.lock: conn = self.connections.get(fs_name, None) if conn: - conn.put_fs_handle() + conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) - def _del_fs_handle(self, fs_name): + def _del_fs_handle(self, fs_name, wait=False): conn = self.connections.pop(fs_name, None) if conn: - conn.del_fs_handle() - def del_fs_handle(self, fs_name): + conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) + + def del_fs_handle(self, fs_name, wait=False): + with self.lock: + self._del_fs_handle(fs_name, wait) + + def del_all_handles(self): with self.lock: - self._del_fs_handle(fs_name) + for fs_name in list(self.connections.keys()): + log.info("waiting for pending ops for '{}'".format(fs_name)) + self._del_fs_handle(fs_name, wait=True) + log.info("pending ops completed for '{}'".format(fs_name)) + # no new connections should have been initialized since its + # guarded on shutdown. + assert len(self.connections) == 0 class VolumeClient(object): def __init__(self, mgr): self.mgr = mgr + self.stopping = Event() self.connection_pool = ConnectionPool(self.mgr) # TODO: make thread pool size configurable self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) @@ -175,6 +193,15 @@ class VolumeClient(object): for fs in fs_map['filesystems']: self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name']) + def shutdown(self): + log.info("shutting down") + # first, note that we're shutting down + self.stopping.set() + # second, ask purge threads to quit + self.purge_queue.cancel_all_jobs() + # third, delete all libcephfs handles from connection pool + self.connection_pool.del_all_handles() + def cluster_log(self, msg, lvl=None): """ log to cluster log with default log level as WARN. @@ -266,6 +293,9 @@ class VolumeClient(object): """ create volume (pool, filesystem and mds) """ + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + metadata_pool, data_pool = self.gen_pool_names(volname) # create pools r, outs, outb = self.create_pool(metadata_pool) @@ -286,6 +316,9 @@ class VolumeClient(object): """ delete the given module (tear down mds, remove filesystem) """ + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + self.purge_queue.cancel_purge_job(volname) self.connection_pool.del_fs_handle(volname) # Tear down MDS daemons @@ -318,6 +351,9 @@ class VolumeClient(object): return self.remove_pool(data_pool) def list_volumes(self): + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + result = [] fs_map = self.mgr.get("fs_map") for f in fs_map['filesystems']: @@ -349,6 +385,9 @@ class VolumeClient(object): # note that force arg is available for remove type commands force = kwargs.get('force', False) + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + # fetch the connection from the pool if not fs_handle: try: diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 367c024cb7c8..1f62f1992cb9 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -171,6 +171,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): super(Module, self).__init__(*args, **kwargs) self.vc = VolumeClient(self) + def __del__(self): + self.vc.shutdown() + + def shutdown(self): + self.vc.shutdown() + def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") try: