# TODO: make this configurable
TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+ MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
def __init__(self, mgr: Module_T):
self.mgr = mgr
- self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
+ self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
self.lock = Lock()
self.cond = Condition(self.lock)
self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
def cleanup_connections(self) -> None:
with self.lock:
logger.info("scanning for idle connections..")
- idle_fs = [fs_name for fs_name, conn in self.connections.items()
- if conn.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL)]
- for fs_name in idle_fs:
- logger.info("cleaning up connection for '{}'".format(fs_name))
- self._del_fs_handle(fs_name)
+ idle_conns = []
+ for fs_name, connections in self.connections.items():
+ logger.debug(f'fs_name ({fs_name}) connections ({connections})')
+ for connection in connections:
+ if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
+ idle_conns.append((fs_name, connection))
+ logger.info(f'cleaning up connections: {idle_conns}')
+ for idle_conn in idle_conns:
+ self._del_connection(idle_conn[0], idle_conn[1])
def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
with self.lock:
- conn = None
try:
- conn = self.connections.get(fs_name, None)
- if conn:
- if conn.is_connection_valid():
- return conn.get_fs_handle()
+ min_shared = 0
+ shared_connection = None
+ connections = self.connections.setdefault(fs_name, [])
+ logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
+ if connections:
+ min_shared = connections[0].ops_in_progress
+ shared_connection = connections[0]
+ for connection in list(connections):
+ logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
+ if connection.ops_in_progress == 0:
+ if connection.is_connection_valid():
+ logger.debug(f'[get] connection ({connection}) can be reused')
+ return connection.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
+ # note -- this will mutate @connections too
+ self._del_connection(fs_name, connection)
else:
- # filesystem id changed beneath us (or the filesystem does not exist).
- # this is possible if the filesystem got removed (and recreated with
- # same name) via "ceph fs rm/new" mon command.
- logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
- self._del_fs_handle(fs_name)
- conn = CephfsConnectionPool.Connection(self.mgr, fs_name)
- conn.connect()
+ if connection.ops_in_progress < min_shared:
+ min_shared = connection.ops_in_progress
+ shared_connection = connection
+ # when we end up here, there are no "free" connections. so either spin up a new
+ # one or share it.
+ if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
+ logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
+ connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
+ connection.connect()
+ self.connections[fs_name].append(connection)
+ return connection.get_fs_handle()
+ else:
+ assert shared_connection is not None
+ logger.debug(f'[get] using shared connection ({shared_connection})')
+ return shared_connection.get_fs_handle()
except cephfs.Error as e:
# try to provide a better error string if possible
if e.args[0] == errno.ENOENT:
raise CephfsConnectionException(
-errno.ENOENT, "FS '{0}' not found".format(fs_name))
raise CephfsConnectionException(-e.args[0], e.args[1])
- self.connections[fs_name] = conn
- return conn.get_fs_handle()
- def put_fs_handle(self, fs_name: str) -> None:
+ def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
with self.lock:
- conn = self.connections.get(fs_name, None)
- if conn:
- conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+ connections = self.connections.get(fs_name, [])
+ for connection in connections:
+ if connection.fs == fs_handle:
+ logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
+ connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
- def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
- conn = self.connections.pop(fs_name, None)
- if conn:
- conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+ def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
+ self.connections[fs_name].remove(connection)
+ connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
- def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
+ def _del_connections(self, fs_name: str, wait: bool = False) -> None:
+ for connection in list(self.connections.get(fs_name, [])):
+ self._del_connection(fs_name, connection, wait)
+
+ def del_connections(self, fs_name: str, wait: bool = False) -> None:
with self.lock:
- self._del_fs_handle(fs_name, wait)
+ self._del_connections(fs_name, wait)
- def del_all_handles(self) -> None:
+ def del_all_connections(self) -> None:
with self.lock:
for fs_name in list(self.connections.keys()):
logger.info("waiting for pending ops for '{}'".format(fs_name))
- self._del_fs_handle(fs_name, wait=True)
+ self._del_connections(fs_name, wait=True)
logger.info("pending ops completed for '{}'".format(fs_name))
# no new connections should have been initialized since its
# guarded on shutdown.
# first, note that we're shutting down
self.stopping.set()
# second, delete all libcephfs handles from connection pool
- self.connection_pool.del_all_handles()
+ self.connection_pool.del_all_connections()
def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
fs_map = self.mgr.get('fs_map')
try:
yield fs_handle
finally:
- fsc.connection_pool.put_fs_handle(fs_name)
+ fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
def colorize(msg: str, color: int, dark: bool = False) -> str: