]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/vol: show clone progress in "ceph status" output
authorRishabh Dave <ridave@redhat.com>
Sun, 4 Feb 2024 18:59:54 +0000 (00:29 +0530)
committerRishabh Dave <ridave@redhat.com>
Wed, 14 Aug 2024 09:35:46 +0000 (15:05 +0530)
Print a progress bar for ongoing clone job in output of "ceph status".

When multiple clones are ongoing, show 1 progress bar in output of
"ceph status" shows average of progress made by each clone.

When number of clone job is more than number of clone threads, print 2
progress bars in output of "ceph status"  command; one for ongoing clone
jobs and other for ongoing+pending clone jobs.

Fixes: https://tracker.ceph.com/issues/61904
Signed-off-by: Rishabh Dave <ridave@redhat.com>
src/pybind/mgr/volumes/fs/operations/clone_index.py
src/pybind/mgr/volumes/fs/stats_util.py
src/pybind/mgr/volumes/fs/volume.py

index f5a850638d8f51efc42b3e88d042263d50dd89e1..edcc01921c683a6790ab9a73997f5f9755d72fcd 100644 (file)
@@ -8,14 +8,16 @@ import cephfs
 
 from .index import Index
 from ..exception import IndexException, VolumeException
-from ..fs_util import list_one_entry_at_a_time
+from ..fs_util import list_one_entry_at_a_time, listdir
 
 log = logging.getLogger(__name__)
 
 
+PATH_MAX = 4096
+
+
 class CloneIndex(Index):
     SUB_GROUP_NAME = "clone"
-    PATH_MAX = 4096
 
     @property
     def path(self):
@@ -47,6 +49,26 @@ class CloneIndex(Index):
         except cephfs.Error as e:
             raise IndexException(-e.args[0], e.args[1])
 
+    def list_entries_by_ctime_order(self):
+        entry_names = listdir(self.fs, self.path, filter_files=False)
+        if not entry_names:
+            return []
+
+        # clone entries with ctime obtained by statig them. basically,
+        # following is a list of tuples where each tuple has 2 memebers.
+        ens_with_ctime = []
+        for en in entry_names:
+            d_path = os.path.join(self.path, en)
+            stb = self.fs.lstat(d_path)
+
+            # add ctime next to clone entry
+            ens_with_ctime.append((en, stb.st_ctime))
+
+        ens_with_ctime.sort(key=lambda ctime: en[1])
+
+        # remove ctime and return list of clone entries sorted by ctime.
+        return [i[0] for i in ens_with_ctime]
+
     def get_oldest_clone_entry(self, exclude=[]):
         min_ctime_entry = None
         exclude_tracking_ids = [v[0] for v in exclude]
@@ -61,7 +83,7 @@ class CloneIndex(Index):
         if min_ctime_entry:
             try:
                 linklen = min_ctime_entry[1].st_size
-                sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX)
+                sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), PATH_MAX)
                 return (min_ctime_entry[0], sink_path[:linklen])
             except cephfs.Error as e:
                 raise IndexException(-e.args[0], e.args[1])
@@ -74,7 +96,7 @@ class CloneIndex(Index):
                 dpath = os.path.join(self.path, dname)
                 st = self.fs.lstat(dpath)
                 if stat.S_ISLNK(st.st_mode):
-                    target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
+                    target_path = self.fs.readlink(dpath, PATH_MAX)
                     if sink_path == target_path[:st.st_size]:
                         return dname
             return None
