From: Rishabh Dave Date: Sun, 4 Feb 2024 18:59:54 +0000 (+0530) Subject: mgr/vol: show clone progress in "ceph status" output X-Git-Tag: v20.0.0~1215^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=65b789edfb507fe11db1052b67a703f7d85a4101;p=ceph.git mgr/vol: show clone progress in "ceph status" output Print a progress bar for ongoing clone job in output of "ceph status". When multiple clones are ongoing, show 1 progress bar in output of "ceph status" shows average of progress made by each clone. When number of clone job is more than number of clone threads, print 2 progress bars in output of "ceph status" command; one for ongoing clone jobs and other for ongoing+pending clone jobs. Fixes: https://tracker.ceph.com/issues/61904 Signed-off-by: Rishabh Dave --- diff --git a/src/pybind/mgr/volumes/fs/operations/clone_index.py b/src/pybind/mgr/volumes/fs/operations/clone_index.py index f5a850638d8f..edcc01921c68 100644 --- a/src/pybind/mgr/volumes/fs/operations/clone_index.py +++ b/src/pybind/mgr/volumes/fs/operations/clone_index.py @@ -8,14 +8,16 @@ import cephfs from .index import Index from ..exception import IndexException, VolumeException -from ..fs_util import list_one_entry_at_a_time +from ..fs_util import list_one_entry_at_a_time, listdir log = logging.getLogger(__name__) +PATH_MAX = 4096 + + class CloneIndex(Index): SUB_GROUP_NAME = "clone" - PATH_MAX = 4096 @property def path(self): @@ -47,6 +49,26 @@ class CloneIndex(Index): except cephfs.Error as e: raise IndexException(-e.args[0], e.args[1]) + def list_entries_by_ctime_order(self): + entry_names = listdir(self.fs, self.path, filter_files=False) + if not entry_names: + return [] + + # clone entries with ctime obtained by statig them. basically, + # following is a list of tuples where each tuple has 2 memebers. + ens_with_ctime = [] + for en in entry_names: + d_path = os.path.join(self.path, en) + stb = self.fs.lstat(d_path) + + # add ctime next to clone entry + ens_with_ctime.append((en, stb.st_ctime)) + + ens_with_ctime.sort(key=lambda ctime: en[1]) + + # remove ctime and return list of clone entries sorted by ctime. + return [i[0] for i in ens_with_ctime] + def get_oldest_clone_entry(self, exclude=[]): min_ctime_entry = None exclude_tracking_ids = [v[0] for v in exclude] @@ -61,7 +83,7 @@ class CloneIndex(Index): if min_ctime_entry: try: linklen = min_ctime_entry[1].st_size - sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX) + sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), PATH_MAX) return (min_ctime_entry[0], sink_path[:linklen]) except cephfs.Error as e: raise IndexException(-e.args[0], e.args[1]) @@ -74,7 +96,7 @@ class CloneIndex(Index): dpath = os.path.join(self.path, dname) st = self.fs.lstat(dpath) if stat.S_ISLNK(st.st_mode): - target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX) + target_path = self.fs.readlink(dpath, PATH_MAX) if sink_path == target_path[:st.st_size]: return dname return None diff --git a/src/pybind/mgr/volumes/fs/stats_util.py b/src/pybind/mgr/volumes/fs/stats_util.py index 74deb103fb8d..cec33eaa8873 100644 --- a/src/pybind/mgr/volumes/fs/stats_util.py +++ b/src/pybind/mgr/volumes/fs/stats_util.py @@ -5,9 +5,22 @@ and destination directory for the copy operation that is performed for snapshot cloning) and pass, print, log and convert them to human readable format conveniently. ''' +from os.path import join as os_path_join from typing import Optional +from logging import getLogger -from mgr_util import format_bytes, format_dimless +from .operations.volume import open_volume_lockless, list_volumes +from .operations.subvolume import open_clone_subvol_pair_in_vol, open_subvol_in_vol +from .operations.template import SubvolumeOpType +from .operations.clone_index import open_clone_index, PATH_MAX +from .operations.resolver import resolve_group_and_subvolume_name +from .exception import VolumeException + +from mgr_util import RTimer, format_bytes, format_dimless +from cephfs import ObjectNotFound + + +log = getLogger(__name__) def get_size_ratio_str(size1, size2): @@ -42,6 +55,11 @@ def get_amount_copied(src_path, dst_path, fs_handle): return size_t, size_c, percent +def get_percent_copied(src_path, dst_path, fs_handle): + _, _, percent = get_amount_copied(src_path, dst_path, fs_handle) + return percent + + def get_stats(src_path, dst_path, fs_handle): rentries = 'ceph.dir.rentries' rentries_t = int(fs_handle.getxattr(src_path, rentries)) @@ -54,3 +72,240 @@ def get_stats(src_path, dst_path, fs_handle): 'amount cloned': get_size_ratio_str(size_c, size_t), 'files cloned': get_num_ratio_str(rentries_c, rentries_t), } + + +class CloneInfo: + + def __init__(self, volname): + self.volname = volname + + self.src_group_name = None + self.src_subvol_name = None + self.src_path = None + + self.dst_group_name = None + self.dst_subvol_name = None + self.dst_path = None + + +class CloneProgressReporter: + + def __init__(self, volclient, vol_spec): + self.vol_spec = vol_spec + + # instance of VolumeClient is needed here so that call to + # LibCephFS.getxattr() can be made. + self.volclient = volclient + + # need to figure out how many progress bars should be printed. print 1 + # progress bar if number of ongoing clones is less than this value, + # else print 2. + self.max_concurrent_clones = self.volclient.mgr.max_concurrent_clones + + # Creating an RTimer instance in advance so that we can check if clone + # reporting has already been initiated by calling RTimer.is_alive(). + self.update_task = RTimer(1, self._update_progress_bars) + + def initiate_reporting(self): + if self.update_task.is_alive(): + log.info('progress reporting thread is already alive, not ' + 'initiating it again') + return + + log.info('initiating progress reporting for clones...') + # progress event ID for ongoing clone jobs + self.on_pev_id: Optional[str] = 'mgr-vol-ongoing-clones' + # progress event ID for ongoing+pending clone jobs + self.onpen_pev_id: Optional[str] = 'mgr-vol-total-clones' + + self.update_task = RTimer(1, self._update_progress_bars) + self.update_task.start() + log.info('progress reporting for clones has been initiated') + + def _get_clone_dst_info(self, fs_handle, ci, clone_entry, + clone_index_path): + log.debug('collecting info for cloning destination') + + ce_path = os_path_join(clone_index_path, clone_entry) + # XXX: This may raise ObjectNotFound exception. As soon as cloning is + # finished, clone entry is deleted by cloner thread. This exception is + # handled in _get_info_for_all_clones(). + dst_subvol_base_path = fs_handle.readlink(ce_path, PATH_MAX).\ + decode('utf-8') + + ci.dst_group_name, ci.dst_subvol_name = \ + resolve_group_and_subvolume_name(self.vol_spec, dst_subvol_base_path) + with open_subvol_in_vol(self.volclient, self.vol_spec, ci.volname, + ci.dst_group_name, ci.dst_subvol_name, + SubvolumeOpType.CLONE_INTERNAL) \ + as (_, _, dst_subvol): + ci.dst_path = dst_subvol.path + log.debug(f'destination subvolume path for clone - {ci.dst_path}') + + log.debug('finished collecting info for cloning destination') + + def _get_clone_src_info(self, fs_handle, ci): + log.debug('collecting info for cloning source') + + with open_clone_subvol_pair_in_vol(self.volclient, self.vol_spec, + ci.volname, ci.dst_group_name, + ci.dst_subvol_name) as \ + (dst_subvol, src_subvol, snap_name): + ci.src_group_name = src_subvol.group_name + ci.src_subvol_name = src_subvol.subvolname + ci.src_path = src_subvol.snapshot_data_path(snap_name) + log.debug(f'source subvolume path for clone - {ci.src_path}') + + log.debug('finished collecting info for cloning source') + + def _get_info_for_all_clones(self): + clones:list[CloneInfo] = [] + + log.debug('collecting all entries in clone index...') + volnames = list_volumes(self.volclient.mgr) + for volname in volnames: + with open_volume_lockless(self.volclient, volname) as fs_handle: + with open_clone_index(fs_handle, self.vol_spec) as clone_index: + clone_index_path = clone_index.path + # get clone in order in which they were launched, this + # should be same as the ctime on clone entry. + clone_index_entries = clone_index.list_entries_by_ctime_order() + log.debug('finished collecting all clone index entries, ' + f'found {len(clones)} clone index entries') + + log.debug('collecting info for clones found through clone index ' + 'entries...') + for ce in clone_index_entries: + ci = CloneInfo(volname) + + try: + self._get_clone_dst_info(fs_handle, ci, ce, + clone_index_path) + self._get_clone_src_info(fs_handle, ci) + except ObjectNotFound as e: + log.info('Exception ObjectNotFound was raised. Apparently ' + 'entry in clone index was removed because one of ' + 'the clone job(s) has completed/cancelled, ' + 'therefore ignoring and proceeding. ' + f'Printing the exception: {e}') + continue + except VolumeException as e: + if e.error_str != 'error fetching subvolume metadata': + raise + log.info('Exception VolumeException was raised. Apparently ' + 'an entry from the metadata file of clone source ' + 'was removed because one of the clone job(s) has ' + 'completed/cancelled. Therefore ignoring and ' + f'proceeding Printing the exception: {e}') + continue + + if not ci.src_path or not ci.dst_path: + continue + + clones.append(ci) + + log.debug('finished collecting info on all clones, found ' + f'{len(clones)} clones') + return clones + + def _update_progress_bar_event(self, ev_id, ev_msg, ev_progress_fraction): + log.debug(f'ev_id = {ev_id} ev_progress_fraction = {ev_progress_fraction}') + log.debug(f'ev_msg = {ev_msg}') + log.debug('calling update() from mgr/update module') + + self.volclient.mgr.remote('progress', 'update', ev_id=ev_id, + ev_msg=ev_msg, + ev_progress=ev_progress_fraction, + refs=['mds', 'clone'], add_to_ceph_s=True) + + log.debug('call to update() from mgr/update module was successful') + + def _update_progress_bars(self): + ''' + Look for amount of progress made by all cloning operations and prints + progress bars, in "ceph -s" output, for average progress made + accordingly. + + This method is supposed to be run only by instance of class RTimer + present in this class. + ''' + clones = self._get_info_for_all_clones() + if not clones: + self.finish() + return + + # onpen bar (that is progress bar for clone jobs in ongoing and pending + # state) is printed when clones are in pending state. it is kept in + # printing until all clone jobs finish. + show_onpen_bar = True if len(clones) > self.max_concurrent_clones \ + else False + + percent = 0.0 + + assert self.on_pev_id is not None + sum_percent_ongoing = 0.0 + avg_percent_ongoing = 0.0 + total_ongoing_clones = min(len(clones), self.max_concurrent_clones) + + if show_onpen_bar: + assert self.onpen_pev_id is not None + sum_percent_onpen = 0.0 + avg_percent_onpen = 0.0 + total_onpen_clones = len(clones) + + for clone in clones: + with open_volume_lockless(self.volclient, clone.volname) as \ + fs_handle: + percent = get_percent_copied(clone.src_path, clone.dst_path, + fs_handle) + if clone in clones[:total_ongoing_clones]: + sum_percent_ongoing += percent + if show_onpen_bar: + sum_percent_onpen += percent + + avg_percent_ongoing = round(sum_percent_ongoing / total_ongoing_clones, 3) + # progress module takes progress as a fraction between 0.0 to 1.0. + avg_progress_fraction = avg_percent_ongoing / 100 + msg = (f'{total_ongoing_clones} ongoing clones - average progress is ' + f'{avg_percent_ongoing}%') + self._update_progress_bar_event(ev_id=self.on_pev_id, ev_msg=msg, + ev_progress_fraction=avg_progress_fraction) + log.debug('finished updating progress bar for ongoing clones with ' + f'following message - {msg}') + + if show_onpen_bar: + avg_percent_onpen = round(sum_percent_onpen / total_onpen_clones, 3) + # progress module takes progress as a fraction between 0.0 to 1.0. + avg_progress_fraction = avg_percent_onpen / 100 + msg = (f'Total {total_onpen_clones} clones - average progress is ' + f'{avg_percent_onpen}%') + self._update_progress_bar_event(ev_id=self.onpen_pev_id, ev_msg=msg, + ev_progress_fraction=avg_progress_fraction) + log.debug('finished updating progress bar for ongoing+pending ' + f'clones with following message - {msg}') + + def _finish_progress_events(self): + ''' + Remove progress bars from "ceph status" output. + ''' + log.info('removing progress bars from "ceph status" output') + + assert self.on_pev_id is not None + assert self.onpen_pev_id is not None + + self.volclient.mgr.remote('progress', 'complete', self.on_pev_id) + self.on_pev_id = None + + self.volclient.mgr.remote('progress', 'complete', self.onpen_pev_id) + self.onpen_pev_id = None + + log.info('finished removing progress bars from "ceph status" output') + + def finish(self): + ''' + All cloning jobs have been completed. Terminate this RTimer thread. + ''' + self._finish_progress_events() + + log.info(f'marking this RTimer thread as finished; thread object ID - {self}') + self.update_task.finished.set() diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 778817a80138..3f3b16020494 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -28,6 +28,7 @@ from .exception import VolumeException, ClusterError, ClusterTimeout, \ from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin from .operations.template import SubvolumeOpType +from .stats_util import CloneProgressReporter if TYPE_CHECKING: from volumes import Module @@ -61,6 +62,8 @@ class VolumeClient(CephfsClient["Module"]): self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay, self.mgr.snapshot_clone_no_wait) + self.clone_progress_reporter = CloneProgressReporter(self, + self.volspec) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the @@ -800,6 +803,7 @@ class VolumeClient(CephfsClient["Module"]): else: s_subvolume.attach_snapshot(s_snapname, t_subvolume) self.cloner.queue_job(volname) + self.clone_progress_reporter.initiate_reporting() except VolumeException as ve: try: t_subvolume.remove()