]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: cleanup libcephfs handles when stopping
authorVenky Shankar <vshankar@redhat.com>
Thu, 10 Oct 2019 13:23:10 +0000 (09:23 -0400)
committerRamana Raja <rraja@redhat.com>
Wed, 12 Feb 2020 10:11:59 +0000 (05:11 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit 2eb0c503047bdf46b0a884df96819cba8979b624)

src/pybind/mgr/volumes/fs/purge_queue.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/mgr/volumes/module.py

index a76065cbab953da9f724c6ce538faf2df547f170..d0814ca7817f3cb8ce4121e2309cc6756bb7e2fb 100644 (file)
@@ -75,10 +75,9 @@ class PurgeQueueBase(object):
                 self.jobs[volname] = []
             self.cv.notifyAll()
 
-    def cancel_purge_job(self, volname):
+    def _cancel_purge_job(self, volname):
         log.info("cancelling purge jobs for volume '{0}'".format(volname))
-        self.lock.acquire()
-        unlock = True
+        locked = True
         try:
             if not self.q.count(volname):
                 return
@@ -90,7 +89,7 @@ class PurgeQueueBase(object):
                 j[1].cancel_job()
             # wait for cancellation to complete
             with self.c_lock:
-                unlock = False
+                locked = False
                 self.waiting = True
                 self.lock.release()
                 while self.waiting:
@@ -98,8 +97,19 @@ class PurgeQueueBase(object):
                              "cancel".format(len(self.jobs[volname]), volname))
                     self.c_cv.wait()
         finally:
-            if unlock:
-                self.lock.release()
+            if not locked:
+                self.lock.acquire()
+
+    def cancel_purge_job(self, volname):
+        self.lock.acquire()
+        self._cancel_purge_job(volname)
+        self.lock.release()
+
+    def cancel_all_jobs(self):
+        self.lock.acquire()
+        for volname in list(self.q):
+            self._cancel_purge_job(volname)
+        self.lock.release()
 
     def register_job(self, volname, purge_dir):
         log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))
index ec3a2199c4ab9f8898d2d67fb95bdd9a009c415d..733e7121b8964ae95a4688425cacdbdabbba7552 100644 (file)
@@ -2,7 +2,7 @@ import json
 import time
 import errno
 import logging
-from threading import Lock
+from threading import Lock, Condition, Event
 try:
     # py2
     from threading import _Timer as Timer
@@ -43,11 +43,16 @@ class ConnectionPool(object):
             self.ops_in_progress += 1
             return self.fs
 
-        def put_fs_handle(self):
+        def put_fs_handle(self, notify):
             assert self.ops_in_progress > 0
             self.ops_in_progress -= 1
+            if self.ops_in_progress == 0:
+                notify()
 
-        def del_fs_handle(self):
+        def del_fs_handle(self, waiter):
+            if waiter:
+                while self.ops_in_progress != 0:
+                    waiter()
             if self.is_connection_valid():
                 self.disconnect()
             else:
@@ -112,6 +117,7 @@ class ConnectionPool(object):
         self.mgr = mgr
         self.connections = {}
         self.lock = Lock()
+        self.cond = Condition(self.lock)
         self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
                                                 self.cleanup_connections)
         self.timer_task.start()
@@ -154,19 +160,31 @@ class ConnectionPool(object):
         with self.lock:
             conn = self.connections.get(fs_name, None)
             if conn:
-                conn.put_fs_handle()
+                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
 
-    def _del_fs_handle(self, fs_name):
+    def _del_fs_handle(self, fs_name, wait=False):
         conn = self.connections.pop(fs_name, None)
         if conn:
-            conn.del_fs_handle()
-    def del_fs_handle(self, fs_name):
+            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+    def del_fs_handle(self, fs_name, wait=False):
+        with self.lock:
+            self._del_fs_handle(fs_name, wait)
+
+    def del_all_handles(self):
         with self.lock:
-            self._del_fs_handle(fs_name)
+            for fs_name in list(self.connections.keys()):
+                log.info("waiting for pending ops for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name, wait=True)
+                log.info("pending ops completed for '{}'".format(fs_name))
+            # no new connections should have been initialized since its
+            # guarded on shutdown.
+            assert len(self.connections) == 0
 
 class VolumeClient(object):
     def __init__(self, mgr):
         self.mgr = mgr
+        self.stopping = Event()
         self.connection_pool = ConnectionPool(self.mgr)
         # TODO: make thread pool size configurable
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
@@ -179,6 +197,15 @@ class VolumeClient(object):
         for fs in fs_map['filesystems']:
             self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
 
+    def shutdown(self):
+        log.info("shutting down")
+        # first, note that we're shutting down
+        self.stopping.set()
+        # second, ask purge threads to quit
+        self.purge_queue.cancel_all_jobs()
+        # third, delete all libcephfs handles from connection pool
+        self.connection_pool.del_all_handles()
+
     def cluster_log(self, msg, lvl=None):
         """
         log to cluster log with default log level as WARN.
@@ -271,6 +298,9 @@ class VolumeClient(object):
         """
         create volume  (pool, filesystem and mds)
         """
+        if self.stopping.isSet():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+
         metadata_pool, data_pool = self.gen_pool_names(volname)
         # create pools
         r, outs, outb = self.create_pool(metadata_pool, 16)
@@ -291,6 +321,9 @@ class VolumeClient(object):
         """
         delete the given module (tear down mds, remove filesystem)
         """
+        if self.stopping.isSet():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+
         self.purge_queue.cancel_purge_job(volname)
         self.connection_pool.del_fs_handle(volname)
         # Tear down MDS daemons
@@ -323,6 +356,9 @@ class VolumeClient(object):
         return self.remove_pool(data_pool)
 
     def list_volumes(self):
+        if self.stopping.isSet():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+
         result = []
         fs_map = self.mgr.get("fs_map")
         for f in fs_map['filesystems']:
@@ -354,6 +390,9 @@ class VolumeClient(object):
             # note that force arg is available for remove type commands
             force = kwargs.get('force', False)
 
+            if self.stopping.isSet():
+                return -errno.ESHUTDOWN, "", "shutdown in progress"
+
             # fetch the connection from the pool
             if not fs_handle:
                 try:
index eb4c350db312dea48aec28fbc041e7788e54db8b..a4fdef5981b9d1995c36b789a44e9f3247a95830 100644 (file)
@@ -185,6 +185,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
         super(Module, self).__init__(*args, **kwargs)
         self.vc = VolumeClient(self)
 
+    def __del__(self):
+        self.vc.shutdown()
+
+    def shutdown(self):
+        self.vc.shutdown()
+
     def handle_command(self, inbuf, cmd):
         handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
         try: