]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr_util: add CephfsClient implementation
authorJan Fajerski <jfajerski@suse.com>
Wed, 18 Dec 2019 10:35:40 +0000 (11:35 +0100)
committerKotresh HR <khiremat@redhat.com>
Mon, 28 Feb 2022 11:53:47 +0000 (17:23 +0530)
This pulls parts of the VolumesClient implementation into mgr_util to
make the CephFS specific pieces available to other mgr modules. To
reduce code duplication the VolumeClient now extends the CephfsClient
class to add the volume specific methods.

Signed-off-by: Jan Fajerski <jfajerski@suse.com>
(cherry picked from commit a44de38b61d598fb0512ea48da0de4179d39b804)

src/pybind/mgr/mgr_util.py
src/pybind/mgr/tox.ini
src/pybind/mgr/volumes/fs/operations/volume.py
src/pybind/mgr/volumes/fs/volume.py
  Trivial conflicts because ofthe order of backports to octopus

src/pybind/mgr/mgr_util.py
src/pybind/mgr/tox.ini
src/pybind/mgr/volumes/fs/operations/volume.py
src/pybind/mgr/volumes/fs/volume.py

index df6486455d86289081cbef4d0d4d7c916b6be9a7..8f61c85b1b7c6ec4e81209a2498ffc8e27b43701 100644 (file)
@@ -1,10 +1,19 @@
+import cephfs
 import contextlib
 import datetime
+import errno
 import os
 import socket
-import logging
 import time
+import logging
 from functools import wraps
+import sys
+from threading import Lock, Condition, Event
+from typing import no_type_check
+if sys.version_info >= (3, 3):
+    from threading import Timer
+else:
+    from threading import _Timer as Timer
 
 try:
     from typing import Tuple, Any, Callable
@@ -31,6 +40,228 @@ UNDERLINE_SEQ = "\033[4m"
 logger = logging.getLogger(__name__)
 
 
