From: Venky Shankar Date: Mon, 17 Jun 2019 09:36:54 +0000 (-0400) Subject: mgr / volumes: maintain connection pool for fs volumes X-Git-Tag: v15.1.0~2189^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5c41e949af9acabd612b0644de0603e374b4b42a;p=ceph.git mgr / volumes: maintain connection pool for fs volumes Right now every [sub]volume call does a connect/disconnect to the cephfs filesystem. This is unnecessary and can be optimized by caching the filesystem handle in a connection pool and (re)using the handle for subsequent [sub]volume operations. This would be useful for implementing features such as purge queue for asynchronous subvolume deletes. Signed-off-by: Venky Shankar --- diff --git a/src/pybind/mgr/volumes/fs/subvolume.py b/src/pybind/mgr/volumes/fs/subvolume.py index 016b00fa69d..ce5142d3784 100644 --- a/src/pybind/mgr/volumes/fs/subvolume.py +++ b/src/pybind/mgr/volumes/fs/subvolume.py @@ -33,11 +33,8 @@ class SubVolume(object): """ - def __init__(self, mgr, fs_name=None): - self.fs = None - self.fs_name = fs_name - self.connected = False - + def __init__(self, mgr, fs_handle): + self.fs = fs_handle self.rados = mgr.rados def _mkdir_p(self, path, mode=0o755): @@ -261,29 +258,8 @@ class SubVolume(object): ### context manager routines - def connect(self): - log.debug("Connecting to cephfs...") - self.fs = cephfs.LibCephFS(rados_inst=self.rados) - log.debug("CephFS initializing...") - self.fs.init() - log.debug("CephFS mounting...") - self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) - log.debug("Connection to cephfs complete") - - def disconnect(self): - log.info("disconnect") - if self.fs: - log.debug("Disconnecting cephfs...") - self.fs.shutdown() - self.fs = None - log.debug("Disconnecting cephfs complete") - def __enter__(self): - self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - self.disconnect() - - def __del__(self): - self.disconnect() + pass diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 8cc7ef26fa0..c1ebe7e3251 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -1,6 +1,14 @@ import json +import time import errno import logging +from threading import Lock +try: + # py2 + from threading import _Timer as Timer +except ImportError: + #py3 + from threading import Timer import cephfs import orchestrator @@ -11,9 +19,147 @@ from .exception import VolumeException log = logging.getLogger(__name__) +class ConnectionPool(object): + class Connection(object): + def __init__(self, mgr, fs_name): + self.fs = None + self.mgr = mgr + self.fs_name = fs_name + self.ops_in_progress = 0 + self.last_used = time.time() + self.fs_id = self.get_fs_id() + + def get_fs_id(self): + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == self.fs_name: + return fs['id'] + raise VolumeException( + -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name)) + + def get_fs_handle(self): + self.last_used = time.time() + self.ops_in_progress += 1 + return self.fs + + def put_fs_handle(self): + assert self.ops_in_progress > 0 + self.ops_in_progress -= 1 + + def del_fs_handle(self): + if self.is_connection_valid(): + self.disconnect() + else: + self.abort() + + def is_connection_valid(self): + fs_id = None + try: + fs_id = self.get_fs_id() + except: + # the filesystem does not exist now -- connection is not valid. + pass + return self.fs_id == fs_id + + def is_connection_idle(self, timeout): + return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) + + def connect(self): + assert self.ops_in_progress == 0 + log.debug("Connecting to cephfs '{0}'".format(self.fs_name)) + self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) + log.debug("CephFS initializing...") + self.fs.init() + log.debug("CephFS mounting...") + self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) + log.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) + + def disconnect(self): + assert self.ops_in_progress == 0 + log.info("disconnecting from cephfs '{0}'".format(self.fs_name)) + self.fs.shutdown() + self.fs = None + + def abort(self): + assert self.ops_in_progress == 0 + log.info("aborting connection from cephfs '{0}'".format(self.fs_name)) + self.fs.abort_conn() + self.fs = None + + class RTimer(Timer): + """ + recurring timer variant of Timer + """ + def run(self): + while not self.finished.is_set(): + self.finished.wait(self.interval) + self.function(*self.args, **self.kwargs) + self.finished.set() + + # TODO: make this configurable + TIMER_TASK_RUN_INTERVAL = 30.0 # seconds + CONNECTION_IDLE_INTERVAL = 60.0 # seconds + + def __init__(self, mgr): + self.mgr = mgr + self.connections = {} + self.lock = Lock() + self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, + self.cleanup_connections) + self.timer_task.start() + + def cleanup_connections(self): + with self.lock: + log.info("scanning for idle connections..") + idle_fs = [fs_name for fs_name,conn in self.connections.iteritems() + if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)] + for fs_name in idle_fs: + log.info("cleaning up connection for '{}'".format(fs_name)) + self._del_fs_handle(fs_name) + + def get_fs_handle(self, fs_name): + with self.lock: + conn = None + try: + conn = self.connections.get(fs_name, None) + if conn: + if conn.is_connection_valid(): + return conn.get_fs_handle() + else: + # filesystem id changed beneath us (or the filesystem does not exist). + # this is possible if the filesystem got removed (and recreated with + # same name) via "ceph fs rm/new" mon command. + log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) + self._del_fs_handle(fs_name) + conn = ConnectionPool.Connection(self.mgr, fs_name) + conn.connect() + except cephfs.Error as e: + # try to provide a better error string if possible + if e.args[0] == errno.ENOENT: + raise VolumeException( + -errno.ENOENT, "Volume '{0}' not found".format(fs_name)) + raise VolumeException(-e.args[0], e.args[1]) + self.connections[fs_name] = conn + return conn.get_fs_handle() + + def put_fs_handle(self, fs_name): + with self.lock: + conn = self.connections.get(fs_name, None) + if conn: + conn.put_fs_handle() + + def _del_fs_handle(self, fs_name): + conn = self.connections.pop(fs_name, None) + if conn: + conn.del_fs_handle() + def del_fs_handle(self, fs_name): + with self.lock: + self._del_fs_handle(fs_name) + class VolumeClient(object): def __init__(self, mgr): self.mgr = mgr + self.connection_pool = ConnectionPool(self.mgr) def gen_pool_names(self, volname): """ @@ -127,6 +273,7 @@ class VolumeClient(object): """ delete the given module (tear down mds, remove filesystem) """ + self.connection_pool.del_fs_handle(volname) # Tear down MDS daemons try: completion = self.mgr.remove_stateless_service("mds", volname) @@ -176,16 +323,51 @@ class VolumeClient(object): except ValueError: raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode)) + def connection_pool_wrap(func): + """ + decorator that wraps subvolume calls by fetching filesystem handle + from the connection pool when fs_handle argument is empty, otherwise + just invoke func with the passed in filesystem handle. Also handles + call made to non-existent volumes (only when fs_handle is empty). + """ + def conn_wrapper(self, fs_handle, **kwargs): + fs_h = fs_handle + fs_name = kwargs['vol_name'] + # note that force arg is available for remove type commands + force = kwargs.get('force', False) + + # fetch the connection from the pool + if not fs_handle: + try: + fs_h = self.connection_pool.get_fs_handle(fs_name) + except VolumeException as ve: + if not force: + return self.volume_exception_to_retval(ve) + return 0, "", "" + + # invoke the actual routine w/ fs handle + result = func(self, fs_h, **kwargs) + + # hand over the connection back to the pool + if fs_h: + self.connection_pool.put_fs_handle(fs_name) + return result + return conn_wrapper + ### subvolume operations - def create_subvolume(self, volname, subvolname, groupname, size, mode='755', pool=None): - ret = 0, "", "" + @connection_pool_wrap + def create_subvolume(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + size = kwargs['size'] + pool = kwargs['pool_layout'] + mode = kwargs['mode'] + try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \ - "volume create` before trying to create subvolumes".format(volname)) - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec(subvolname, groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -196,36 +378,35 @@ class VolumeClient(object): ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume(self, volname, subvolname, groupname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] try: - fs = self.get_fs(volname) - if fs: - with SubVolume(self.mgr, fs_name=volname) as sv: - spec = SubvolumeSpec(subvolname, groupname) - if self.group_exists(sv, spec): - sv.remove_subvolume(spec, force) - sv.purge_subvolume(spec) - elif not force: - raise VolumeException( - -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \ - "subvolume '{1}'".format(groupname, subvolname)) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \ - "'{1}'".format(volname, subvolname)) + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec(subvolname, groupname) + if self.group_exists(sv, spec): + sv.remove_subvolume(spec, force) + sv.purge_subvolume(spec) + elif not force: + raise VolumeException( + -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \ + "subvolume '{1}'".format(groupname, subvolname)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret - def subvolume_getpath(self, volname, subvolname, groupname): - ret = None + @connection_pool_wrap + def subvolume_getpath(self, fs_handle, **kwargs): + ret = None + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found".format(volname)) - - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec(subvolname, groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -241,15 +422,16 @@ class VolumeClient(object): ### subvolume snapshot - def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname): - ret = 0, "", "" - try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \ - "'{1}'".format(volname, snapname)) + @connection_pool_wrap + def create_subvolume_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] - with SubVolume(self.mgr, fs_name=volname) as sv: + try: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec(subvolname, groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -264,76 +446,76 @@ class VolumeClient(object): ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] try: - if self.volume_exists(volname): - with SubVolume(self.mgr, fs_name=volname) as sv: - spec = SubvolumeSpec(subvolname, groupname) - if self.group_exists(sv, spec): - if sv.get_subvolume_path(spec): - sv.remove_subvolume_snapshot(spec, snapname, force) - elif not force: - raise VolumeException( - -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \ - "subvolume snapshot '{1}'".format(subvolname, snapname)) + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec(subvolname, groupname) + if self.group_exists(sv, spec): + if sv.get_subvolume_path(spec): + sv.remove_subvolume_snapshot(spec, snapname, force) elif not force: raise VolumeException( - -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \ - "remove subvolume snapshot '{1}'".format(groupname, snapname)) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \ - "snapshot '{1}'".format(volname, snapname)) + -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \ + "subvolume snapshot '{1}'".format(subvolname, snapname)) + elif not force: + raise VolumeException( + -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \ + "remove subvolume snapshot '{1}'".format(groupname, snapname)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret ### group operations - def create_subvolume_group(self, volname, groupname, mode='755', pool=None): - ret = 0, "", "" - try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \ - "volume create` before trying to create subvolume groups".format(volname)) + @connection_pool_wrap + def create_subvolume_group(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + pool = kwargs['pool_layout'] + mode = kwargs['mode'] + try: # TODO: validate that subvol size fits in volume size - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec("", groupname) sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume_group(self, volname, groupname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume_group(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] try: - if self.volume_exists(volname): - with SubVolume(self.mgr, fs_name=volname) as sv: - # TODO: check whether there are no subvolumes in the group - spec = SubvolumeSpec("", groupname) - sv.remove_group(spec, force) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \ - "group '{0}'".format(volname, groupname)) + with SubVolume(self.mgr, fs_handle) as sv: + # TODO: check whether there are no subvolumes in the group + spec = SubvolumeSpec("", groupname) + sv.remove_group(spec, force) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret ### group snapshot - def create_subvolume_group_snapshot(self, volname, groupname, snapname): - ret = 0, "", "" + @connection_pool_wrap + def create_subvolume_group_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + snapname = kwargs['snap_name'] try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \ - "'{1}'".format(volname, snapname)) - - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec("", groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -344,22 +526,22 @@ class VolumeClient(object): ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume_group_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + snapname = kwargs['snap_name'] + force = kwargs['force'] try: - if self.volume_exists(volname): - with SubVolume(self.mgr, fs_name=volname) as sv: - spec = SubvolumeSpec("", groupname) - if self.group_exists(sv, spec): - sv.remove_group_snapshot(spec, snapname, force) - elif not force: - raise VolumeException( - -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \ - "remove it".format(groupname)) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \ - "snapshot '{1}'".format(volname, snapname)) + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec("", groupname) + if self.group_exists(sv, spec): + sv.remove_group_snapshot(spec, snapname, force) + elif not force: + raise VolumeException( + -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \ + "remove it".format(groupname)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 60d78a96495..167b7e0044b 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -199,82 +199,63 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - pool_layout = cmd.get('pool_layout', None) - mode = cmd.get('mode', '755') - - return self.vc.create_subvolume_group(vol_name, group_name, mode=mode, pool=pool_layout) + return self.vc.create_subvolume_group( + None, vol_name=cmd['vol_name'], group_name=cmd['group_name'], + pool_layout=cmd.get('pool_layout', None), mode=cmd.get('mode', '755')) def _cmd_fs_subvolumegroup_rm(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - force = cmd.get('force', False) - - return self.vc.remove_subvolume_group(vol_name, group_name, force) + return self.vc.remove_subvolume_group(None, vol_name=cmd['vol_name'], + group_name=cmd['group_name'], + force=cmd.get('force', False)) def _cmd_fs_subvolume_create(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - size = cmd.get('size', None) - group_name = cmd.get('group_name', None) - pool_layout = cmd.get('pool_layout', None) - mode = cmd.get('mode', '755') - - return self.vc.create_subvolume(vol_name, sub_name, group_name, size, mode=mode, pool=pool_layout) + return self.vc.create_subvolume(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + group_name=cmd.get('group_name', None), + size=cmd.get('size', None), + pool_layout=cmd.get('pool_layout', None), + mode=cmd.get('mode', '755')) def _cmd_fs_subvolume_rm(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - force = cmd.get('force', False) - group_name = cmd.get('group_name', None) - - return self.vc.remove_subvolume(vol_name, sub_name, group_name, force) + return self.vc.remove_subvolume(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + group_name=cmd.get('group_name', None), + force=cmd.get('force', False)) def _cmd_fs_subvolume_getpath(self, inbuf, cmd): - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - group_name = cmd.get('group_name', None) - - return self.vc.subvolume_getpath(vol_name, sub_name, group_name) + return self.vc.subvolume_getpath(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + group_name=cmd.get('group_name', None)) def _cmd_fs_subvolumegroup_snapshot_create(self, inbuf, cmd): - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - snap_name = cmd['snap_name'] - - return self.vc.create_subvolume_group_snapshot(vol_name, group_name, snap_name) + return self.vc.create_subvolume_group_snapshot(None, vol_name=cmd['vol_name'], + group_name=cmd['group_name'], + snap_name=cmd['snap_name']) def _cmd_fs_subvolumegroup_snapshot_rm(self, inbuf, cmd): - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - snap_name = cmd['snap_name'] - force = cmd.get('force', False) - - return self.vc.remove_subvolume_group_snapshot(vol_name, group_name, snap_name, force) + return self.vc.remove_subvolume_group_snapshot(None, vol_name=cmd['vol_name'], + group_name=cmd['group_name'], + snap_name=cmd['snap_name'], + force=cmd.get('force', False)) def _cmd_fs_subvolume_snapshot_create(self, inbuf, cmd): - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - snap_name = cmd['snap_name'] - group_name = cmd.get('group_name', None) - - return self.vc.create_subvolume_snapshot(vol_name, sub_name, snap_name, group_name) + return self.vc.create_subvolume_snapshot(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + snap_name=cmd['snap_name'], + group_name=cmd.get('group_name', None)) def _cmd_fs_subvolume_snapshot_rm(self, inbuf, cmd): - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - snap_name = cmd['snap_name'] - force = cmd.get('force', False) - group_name = cmd.get('group_name', None) - - return self.vc.remove_subvolume_snapshot(vol_name, sub_name, snap_name, group_name, force) + return self.vc.remove_subvolume_snapshot(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + snap_name=cmd['snap_name'], + group_name=cmd.get('group_name', None), + force=cmd.get('force', False))