]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: asynchronous cloner module
authorVenky Shankar <vshankar@redhat.com>
Wed, 4 Dec 2019 05:23:48 +0000 (00:23 -0500)
committerRamana Raja <rraja@redhat.com>
Wed, 12 Feb 2020 10:12:00 +0000 (05:12 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit 4f09568b012cef24d2075733f9d81064790fe4e6)

src/pybind/mgr/volumes/fs/async_cloner.py [new file with mode: 0644]
src/pybind/mgr/volumes/fs/fs_util.py
src/pybind/mgr/volumes/fs/operations/resolver.py [new file with mode: 0644]
src/pybind/mgr/volumes/fs/operations/template.py
src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
src/pybind/mgr/volumes/fs/volume.py

diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py
new file mode 100644 (file)
index 0000000..c67dbde
--- /dev/null
@@ -0,0 +1,222 @@
+import os
+import stat
+import time
+import errno
+import logging
+from contextlib import contextmanager
+
+import cephfs
+
+from .async_job import AsyncJobs
+from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
+from .fs_util import copy_file
+from .operations.op_sm import OpSm
+from .operations.resolver import resolve
+from .operations.volume import open_volume, open_volume_lockless
+from .operations.group import open_group
+from .operations.subvolume import open_subvol
+from .operations.clone_index import open_clone_index
+
+log = logging.getLogger(__name__)
+
+# helper for fetching a clone entry for a given volume
+def get_next_clone_entry(volume_client, volname, running_jobs):
+    log.debug("fetching clone entry for volume '{0}'".format(volname))
+
+    try:
+        with open_volume(volume_client, volname) as fs_handle:
+            try:
+                with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
+                    job = clone_index.get_oldest_clone_entry(running_jobs)
+                    return 0, job
+            except IndexException as ve:
+                if ve.errno == -errno.ENOENT:
+                    return 0, None
+                raise ve
+    except VolumeException as ve:
+        log.error("error fetching clone entry for volume '{0}' ({1})".format(volname), ve)
+        return ve.errno, None
+    return ret
+
+@contextmanager
+def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]):
+    with open_volume(volume_client, volname) as fs_handle:
+        with open_group(fs_handle, volume_client.volspec, groupname) as group:
+            with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
+                             need_complete, expected_types) as subvolume:
+                yield subvolume
+
+@contextmanager
+def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]):
+    with open_group(fs_handle, volume_client.volspec, groupname) as group:
+        with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
+                         need_complete, expected_types) as subvolume:
+            yield subvolume
+
+def get_clone_state(volume_client, volname, groupname, subvolname):
+    with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
+        return subvolume.state
+
+def set_clone_state(volume_client, volname, groupname, subvolname, state):
+    with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
+        subvolume.state = (state, True)
+
+def get_clone_source(clone_subvolume):
+    source = clone_subvolume._get_clone_source()
+    return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
+
+def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
+    try:
+        next_state = OpSm.get_next_state("clone", "pending", 0)
+    except OpSmException as oe:
+        raise VolumeException(oe.error, oe.error_str)
+    return (next_state, False)
+
+def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
+    """
+    bulk copy data from source to destination -- only directories, symlinks
+    and regular files are synced. note that @should_cancel is not used right
+    now but would be required when implementing cancelation for in-progress
+    clone operations.
+    """
+    log.info("copying data from {0} to {1}".format(source_path, dst_path))
+    def cptree(src_root_path, dst_root_path):
+        log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
+        try:
+            with fs_handle.opendir(src_root_path) as dir_handle:
+                d = fs_handle.readdir(dir_handle)
+                while d:
+                    if d.d_name not in (b".", b".."):
+                        log.debug("d={0}".format(d))
+                        d_full_src = os.path.join(src_root_path, d.d_name)
+                        d_full_dst = os.path.join(dst_root_path, d.d_name)
+                        st = fs_handle.lstat(d_full_src)
+                        mo = st.st_mode & ~stat.S_IFMT(st.st_mode)
+                        if stat.S_ISDIR(st.st_mode):
+                            log.debug("cptree: (DIR) {0}".format(d_full_src))
+                            try:
+                                fs_handle.mkdir(d_full_dst, mo)
+                                fs_handle.chown(d_full_dst, st.st_uid, st.st_gid)
+                            except cephfs.Error as e:
+                                if not e.args[0] == errno.EEXIST:
+                                    raise
+                            cptree(d_full_src, d_full_dst)
+                        elif stat.S_ISLNK(st.st_mode):
+                            log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
+                            target = fs_handle.readlink(d_full_src, 4096)
+                            try:
+                                fs_handle.symlink(target[:st.st_size], d_full_dst)
+                            except cephfs.Error as e:
+                                if not e.args[0] == errno.EEXIST:
+                                    raise
+                        elif stat.S_ISREG(st.st_mode):
+                            log.debug("cptree: (REG) {0}".format(d_full_src))
+                            copy_file(fs_handle, d_full_src, d_full_dst, mo, st.st_uid, st.st_gid)
+                        else:
+                            log.warn("cptree: (IGNORE) {0}".format(d_full_src))
+                    d = fs_handle.readdir(dir_handle)
+        except cephfs.Error as e:
+            if not e.args[0] == errno.ENOENT:
+                raise VolumeException(-e.args[0], e.args[1])
+    cptree(source_path, dst_path)
+
+def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
+    with open_volume_lockless(volume_client, volname) as fs_handle:
+        with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+            s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
+            with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+                src_path = source_subvolume.snapshot_path(s_snapname)
+                dst_path = clone_subvolume.path
+                bulk_copy(fs_handle, src_path, dst_path, should_cancel)
+
+def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
+    try:
+        do_clone(volume_client, volname, groupname, subvolname, should_cancel)
+        next_state = OpSm.get_next_state("clone", "in-progress", 0)
+    except VolumeException as ve:
+        # jump to failed state
+        next_state = OpSm.get_next_state("clone", "in-progress", -1)
+    except OpSmException as oe:
+        raise VolumeException(oe.error, oe.error_str)
+    return (next_state, False)
+
+def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
+    try:
+        # detach source but leave the clone section intact for later inspection
+        with open_volume(volume_client, volname) as fs_handle:
+            with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+                s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
+                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+                    source_subvolume.detach_snapshot(s_snapname, index)
+    except (MetadataMgrException, VolumeException) as e:
+        log.error("failed to detach clone from snapshot: {0}".format(e))
+    return (None, True)
+
+def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
+    try:
+        with open_volume(volume_client, volname) as fs_handle:
+            with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
+                s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
+                with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
+                    source_subvolume.detach_snapshot(s_snapname, index)
+                    clone_subvolume.remove_clone_source(flush=True)
+    except (MetadataMgrException, VolumeException) as e:
+        log.error("failed to detach clone from snapshot: {0}".format(e))
+    return (None, True)
+
+def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
+    finished = False
+    current_state = None
+    try:
+        current_state = get_clone_state(volume_client, volname, groupname, subvolname)
+        log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
+        while not finished:
+            handler = state_table.get(current_state, None)
+            if not handler:
+                raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
+            (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
+            if next_state:
+                log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
+                                                                                         current_state, next_state))
+                set_clone_state(volume_client, volname, groupname, subvolname, next_state)
+                current_state = next_state
+    except VolumeException as ve:
+        log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
+                                                                                             subvolname, current_state, ve))
+
+def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
+    log.info("cloning to subvolume path: {0}".format(clone_path))
+    resolved = resolve(volume_client.volspec, clone_path)
+
+    groupname  = resolved[0]
+    subvolname = resolved[1]
+    log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
+
+    try:
+        log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
+        start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
+        log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
+    except VolumeException as ve:
+        log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
+
+class Cloner(AsyncJobs):
+    """
+    Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
+    this relies on a simple state machine (which mimics states from OpSm class) as
+    the driver. file types supported are directories, symbolic links and regular files.
+    """
+    def __init__(self, volume_client, tp_size):
+        self.vc = volume_client
+        self.state_table = {
+            'pending'     : handle_clone_pending,
+            'in-progress' : handle_clone_in_progress,
+            'complete'    : handle_clone_complete,
+            'failed'      : handle_clone_failed
+        }
+        super(Cloner, self).__init__(volume_client, "cloner", tp_size)
+
+    def get_next_job(self, volname, running_jobs):
+        return get_next_clone_entry(self.vc, volname, running_jobs)
+
+    def execute_job(self, volname, job, should_cancel):
+        clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
index a556522195193dd5d5c718069a70099245b1415d..7dd116e5fb3581477a25cab67ba321748391ea69 100644 (file)
@@ -86,6 +86,38 @@ def list_one_entry_at_a_time(fs, dirpath):
     except cephfs.Error as e:
         raise VolumeException(-e.args[0], e.args[1])
 