+class CephfsConnectionException(Exception):
+    def __init__(self, error_code, error_message):
+        self.errno = error_code
+        self.error_str = error_message
+
+    def to_tuple(self):
+        return self.errno, "", self.error_str
+
+    def __str__(self):
+        return "{0} ({1})".format(self.errno, self.error_str)
+
+
+class ConnectionPool(object):
+    class Connection(object):
+        def __init__(self, mgr, fs_name):
+            self.fs = None
+            self.mgr = mgr
+            self.fs_name = fs_name
+            self.ops_in_progress = 0
+            self.last_used = time.time()
+            self.fs_id = self.get_fs_id()
+
+        def get_fs_id(self):
+            fs_map = self.mgr.get('fs_map')
+            for fs in fs_map['filesystems']:
+                if fs['mdsmap']['fs_name'] == self.fs_name:
+                    return fs['id']
+            raise CephfsConnectionException(
+                -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
+
+        def get_fs_handle(self):
+            self.last_used = time.time()
+            self.ops_in_progress += 1
+            return self.fs
+
+        def put_fs_handle(self, notify):
+            assert self.ops_in_progress > 0
+            self.ops_in_progress -= 1
+            if self.ops_in_progress == 0:
+                notify()
+
+        def del_fs_handle(self, waiter):
+            if waiter:
+                while self.ops_in_progress != 0:
+                    waiter()
+            if self.is_connection_valid():
+                self.disconnect()
+            else:
+                self.abort()
+
+        def is_connection_valid(self):
+            fs_id = None
+            try:
+                fs_id = self.get_fs_id()
+            except:
+                # the filesystem does not exist now -- connection is not valid.
+                pass
+            logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
+            return self.fs_id == fs_id
+
+        def is_connection_idle(self, timeout):
+            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+        def connect(self):
+            assert self.ops_in_progress == 0
+            logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+            logger.debug("Setting user ID and group ID of CephFS mount as root...")
+            self.fs.conf_set("client_mount_uid", "0")
+            self.fs.conf_set("client_mount_gid", "0")
+            logger.debug("CephFS initializing...")
+            self.fs.init()
+            logger.debug("CephFS mounting...")
+            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+            logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+            self.mgr._ceph_register_client(self.fs.get_addrs())
+
+        def disconnect(self):
+            try:
+                assert self.fs
+                assert self.ops_in_progress == 0
+                logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+                addrs = self.fs.get_addrs()
+                self.fs.shutdown()
+                self.mgr._ceph_unregister_client(addrs)
+                self.fs = None
+            except Exception as e:
+                logger.debug("disconnect: ({0})".format(e))
+                raise
+
+        def abort(self):
+            assert self.fs
+            assert self.ops_in_progress == 0
+            logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+            self.fs.abort_conn()
+            logger.info("abort done from cephfs '{0}'".format(self.fs_name))
+            self.fs = None
+
+    class RTimer(Timer):
+        """
+        recurring timer variant of Timer
+        """
+        @no_type_check
+        def run(self):
+            try:
+                while not self.finished.is_set():
+                    self.finished.wait(self.interval)
+                    self.function(*self.args, **self.kwargs)
+                self.finished.set()
+            except Exception as e:
+                logger.error("ConnectionPool.RTimer: %s", e)
+                raise
+
+    # TODO: make this configurable
+    TIMER_TASK_RUN_INTERVAL = 30.0   # seconds
+    CONNECTION_IDLE_INTERVAL = 60.0  # seconds
+
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.connections = {}
+        self.lock = Lock()
+        self.cond = Condition(self.lock)
+        self.timer_task = ConnectionPool.RTimer(
+            ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+            self.cleanup_connections)
+        self.timer_task.start()
+
+    def cleanup_connections(self):
+        with self.lock:
+            logger.info("scanning for idle connections..")
+            idle_fs = [fs_name for fs_name, conn in self.connections.items()
+                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+            for fs_name in idle_fs:
+                logger.info("cleaning up connection for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name)
+
+    def get_fs_handle(self, fs_name):
+        with self.lock:
+            conn = None
+            try:
+                conn = self.connections.get(fs_name, None)
+                if conn:
+                    if conn.is_connection_valid():
+                        return conn.get_fs_handle()
+                    else:
+                        # filesystem id changed beneath us (or the filesystem does not exist).
+                        # this is possible if the filesystem got removed (and recreated with
+                        # same name) via "ceph fs rm/new" mon command.
+                        logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+                        self._del_fs_handle(fs_name)
+                conn = ConnectionPool.Connection(self.mgr, fs_name)
+                conn.connect()
+            except cephfs.Error as e:
+                # try to provide a better error string if possible
+                if e.args[0] == errno.ENOENT:
+                    raise CephfsConnectionException(
+                        -errno.ENOENT, "FS '{0}' not found".format(fs_name))
+                raise CephfsConnectionException(-e.args[0], e.args[1])
+            self.connections[fs_name] = conn
+            return conn.get_fs_handle()
+
+    def put_fs_handle(self, fs_name):
+        with self.lock:
+            conn = self.connections.get(fs_name, None)
+            if conn:
+                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+
+    def _del_fs_handle(self, fs_name, wait=False):
+        conn = self.connections.pop(fs_name, None)
+        if conn:
+            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+    def del_fs_handle(self, fs_name, wait=False):
+        with self.lock:
+            self._del_fs_handle(fs_name, wait)
+
+    def del_all_handles(self):
+        with self.lock:
+            for fs_name in list(self.connections.keys()):
+                logger.info("waiting for pending ops for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name, wait=True)
+                logger.info("pending ops completed for '{}'".format(fs_name))
+            # no new connections should have been initialized since its
+            # guarded on shutdown.
+            assert len(self.connections) == 0
+
+
+class CephfsClient(object):
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.stopping = Event()
+        self.connection_pool = ConnectionPool(self.mgr)
+
+    def shutdown(self):
+        logger.info("shutting down")
+        # first, note that we're shutting down
+        self.stopping.set()
+        # second, delete all libcephfs handles from connection pool
+        self.connection_pool.del_all_handles()
+
+
+@contextlib.contextmanager
+def open_filesystem(fsc, fs_name):
+    """
+    Open a volume with shared access.
+    This API is to be used as a context manager.
+
+    :param fsc: cephfs client instance
+    :param fs_name: fs name
+    :return: yields a fs handle (ceph filesystem handle)
+    """
+    if fsc.is_stopping():
+        raise CephfsConnectionException(-errno.ESHUTDOWN,
+                                        "shutdown in progress")
+
+    fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
+    try:
+        yield fs_handle
+    finally:
+        fsc.connection_pool.put_fs_handle(fs_name)
+
+
 def colorize(msg, color, dark=False):
     """
     Decorate `msg` with escape sequences to give the requested color
index 3e129ba64eb16a422358678aa3e1fd42f52bdc69..aae21ed6b970419a60b871faf68531ab9f29043d 100644 (file)
@@ -32,7 +32,7 @@ setenv =
     LD_LIBRARY_PATH = ../../../build/lib
 deps =
     cython
-    -rrequirements.txt
+    -r requirements.txt
 commands =
     pytest --cov --cov-append --cov-report= --doctest-modules {posargs: \
         mgr_util.py \
@@ -46,7 +46,7 @@ commands =
 basepython = python3
 deps =
     cython
-    -rrequirements.txt
+    -r requirements.txt
     mypy==0.770
 commands =
     mypy --config-file=../../mypy.ini \
index d8eecba7fe8b68e51c936a1683fc44ce97dcf0e8..b688617937b7a05f300a1c0983c5fc985e918238 100644 (file)
@@ -1,4 +1,3 @@
-import time
 import errno
 import logging
 import sys
@@ -6,196 +5,17 @@ import sys
 from typing import List
 
 from contextlib import contextmanager
-from threading import Lock, Condition
-from typing import no_type_check
 
-if sys.version_info >= (3, 3):
-    from threading import Timer
-else:
-    from threading import _Timer as Timer
-
-import cephfs
 import orchestrator
 
 from .lock import GlobalLock
 from ..exception import VolumeException
 from ..fs_util import create_pool, remove_pool, create_filesystem, \
     remove_filesystem, create_mds, volume_exists
+from mgr_util import open_filesystem
 
 log = logging.getLogger(__name__)
 
-class ConnectionPool(object):
-    class Connection(object):
-        def __init__(self, mgr, fs_name):
-            self.fs = None
-            self.mgr = mgr
-            self.fs_name = fs_name
-            self.ops_in_progress = 0
-            self.last_used = time.time()
-            self.fs_id = self.get_fs_id()
-
-        def get_fs_id(self):
-            fs_map = self.mgr.get('fs_map')
-            for fs in fs_map['filesystems']:
-                if fs['mdsmap']['fs_name'] == self.fs_name:
-                    return fs['id']
-            raise VolumeException(
-                -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
-
-        def get_fs_handle(self):
-            self.last_used = time.time()
-            self.ops_in_progress += 1
-            return self.fs
-
-        def put_fs_handle(self, notify):
-            assert self.ops_in_progress > 0
-            self.ops_in_progress -= 1
-            if self.ops_in_progress == 0:
-                notify()
-
-        def del_fs_handle(self, waiter):
-            if waiter:
-                while self.ops_in_progress != 0:
-                    waiter()
-            if self.is_connection_valid():
-                self.disconnect()
-            else:
-                self.abort()
-
-        def is_connection_valid(self):
-            fs_id = None
-            try:
-                fs_id = self.get_fs_id()
-            except:
-                # the filesystem does not exist now -- connection is not valid.
-                pass
-            log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
-            return self.fs_id == fs_id
-
-        def is_connection_idle(self, timeout):
-            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
-
-        def connect(self):
-            assert self.ops_in_progress == 0
-            log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
-            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
-            log.debug("Setting user ID and group ID of CephFS mount as root...")
-            self.fs.conf_set("client_mount_uid", "0")
-            self.fs.conf_set("client_mount_gid", "0")
-            log.debug("CephFS initializing...")
-            self.fs.init()
-            log.debug("CephFS mounting...")
-            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
-            log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
-            self.mgr._ceph_register_client(self.fs.get_addrs())
-
-        def disconnect(self):
-            try:
-                assert self.fs
-                assert self.ops_in_progress == 0
-                log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
-                addrs = self.fs.get_addrs()
-                self.fs.shutdown()
-                self.mgr._ceph_unregister_client(addrs)
-                self.fs = None
-            except Exception as e:
-                log.debug("disconnect: ({0})".format(e))
-                raise
-
-        def abort(self):
-            assert self.fs
-            assert self.ops_in_progress == 0
-            log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
-            self.fs.abort_conn()
-            log.info("abort done from cephfs '{0}'".format(self.fs_name))
-            self.fs = None
-
-    class RTimer(Timer):
-        """
-        recurring timer variant of Timer
-        """
-        @no_type_check
-        def run(self):
-            try:
-                while not self.finished.is_set():
-                    self.finished.wait(self.interval)
-                    self.function(*self.args, **self.kwargs)
-                self.finished.set()
-            except Exception as e:
-                log.error("ConnectionPool.RTimer: %s", e)
-                raise
-
-    # TODO: make this configurable
-    TIMER_TASK_RUN_INTERVAL = 30.0  # seconds
-    CONNECTION_IDLE_INTERVAL = 60.0 # seconds
-
-    def __init__(self, mgr):
-        self.mgr = mgr
-        self.connections = {}
-        self.lock = Lock()
-        self.cond = Condition(self.lock)
-        self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
-                                                self.cleanup_connections)
-        self.timer_task.start()
-
-    def cleanup_connections(self):
-        with self.lock:
-            log.info("scanning for idle connections..")
-            idle_fs = [fs_name for fs_name,conn in self.connections.items()
-                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
-            for fs_name in idle_fs:
-                log.info("cleaning up connection for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name)
-
-    def get_fs_handle(self, fs_name):
-        with self.lock:
-            conn = None
-            try:
-                conn = self.connections.get(fs_name, None)
-                if conn:
-                    if conn.is_connection_valid():
-                        return conn.get_fs_handle()
-                    else:
-                        # filesystem id changed beneath us (or the filesystem does not exist).
-                        # this is possible if the filesystem got removed (and recreated with
-                        # same name) via "ceph fs rm/new" mon command.
-                        log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
-                        self._del_fs_handle(fs_name)
-                conn = ConnectionPool.Connection(self.mgr, fs_name)
-                conn.connect()
-            except cephfs.Error as e:
-                # try to provide a better error string if possible
-                if e.args[0] == errno.ENOENT:
-                    raise VolumeException(
-                        -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
-                raise VolumeException(-e.args[0], e.args[1])
-            self.connections[fs_name] = conn
-            return conn.get_fs_handle()
-
-    def put_fs_handle(self, fs_name):
-        with self.lock:
-            conn = self.connections.get(fs_name, None)
-            if conn:
-                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
-
-    def _del_fs_handle(self, fs_name, wait=False):
-        conn = self.connections.pop(fs_name, None)
-        if conn:
-            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
-
-    def del_fs_handle(self, fs_name, wait=False):
-        with self.lock:
-            self._del_fs_handle(fs_name, wait)
-
-    def del_all_handles(self):
-        with self.lock:
-            for fs_name in list(self.connections.keys()):
-                log.info("waiting for pending ops for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name, wait=True)
-                log.info("pending ops completed for '{}'".format(fs_name))
-            # no new connections should have been initialized since its
-            # guarded on shutdown.
-            assert len(self.connections) == 0
 
 def gen_pool_names(volname):
     """
@@ -260,6 +80,7 @@ def create_volume(mgr, volname, placement):
     # create mds
     return create_mds(mgr, volname, placement)
 
+
 def delete_volume(mgr, volname, metadata_pool, data_pools):
     """
     delete the given module (tear down mds, remove filesystem, remove pools)
@@ -298,6 +119,7 @@ def delete_volume(mgr, volname, metadata_pool, data_pools):
     result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
     return r, result_str, ""
 
+
 def list_volumes(mgr):
     """
     list all filesystem volumes.
@@ -311,40 +133,32 @@ def list_volumes(mgr):
         result.append({'name': f['mdsmap']['fs_name']})
     return result
 
+
 @contextmanager
 def open_volume(vc, volname):
     """
-    open a volume for exclusive access. This API is to be used as a context manager.
+    open a volume for exclusive access. This API is to be used as a contextr
+    manager.
 
     :param vc: volume client instance
     :param volname: volume name
     :return: yields a volume handle (ceph filesystem handle)
     """
-    if vc.is_stopping():
-        raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
-
     g_lock = GlobalLock()
-    fs_handle = vc.connection_pool.get_fs_handle(volname)
-    try:
-        with g_lock.lock_op():
+    with g_lock.lock_op():
+        with open_filesystem(vc, volname) as fs_handle:
             yield fs_handle
-    finally:
-        vc.connection_pool.put_fs_handle(volname)
+
 
 @contextmanager
 def open_volume_lockless(vc, volname):
     """
-    open a volume with shared access. This API is to be used as a context manager.
+    open a volume with shared access. This API is to be used as a context
+    manager.
 
     :param vc: volume client instance
     :param volname: volume name
     :return: yields a volume handle (ceph filesystem handle)
     """
-    if vc.is_stopping():
-        raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
-
-    fs_handle = vc.connection_pool.get_fs_handle(volname)
-    try:
+    with open_filesystem(vc, volname) as fs_handle:
         yield fs_handle
-    finally:
-        vc.connection_pool.put_fs_handle(volname)
index 9182318477817e8dde3a189ae9a821dba266ec55..4128ecc163af165408df47f4b249aae4a59df6e6 100644 (file)
@@ -1,14 +1,13 @@
 import json
 import errno
 import logging
-from threading import Event
 
-import cephfs
+from mgr_util import CephfsClient
 
 from .fs_util import listdir
 
-from .operations.volume import ConnectionPool, open_volume, create_volume, \
-    delete_volume, list_volumes, get_pool_names
+from .operations.volume import create_volume, \
+    delete_volume, list_volumes, open_volume, get_pool_names
 from .operations.group import open_group, create_group, remove_group, open_group_unique
 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
     create_clone
@@ -30,6 +29,7 @@ def octal_str_to_decimal_int(mode):
     except ValueError:
         raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
 
+
 def name_to_json(names):
     """
     convert the list of names to json
@@ -39,13 +39,13 @@ def name_to_json(names):
         namedict.append({'name': names[i].decode('utf-8')})
     return json.dumps(namedict, indent=4, sort_keys=True)
 
-class VolumeClient(object):
+
+class VolumeClient(CephfsClient):
     def __init__(self, mgr):
-        self.mgr = mgr
-        self.stopping = Event()
+        super().__init__(mgr)
         # volume specification
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
-        self.connection_pool = ConnectionPool(self.mgr)
+        # TODO: make thread pool size configurable
         self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
@@ -62,6 +62,7 @@ class VolumeClient(object):
         return self.stopping.is_set()
 
     def shutdown(self):
+        # Overrides CephfsClient.shutdown()
         log.info("shutting down")
         # first, note that we're shutting down
         self.stopping.set()