]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: implement filesystem volume module
authorVenky Shankar <vshankar@redhat.com>
Wed, 20 Nov 2019 13:41:38 +0000 (08:41 -0500)
committerRamana Raja <rraja@redhat.com>
Wed, 12 Feb 2020 10:11:59 +0000 (05:11 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit f9ae6e38600bada917e9d7c953e736c5c45e7064)

src/pybind/mgr/volumes/fs/operations/volume.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/volumes/fs/operations/volume.py b/src/pybind/mgr/volumes/fs/operations/volume.py
new file mode 100644 (file)
index 0000000..0d5c90c
--- /dev/null
@@ -0,0 +1,304 @@
+import time
+import errno
+import logging
+from contextlib import contextmanager
+from threading import Lock, Condition
+
+try:
+    # py2
+    from threading import _Timer as Timer
+except ImportError:
+    #py3
+    from threading import Timer
+
+import cephfs
+import orchestrator
+
+from .lock import GlobalLock
+from ..exception import VolumeException
+from ..fs_util import create_pool, remove_pool, create_filesystem, \
+    remove_filesystem, create_mds, volume_exists
+
+log = logging.getLogger(__name__)
+
+class ConnectionPool(object):
+    class Connection(object):
+        def __init__(self, mgr, fs_name):
+            self.fs = None
+            self.mgr = mgr
+            self.fs_name = fs_name
+            self.ops_in_progress = 0
+            self.last_used = time.time()
+            self.fs_id = self.get_fs_id()
+
+        def get_fs_id(self):
+            fs_map = self.mgr.get('fs_map')
+            for fs in fs_map['filesystems']:
+                if fs['mdsmap']['fs_name'] == self.fs_name:
+                    return fs['id']
+            raise VolumeException(
+                -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+        def get_fs_handle(self):
+            self.last_used = time.time()
+            self.ops_in_progress += 1
+            return self.fs
+
+        def put_fs_handle(self, notify):
+            assert self.ops_in_progress > 0
+            self.ops_in_progress -= 1
+            if self.ops_in_progress == 0:
+                notify()
+
+        def del_fs_handle(self, waiter):
+            if waiter:
+                while self.ops_in_progress != 0:
+                    waiter()
+            if self.is_connection_valid():
+                self.disconnect()
+            else:
+                self.abort()
+
+        def is_connection_valid(self):
+            fs_id = None
+            try:
+                fs_id = self.get_fs_id()
+            except:
+                # the filesystem does not exist now -- connection is not valid.
+                pass
+            log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
+            return self.fs_id == fs_id
+
+        def is_connection_idle(self, timeout):
+            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+        def connect(self):
+            assert self.ops_in_progress == 0
+            log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+            log.debug("Setting user ID and group ID of CephFS mount as root...")
+            self.fs.conf_set("client_mount_uid", "0")
+            self.fs.conf_set("client_mount_gid", "0")
+            log.debug("CephFS initializing...")
+            self.fs.init()
+            log.debug("CephFS mounting...")
+            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+            log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+        def disconnect(self):
+            try:
+                assert self.ops_in_progress == 0
+                log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+                self.fs.shutdown()
+                self.fs = None
+            except Exception as e:
+                log.debug("disconnect: ({0})".format(e))
+                raise
+
+        def abort(self):
+            assert self.ops_in_progress == 0
+            log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+            self.fs.abort_conn()
+            log.info("abort done from cephfs '{0}'".format(self.fs_name))
+            self.fs = None
+
+    class RTimer(Timer):
+        """
+        recurring timer variant of Timer
+        """
+        def run(self):
+            try:
+                while not self.finished.is_set():
+                    self.finished.wait(self.interval)
+                    self.function(*self.args, **self.kwargs)
+                self.finished.set()
+            except Exception as e:
+                log.error("ConnectionPool.RTimer: %s", e)
+                raise
+
+    # TODO: make this configurable
+    TIMER_TASK_RUN_INTERVAL = 30.0  # seconds
+    CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.connections = {}
+        self.lock = Lock()
+        self.cond = Condition(self.lock)
+        self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+                                                self.cleanup_connections)
+        self.timer_task.start()
+
+    def cleanup_connections(self):
+        with self.lock:
+            log.info("scanning for idle connections..")
+            idle_fs = [fs_name for fs_name,conn in self.connections.items()
+                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+            for fs_name in idle_fs:
+                log.info("cleaning up connection for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name)
+
+    def get_fs_handle(self, fs_name):
+        with self.lock:
+            conn = None
+            try:
+                conn = self.connections.get(fs_name, None)
+                if conn:
+                    if conn.is_connection_valid():
+                        return conn.get_fs_handle()
+                    else:
+                        # filesystem id changed beneath us (or the filesystem does not exist).
+                        # this is possible if the filesystem got removed (and recreated with
+                        # same name) via "ceph fs rm/new" mon command.
+                        log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+                        self._del_fs_handle(fs_name)
+                conn = ConnectionPool.Connection(self.mgr, fs_name)
+                conn.connect()
+            except cephfs.Error as e:
+                # try to provide a better error string if possible
+                if e.args[0] == errno.ENOENT:
+                    raise VolumeException(
+                        -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+                raise VolumeException(-e.args[0], e.args[1])
+            self.connections[fs_name] = conn
+            return conn.get_fs_handle()
+
+    def put_fs_handle(self, fs_name):
+        with self.lock:
+            conn = self.connections.get(fs_name, None)
+            if conn:
+                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+
+    def _del_fs_handle(self, fs_name, wait=False):
+        conn = self.connections.pop(fs_name, None)
+        if conn:
+            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+
+    def del_fs_handle(self, fs_name, wait=False):
+        with self.lock:
+            self._del_fs_handle(fs_name, wait)
+
+    def del_all_handles(self):
+        with self.lock:
+            for fs_name in list(self.connections.keys()):
+                log.info("waiting for pending ops for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name, wait=True)
+                log.info("pending ops completed for '{}'".format(fs_name))
+            # no new connections should have been initialized since its
+            # guarded on shutdown.
+            assert len(self.connections) == 0
+
+def gen_pool_names(volname):
+    """
+    return metadata and data pool name (from a filesystem/volume name) as a tuple
+    """
+    return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
+
+def create_volume(mgr, volname):
+    """
+    create volume  (pool, filesystem and mds)
+    """
+    metadata_pool, data_pool = gen_pool_names(volname)
+    # create pools
+    r, outs, outb = create_pool(mgr, metadata_pool, 16)
+    if r != 0:
+        return r, outb, outs
+    r, outb, outs = create_pool(mgr, data_pool, 8)
+    if r != 0:
+        #cleanup
+        remove_pool(metadata_pool)
+        return r, outb, outs
+    # create filesystem
+    r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
+    if r != 0:
+        log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
+        #cleanup
+        remove_pool(data_pool)
+        remove_pool(metadata_pool)
+        return r, outb, outs
+    # create mds
+    return create_mds(mgr, volname)
+
+def delete_volume(mgr, volname):
+    """
+    delete the given module (tear down mds, remove filesystem)
+    """
+    # Tear down MDS daemons
+    try:
+        completion = mgr.remove_stateless_service("mds", volname)
+        mgr._orchestrator_wait([completion])
+        orchestrator.raise_if_exception(completion)
+    except (ImportError, orchestrator.OrchestratorError):
+        log.warning("OrchestratorError, not tearing down MDS daemons")
+    except Exception as e:
+        # Don't let detailed orchestrator exceptions (python backtraces)
+        # bubble out to the user
+        log.exception("Failed to tear down MDS daemons")
+        return -errno.EINVAL, "", str(e)
+
+    # In case orchestrator didn't tear down MDS daemons cleanly, or
+    # there was no orchestrator, we force the daemons down.
+    if volume_exists(mgr, volname):
+        r, outb, outs = remove_filesystem(mgr, volname)
+        if r != 0:
+            return r, outb, outs
+    else:
+        err = "Filesystem not found for volume '{0}'".format(volname)
+        log.warning(err)
+        return -errno.ENOENT, "", err
+    metadata_pool, data_pool = gen_pool_names(volname)
+    r, outb, outs = remove_pool(mgr, metadata_pool)
+    if r != 0:
+        return r, outb, outs
+    return remove_pool(mgr, data_pool)
+
+def list_volumes(mgr):
+    """
+    list all filesystem volumes.
+
+    :param: None
+    :return: None
+    """
+    result = []
+    fs_map = mgr.get("fs_map")
+    for f in fs_map['filesystems']:
+        result.append({'name': f['mdsmap']['fs_name']})
+    return result
+
+@contextmanager
+def open_volume(vc, volname):
+    """
+    open a volume for exclusive access. This API is to be used as a context manager.
+
+    :param vc: volume client instance
+    :param volname: volume name
+    :return: yields a volume handle (ceph filesystem handle)
+    """
+    if vc.is_stopping():
+        raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
+
+    g_lock = GlobalLock()
+    fs_handle = vc.connection_pool.get_fs_handle(volname)
+    try:
+        with g_lock.lock_op():
+            yield fs_handle
+    finally:
+        vc.connection_pool.put_fs_handle(volname)
+
+@contextmanager
+def open_volume_lockless(vc, volname):
+    """
+    open a volume with shared access. This API is to be used as a context manager.
+
+    :param vc: volume client instance
+    :param volname: volume name
+    :return: yields a volume handle (ceph filesystem handle)
+    """
+    if vc.is_stopping():
+        raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
+
+    fs_handle = vc.connection_pool.get_fs_handle(volname)
+    try:
+        yield fs_handle
+    finally:
+        vc.connection_pool.put_fs_handle(volname)