]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/mgr_util: switch using unshared cephfs connections whenever possible 41917/head
authorVenky Shankar <vshankar@redhat.com>
Thu, 17 Jun 2021 12:39:56 +0000 (08:39 -0400)
committerVenky Shankar <vshankar@redhat.com>
Wed, 23 Jun 2021 03:28:56 +0000 (23:28 -0400)
The connection pool used by mgr/volumes (mgr/snap_schedule, mgr/mirroring)
is basically a cached libcephfs connection handle. Since its a singleton
lock contention may occur in the client (contention on `client_lock`),
especially when an MDS operation taken too long to complete.

Having a pool of connections to a cephfs file system, ensures that concurrent
access to ceph file system via the connection pool use unshared libcephfs
handles whenever possible thereby not contending for `client_lock`. However,
there is a max cap of unshared connections per cephfs file system, post which
connections start to get shared between users.

Note that to totally benefit from this, its desirable to have multiple active
MDSs for a ceph file system to rule out the possibility of the contention in
`mds_lock`.

Fixes: http://tracker.ceph.com/issues/51256
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/pybind/mgr/mgr_util.py
src/pybind/mgr/volumes/fs/volume.py

index 29a6f631477ee39c88b11aa879cd8cd8ce5dbb62..a810d519ae09f66ece19bc8a1e44393740d6de6b 100644 (file)
@@ -181,10 +181,11 @@ class CephfsConnectionPool(object):
     # TODO: make this configurable
     TIMER_TASK_RUN_INTERVAL = 30.0   # seconds
     CONNECTION_IDLE_INTERVAL = 60.0  # seconds
+    MAX_CONCURRENT_CONNECTIONS = 5   # max number of concurrent connections per volume
 
     def __init__(self, mgr: Module_T):
         self.mgr = mgr
-        self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
+        self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
         self.lock = Lock()
         self.cond = Condition(self.lock)
         self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
@@ -194,57 +195,87 @@ class CephfsConnectionPool(object):
     def cleanup_connections(self) -> None:
         with self.lock:
             logger.info("scanning for idle connections..")
