From e994382a6378febfee94a639ea597c52bac1266c Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Wed, 4 Dec 2019 00:23:48 -0500 Subject: [PATCH] mgr/volumes: asynchronous cloner module Signed-off-by: Venky Shankar (cherry picked from commit 4f09568b012cef24d2075733f9d81064790fe4e6) --- src/pybind/mgr/volumes/fs/async_cloner.py | 222 ++++++++++++++++++ src/pybind/mgr/volumes/fs/fs_util.py | 32 +++ .../mgr/volumes/fs/operations/resolver.py | 17 ++ .../mgr/volumes/fs/operations/template.py | 9 + .../operations/versions/metadata_manager.py | 3 + .../fs/operations/versions/subvolume_v1.py | 25 +- src/pybind/mgr/volumes/fs/volume.py | 4 + 7 files changed, 306 insertions(+), 6 deletions(-) create mode 100644 src/pybind/mgr/volumes/fs/async_cloner.py create mode 100644 src/pybind/mgr/volumes/fs/operations/resolver.py diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py new file mode 100644 index 0000000000000..c67dbde1abef3 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -0,0 +1,222 @@ +import os +import stat +import time +import errno +import logging +from contextlib import contextmanager + +import cephfs + +from .async_job import AsyncJobs +from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException +from .fs_util import copy_file +from .operations.op_sm import OpSm +from .operations.resolver import resolve +from .operations.volume import open_volume, open_volume_lockless +from .operations.group import open_group +from .operations.subvolume import open_subvol +from .operations.clone_index import open_clone_index + +log = logging.getLogger(__name__) + +# helper for fetching a clone entry for a given volume +def get_next_clone_entry(volume_client, volname, running_jobs): + log.debug("fetching clone entry for volume '{0}'".format(volname)) + + try: + with open_volume(volume_client, volname) as fs_handle: + try: + with open_clone_index(fs_handle, volume_client.volspec) as clone_index: + job = clone_index.get_oldest_clone_entry(running_jobs) + return 0, job + except IndexException as ve: + if ve.errno == -errno.ENOENT: + return 0, None + raise ve + except VolumeException as ve: + log.error("error fetching clone entry for volume '{0}' ({1})".format(volname), ve) + return ve.errno, None + return ret + +@contextmanager +def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]): + with open_volume(volume_client, volname) as fs_handle: + with open_group(fs_handle, volume_client.volspec, groupname) as group: + with open_subvol(fs_handle, volume_client.volspec, group, subvolname, + need_complete, expected_types) as subvolume: + yield subvolume + +@contextmanager +def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]): + with open_group(fs_handle, volume_client.volspec, groupname) as group: + with open_subvol(fs_handle, volume_client.volspec, group, subvolname, + need_complete, expected_types) as subvolume: + yield subvolume + +def get_clone_state(volume_client, volname, groupname, subvolname): + with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume: + return subvolume.state + +def set_clone_state(volume_client, volname, groupname, subvolname, state): + with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume: + subvolume.state = (state, True) + +def get_clone_source(clone_subvolume): + source = clone_subvolume._get_clone_source() + return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) + +def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + next_state = OpSm.get_next_state("clone", "pending", 0) + except OpSmException as oe: + raise VolumeException(oe.error, oe.error_str) + return (next_state, False) + +def bulk_copy(fs_handle, source_path, dst_path, should_cancel): + """ + bulk copy data from source to destination -- only directories, symlinks + and regular files are synced. note that @should_cancel is not used right + now but would be required when implementing cancelation for in-progress + clone operations. + """ + log.info("copying data from {0} to {1}".format(source_path, dst_path)) + def cptree(src_root_path, dst_root_path): + log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) + try: + with fs_handle.opendir(src_root_path) as dir_handle: + d = fs_handle.readdir(dir_handle) + while d: + if d.d_name not in (b".", b".."): + log.debug("d={0}".format(d)) + d_full_src = os.path.join(src_root_path, d.d_name) + d_full_dst = os.path.join(dst_root_path, d.d_name) + st = fs_handle.lstat(d_full_src) + mo = st.st_mode & ~stat.S_IFMT(st.st_mode) + if stat.S_ISDIR(st.st_mode): + log.debug("cptree: (DIR) {0}".format(d_full_src)) + try: + fs_handle.mkdir(d_full_dst, mo) + fs_handle.chown(d_full_dst, st.st_uid, st.st_gid) + except cephfs.Error as e: + if not e.args[0] == errno.EEXIST: + raise + cptree(d_full_src, d_full_dst) + elif stat.S_ISLNK(st.st_mode): + log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) + target = fs_handle.readlink(d_full_src, 4096) + try: + fs_handle.symlink(target[:st.st_size], d_full_dst) + except cephfs.Error as e: + if not e.args[0] == errno.EEXIST: + raise + elif stat.S_ISREG(st.st_mode): + log.debug("cptree: (REG) {0}".format(d_full_src)) + copy_file(fs_handle, d_full_src, d_full_dst, mo, st.st_uid, st.st_gid) + else: + log.warn("cptree: (IGNORE) {0}".format(d_full_src)) + d = fs_handle.readdir(dir_handle) + except cephfs.Error as e: + if not e.args[0] == errno.ENOENT: + raise VolumeException(-e.args[0], e.args[1]) + cptree(source_path, dst_path) + +def do_clone(volume_client, volname, groupname, subvolname, should_cancel): + with open_volume_lockless(volume_client, volname) as fs_handle: + with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: + s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: + src_path = source_subvolume.snapshot_path(s_snapname) + dst_path = clone_subvolume.path + bulk_copy(fs_handle, src_path, dst_path, should_cancel) + +def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + do_clone(volume_client, volname, groupname, subvolname, should_cancel) + next_state = OpSm.get_next_state("clone", "in-progress", 0) + except VolumeException as ve: + # jump to failed state + next_state = OpSm.get_next_state("clone", "in-progress", -1) + except OpSmException as oe: + raise VolumeException(oe.error, oe.error_str) + return (next_state, False) + +def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + # detach source but leave the clone section intact for later inspection + with open_volume(volume_client, volname) as fs_handle: + with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: + s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: + source_subvolume.detach_snapshot(s_snapname, index) + except (MetadataMgrException, VolumeException) as e: + log.error("failed to detach clone from snapshot: {0}".format(e)) + return (None, True) + +def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel): + try: + with open_volume(volume_client, volname) as fs_handle: + with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: + s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) + with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: + source_subvolume.detach_snapshot(s_snapname, index) + clone_subvolume.remove_clone_source(flush=True) + except (MetadataMgrException, VolumeException) as e: + log.error("failed to detach clone from snapshot: {0}".format(e)) + return (None, True) + +def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel): + finished = False + current_state = None + try: + current_state = get_clone_state(volume_client, volname, groupname, subvolname) + log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) + while not finished: + handler = state_table.get(current_state, None) + if not handler: + raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) + (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel) + if next_state: + log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ + current_state, next_state)) + set_clone_state(volume_client, volname, groupname, subvolname, next_state) + current_state = next_state + except VolumeException as ve: + log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ + subvolname, current_state, ve)) + +def clone(volume_client, volname, index, clone_path, state_table, should_cancel): + log.info("cloning to subvolume path: {0}".format(clone_path)) + resolved = resolve(volume_client.volspec, clone_path) + + groupname = resolved[0] + subvolname = resolved[1] + log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) + + try: + log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel) + log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) + except VolumeException as ve: + log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) + +class Cloner(AsyncJobs): + """ + Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. + this relies on a simple state machine (which mimics states from OpSm class) as + the driver. file types supported are directories, symbolic links and regular files. + """ + def __init__(self, volume_client, tp_size): + self.vc = volume_client + self.state_table = { + 'pending' : handle_clone_pending, + 'in-progress' : handle_clone_in_progress, + 'complete' : handle_clone_complete, + 'failed' : handle_clone_failed + } + super(Cloner, self).__init__(volume_client, "cloner", tp_size) + + def get_next_job(self, volname, running_jobs): + return get_next_clone_entry(self.vc, volname, running_jobs) + + def execute_job(self, volname, job, should_cancel): + clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel) diff --git a/src/pybind/mgr/volumes/fs/fs_util.py b/src/pybind/mgr/volumes/fs/fs_util.py index a556522195193..7dd116e5fb358 100644 --- a/src/pybind/mgr/volumes/fs/fs_util.py +++ b/src/pybind/mgr/volumes/fs/fs_util.py @@ -86,6 +86,38 @@ def list_one_entry_at_a_time(fs, dirpath): except cephfs.Error as e: raise VolumeException(-e.args[0], e.args[1]) +def copy_file(fs, src, dst, mode, uid, gid): + """ + Copy a regular file from @src to @dst. @dst is overwritten if it exists. + """ + src_fd = dst_fd = None + try: + src_fd = fs.open(src, os.O_RDONLY); + dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode) + fs.chown(dst, uid, gid) + except cephfs.Error as e: + if src_fd is not None: + fs.close(src_fd) + if dst_fd is not None: + fs.close(dst_fd) + raise VolumeException(-e.args[0], e.args[1]) + + IO_SIZE = 8 * 1024 * 1024 + try: + while True: + data = fs.read(src_fd, -1, IO_SIZE) + if not len(data): + break + written = 0 + while written < len(data): + written += fs.write(dst_fd, data[written:], -1) + fs.fsync(dst_fd, 0) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) + finally: + fs.close(src_fd) + fs.close(dst_fd) + def get_ancestor_xattr(fs, path, attr): """ Helper for reading layout information: if this xattr is missing diff --git a/src/pybind/mgr/volumes/fs/operations/resolver.py b/src/pybind/mgr/volumes/fs/operations/resolver.py new file mode 100644 index 0000000000000..bf982af953ef5 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/operations/resolver.py @@ -0,0 +1,17 @@ +import os + +from .group import Group + +def splitall(path): + if path == "/": + return ["/"] + s = os.path.split(path) + return splitall(s[0]) + [s[1]] + +def resolve(vol_spec, path): + parts = splitall(path) + if len(parts) != 4 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix: + return None + groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2] + subvolname = parts[3] + return (groupname, subvolname) diff --git a/src/pybind/mgr/volumes/fs/operations/template.py b/src/pybind/mgr/volumes/fs/operations/template.py index 7c3d60882b642..846ea51eab491 100644 --- a/src/pybind/mgr/volumes/fs/operations/template.py +++ b/src/pybind/mgr/volumes/fs/operations/template.py @@ -111,6 +111,15 @@ class SubvolumeTemplate(object): """ raise VolumeException(-errno.ENOTSUP, "operation not supported.") + def snapshot_path(self, snapname): + """ + return the snapshot path for a given snapshot name + + :param: subvolume snapshot name + :return: snapshot path + """ + raise VolumeException(-errno.ENOTSUP, "operation not supported.") + def list_snapshots(self): """ list all subvolume snapshots. diff --git a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py index c54da537e863b..7aa05c0f61775 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/metadata_manager.py @@ -121,6 +121,9 @@ class MetadataManager(object): for key,value in dct.items(): self.config.set(section, key, str(value)) + def update_global_section(self, key, value): + self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value)) + def get_option(self, section, key): if not self.config.has_section(section): raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) diff --git a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py index f1e83863ee8b5..d8e0e0137ae4f 100644 --- a/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py +++ b/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py @@ -168,6 +168,18 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): subvolume_status["source"] = self._get_clone_source() return subvolume_status + @property + def state(self): + return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE) + + @state.setter + def state(self, val): + state = val[0] + flush = val[1] + self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state) + if flush: + self.metadata_mgr.flush() + def remove(self): self.trash_base_dir() @@ -175,10 +187,13 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): subvol_path = self.path return self._resize(subvol_path, newsize, noshrink) + def snapshot_path(self, snapname): + return os.path.join(self.path, + self.vol_spec.snapshot_dir_prefix.encode('utf-8'), + snapname.encode('utf-8')) + def create_snapshot(self, snapname): - snappath = os.path.join(self.path, - self.vol_spec.snapshot_dir_prefix.encode('utf-8'), - snapname.encode('utf-8')) + snappath = self.snapshot_path(snapname) mksnap(self.fs, snappath) def is_snapshot_protected(self, snapname): @@ -204,9 +219,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): def remove_snapshot(self, snapname): if self.is_snapshot_protected(snapname): raise VolumeException(-errno.EINVAL, "snapshot '{0}' is protected".format(snapname)) - snappath = os.path.join(self.path, - self.vol_spec.snapshot_dir_prefix.encode('utf-8'), - snapname.encode('utf-8')) + snappath = self.snapshot_path(snapname) rmsnap(self.fs, snappath) def list_snapshots(self): diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index e384db4745a38..21b0dc848c3a2 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -15,6 +15,7 @@ from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ from .vol_spec import VolSpec from .exception import VolumeException +from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin log = logging.getLogger(__name__) @@ -42,6 +43,7 @@ class VolumeClient(object): self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) self.connection_pool = ConnectionPool(self.mgr) # TODO: make thread pool size configurable + self.cloner = Cloner(self, 4) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the @@ -50,6 +52,7 @@ class VolumeClient(object): # job list. fs_map = self.mgr.get('fs_map') for fs in fs_map['filesystems']: + self.cloner.queue_job(fs['mdsmap']['fs_name']) self.purge_queue.queue_job(fs['mdsmap']['fs_name']) def is_stopping(self): @@ -308,6 +311,7 @@ class VolumeClient(object): with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume: try: subvolume.attach_snapshot(snapname, target_subvolume) + self.cloner.queue_job(volname) except VolumeException as ve: try: target_subvolume.remove() -- 2.39.5