remote.run(args=['sudo',
'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
+ def get_service_task_status(self, service, status_key):
+ """
+ Return daemon task status for a given ceph service.
+
+ :param service: ceph service (mds, osd, etc...)
+ :param status_key: matching task status key
+ """
+ task_status = {}
+ status = self.raw_cluster_status()
+ try:
+ for k,v in status['servicemap']['services'][service]['daemons'].items():
+ ts = dict(v).get('task_status', None)
+ if ts:
+ task_status[k] = ts[status_key]
+ except KeyError: # catches missing service and status key
+ return {}
+ self.log(task_status)
+ return task_status
def utility_task(name):
"""
self.assertEqual(res['return_code'], expected)
def _get_scrub_status(self):
return self.fs.rank_tell(["scrub", "status"])
+ def _check_task_status(self, expected_status):
+ task_status = self.fs.get_task_status("scrub status")
+ self.assertTrue(task_status['0'].startswith(expected_status))
def test_scrub_abort(self):
test_dir = "scrub_control_test_path"
out_json = self._get_scrub_status()
self.assertTrue("no active" in out_json['status'])
+ # sleep enough to fetch updated task status
+ time.sleep(10)
+ self._check_task_status("idle")
+
def test_scrub_pause_and_resume(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
+ # sleep enough to fetch updated task status
+ time.sleep(10)
+ self._check_task_status("paused")
+
# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
+ # sleep enough to fetch updated task status
+ time.sleep(10)
+ self._check_task_status("paused")
+
# abort and verify
self._abort_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
self.assertTrue("0 inodes" in out_json['status'])
+ # sleep enough to fetch updated task status
+ time.sleep(10)
+ self._check_task_status("paused")
+
# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("no active" in out_json['status'])
+ # sleep enough to fetch updated task status
+ time.sleep(10)
+ self._check_task_status("idle")
+
class TestScrubChecks(CephFSTestCase):
"""
Run flush and scrub commands on the specified files in the filesystem. This