From a8a549d335246cd16d18b1094f9cf192726a54b0 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Thu, 17 Jun 2021 08:39:56 -0400 Subject: [PATCH] mgr/mgr_util: switch using unshared cephfs connections whenever possible The connection pool used by mgr/volumes (mgr/snap_schedule, mgr/mirroring) is basically a cached libcephfs connection handle. Since its a singleton lock contention may occur in the client (contention on `client_lock`), especially when an MDS operation taken too long to complete. Having a pool of connections to a cephfs file system, ensures that concurrent access to ceph file system via the connection pool use unshared libcephfs handles whenever possible thereby not contending for `client_lock`. However, there is a max cap of unshared connections per cephfs file system, post which connections start to get shared between users. Note that to totally benefit from this, its desirable to have multiple active MDSs for a ceph file system to rule out the possibility of the contention in `mds_lock`. Fixes: http://tracker.ceph.com/issues/51256 Signed-off-by: Venky Shankar (cherry picked from commit 2e7adc48d141dacf7fc90d4c4f8d61c287e77984) --- src/pybind/mgr/mgr_util.py | 99 +++++++++++++++++++---------- src/pybind/mgr/volumes/fs/volume.py | 4 +- 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 29a6f631477ee..a810d519ae09f 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -181,10 +181,11 @@ class CephfsConnectionPool(object): # TODO: make this configurable TIMER_TASK_RUN_INTERVAL = 30.0 # seconds CONNECTION_IDLE_INTERVAL = 60.0 # seconds + MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume def __init__(self, mgr: Module_T): self.mgr = mgr - self.connections: Dict[str, CephfsConnectionPool.Connection] = {} + self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {} self.lock = Lock() self.cond = Condition(self.lock) self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL, @@ -194,57 +195,87 @@ class CephfsConnectionPool(object): def cleanup_connections(self) -> None: with self.lock: logger.info("scanning for idle connections..") - idle_fs = [fs_name for fs_name, conn in self.connections.items() - if conn.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL)] - for fs_name in idle_fs: - logger.info("cleaning up connection for '{}'".format(fs_name)) - self._del_fs_handle(fs_name) + idle_conns = [] + for fs_name, connections in self.connections.items(): + logger.debug(f'fs_name ({fs_name}) connections ({connections})') + for connection in connections: + if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL): + idle_conns.append((fs_name, connection)) + logger.info(f'cleaning up connections: {idle_conns}') + for idle_conn in idle_conns: + self._del_connection(idle_conn[0], idle_conn[1]) def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS": with self.lock: - conn = None try: - conn = self.connections.get(fs_name, None) - if conn: - if conn.is_connection_valid(): - return conn.get_fs_handle() + min_shared = 0 + shared_connection = None + connections = self.connections.setdefault(fs_name, []) + logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})') + if connections: + min_shared = connections[0].ops_in_progress + shared_connection = connections[0] + for connection in list(connections): + logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}') + if connection.ops_in_progress == 0: + if connection.is_connection_valid(): + logger.debug(f'[get] connection ({connection}) can be reused') + return connection.get_fs_handle() + else: + # filesystem id changed beneath us (or the filesystem does not exist). + # this is possible if the filesystem got removed (and recreated with + # same name) via "ceph fs rm/new" mon command. + logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})') + # note -- this will mutate @connections too + self._del_connection(fs_name, connection) else: - # filesystem id changed beneath us (or the filesystem does not exist). - # this is possible if the filesystem got removed (and recreated with - # same name) via "ceph fs rm/new" mon command. - logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) - self._del_fs_handle(fs_name) - conn = CephfsConnectionPool.Connection(self.mgr, fs_name) - conn.connect() + if connection.ops_in_progress < min_shared: + min_shared = connection.ops_in_progress + shared_connection = connection + # when we end up here, there are no "free" connections. so either spin up a new + # one or share it. + if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS: + logger.debug('[get] spawning new connection since no connection is unused and we still have room for more') + connection = CephfsConnectionPool.Connection(self.mgr, fs_name) + connection.connect() + self.connections[fs_name].append(connection) + return connection.get_fs_handle() + else: + assert shared_connection is not None + logger.debug(f'[get] using shared connection ({shared_connection})') + return shared_connection.get_fs_handle() except cephfs.Error as e: # try to provide a better error string if possible if e.args[0] == errno.ENOENT: raise CephfsConnectionException( -errno.ENOENT, "FS '{0}' not found".format(fs_name)) raise CephfsConnectionException(-e.args[0], e.args[1]) - self.connections[fs_name] = conn - return conn.get_fs_handle() - def put_fs_handle(self, fs_name: str) -> None: + def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None: with self.lock: - conn = self.connections.get(fs_name, None) - if conn: - conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) + connections = self.connections.get(fs_name, []) + for connection in connections: + if connection.fs == fs_handle: + logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}') + connection.put_fs_handle(notify=lambda: self.cond.notifyAll()) - def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None: - conn = self.connections.pop(fs_name, None) - if conn: - conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) + def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None: + self.connections[fs_name].remove(connection) + connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) - def del_fs_handle(self, fs_name: str, wait: bool = False) -> None: + def _del_connections(self, fs_name: str, wait: bool = False) -> None: + for connection in list(self.connections.get(fs_name, [])): + self._del_connection(fs_name, connection, wait) + + def del_connections(self, fs_name: str, wait: bool = False) -> None: with self.lock: - self._del_fs_handle(fs_name, wait) + self._del_connections(fs_name, wait) - def del_all_handles(self) -> None: + def del_all_connections(self) -> None: with self.lock: for fs_name in list(self.connections.keys()): logger.info("waiting for pending ops for '{}'".format(fs_name)) - self._del_fs_handle(fs_name, wait=True) + self._del_connections(fs_name, wait=True) logger.info("pending ops completed for '{}'".format(fs_name)) # no new connections should have been initialized since its # guarded on shutdown. @@ -265,7 +296,7 @@ class CephfsClient(Generic[Module_T]): # first, note that we're shutting down self.stopping.set() # second, delete all libcephfs handles from connection pool - self.connection_pool.del_all_handles() + self.connection_pool.del_all_connections() def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]: fs_map = self.mgr.get('fs_map') @@ -305,7 +336,7 @@ def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCep try: yield fs_handle finally: - fsc.connection_pool.put_fs_handle(fs_name) + fsc.connection_pool.put_fs_handle(fs_name, fs_handle) def colorize(msg: str, color: int, dark: bool = False) -> str: diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index ececd16e39ecb..717dde949d82b 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -73,7 +73,7 @@ class VolumeClient(CephfsClient["Module"]): # stop purge threads self.purge_queue.shutdown() # last, delete all libcephfs handles from connection pool - self.connection_pool.del_all_handles() + self.connection_pool.del_all_connections() def cluster_log(self, msg, lvl=None): """ @@ -122,7 +122,7 @@ class VolumeClient(CephfsClient["Module"]): if not metadata_pool: return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname) self.purge_queue.cancel_jobs(volname) - self.connection_pool.del_fs_handle(volname, wait=True) + self.connection_pool.del_connections(volname, wait=True) return delete_volume(self.mgr, volname, metadata_pool, data_pools) def list_fs_volumes(self): -- 2.39.5