+def copy_file(fs, src, dst, mode, uid, gid):
+    """
+    Copy a regular file from @src to @dst. @dst is overwritten if it exists.
+    """
+    src_fd = dst_fd = None
+    try:
+        src_fd = fs.open(src, os.O_RDONLY);
+        dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode)
+        fs.chown(dst, uid, gid)
+    except cephfs.Error as e:
+        if src_fd is not None:
+            fs.close(src_fd)
+        if dst_fd is not None:
+            fs.close(dst_fd)
+        raise VolumeException(-e.args[0], e.args[1])
+
+    IO_SIZE = 8 * 1024 * 1024
+    try:
+        while True:
+            data = fs.read(src_fd, -1, IO_SIZE)
+            if not len(data):
+                break
+            written = 0
+            while written < len(data):
+                written += fs.write(dst_fd, data[written:], -1)
+        fs.fsync(dst_fd, 0)
+    except cephfs.Error as e:
+        raise VolumeException(-e.args[0], e.args[1])
+    finally:
+        fs.close(src_fd)
+        fs.close(dst_fd)
+
 def get_ancestor_xattr(fs, path, attr):
     """
     Helper for reading layout information: if this xattr is missing
diff --git a/src/pybind/mgr/volumes/fs/operations/resolver.py b/src/pybind/mgr/volumes/fs/operations/resolver.py
new file mode 100644 (file)
index 0000000..bf982af
--- /dev/null
@@ -0,0 +1,17 @@
+import os
+
+from .group import Group
+
+def splitall(path):
+    if path == "/":
+        return ["/"]
+    s = os.path.split(path)
+    return splitall(s[0]) + [s[1]]
+
+def resolve(vol_spec, path):
+    parts = splitall(path)
+    if len(parts) != 4 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix:
+        return None
+    groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+    subvolname = parts[3]
+    return (groupname, subvolname)
index 7c3d60882b64213d2831a39016668dafa98131e5..846ea51eab4913bc63d6251f0622770c8f66baab 100644 (file)
@@ -111,6 +111,15 @@ class SubvolumeTemplate(object):
         """
         raise VolumeException(-errno.ENOTSUP, "operation not supported.")
 
