self._wait_for_trash_empty()
-# NOTE: these tests consumes considerable amount of CPU and RAM due generation
-# random of files and due to multiple cloning jobs that are run simultaneously.
-#
-# NOTE: mgr/vol code generates progress bars for cloning jobs and these tests
-# capture them through "ceph status --format json-pretty" and checks if they
-# are as expected. If cloning happens too fast, these tests will fail to
-# capture progress bars, at least in desired state. Thus, these tests are
-# slightly racy by their very nature.
-#
-# Two measure can be taken to avoid this (and thereby inconsistent results in
-# testing) -
-# 1. Slow down cloning. This was done by adding a sleep after every file is
-# copied. However, this method was rejected since a new config for this would
-# have to be added.
-# 2. Amount of data that will cloned is big enough so that cloning takes enough
-# time for test code to capture the progress bar in desired state and finish
-# running. This is method that has been currently employed. This consumes
-# significantly more time, CPU and RAM in comparison.
-class TestCloneProgressReporter(TestVolumesHelper):
- '''
- This class contains tests for features that show how much progress cloning
- jobs have made.
- '''
-
+class CloneProgressReporterHelper(TestVolumesHelper):
CLIENTS_REQUIRED = 1
def setUp(self):
- super(TestCloneProgressReporter, self).setUp()
+ super(CloneProgressReporterHelper, self).setUp()
# save this config value so that it can be set again at the end of test
# and therefore other tests that might depend on this won't be
v = self.volname
o = self.get_ceph_cmd_stdout('fs volume ls')
if self.volname not in o:
- super(TestCloneProgressReporter, self).tearDown()
+ super(CloneProgressReporterHelper, self).tearDown()
return
subvols = self.get_ceph_cmd_stdout(f'fs subvolume ls {v} --format '
# Ceph cluster.
#self.run_ceph_cmd(f'fs volume rm {self.volname} --yes-i-really-mean-it')
- super(self.__class__, self).tearDown()
+ super(CloneProgressReporterHelper, self).tearDown()
# XXX: it is important to wait for rbytes value to catch up to actual size of
# subvolume so that progress bar shows sensible amount of progress
f'size reported by rstat = {r_size}')
raise RsizeDoesntMatch(msg)
- def test_progress_is_printed_in_clone_status_output(self):
- '''
- Test that the command "ceph fs clone status" prints progress stats
- for the clone.
- '''
- v = self.volname
- sv = 'sv1'
- ss = 'ss1'
- # "clone" must be part of clone name for sake of tearDown()
- c = 'ss1clone1'
-
- self.run_ceph_cmd(f'fs subvolume create {v} {sv} --mode=777')
- size = self._do_subvolume_io(sv, None, None, 3, 1024)
-
- self.run_ceph_cmd(f'fs subvolume snapshot create {v} {sv} {ss}')
- self.wait_till_rbytes_is_right(v, sv, size)
-
- self.run_ceph_cmd(f'fs subvolume snapshot clone {v} {sv} {ss} {c}')
- self._wait_for_clone_to_be_in_progress(c)
-
- with safe_while(tries=120, sleep=1) as proceed:
- while proceed():
- o = self.get_ceph_cmd_stdout(f'fs clone status {v} {c}')
- o = json.loads(o)
-
- try:
- p = o['status']['progress_report']['percentage cloned']
- log.debug(f'percentage cloned = {p}')
- except KeyError:
- # if KeyError is caught, either progress_report is present
- # or clone is complete
- if 'progress_report' in ['status']:
- self.assertEqual(o['status']['state'], 'complete')
- break
-
- self._wait_for_clone_to_complete(c)
-
def filter_in_only_clone_pevs(self, progress_events):
'''
Progress events dictionary in output of "ceph status --format json"
return pevs
+ def wait_for_both_progress_bars_to_appear(self, sleep=1, iters=20):
+ pevs = []
+ msg = (f'Waited for {iters*sleep} seconds but couldn\'t 2 progress '
+ 'bars in output of "ceph status" command.')
+ with safe_while(tries=iters, sleep=sleep, action=msg) as proceed:
+ while proceed():
+ o = self.get_ceph_cmd_stdout('status --format json-pretty')
+ o = json.loads(o)
+ pevs = o['progress_events']
+ pevs = self.filter_in_only_clone_pevs(pevs)
+ if len(pevs) == 2:
+ v = tuple(pevs.values())
+ if 'ongoing+pending' in v[1]['message']:
+ self.assertIn('ongoing', v[0]['message'])
+ else:
+ self.assertIn('ongoing', v[1]['message'])
+ self.assertIn('ongoing+pending', v[0]['message'])
+ break
+
+ def get_onpen_count(self, pev):
+ '''
+ Return number of clones reported in the message of progress bar for
+ ongoing+pending clones.
+ '''
+ i = pev['message'].find('ongoing+pending')
+ if i == -1:
+ return
+ count = pev['message'][:i]
+ count = count[:-1] # remomve trailing space
+ count = int(count)
+ return count
+
+ def get_both_progress_fractions_and_onpen_count(self):
+ '''
+ Go through output of "ceph status --format json-pretty" and return
+ progress made by both clones (that is progress fractions) and return
+ number of clones in reported in message of ongoing+pending progress
+ bar.
+ '''
+ msg = 'Expected 2 progress bars but found ' # rest continued in loop
+ with safe_while(tries=20, sleep=1, action=msg) as proceed:
+ while proceed():
+ o = self.get_ceph_cmd_stdout('status --format json-pretty')
+ o = json.loads(o)
+ pevs = o['progress_events']
+ pevs = self.filter_in_only_clone_pevs(pevs)
+ if len(pevs.values()) == 2:
+ break
+ else:
+ msg += f'{len(pevs)} instead'
+
+ log.info(f'pevs -\n{pevs}')
+ # on_p - progress fraction for ongoing clone jobs
+ # onpen_p - progress fraction for ongoing+pending clone jobs
+ pev1, pev2 = tuple(pevs.values())
+ if 'ongoing+pending' in pev1['message']:
+ onpen_p = pev1['progress']
+ onpen_count = self.get_onpen_count(pev1)
+ on_p = pev2['progress']
+ else:
+ onpen_p = pev2['progress']
+ onpen_count = self.get_onpen_count(pev2)
+ on_p = pev1['progress']
+
+ on_p = float(on_p)
+ onpen_p = float(onpen_p)
+
+ return on_p, onpen_p, onpen_count
+
+ # "ceph fs clone cancel" command takes considerable time to finish running.
+ # test cases where more than 4 clones are being cancelled, this error is
+ # seen, and can be safely ignored since it only implies that cloning has
+ # been finished.
+ def cancel_clones_and_ignore_if_finished(self, clones):
+ if isinstance(clones, str):
+ clones = (clones, )
+
+ for c in clones:
+ cmdargs = f'fs clone cancel {self.volname} {c}'
+ proc = self.run_ceph_cmd(args=cmdargs, stderr=StringIO(),
+ check_status=False)
+
+ stderr = proc.stderr.getvalue().strip().lower()
+ if proc.exitstatus == 0:
+ continue
+ elif proc.exitstatus == 22 and 'clone finished' in stderr:
+ continue
+ else:
+ cmdargs = './bin/ceph ' + cmdargs
+ raise CommandFailedError(cmdargs, proc.exitstatus)
+
+ def cancel_clones(self, clones, check_status=True):
+ v = self.volname
+ if not isinstance(clones, (tuple, list)):
+ clones = (clones, )
+
+ for i in clones:
+ self.run_ceph_cmd(f'fs clone cancel {v} {i}',
+ check_status=check_status)
+ time.sleep(2)
+
+ # check status is False since this method is meant to cleanup clones at
+ # the end of a test case and some clones might already be complete.
+ def cancel_clones_and_confirm(self, clones, check_status=False):
+ if not isinstance(clones, (tuple, list)):
+ clones = (clones, )
+
+ self.cancel_clones(clones, check_status)
+
+ for i in clones:
+ self._wait_for_clone_to_be_canceled(i)
+
+ def cancel_clones_and_assert(self, clones):
+ v = self.volname
+ if not isinstance(clones, (tuple, list)):
+ clones = (clones, )
+
+ self.cancel_clones(clones, True)
+
+ for i in clones:
+ o = self.get_ceph_cmd_stdout(f'fs clone status {v} {i}')
+ try:
+ self.assertIn('canceled', o)
+ except AssertionError:
+ self.assertIn('complete', o)
+
+ def _wait_for_clone_progress_bars_to_be_removed(self):
+ with safe_while(tries=10, sleep=0.5) as proceed:
+ while proceed():
+ o = self.get_ceph_cmd_stdout('status --format json-pretty')
+ o = json.loads(o)
+
+ pevs = o['progress_events'] # pevs = progress events
+ pevs = self.filter_in_only_clone_pevs(pevs)
+ if not pevs:
+ break
+
+
+
+# NOTE: these tests consumes considerable amount of CPU and RAM due generation
+# random of files and due to multiple cloning jobs that are run simultaneously.
+#
+# NOTE: mgr/vol code generates progress bars for cloning jobs and these tests
+# capture them through "ceph status --format json-pretty" and checks if they
+# are as expected. If cloning happens too fast, these tests will fail to
+# capture progress bars, at least in desired state. Thus, these tests are
+# slightly racy by their very nature.
+#
+# Two measure can be taken to avoid this (and thereby inconsistent results in
+# testing) -
+# 1. Slow down cloning. This was done by adding a sleep after every file is
+# copied. However, this method was rejected since a new config for this would
+# have to be added.
+# 2. Amount of data that will cloned is big enough so that cloning takes enough
+# time for test code to capture the progress bar in desired state and finish
+# running. This is method that has been currently employed. This consumes
+# significantly more time, CPU and RAM in comparison.
+class TestCloneProgressReporter(CloneProgressReporterHelper):
+ '''
+ This class contains tests for features that show how much progress cloning
+ jobs have made.
+ '''
+
+
+ def test_progress_is_printed_in_clone_status_output(self):
+ '''
+ Test that the command "ceph fs clone status" prints progress stats
+ for the clone.
+ '''
+ v = self.volname
+ sv = 'sv1'
+ ss = 'ss1'
+ # "clone" must be part of clone name for sake of tearDown()
+ c = 'ss1clone1'
+
+ self.run_ceph_cmd(f'fs subvolume create {v} {sv} --mode=777')
+ size = self._do_subvolume_io(sv, None, None, 3, 1024)
+
+ self.run_ceph_cmd(f'fs subvolume snapshot create {v} {sv} {ss}')
+ self.wait_till_rbytes_is_right(v, sv, size)
+
+ self.run_ceph_cmd(f'fs subvolume snapshot clone {v} {sv} {ss} {c}')
+ self._wait_for_clone_to_be_in_progress(c)
+
+ with safe_while(tries=120, sleep=1) as proceed:
+ while proceed():
+ o = self.get_ceph_cmd_stdout(f'fs clone status {v} {c}')
+ o = json.loads(o)
+
+ try:
+ p = o['status']['progress_report']['percentage cloned']
+ log.debug(f'percentage cloned = {p}')
+ except KeyError:
+ # if KeyError is caught, either progress_report is present
+ # or clone is complete
+ if 'progress_report' in ['status']:
+ self.assertEqual(o['status']['state'], 'complete')
+ break
+
+ self._wait_for_clone_to_complete(c)
+
def test_clones_less_than_cloner_threads(self):
'''
Test that one progress bar is printed in output of "ceph status" output
# and not cancelling these clone doesnt affect this test case.
self.cancel_clones_and_ignore_if_finished(c)
- def wait_for_both_progress_bars_to_appear(self, sleep=1, iters=20):
- pevs = []
- msg = (f'Waited for {iters*sleep} seconds but couldn\'t 2 progress '
- 'bars in output of "ceph status" command.')
- with safe_while(tries=iters, sleep=sleep, action=msg) as proceed:
- while proceed():
- o = self.get_ceph_cmd_stdout('status --format json-pretty')
- o = json.loads(o)
- pevs = o['progress_events']
- pevs = self.filter_in_only_clone_pevs(pevs)
- if len(pevs) == 2:
- v = tuple(pevs.values())
- if 'ongoing+pending' in v[1]['message']:
- self.assertIn('ongoing', v[0]['message'])
- else:
- self.assertIn('ongoing', v[1]['message'])
- self.assertIn('ongoing+pending', v[0]['message'])
- break
-
def test_clones_more_than_cloner_threads(self):
'''
Test that 2 progress bars are printed in output of "ceph status"
# CPU and not cancelling these clone doesnt affect this test case.
self.cancel_clones_and_ignore_if_finished(c)
- def get_onpen_count(self, pev):
- '''
- Return number of clones reported in the message of progress bar for
- ongoing+pending clones.
- '''
- i = pev['message'].find('ongoing+pending')
- if i == -1:
- return
- count = pev['message'][:i]
- count = count[:-1] # remomve trailing space
- count = int(count)
- return count
-
- def get_both_progress_fractions_and_onpen_count(self):
- '''
- Go through output of "ceph status --format json-pretty" and return
- progress made by both clones (that is progress fractions) and return
- number of clones in reported in message of ongoing+pending progress
- bar.
- '''
- msg = 'Expected 2 progress bars but found ' # rest continued in loop
- with safe_while(tries=20, sleep=1, action=msg) as proceed:
- while proceed():
- o = self.get_ceph_cmd_stdout('status --format json-pretty')
- o = json.loads(o)
- pevs = o['progress_events']
- pevs = self.filter_in_only_clone_pevs(pevs)
- if len(pevs.values()) == 2:
- break
- else:
- msg += f'{len(pevs)} instead'
-
- log.info(f'pevs -\n{pevs}')
- # on_p - progress fraction for ongoing clone jobs
- # onpen_p - progress fraction for ongoing+pending clone jobs
- pev1, pev2 = tuple(pevs.values())
- if 'ongoing+pending' in pev1['message']:
- onpen_p = pev1['progress']
- onpen_count = self.get_onpen_count(pev1)
- on_p = pev2['progress']
- else:
- onpen_p = pev2['progress']
- onpen_count = self.get_onpen_count(pev2)
- on_p = pev1['progress']
-
- on_p = float(on_p)
- onpen_p = float(onpen_p)
-
- return on_p, onpen_p, onpen_count
-
- # "ceph fs clone cancel" command takes considerable time to finish running.
- # test cases where more than 4 clones are being cancelled, this error is
- # seen, and can be safely ignored since it only implies that cloning has
- # been finished.
- def cancel_clones_and_ignore_if_finished(self, clones):
- if isinstance(clones, str):
- clones = (clones, )
-
- for c in clones:
- cmdargs = f'fs clone cancel {self.volname} {c}'
- proc = self.run_ceph_cmd(args=cmdargs, stderr=StringIO(),
- check_status=False)
-
- stderr = proc.stderr.getvalue().strip().lower()
- if proc.exitstatus == 0:
- continue
- elif proc.exitstatus == 22 and 'clone finished' in stderr:
- continue
- else:
- cmdargs = './bin/ceph ' + cmdargs
- raise CommandFailedError(cmdargs, proc.exitstatus)
-
- def cancel_clones(self, clones, check_status=True):
- v = self.volname
- if not isinstance(clones, (tuple, list)):
- clones = (clones, )
-
- for i in clones:
- self.run_ceph_cmd(f'fs clone cancel {v} {i}',
- check_status=check_status)
- time.sleep(2)
-
- # check status is False since this method is meant to cleanup clones at
- # the end of a test case and some clones might already be complete.
- def cancel_clones_and_confirm(self, clones, check_status=False):
- if not isinstance(clones, (tuple, list)):
- clones = (clones, )
-
- self.cancel_clones(clones, check_status)
-
- for i in clones:
- self._wait_for_clone_to_be_canceled(i)
-
- def cancel_clones_and_assert(self, clones):
- v = self.volname
- if not isinstance(clones, (tuple, list)):
- clones = (clones, )
-
- self.cancel_clones(clones, True)
-
- for i in clones:
- o = self.get_ceph_cmd_stdout(f'fs clone status {v} {i}')
- try:
- self.assertIn('canceled', o)
- except AssertionError:
- self.assertIn('complete', o)
-
def test_progress_drops_when_new_jobs_are_added(self):
'''
Test that progress indicated by progress bar for ongoing+pending clones
# and not cancelling these clone doesnt affect this test case.
self.cancel_clones_and_ignore_if_finished(c)
- def _wait_for_clone_progress_bars_to_be_removed(self):
- with safe_while(tries=10, sleep=0.5) as proceed:
- while proceed():
- o = self.get_ceph_cmd_stdout('status --format json-pretty')
- o = json.loads(o)
-
- pevs = o['progress_events'] # pevs = progress events
- pevs = self.filter_in_only_clone_pevs(pevs)
- if not pevs:
- break
-
def test_when_clones_cancelled_are_less_than_cloner_threads(self):
'''
Test that the progress bar that is printed for 1 ongoing clone job is