index 74deb103fb8d7b9d088eb0285332f3f2e858f9d1..cec33eaa8873dc96ea3a8c07603c24c4dbd7871b 100644 (file)
@@ -5,9 +5,22 @@ and destination directory for the copy operation that is performed for snapshot
 cloning) and pass, print, log and convert them to human readable format
 conveniently.
 '''
+from os.path import join as os_path_join
 from typing import Optional
+from logging import getLogger
 
-from mgr_util import format_bytes, format_dimless
+from .operations.volume import open_volume_lockless, list_volumes
+from .operations.subvolume import open_clone_subvol_pair_in_vol, open_subvol_in_vol
+from .operations.template import SubvolumeOpType
+from .operations.clone_index import open_clone_index, PATH_MAX
+from .operations.resolver import resolve_group_and_subvolume_name
+from .exception import VolumeException
+
+from mgr_util import RTimer, format_bytes, format_dimless
+from cephfs import ObjectNotFound
+
+
+log = getLogger(__name__)
 
 
 def get_size_ratio_str(size1, size2):
@@ -42,6 +55,11 @@ def get_amount_copied(src_path, dst_path, fs_handle):
     return size_t, size_c, percent
 
 
+def get_percent_copied(src_path, dst_path, fs_handle):
+    _, _, percent = get_amount_copied(src_path, dst_path, fs_handle)
+    return percent
+
+
 def get_stats(src_path, dst_path, fs_handle):
     rentries = 'ceph.dir.rentries'
     rentries_t = int(fs_handle.getxattr(src_path, rentries))
@@ -54,3 +72,240 @@ def get_stats(src_path, dst_path, fs_handle):
         'amount cloned': get_size_ratio_str(size_c, size_t),
         'files cloned': get_num_ratio_str(rentries_c, rentries_t),
     }
+
+
+class CloneInfo:
+
+    def __init__(self, volname):
+        self.volname = volname
+
+        self.src_group_name = None
+        self.src_subvol_name = None
+        self.src_path = None
+
+        self.dst_group_name = None
+        self.dst_subvol_name = None
+        self.dst_path = None
+
+
+class CloneProgressReporter:
+
+    def __init__(self, volclient, vol_spec):
+        self.vol_spec = vol_spec
+
+        # instance of VolumeClient is needed here so that call to
+        # LibCephFS.getxattr() can be made.
+        self.volclient = volclient
+
+        # need to figure out how many progress bars should be printed. print 1
+        # progress bar if number of ongoing clones is less than this value,
+        # else print 2.
+        self.max_concurrent_clones = self.volclient.mgr.max_concurrent_clones
+
+        # Creating an RTimer instance in advance so that we can check if clone
+        # reporting has already been initiated by calling RTimer.is_alive().
+        self.update_task = RTimer(1, self._update_progress_bars)
+
+    def initiate_reporting(self):
+        if self.update_task.is_alive():
+            log.info('progress reporting thread is already alive, not '
+                     'initiating it again')
+            return
+
+        log.info('initiating progress reporting for clones...')
+        # progress event ID for ongoing clone jobs
+        self.on_pev_id: Optional[str] = 'mgr-vol-ongoing-clones'
+        # progress event ID for ongoing+pending clone jobs
+        self.onpen_pev_id: Optional[str] = 'mgr-vol-total-clones'
+
+        self.update_task = RTimer(1, self._update_progress_bars)
+        self.update_task.start()
+        log.info('progress reporting for clones has been initiated')
+
+    def _get_clone_dst_info(self, fs_handle, ci, clone_entry,
+                            clone_index_path):
+        log.debug('collecting info for cloning destination')
+
+        ce_path = os_path_join(clone_index_path, clone_entry)
+        # XXX: This may raise ObjectNotFound exception. As soon as cloning is
+        # finished, clone entry is deleted by cloner thread. This exception is
+        # handled in _get_info_for_all_clones().
+        dst_subvol_base_path = fs_handle.readlink(ce_path, PATH_MAX).\
+            decode('utf-8')
+
+        ci.dst_group_name, ci.dst_subvol_name = \
+            resolve_group_and_subvolume_name(self.vol_spec, dst_subvol_base_path)
+        with open_subvol_in_vol(self.volclient, self.vol_spec, ci.volname,
+                                ci.dst_group_name, ci.dst_subvol_name,
+                                SubvolumeOpType.CLONE_INTERNAL) \
+                                as (_, _, dst_subvol):
+            ci.dst_path = dst_subvol.path
+            log.debug(f'destination subvolume path for clone - {ci.dst_path}')
+
+        log.debug('finished collecting info for cloning destination')
+
+    def _get_clone_src_info(self, fs_handle, ci):
+        log.debug('collecting info for cloning source')
+
+        with open_clone_subvol_pair_in_vol(self.volclient, self.vol_spec,
+                                           ci.volname, ci.dst_group_name,
+                                           ci.dst_subvol_name) as \
+                                           (dst_subvol, src_subvol, snap_name):
+            ci.src_group_name = src_subvol.group_name
+            ci.src_subvol_name = src_subvol.subvolname
+            ci.src_path = src_subvol.snapshot_data_path(snap_name)
+            log.debug(f'source subvolume path for clone - {ci.src_path}')
+
+        log.debug('finished collecting info for cloning source')
+
+    def _get_info_for_all_clones(self):
+        clones:list[CloneInfo] = []
+
+        log.debug('collecting all entries in clone index...')
+        volnames = list_volumes(self.volclient.mgr)
+        for volname in volnames:
+            with open_volume_lockless(self.volclient, volname) as fs_handle:
+                with open_clone_index(fs_handle, self.vol_spec) as clone_index:
+                    clone_index_path = clone_index.path
+                    # get clone in order in which they were launched, this
+                    # should be same as the ctime on clone entry.
+                    clone_index_entries = clone_index.list_entries_by_ctime_order()
+                    log.debug('finished collecting all clone index entries, '
+                              f'found {len(clones)} clone index entries')
+
+                log.debug('collecting info for clones found through clone index '
+                         'entries...')
+                for ce in clone_index_entries:
+                    ci = CloneInfo(volname)
+
+                    try:
+                        self._get_clone_dst_info(fs_handle, ci, ce,
+                                                 clone_index_path)
+                        self._get_clone_src_info(fs_handle, ci)
+                    except ObjectNotFound as e:
+                        log.info('Exception ObjectNotFound was raised. Apparently '
+                                 'entry in clone index was removed because one of '
+                                 'the clone job(s) has completed/cancelled, '
+                                 'therefore ignoring and proceeding. '
+                                 f'Printing the exception: {e}')
+                        continue
+                    except VolumeException as e:
+                        if e.error_str != 'error fetching subvolume metadata':
+                            raise
+                        log.info('Exception VolumeException was raised. Apparently '
+                                 'an entry from the metadata file of clone source '
+                                 'was removed because one of the clone job(s) has '
+                                 'completed/cancelled. Therefore ignoring and '
+                                 f'proceeding Printing the exception: {e}')
+                        continue
+
+                    if not ci.src_path or not ci.dst_path:
+                        continue
+
+                    clones.append(ci)
+
+        log.debug('finished collecting info on all clones, found '
+                  f'{len(clones)} clones')
+        return clones
+
+    def _update_progress_bar_event(self, ev_id, ev_msg, ev_progress_fraction):
+        log.debug(f'ev_id = {ev_id} ev_progress_fraction = {ev_progress_fraction}')
+        log.debug(f'ev_msg = {ev_msg}')
+        log.debug('calling update() from mgr/update module')
+
+        self.volclient.mgr.remote('progress', 'update', ev_id=ev_id,
+                                  ev_msg=ev_msg,
+                                  ev_progress=ev_progress_fraction,
+                                  refs=['mds', 'clone'], add_to_ceph_s=True)
+
+        log.debug('call to update() from mgr/update module was successful')
+
+    def _update_progress_bars(self):
+        '''
+        Look for amount of progress made by all cloning operations and prints
+        progress bars, in "ceph -s" output, for average progress made
+        accordingly.
+
+        This method is supposed to be run only by instance of class RTimer
+        present in this class.
+        '''
+        clones = self._get_info_for_all_clones()
+        if not clones:
+            self.finish()
+            return
+
+        # onpen bar (that is progress bar for clone jobs in ongoing and pending
+        # state) is printed when clones are in pending state. it is kept in
+        # printing until all clone jobs finish.
+        show_onpen_bar = True if len(clones) > self.max_concurrent_clones \
+            else False
+
+        percent = 0.0
+
+        assert self.on_pev_id is not None
+        sum_percent_ongoing = 0.0
+        avg_percent_ongoing = 0.0
+        total_ongoing_clones = min(len(clones), self.max_concurrent_clones)
+
+        if show_onpen_bar:
+            assert self.onpen_pev_id is not None
+            sum_percent_onpen = 0.0
+            avg_percent_onpen = 0.0
+            total_onpen_clones = len(clones)
+
+        for clone in clones:
+            with open_volume_lockless(self.volclient, clone.volname) as \
+                    fs_handle:
+                percent = get_percent_copied(clone.src_path, clone.dst_path,
+                                             fs_handle)
+                if clone in clones[:total_ongoing_clones]:
+                    sum_percent_ongoing += percent
+                if show_onpen_bar:
+                    sum_percent_onpen += percent
+
+        avg_percent_ongoing = round(sum_percent_ongoing / total_ongoing_clones, 3)
+        # progress module takes progress as a fraction between 0.0 to 1.0.
+        avg_progress_fraction = avg_percent_ongoing / 100
+        msg = (f'{total_ongoing_clones} ongoing clones - average progress is '
+               f'{avg_percent_ongoing}%')
+        self._update_progress_bar_event(ev_id=self.on_pev_id, ev_msg=msg,
+            ev_progress_fraction=avg_progress_fraction)
+        log.debug('finished updating progress bar for ongoing clones with '
+                  f'following message - {msg}')
+
+        if show_onpen_bar:
+            avg_percent_onpen = round(sum_percent_onpen / total_onpen_clones, 3)
+            # progress module takes progress as a fraction between 0.0 to 1.0.
+            avg_progress_fraction = avg_percent_onpen / 100
+            msg = (f'Total {total_onpen_clones} clones - average progress is '
+                   f'{avg_percent_onpen}%')
+            self._update_progress_bar_event(ev_id=self.onpen_pev_id, ev_msg=msg,
+                                        ev_progress_fraction=avg_progress_fraction)
+            log.debug('finished updating progress bar for ongoing+pending '
+                      f'clones with following message - {msg}')
+
+    def _finish_progress_events(self):
+        '''
+        Remove progress bars from "ceph status" output.
+        '''
+        log.info('removing progress bars from "ceph status" output')
+
+        assert self.on_pev_id is not None
+        assert self.onpen_pev_id is not None
+
+        self.volclient.mgr.remote('progress', 'complete', self.on_pev_id)
+        self.on_pev_id = None
+
+        self.volclient.mgr.remote('progress', 'complete', self.onpen_pev_id)
+        self.onpen_pev_id = None
+
+        log.info('finished removing progress bars from "ceph status" output')
+
+    def finish(self):
+        '''
+        All cloning jobs have been completed. Terminate this RTimer thread.
+        '''
+        self._finish_progress_events()
+
+        log.info(f'marking this RTimer thread as finished; thread object ID - {self}')
+        self.update_task.finished.set()
index 778817a8013853941e9778a3f8bcd714f880eeb2..3f3b16020494d0e006b59806f650d83566bc1d21 100644 (file)
@@ -28,6 +28,7 @@ from .exception import VolumeException, ClusterError, ClusterTimeout, \
 from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
 from .operations.template import SubvolumeOpType
+from .stats_util import CloneProgressReporter
 
 if TYPE_CHECKING:
     from volumes import Module
@@ -61,6 +62,8 @@ class VolumeClient(CephfsClient["Module"]):
         self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
         self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
                              self.mgr.snapshot_clone_no_wait)
+        self.clone_progress_reporter = CloneProgressReporter(self,
+                                                             self.volspec)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
@@ -800,6 +803,7 @@ class VolumeClient(CephfsClient["Module"]):
                 else:
                     s_subvolume.attach_snapshot(s_snapname, t_subvolume)
                 self.cloner.queue_job(volname)
+                self.clone_progress_reporter.initiate_reporting()
             except VolumeException as ve:
                 try:
                     t_subvolume.remove()