+    def snapshot_path(self, snapname):
+        """
+        return the snapshot path for a given snapshot name
+
+        :param: subvolume snapshot name
+        :return: snapshot path
+        """
+        raise VolumeException(-errno.ENOTSUP, "operation not supported.")
+
     def list_snapshots(self):
         """
         list all subvolume snapshots.
index c54da537e863b1855dbd7481f4ac9767b06487cb..7aa05c0f617755b8cfe650387b474e72bb4ad864 100644 (file)
@@ -121,6 +121,9 @@ class MetadataManager(object):
         for key,value in dct.items():
             self.config.set(section, key, str(value))
 
+    def update_global_section(self, key, value):
+        self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value))
+
     def get_option(self, section, key):
         if not self.config.has_section(section):
             raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section))
index f1e83863ee8b5823fc1e8ff39146f05f188f1cd8..d8e0e0137ae4fd4e1d9e84d904ca70ab16208241 100644 (file)
@@ -168,6 +168,18 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
             subvolume_status["source"] = self._get_clone_source()
         return subvolume_status
 
+    @property
+    def state(self):
+        return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)
+
+    @state.setter
+    def state(self, val):
+        state = val[0]
+        flush = val[1]
+        self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
+        if flush:
+            self.metadata_mgr.flush()
+
     def remove(self):
         self.trash_base_dir()
 
@@ -175,10 +187,13 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         subvol_path = self.path
         return self._resize(subvol_path, newsize, noshrink)
 
+    def snapshot_path(self, snapname):
+        return os.path.join(self.path,
+                            self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
+                            snapname.encode('utf-8'))
+
     def create_snapshot(self, snapname):
-        snappath = os.path.join(self.path,
-                                self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
-                                snapname.encode('utf-8'))
+        snappath = self.snapshot_path(snapname)
         mksnap(self.fs, snappath)
 
     def is_snapshot_protected(self, snapname):
@@ -204,9 +219,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
     def remove_snapshot(self, snapname):
         if self.is_snapshot_protected(snapname):
             raise VolumeException(-errno.EINVAL, "snapshot '{0}' is protected".format(snapname))
-        snappath = os.path.join(self.path,
-                                self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
-                                snapname.encode('utf-8'))
+        snappath = self.snapshot_path(snapname)
         rmsnap(self.fs, snappath)
 
     def list_snapshots(self):
index e384db4745a38d8224135abd689ecc8c8e7d8ac8..21b0dc848c3a2b1e568feb1ff15b543bafe6273b 100644 (file)
@@ -15,6 +15,7 @@ from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
 
 from .vol_spec import VolSpec
 from .exception import VolumeException
+from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
 
 log = logging.getLogger(__name__)
@@ -42,6 +43,7 @@ class VolumeClient(object):
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
         self.connection_pool = ConnectionPool(self.mgr)
         # TODO: make thread pool size configurable
+        self.cloner = Cloner(self, 4)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
@@ -50,6 +52,7 @@ class VolumeClient(object):
         # job list.
         fs_map = self.mgr.get('fs_map')
         for fs in fs_map['filesystems']:
+            self.cloner.queue_job(fs['mdsmap']['fs_name'])
             self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
 
     def is_stopping(self):
@@ -308,6 +311,7 @@ class VolumeClient(object):
         with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
             try:
                 subvolume.attach_snapshot(snapname, target_subvolume)
+                self.cloner.queue_job(volname)
             except VolumeException as ve:
                 try:
                     target_subvolume.remove()