From: Jan Fajerski Date: Wed, 18 Dec 2019 10:35:40 +0000 (+0100) Subject: mgr_util: add CephfsClient implementation X-Git-Tag: v15.2.17~108^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=36a1ac6f60da9bbbd89c3b685820a9840afe2c5f;p=ceph.git mgr_util: add CephfsClient implementation This pulls parts of the VolumesClient implementation into mgr_util to make the CephFS specific pieces available to other mgr modules. To reduce code duplication the VolumeClient now extends the CephfsClient class to add the volume specific methods. Signed-off-by: Jan Fajerski (cherry picked from commit a44de38b61d598fb0512ea48da0de4179d39b804) src/pybind/mgr/mgr_util.py src/pybind/mgr/tox.ini src/pybind/mgr/volumes/fs/operations/volume.py src/pybind/mgr/volumes/fs/volume.py Trivial conflicts because ofthe order of backports to octopus --- diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index df6486455d862..8f61c85b1b7c6 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -1,10 +1,19 @@ +import cephfs import contextlib import datetime +import errno import os import socket -import logging import time +import logging from functools import wraps +import sys +from threading import Lock, Condition, Event +from typing import no_type_check +if sys.version_info >= (3, 3): + from threading import Timer +else: + from threading import _Timer as Timer try: from typing import Tuple, Any, Callable @@ -31,6 +40,228 @@ UNDERLINE_SEQ = "\033[4m" logger = logging.getLogger(__name__) +class CephfsConnectionException(Exception): + def __init__(self, error_code, error_message): + self.errno = error_code + self.error_str = error_message + + def to_tuple(self): + return self.errno, "", self.error_str + + def __str__(self): + return "{0} ({1})".format(self.errno, self.error_str) + + +class ConnectionPool(object): + class Connection(object): + def __init__(self, mgr, fs_name): + self.fs = None + self.mgr = mgr + self.fs_name = fs_name + self.ops_in_progress = 0 + self.last_used = time.time() + self.fs_id = self.get_fs_id() + + def get_fs_id(self): + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == self.fs_name: + return fs['id'] + raise CephfsConnectionException( + -errno.ENOENT, "FS '{0}' not found".format(self.fs_name)) + + def get_fs_handle(self): + self.last_used = time.time() + self.ops_in_progress += 1 + return self.fs + + def put_fs_handle(self, notify): + assert self.ops_in_progress > 0 + self.ops_in_progress -= 1 + if self.ops_in_progress == 0: + notify() + + def del_fs_handle(self, waiter): + if waiter: + while self.ops_in_progress != 0: + waiter() + if self.is_connection_valid(): + self.disconnect() + else: + self.abort() + + def is_connection_valid(self): + fs_id = None + try: + fs_id = self.get_fs_id() + except: + # the filesystem does not exist now -- connection is not valid. + pass + logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id)) + return self.fs_id == fs_id + + def is_connection_idle(self, timeout): + return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) + + def connect(self): + assert self.ops_in_progress == 0 + logger.debug("Connecting to cephfs '{0}'".format(self.fs_name)) + self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) + logger.debug("Setting user ID and group ID of CephFS mount as root...") + self.fs.conf_set("client_mount_uid", "0") + self.fs.conf_set("client_mount_gid", "0") + logger.debug("CephFS initializing...") + self.fs.init() + logger.debug("CephFS mounting...") + self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) + logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) + self.mgr._ceph_register_client(self.fs.get_addrs()) + + def disconnect(self): + try: + assert self.fs + assert self.ops_in_progress == 0 + logger.info("disconnecting from cephfs '{0}'".format(self.fs_name)) + addrs = self.fs.get_addrs() + self.fs.shutdown() + self.mgr._ceph_unregister_client(addrs) + self.fs = None + except Exception as e: + logger.debug("disconnect: ({0})".format(e)) + raise + + def abort(self): + assert self.fs + assert self.ops_in_progress == 0 + logger.info("aborting connection from cephfs '{0}'".format(self.fs_name)) + self.fs.abort_conn() + logger.info("abort done from cephfs '{0}'".format(self.fs_name)) + self.fs = None + + class RTimer(Timer): + """ + recurring timer variant of Timer + """ + @no_type_check + def run(self): + try: + while not self.finished.is_set(): + self.finished.wait(self.interval) + self.function(*self.args, **self.kwargs) + self.finished.set() + except Exception as e: + logger.error("ConnectionPool.RTimer: %s", e) + raise + + # TODO: make this configurable + TIMER_TASK_RUN_INTERVAL = 30.0 # seconds + CONNECTION_IDLE_INTERVAL = 60.0 # seconds + + def __init__(self, mgr): + self.mgr = mgr + self.connections = {} + self.lock = Lock() + self.cond = Condition(self.lock) + self.timer_task = ConnectionPool.RTimer( + ConnectionPool.TIMER_TASK_RUN_INTERVAL, + self.cleanup_connections) + self.timer_task.start() + + def cleanup_connections(self): + with self.lock: + logger.info("scanning for idle connections..") + idle_fs = [fs_name for fs_name, conn in self.connections.items() + if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)] + for fs_name in idle_fs: + logger.info("cleaning up connection for '{}'".format(fs_name)) + self._del_fs_handle(fs_name) + + def get_fs_handle(self, fs_name): + with self.lock: + conn = None + try: + conn = self.connections.get(fs_name, None) + if conn: + if conn.is_connection_valid(): + return conn.get_fs_handle() + else: + # filesystem id changed beneath us (or the filesystem does not exist). + # this is possible if the filesystem got removed (and recreated with + # same name) via "ceph fs rm/new" mon command. + logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) + self._del_fs_handle(fs_name) + conn = ConnectionPool.Connection(self.mgr, fs_name) + conn.connect() + except cephfs.Error as e: + # try to provide a better error string if possible + if e.args[0] == errno.ENOENT: + raise CephfsConnectionException( + -errno.ENOENT, "FS '{0}' not found".format(fs_name)) + raise CephfsConnectionException(-e.args[0], e.args[1]) + self.connections[fs_name] = conn + return conn.get_fs_handle() + + def put_fs_handle(self, fs_name): + with self.lock: + conn = self.connections.get(fs_name, None) + if conn: + conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) + + def _del_fs_handle(self, fs_name, wait=False): + conn = self.connections.pop(fs_name, None) + if conn: + conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) + + def del_fs_handle(self, fs_name, wait=False): + with self.lock: + self._del_fs_handle(fs_name, wait) + + def del_all_handles(self): + with self.lock: + for fs_name in list(self.connections.keys()): + logger.info("waiting for pending ops for '{}'".format(fs_name)) + self._del_fs_handle(fs_name, wait=True) + logger.info("pending ops completed for '{}'".format(fs_name)) + # no new connections should have been initialized since its + # guarded on shutdown. + assert len(self.connections) == 0 + + +class CephfsClient(object): + def __init__(self, mgr): + self.mgr = mgr + self.stopping = Event() + self.connection_pool = ConnectionPool(self.mgr) + + def shutdown(self): + logger.info("shutting down") + # first, note that we're shutting down + self.stopping.set() + # second, delete all libcephfs handles from connection pool + self.connection_pool.del_all_handles() + + +@contextlib.contextmanager +def open_filesystem(fsc, fs_name): + """ + Open a volume with shared access. + This API is to be used as a context manager. + + :param fsc: cephfs client instance + :param fs_name: fs name + :return: yields a fs handle (ceph filesystem handle) + """ + if fsc.is_stopping(): + raise CephfsConnectionException(-errno.ESHUTDOWN, + "shutdown in progress") + + fs_handle = fsc.connection_pool.get_fs_handle(fs_name) + try: + yield fs_handle + finally: + fsc.connection_pool.put_fs_handle(fs_name) + + def colorize(msg, color, dark=False): """ Decorate `msg` with escape sequences to give the requested color diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 3e129ba64eb16..aae21ed6b9704 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -32,7 +32,7 @@ setenv = LD_LIBRARY_PATH = ../../../build/lib deps = cython - -rrequirements.txt + -r requirements.txt commands = pytest --cov --cov-append --cov-report= --doctest-modules {posargs: \ mgr_util.py \ @@ -46,7 +46,7 @@ commands = basepython = python3 deps = cython - -rrequirements.txt + -r requirements.txt mypy==0.770 commands = mypy --config-file=../../mypy.ini \ diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py index d8eecba7fe8b6..b688617937b7a 100644 --- a/src/pybind/mgr/volumes/fs/operations/volume.py +++ b/src/pybind/mgr/volumes/fs/operations/volume.py @@ -1,4 +1,3 @@ -import time import errno import logging import sys @@ -6,196 +5,17 @@ import sys from typing import List from contextlib import contextmanager -from threading import Lock, Condition -from typing import no_type_check -if sys.version_info >= (3, 3): - from threading import Timer -else: - from threading import _Timer as Timer - -import cephfs import orchestrator from .lock import GlobalLock from ..exception import VolumeException from ..fs_util import create_pool, remove_pool, create_filesystem, \ remove_filesystem, create_mds, volume_exists +from mgr_util import open_filesystem log = logging.getLogger(__name__) -class ConnectionPool(object): - class Connection(object): - def __init__(self, mgr, fs_name): - self.fs = None - self.mgr = mgr - self.fs_name = fs_name - self.ops_in_progress = 0 - self.last_used = time.time() - self.fs_id = self.get_fs_id() - - def get_fs_id(self): - fs_map = self.mgr.get('fs_map') - for fs in fs_map['filesystems']: - if fs['mdsmap']['fs_name'] == self.fs_name: - return fs['id'] - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name)) - - def get_fs_handle(self): - self.last_used = time.time() - self.ops_in_progress += 1 - return self.fs - - def put_fs_handle(self, notify): - assert self.ops_in_progress > 0 - self.ops_in_progress -= 1 - if self.ops_in_progress == 0: - notify() - - def del_fs_handle(self, waiter): - if waiter: - while self.ops_in_progress != 0: - waiter() - if self.is_connection_valid(): - self.disconnect() - else: - self.abort() - - def is_connection_valid(self): - fs_id = None - try: - fs_id = self.get_fs_id() - except: - # the filesystem does not exist now -- connection is not valid. - pass - log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id)) - return self.fs_id == fs_id - - def is_connection_idle(self, timeout): - return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) - - def connect(self): - assert self.ops_in_progress == 0 - log.debug("Connecting to cephfs '{0}'".format(self.fs_name)) - self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) - log.debug("Setting user ID and group ID of CephFS mount as root...") - self.fs.conf_set("client_mount_uid", "0") - self.fs.conf_set("client_mount_gid", "0") - log.debug("CephFS initializing...") - self.fs.init() - log.debug("CephFS mounting...") - self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) - log.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) - self.mgr._ceph_register_client(self.fs.get_addrs()) - - def disconnect(self): - try: - assert self.fs - assert self.ops_in_progress == 0 - log.info("disconnecting from cephfs '{0}'".format(self.fs_name)) - addrs = self.fs.get_addrs() - self.fs.shutdown() - self.mgr._ceph_unregister_client(addrs) - self.fs = None - except Exception as e: - log.debug("disconnect: ({0})".format(e)) - raise - - def abort(self): - assert self.fs - assert self.ops_in_progress == 0 - log.info("aborting connection from cephfs '{0}'".format(self.fs_name)) - self.fs.abort_conn() - log.info("abort done from cephfs '{0}'".format(self.fs_name)) - self.fs = None - - class RTimer(Timer): - """ - recurring timer variant of Timer - """ - @no_type_check - def run(self): - try: - while not self.finished.is_set(): - self.finished.wait(self.interval) - self.function(*self.args, **self.kwargs) - self.finished.set() - except Exception as e: - log.error("ConnectionPool.RTimer: %s", e) - raise - - # TODO: make this configurable - TIMER_TASK_RUN_INTERVAL = 30.0 # seconds - CONNECTION_IDLE_INTERVAL = 60.0 # seconds - - def __init__(self, mgr): - self.mgr = mgr - self.connections = {} - self.lock = Lock() - self.cond = Condition(self.lock) - self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, - self.cleanup_connections) - self.timer_task.start() - - def cleanup_connections(self): - with self.lock: - log.info("scanning for idle connections..") - idle_fs = [fs_name for fs_name,conn in self.connections.items() - if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)] - for fs_name in idle_fs: - log.info("cleaning up connection for '{}'".format(fs_name)) - self._del_fs_handle(fs_name) - - def get_fs_handle(self, fs_name): - with self.lock: - conn = None - try: - conn = self.connections.get(fs_name, None) - if conn: - if conn.is_connection_valid(): - return conn.get_fs_handle() - else: - # filesystem id changed beneath us (or the filesystem does not exist). - # this is possible if the filesystem got removed (and recreated with - # same name) via "ceph fs rm/new" mon command. - log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) - self._del_fs_handle(fs_name) - conn = ConnectionPool.Connection(self.mgr, fs_name) - conn.connect() - except cephfs.Error as e: - # try to provide a better error string if possible - if e.args[0] == errno.ENOENT: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found".format(fs_name)) - raise VolumeException(-e.args[0], e.args[1]) - self.connections[fs_name] = conn - return conn.get_fs_handle() - - def put_fs_handle(self, fs_name): - with self.lock: - conn = self.connections.get(fs_name, None) - if conn: - conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) - - def _del_fs_handle(self, fs_name, wait=False): - conn = self.connections.pop(fs_name, None) - if conn: - conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) - - def del_fs_handle(self, fs_name, wait=False): - with self.lock: - self._del_fs_handle(fs_name, wait) - - def del_all_handles(self): - with self.lock: - for fs_name in list(self.connections.keys()): - log.info("waiting for pending ops for '{}'".format(fs_name)) - self._del_fs_handle(fs_name, wait=True) - log.info("pending ops completed for '{}'".format(fs_name)) - # no new connections should have been initialized since its - # guarded on shutdown. - assert len(self.connections) == 0 def gen_pool_names(volname): """ @@ -260,6 +80,7 @@ def create_volume(mgr, volname, placement): # create mds return create_mds(mgr, volname, placement) + def delete_volume(mgr, volname, metadata_pool, data_pools): """ delete the given module (tear down mds, remove filesystem, remove pools) @@ -298,6 +119,7 @@ def delete_volume(mgr, volname, metadata_pool, data_pools): result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools)) return r, result_str, "" + def list_volumes(mgr): """ list all filesystem volumes. @@ -311,40 +133,32 @@ def list_volumes(mgr): result.append({'name': f['mdsmap']['fs_name']}) return result + @contextmanager def open_volume(vc, volname): """ - open a volume for exclusive access. This API is to be used as a context manager. + open a volume for exclusive access. This API is to be used as a contextr + manager. :param vc: volume client instance :param volname: volume name :return: yields a volume handle (ceph filesystem handle) """ - if vc.is_stopping(): - raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress") - g_lock = GlobalLock() - fs_handle = vc.connection_pool.get_fs_handle(volname) - try: - with g_lock.lock_op(): + with g_lock.lock_op(): + with open_filesystem(vc, volname) as fs_handle: yield fs_handle - finally: - vc.connection_pool.put_fs_handle(volname) + @contextmanager def open_volume_lockless(vc, volname): """ - open a volume with shared access. This API is to be used as a context manager. + open a volume with shared access. This API is to be used as a context + manager. :param vc: volume client instance :param volname: volume name :return: yields a volume handle (ceph filesystem handle) """ - if vc.is_stopping(): - raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress") - - fs_handle = vc.connection_pool.get_fs_handle(volname) - try: + with open_filesystem(vc, volname) as fs_handle: yield fs_handle - finally: - vc.connection_pool.put_fs_handle(volname) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 9182318477817..4128ecc163af1 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -1,14 +1,13 @@ import json import errno import logging -from threading import Event -import cephfs +from mgr_util import CephfsClient from .fs_util import listdir -from .operations.volume import ConnectionPool, open_volume, create_volume, \ - delete_volume, list_volumes, get_pool_names +from .operations.volume import create_volume, \ + delete_volume, list_volumes, open_volume, get_pool_names from .operations.group import open_group, create_group, remove_group, open_group_unique from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ create_clone @@ -30,6 +29,7 @@ def octal_str_to_decimal_int(mode): except ValueError: raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode)) + def name_to_json(names): """ convert the list of names to json @@ -39,13 +39,13 @@ def name_to_json(names): namedict.append({'name': names[i].decode('utf-8')}) return json.dumps(namedict, indent=4, sort_keys=True) -class VolumeClient(object): + +class VolumeClient(CephfsClient): def __init__(self, mgr): - self.mgr = mgr - self.stopping = Event() + super().__init__(mgr) # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) - self.connection_pool = ConnectionPool(self.mgr) + # TODO: make thread pool size configurable self.cloner = Cloner(self, self.mgr.max_concurrent_clones) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart @@ -62,6 +62,7 @@ class VolumeClient(object): return self.stopping.is_set() def shutdown(self): + # Overrides CephfsClient.shutdown() log.info("shutting down") # first, note that we're shutting down self.stopping.set()