]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: use dedicated libcephfs handles for subvolume calls and async jobs
authorVenky Shankar <vshankar@redhat.com>
Fri, 18 Jun 2021 07:13:01 +0000 (03:13 -0400)
committerKotresh HR <khiremat@redhat.com>
Mon, 28 Feb 2022 11:55:55 +0000 (17:25 +0530)
Fixes: http://tracker.ceph.com/issues/51271
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit cb2883feac1a5c141a3d72120c2320f7a8ffdea8)

Conflicts:
  src/pybind/mgr/volumes/fs/async_cloner.py: The commit cf2a1ad65120 is
not backported
  src/pybind/mgr/volumes/fs/async_job.py: The commit cf2a1ad65120 is not
backported

src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/async_job.py
src/pybind/mgr/volumes/fs/purge_queue.py

index 30f0af6183647fa999631e1f6fec4f5d8a3f548f..443232b34619c5e6c1c2a2763929abde38c79c0f 100644 (file)
@@ -22,13 +22,13 @@ from .operations.template import SubvolumeOpType
 log = logging.getLogger(__name__)
 
 # helper for fetching a clone entry for a given volume
-def get_next_clone_entry(volume_client, volname, running_jobs):
+def get_next_clone_entry(fs_client, volspec, volname, running_jobs):
     log.debug("fetching clone entry for volume '{0}'".format(volname))
 
     try:
-        with open_volume_lockless(volume_client, volname) as fs_handle:
+        with open_volume_lockless(fs_client, volname) as fs_handle:
             try:
-                with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
+                with open_clone_index(fs_handle, volspec) as clone_index:
                     job = clone_index.get_oldest_clone_entry(running_jobs)
                     return 0, job
             except IndexException as ve:
@@ -40,46 +40,46 @@ def get_next_clone_entry(volume_client, volname, running_jobs):
         return ve.errno, None
 
 @contextmanager
-def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
-    with open_volume(volume_client, volname) as fs_handle:
-        with open_group(fs_handle, volume_client.volspec, groupname) as group:
-            with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
+def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type):
+    with open_volume(fs_client, volname) as fs_handle:
+        with open_group(fs_handle, volspec, groupname) as group:
+            with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume:
                 yield subvolume
 
 @contextmanager
-def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
-    with open_group(fs_handle, volume_client.volspec, groupname) as group:
-        with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
+def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type):
+    with open_group(fs_handle, volspec, groupname) as group:
+        with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume:
             yield subvolume
 
 @contextmanager
-def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
+def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
     # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
     # the clone subvolume as the source subvolume
     if s_groupname == c_groupname and s_subvolname == c_subvolname:
         yield c_subvolume
     else:
-        with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
+        with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume:
             yield s_subvolume
 
 
 @contextmanager
-def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
-    with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
+def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname):
+    with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
         s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
         if groupname == s_groupname and subvolname == s_subvolname:
             # use the same subvolume to avoid metadata overwrites
             yield (clone_subvolume, clone_subvolume, s_snapname)
         else:
-            with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
+            with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
                 yield (clone_subvolume, source_subvolume, s_snapname)
 
-def get_clone_state(volume_client, volname, groupname, subvolname):
-    with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
+def get_clone_state(fs_client, volspec, volname, groupname, subvolname):
+    with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
         return subvolume.state
 
-def set_clone_state(volume_client, volname, groupname, subvolname, state):
-    with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
+def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state):
+    with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
         subvolume.state = (state, True)
 
 def get_clone_source(clone_subvolume):
@@ -98,7 +98,7 @@ def get_next_state_on_error(errnum):
                                               SubvolumeActions.ACTION_FAILED)
     return next_state
 
-def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
     try:
         if should_cancel():
             next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
@@ -184,16 +184,16 @@ def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
     if should_cancel():
         raise VolumeException(-errno.EINTR, "clone operation interrupted")
 
-def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
-    with open_volume_lockless(volume_client, volname) as fs_handle:
-        with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel):
+    with open_volume_lockless(fs_client, volname) as fs_handle:
+        with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
             src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
             dst_path = clone_volumes[0].path
             bulk_copy(fs_handle, src_path, dst_path, should_cancel)
 
-def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
     try:
-        do_clone(volume_client, volname, groupname, subvolname, should_cancel)
+        do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel)
         next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
                                               SubvolumeStates.STATE_INPROGRESS,
                                               SubvolumeActions.ACTION_SUCCESS)
@@ -203,31 +203,31 @@ def handle_clone_in_progress(volume_client, volname, index, groupname, subvolnam
         raise VolumeException(oe.errno, oe.error_str)
     return (next_state, False)
 
-def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
     try:
-        with open_volume(volume_client, volname) as fs_handle:
+        with open_volume(fs_client, volname) as fs_handle:
             # detach source but leave the clone section intact for later inspection
-            with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+            with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
                 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
     except (MetadataMgrException, VolumeException) as e:
         log.error("failed to detach clone from snapshot: {0}".format(e))
     return (None, True)
 
-def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
     try:
-        with open_volume(volume_client, volname) as fs_handle:
-            with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+        with open_volume(fs_client, volname) as fs_handle:
+            with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
                 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
                 clone_volumes[0].remove_clone_source(flush=True)
     except (MetadataMgrException, VolumeException) as e:
         log.error("failed to detach clone from snapshot: {0}".format(e))
     return (None, True)
 
-def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
+def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
     finished = False
     current_state = None
     try:
-        current_state = get_clone_state(volume_client, volname, groupname, subvolname)
+        current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname)
         log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
         if current_state == SubvolumeStates.STATE_PENDING:
             time.sleep(snapshot_clone_delay)
@@ -236,19 +236,19 @@ def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_t
             handler = state_table.get(current_state, None)
             if not handler:
                 raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
