From baa82975a32c30a66df1a568989ec64b266429a0 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Fri, 18 Jun 2021 03:13:01 -0400 Subject: [PATCH] mgr/volumes: use dedicated libcephfs handles for subvolume calls and async jobs Fixes: http://tracker.ceph.com/issues/51271 Signed-off-by: Venky Shankar (cherry picked from commit cb2883feac1a5c141a3d72120c2320f7a8ffdea8) --- src/pybind/mgr/volumes/fs/async_cloner.py | 92 +++++++++++------------ src/pybind/mgr/volumes/fs/async_job.py | 4 +- src/pybind/mgr/volumes/fs/purge_queue.py | 28 +++---- 3 files changed, 63 insertions(+), 61 deletions(-) diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index 580da8e7de5cc..06b73dab6579d 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -23,13 +23,13 @@ from .operations.template import SubvolumeOpType log = logging.getLogger(__name__) # helper for fetching a clone entry for a given volume -def get_next_clone_entry(volume_client, volname, running_jobs): +def get_next_clone_entry(fs_client, volspec, volname, running_jobs): log.debug("fetching clone entry for volume '{0}'".format(volname)) try: - with open_volume_lockless(volume_client, volname) as fs_handle: + with open_volume_lockless(fs_client, volname) as fs_handle: try: - with open_clone_index(fs_handle, volume_client.volspec) as clone_index: + with open_clone_index(fs_handle, volspec) as clone_index: job = clone_index.get_oldest_clone_entry(running_jobs) return 0, job except IndexException as ve: @@ -41,46 +41,46 @@ def get_next_clone_entry(volume_client, volname, running_jobs): return ve.errno, None @contextmanager -def open_at_volume(volume_client, volname, groupname, subvolname, op_type): - with open_volume(volume_client, volname) as fs_handle: - with open_group(fs_handle, volume_client.volspec, groupname) as group: - with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: +def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type): + with open_volume(fs_client, volname) as fs_handle: + with open_group(fs_handle, volspec, groupname) as group: + with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: yield subvolume @contextmanager -def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type): - with open_group(fs_handle, volume_client.volspec, groupname) as group: - with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: +def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type): + with open_group(fs_handle, volspec, groupname) as group: + with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: yield subvolume @contextmanager -def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): +def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return # the clone subvolume as the source subvolume if s_groupname == c_groupname and s_subvolname == c_subvolname: yield c_subvolume else: - with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume: + with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume: yield s_subvolume @contextmanager -def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname): - with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: +def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname): + with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) if groupname == s_groupname and subvolname == s_subvolname: # use the same subvolume to avoid metadata overwrites yield (clone_subvolume, clone_subvolume, s_snapname) else: - with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: + with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: yield (clone_subvolume, source_subvolume, s_snapname) -def get_clone_state(volume_client, volname, groupname, subvolname): - with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: +def get_clone_state(fs_client, volspec, volname, groupname, subvolname): + with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: return subvolume.state -def set_clone_state(volume_client, volname, groupname, subvolname, state): - with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: +def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state): + with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: subvolume.state = (state, True) def get_clone_source(clone_subvolume): @@ -99,7 +99,7 @@ def get_next_state_on_error(errnum): SubvolumeActions.ACTION_FAILED) return next_state -def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel): +def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): try: if should_cancel(): next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, @@ -185,16 +185,16 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel): if should_cancel(): raise VolumeException(-errno.EINTR, "clone operation interrupted") -def do_clone(volume_client, volname, groupname, subvolname, should_cancel): - with open_volume_lockless(volume_client, volname) as fs_handle: - with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: +def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel): + with open_volume_lockless(fs_client, volname) as fs_handle: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) dst_path = clone_volumes[0].path bulk_copy(fs_handle, src_path, dst_path, should_cancel) -def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel): +def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): try: - do_clone(volume_client, volname, groupname, subvolname, should_cancel) + do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel) next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, SubvolumeStates.STATE_INPROGRESS, SubvolumeActions.ACTION_SUCCESS) @@ -204,31 +204,31 @@ def handle_clone_in_progress(volume_client, volname, index, groupname, subvolnam raise VolumeException(oe.errno, oe.error_str) return (next_state, False) -def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel): +def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): try: - with open_volume(volume_client, volname) as fs_handle: + with open_volume(fs_client, volname) as fs_handle: # detach source but leave the clone section intact for later inspection - with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: clone_volumes[1].detach_snapshot(clone_volumes[2], index) except (MetadataMgrException, VolumeException) as e: log.error("failed to detach clone from snapshot: {0}".format(e)) return (None, True) -def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel): +def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): try: - with open_volume(volume_client, volname) as fs_handle: - with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: + with open_volume(fs_client, volname) as fs_handle: + with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: clone_volumes[1].detach_snapshot(clone_volumes[2], index) clone_volumes[0].remove_clone_source(flush=True) except (MetadataMgrException, VolumeException) as e: log.error("failed to detach clone from snapshot: {0}".format(e)) return (None, True) -def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay): +def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay): finished = False current_state = None try: - current_state = get_clone_state(volume_client, volname, groupname, subvolname) + current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname) log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) if current_state == SubvolumeStates.STATE_PENDING: time.sleep(snapshot_clone_delay) @@ -237,19 +237,19 @@ def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_t handler = state_table.get(current_state, None) if not handler: raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) - (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel) + (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel) if next_state: log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ current_state, next_state)) - set_clone_state(volume_client, volname, groupname, subvolname, next_state) + set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state) current_state = next_state except VolumeException as ve: log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ subvolname, current_state, ve)) -def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay): +def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay): log.info("cloning to subvolume path: {0}".format(clone_path)) - resolved = resolve(volume_client.volspec, clone_path) + resolved = resolve(volspec, clone_path) groupname = resolved[0] subvolname = resolved[1] @@ -257,7 +257,7 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel, try: log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) - start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay) + start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay) log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) except VolumeException as ve: log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) @@ -301,8 +301,8 @@ class Cloner(AsyncJobs): s_subvolname = status['source']['subvolume'] s_snapname = status['source']['snapshot'] - with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname, - clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: + with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume, + clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, clone_state, SubvolumeActions.ACTION_CANCELLED) @@ -318,9 +318,9 @@ class Cloner(AsyncJobs): track_idx = None try: - with open_volume(self.vc, volname) as fs_handle: + with open_volume(self.fs_client, volname) as fs_handle: with open_group(fs_handle, self.vc.volspec, groupname) as group: - with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: + with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: status = clone_subvolume.status clone_state = SubvolumeStates.from_value(status['state']) if not self.is_clone_cancelable(clone_state): @@ -338,9 +338,9 @@ class Cloner(AsyncJobs): # accessing the volume in exclusive mode here would lead to deadlock. assert track_idx is not None with lock_timeout_log(self.lock): - with open_volume_lockless(self.vc, volname) as fs_handle: + with open_volume_lockless(self.fs_client, volname) as fs_handle: with open_group(fs_handle, self.vc.volspec, groupname) as group: - with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: + with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") except (IndexException, MetadataMgrException) as e: @@ -348,7 +348,7 @@ class Cloner(AsyncJobs): raise VolumeException(-errno.EINVAL, "error canceling clone") def get_next_job(self, volname, running_jobs): - return get_next_clone_entry(self.vc, volname, running_jobs) + return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs) def execute_job(self, volname, job, should_cancel): - clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay) + clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index 6f7741fc99d8d..b79a3b52544ea 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -4,7 +4,7 @@ import logging import threading import traceback from collections import deque -from mgr_util import lock_timeout_log +from mgr_util import lock_timeout_log, CephfsClient from .exception import NotImplementedException @@ -119,6 +119,8 @@ class AsyncJobs(threading.Thread): self.cancel_cv = threading.Condition(self.lock) self.nr_concurrent_jobs = nr_concurrent_jobs self.name_pfx = name_pfx + # each async job group uses its own libcephfs connection (pool) + self.fs_client = CephfsClient(self.vc.mgr) self.threads = [] for i in range(self.nr_concurrent_jobs): diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py index 7c902572e7aec..d67ef9af373f6 100644 --- a/src/pybind/mgr/volumes/fs/purge_queue.py +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -17,13 +17,13 @@ from .operations.trash import open_trashcan log = logging.getLogger(__name__) # helper for fetching a trash entry for a given volume -def get_trash_entry_for_volume(volume_client, volname, running_jobs): +def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs): log.debug("fetching trash entry for volume '{0}'".format(volname)) try: - with open_volume_lockless(volume_client, volname) as fs_handle: + with open_volume_lockless(fs_client, volname) as fs_handle: try: - with open_trashcan(fs_handle, volume_client.volspec) as trashcan: + with open_trashcan(fs_handle, volspec) as trashcan: path = trashcan.get_trash_entry(running_jobs) return 0, path except VolumeException as ve: @@ -34,14 +34,14 @@ def get_trash_entry_for_volume(volume_client, volname, running_jobs): log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve)) return ve.errno, None -def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel): - groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8')) +def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel): + groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8')) log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname)) try: - with open_volume(volume_client, volname) as fs_handle: - with open_group(fs_handle, volume_client.volspec, groupname) as group: - with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume: + with open_volume(fs_client, volname) as fs_handle: + with open_group(fs_handle, volspec, groupname) as group: + with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume: log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable)) if not subvolume.purgeable: return @@ -54,13 +54,13 @@ def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, sho raise # helper for starting a purge operation on a trash entry -def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel): +def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel): log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname)) ret = 0 try: - with open_volume_lockless(volume_client, volname) as fs_handle: - with open_trashcan(fs_handle, volume_client.volspec) as trashcan: + with open_volume_lockless(fs_client, volname) as fs_handle: + with open_trashcan(fs_handle, volspec) as trashcan: try: pth = os.path.join(trashcan.path, purge_entry) stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE, @@ -78,7 +78,7 @@ def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_can return ve.errno finally: if delink: - subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel) + subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel) log.debug("purging trash link: {0}".format(purge_entry)) trashcan.delink(purge_entry) else: @@ -103,7 +103,7 @@ class ThreadPoolPurgeQueueMixin(AsyncJobs): super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size) def get_next_job(self, volname, running_jobs): - return get_trash_entry_for_volume(self.vc, volname, running_jobs) + return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs) def execute_job(self, volname, job, should_cancel): - purge_trash_entry_for_volume(self.vc, volname, job, should_cancel) + purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel) -- 2.39.5