From a9819fe2802502d7c23cd935bb0ce96d88adfd1d Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Thu, 10 Oct 2019 09:23:10 -0400 Subject: [PATCH] mgr/volumes: cleanup libcephfs handles when stopping Signed-off-by: Venky Shankar (cherry picked from commit 2eb0c503047bdf46b0a884df96819cba8979b624) --- src/pybind/mgr/volumes/fs/purge_queue.py | 22 +++++++--- src/pybind/mgr/volumes/fs/volume.py | 55 ++++++++++++++++++++---- src/pybind/mgr/volumes/module.py | 6 +++ 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py index a76065cbab953..d0814ca7817f3 100644 --- a/src/pybind/mgr/volumes/fs/purge_queue.py +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -75,10 +75,9 @@ class PurgeQueueBase(object): self.jobs[volname] = [] self.cv.notifyAll() - def cancel_purge_job(self, volname): + def _cancel_purge_job(self, volname): log.info("cancelling purge jobs for volume '{0}'".format(volname)) - self.lock.acquire() - unlock = True + locked = True try: if not self.q.count(volname): return @@ -90,7 +89,7 @@ class PurgeQueueBase(object): j[1].cancel_job() # wait for cancellation to complete with self.c_lock: - unlock = False + locked = False self.waiting = True self.lock.release() while self.waiting: @@ -98,8 +97,19 @@ class PurgeQueueBase(object): "cancel".format(len(self.jobs[volname]), volname)) self.c_cv.wait() finally: - if unlock: - self.lock.release() + if not locked: + self.lock.acquire() + + def cancel_purge_job(self, volname): + self.lock.acquire() + self._cancel_purge_job(volname) + self.lock.release() + + def cancel_all_jobs(self): + self.lock.acquire() + for volname in list(self.q): + self._cancel_purge_job(volname) + self.lock.release() def register_job(self, volname, purge_dir): log.debug("registering purge job: {0}.{1}".format(volname, purge_dir)) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index ec3a2199c4ab9..733e7121b8964 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -2,7 +2,7 @@ import json import time import errno import logging -from threading import Lock +from threading import Lock, Condition, Event try: # py2 from threading import _Timer as Timer @@ -43,11 +43,16 @@ class ConnectionPool(object): self.ops_in_progress += 1 return self.fs - def put_fs_handle(self): + def put_fs_handle(self, notify): assert self.ops_in_progress > 0 self.ops_in_progress -= 1 + if self.ops_in_progress == 0: + notify() - def del_fs_handle(self): + def del_fs_handle(self, waiter): + if waiter: + while self.ops_in_progress != 0: + waiter() if self.is_connection_valid(): self.disconnect() else: @@ -112,6 +117,7 @@ class ConnectionPool(object): self.mgr = mgr self.connections = {} self.lock = Lock() + self.cond = Condition(self.lock) self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, self.cleanup_connections) self.timer_task.start() @@ -154,19 +160,31 @@ class ConnectionPool(object): with self.lock: conn = self.connections.get(fs_name, None) if conn: - conn.put_fs_handle() + conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) - def _del_fs_handle(self, fs_name): + def _del_fs_handle(self, fs_name, wait=False): conn = self.connections.pop(fs_name, None) if conn: - conn.del_fs_handle() - def del_fs_handle(self, fs_name): + conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) + + def del_fs_handle(self, fs_name, wait=False): + with self.lock: + self._del_fs_handle(fs_name, wait) + + def del_all_handles(self): with self.lock: - self._del_fs_handle(fs_name) + for fs_name in list(self.connections.keys()): + log.info("waiting for pending ops for '{}'".format(fs_name)) + self._del_fs_handle(fs_name, wait=True) + log.info("pending ops completed for '{}'".format(fs_name)) + # no new connections should have been initialized since its + # guarded on shutdown. + assert len(self.connections) == 0 class VolumeClient(object): def __init__(self, mgr): self.mgr = mgr + self.stopping = Event() self.connection_pool = ConnectionPool(self.mgr) # TODO: make thread pool size configurable self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) @@ -179,6 +197,15 @@ class VolumeClient(object): for fs in fs_map['filesystems']: self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name']) + def shutdown(self): + log.info("shutting down") + # first, note that we're shutting down + self.stopping.set() + # second, ask purge threads to quit + self.purge_queue.cancel_all_jobs() + # third, delete all libcephfs handles from connection pool + self.connection_pool.del_all_handles() + def cluster_log(self, msg, lvl=None): """ log to cluster log with default log level as WARN. @@ -271,6 +298,9 @@ class VolumeClient(object): """ create volume (pool, filesystem and mds) """ + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + metadata_pool, data_pool = self.gen_pool_names(volname) # create pools r, outs, outb = self.create_pool(metadata_pool, 16) @@ -291,6 +321,9 @@ class VolumeClient(object): """ delete the given module (tear down mds, remove filesystem) """ + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + self.purge_queue.cancel_purge_job(volname) self.connection_pool.del_fs_handle(volname) # Tear down MDS daemons @@ -323,6 +356,9 @@ class VolumeClient(object): return self.remove_pool(data_pool) def list_volumes(self): + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + result = [] fs_map = self.mgr.get("fs_map") for f in fs_map['filesystems']: @@ -354,6 +390,9 @@ class VolumeClient(object): # note that force arg is available for remove type commands force = kwargs.get('force', False) + if self.stopping.isSet(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + # fetch the connection from the pool if not fs_handle: try: diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index eb4c350db312d..a4fdef5981b9d 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -185,6 +185,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): super(Module, self).__init__(*args, **kwargs) self.vc = VolumeClient(self) + def __del__(self): + self.vc.shutdown() + + def shutdown(self): + self.vc.shutdown() + def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") try: -- 2.39.5