-            (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
+            (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel)
             if next_state:
                 log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
                                                                                          current_state, next_state))
-                set_clone_state(volume_client, volname, groupname, subvolname, next_state)
+                set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state)
                 current_state = next_state
     except VolumeException as ve:
         log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
                                                                                              subvolname, current_state, ve))
 
-def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
+def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
     log.info("cloning to subvolume path: {0}".format(clone_path))
-    resolved = resolve(volume_client.volspec, clone_path)
+    resolved = resolve(volspec, clone_path)
 
     groupname  = resolved[0]
     subvolname = resolved[1]
@@ -256,7 +256,7 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel,
 
     try:
         log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
-        start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
+        start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
         log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
     except VolumeException as ve:
         log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
@@ -300,8 +300,8 @@ class Cloner(AsyncJobs):
         s_subvolname = status['source']['subvolume']
         s_snapname = status['source']['snapshot']
 
-        with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
-                                  clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+        with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume,
+                                  clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
             next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
                                                   clone_state,
                                                   SubvolumeActions.ACTION_CANCELLED)
@@ -317,9 +317,9 @@ class Cloner(AsyncJobs):
         track_idx = None
 
         try:
-            with open_volume(self.vc, volname) as fs_handle:
+            with open_volume(self.fs_client, volname) as fs_handle:
                 with open_group(fs_handle, self.vc.volspec, groupname) as group:
-                    with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
+                    with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
                         status = clone_subvolume.status
                         clone_state = SubvolumeStates.from_value(status['state'])
                         if not self.is_clone_cancelable(clone_state):
@@ -337,9 +337,9 @@ class Cloner(AsyncJobs):
             # accessing the volume in exclusive mode here would lead to deadlock.
             assert track_idx is not None
             with self.lock:
-                with open_volume_lockless(self.vc, volname) as fs_handle:
+                with open_volume_lockless(self.fs_client, volname) as fs_handle:
                     with open_group(fs_handle, self.vc.volspec, groupname) as group:
-                        with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
+                        with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
                             if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
                                 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
         except (IndexException, MetadataMgrException) as e:
@@ -347,7 +347,7 @@ class Cloner(AsyncJobs):
             raise VolumeException(-errno.EINVAL, "error canceling clone")
 
     def get_next_job(self, volname, running_jobs):
-        return get_next_clone_entry(self.vc, volname, running_jobs)
+        return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs)
 
     def execute_job(self, volname, job, should_cancel):
-        clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
+        clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
index fb7051f47c24282b07ec51f9149fde6f10243763..7a38ef53e0d2ef715fc5fe9d539f856d156e0695 100644 (file)
@@ -4,6 +4,7 @@ import logging
 import threading
 import traceback
 from collections import deque
+from mgr_util import CephfsClient
 
 from .exception import NotImplementedException
 
@@ -115,6 +116,8 @@ class AsyncJobs(object):
         self.waiting = False
         self.cancel_cv = threading.Condition(self.lock)
         self.nr_concurrent_jobs = nr_concurrent_jobs
+        # each async job group uses its own libcephfs connection (pool)
+        self.fs_client = CephfsClient(self.vc.mgr)
 
         self.threads = []
         for i in range(nr_concurrent_jobs):
index 7c902572e7aecbccbb97b9cd5dc006f82e080981..d67ef9af373f6fcaaf347c4ca5695ec42e78115b 100644 (file)
@@ -17,13 +17,13 @@ from .operations.trash import open_trashcan
 log = logging.getLogger(__name__)
 
 # helper for fetching a trash entry for a given volume
-def get_trash_entry_for_volume(volume_client, volname, running_jobs):
+def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
     log.debug("fetching trash entry for volume '{0}'".format(volname))
 
     try:
-        with open_volume_lockless(volume_client, volname) as fs_handle:
+        with open_volume_lockless(fs_client, volname) as fs_handle:
             try:
-                with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+                with open_trashcan(fs_handle, volspec) as trashcan:
                     path = trashcan.get_trash_entry(running_jobs)
                     return 0, path
             except VolumeException as ve:
@@ -34,14 +34,14 @@ def get_trash_entry_for_volume(volume_client, volname, running_jobs):
         log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
         return ve.errno, None
 
-def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
-    groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
+def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
+    groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
     log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
 
     try:
-        with open_volume(volume_client, volname) as fs_handle:
-            with open_group(fs_handle, volume_client.volspec, groupname) as group:
-                with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+        with open_volume(fs_client, volname) as fs_handle:
+            with open_group(fs_handle, volspec, groupname) as group:
+                with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
                     log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
                     if not subvolume.purgeable:
                         return
@@ -54,13 +54,13 @@ def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, sho
             raise
 
 # helper for starting a purge operation on a trash entry
-def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
+def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
     log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
 
     ret = 0
     try:
-        with open_volume_lockless(volume_client, volname) as fs_handle:
-            with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+        with open_volume_lockless(fs_client, volname) as fs_handle:
+            with open_trashcan(fs_handle, volspec) as trashcan:
                 try:
                     pth = os.path.join(trashcan.path, purge_entry)
                     stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
@@ -78,7 +78,7 @@ def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_can
                                 return ve.errno
                         finally:
                             if delink:
-                                subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
+                                subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
                                 log.debug("purging trash link: {0}".format(purge_entry))
                                 trashcan.delink(purge_entry)
                     else:
@@ -103,7 +103,7 @@ class ThreadPoolPurgeQueueMixin(AsyncJobs):
         super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
 
     def get_next_job(self, volname, running_jobs):
-        return get_trash_entry_for_volume(self.vc, volname, running_jobs)
+        return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)
 
     def execute_job(self, volname, job, should_cancel):
-        purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)
+        purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)