-            idle_fs = [fs_name for fs_name, conn in self.connections.items()
-                       if conn.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL)]
-            for fs_name in idle_fs:
-                logger.info("cleaning up connection for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name)
+            idle_conns = []
+            for fs_name, connections in self.connections.items():
+                logger.debug(f'fs_name ({fs_name}) connections ({connections})')
+                for connection in connections:
+                    if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
+                        idle_conns.append((fs_name, connection))
+            logger.info(f'cleaning up connections: {idle_conns}')
+            for idle_conn in idle_conns:
+                self._del_connection(idle_conn[0], idle_conn[1])
 
     def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
         with self.lock:
-            conn = None
             try:
-                conn = self.connections.get(fs_name, None)
-                if conn:
-                    if conn.is_connection_valid():
-                        return conn.get_fs_handle()
+                min_shared = 0
+                shared_connection = None
+                connections = self.connections.setdefault(fs_name, [])
+                logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
+                if connections:
+                    min_shared = connections[0].ops_in_progress
+                    shared_connection = connections[0]
+                for connection in list(connections):
+                    logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
+                    if connection.ops_in_progress == 0:
+                        if connection.is_connection_valid():
+                            logger.debug(f'[get] connection ({connection}) can be reused')
+                            return connection.get_fs_handle()
+                        else:
+                            # filesystem id changed beneath us (or the filesystem does not exist).
+                            # this is possible if the filesystem got removed (and recreated with
+                            # same name) via "ceph fs rm/new" mon command.
+                            logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
+                            # note -- this will mutate @connections too
+                            self._del_connection(fs_name, connection)
                     else:
-                        # filesystem id changed beneath us (or the filesystem does not exist).
-                        # this is possible if the filesystem got removed (and recreated with
-                        # same name) via "ceph fs rm/new" mon command.
-                        logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
-                        self._del_fs_handle(fs_name)
-                conn = CephfsConnectionPool.Connection(self.mgr, fs_name)
-                conn.connect()
+                        if connection.ops_in_progress < min_shared:
+                            min_shared = connection.ops_in_progress
+                            shared_connection = connection
+                # when we end up here, there are no "free" connections. so either spin up a new
+                # one or share it.
+                if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
+                    logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
+                    connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
+                    connection.connect()
+                    self.connections[fs_name].append(connection)
+                    return connection.get_fs_handle()
+                else:
+                    assert shared_connection is not None
+                    logger.debug(f'[get] using shared connection ({shared_connection})')
+                    return shared_connection.get_fs_handle()
             except cephfs.Error as e:
                 # try to provide a better error string if possible
                 if e.args[0] == errno.ENOENT:
                     raise CephfsConnectionException(
                         -errno.ENOENT, "FS '{0}' not found".format(fs_name))
                 raise CephfsConnectionException(-e.args[0], e.args[1])
-            self.connections[fs_name] = conn
-            return conn.get_fs_handle()
 
-    def put_fs_handle(self, fs_name: str) -> None:
+    def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
         with self.lock:
-            conn = self.connections.get(fs_name, None)
-            if conn:
-                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+            connections = self.connections.get(fs_name, [])
+            for connection in connections:
+                if connection.fs == fs_handle:
+                    logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
+                    connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
 
-    def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
-        conn = self.connections.pop(fs_name, None)
-        if conn:
-            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+    def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
+        self.connections[fs_name].remove(connection)
+        connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
 
-    def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
+    def _del_connections(self, fs_name: str, wait: bool = False) -> None:
+        for connection in list(self.connections.get(fs_name, [])):
+            self._del_connection(fs_name, connection, wait)
+
+    def del_connections(self, fs_name: str, wait: bool = False) -> None:
         with self.lock:
-            self._del_fs_handle(fs_name, wait)
+            self._del_connections(fs_name, wait)
 
-    def del_all_handles(self) -> None:
+    def del_all_connections(self) -> None:
         with self.lock:
             for fs_name in list(self.connections.keys()):
                 logger.info("waiting for pending ops for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name, wait=True)
+                self._del_connections(fs_name, wait=True)
                 logger.info("pending ops completed for '{}'".format(fs_name))
             # no new connections should have been initialized since its
             # guarded on shutdown.
@@ -265,7 +296,7 @@ class CephfsClient(Generic[Module_T]):
         # first, note that we're shutting down
         self.stopping.set()
         # second, delete all libcephfs handles from connection pool
-        self.connection_pool.del_all_handles()
+        self.connection_pool.del_all_connections()
 
     def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
         fs_map = self.mgr.get('fs_map')
@@ -305,7 +336,7 @@ def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCep
     try:
         yield fs_handle
     finally:
-        fsc.connection_pool.put_fs_handle(fs_name)
+        fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
 
 
 def colorize(msg: str, color: int, dark: bool = False) -> str:
index e38ad71f1608519fe0f9630f218782262d838399..a55ca44f31bce4e2c40e1837d8662991e96f2223 100644 (file)
@@ -73,7 +73,7 @@ class VolumeClient(CephfsClient["Module"]):
         # stop purge threads
         self.purge_queue.shutdown()
         # last, delete all libcephfs handles from connection pool
-        self.connection_pool.del_all_handles()
+        self.connection_pool.del_all_connections()
 
     def cluster_log(self, msg, lvl=None):
         """
@@ -122,7 +122,7 @@ class VolumeClient(CephfsClient["Module"]):
         if not metadata_pool:
             return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
         self.purge_queue.cancel_jobs(volname)
-        self.connection_pool.del_fs_handle(volname, wait=True)
+        self.connection_pool.del_connections(volname, wait=True)
         return delete_volume(self.mgr, volname, metadata_pool, data_pools)
 
     def list_fs_volumes(self):