From: John Spray Date: Wed, 3 Sep 2014 13:14:28 +0000 (+0100) Subject: tasks: generalise CephFSTestCase X-Git-Tag: v0.94.10~27^2^2~282^2~6^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d777d7123bb22c593fb2529e0a777556efad9db9;p=ceph.git tasks: generalise CephFSTestCase Some of this stuff could be even more general for embedding unittest-style suites, but for the moment let's keep the cephfs stuff in a walled garden. Signed-off-by: John Spray --- diff --git a/tasks/cephfs/cephfs_test_case.py b/tasks/cephfs/cephfs_test_case.py new file mode 100644 index 00000000000..0d4851788f7 --- /dev/null +++ b/tasks/cephfs/cephfs_test_case.py @@ -0,0 +1,108 @@ +import logging +import unittest +from teuthology.task import interactive + + +log = logging.getLogger(__name__) + + +class CephFSTestCase(unittest.TestCase): + fs = None + + def assert_session_count(self, expected, ls_data=None): + if ls_data is None: + ls_data = self.fs.mds_asok(['session', 'ls']) + + self.assertEqual(expected, len(ls_data), "Expected {0} sessions, found {1}".format( + expected, len(ls_data) + )) + + def assert_session_state(self, client_id, expected_state): + self.assertEqual( + self._session_by_id( + self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'], + expected_state) + + def get_session_data(self, client_id): + return self._session_by_id(client_id) + + def _session_list(self): + ls_data = self.fs.mds_asok(['session', 'ls']) + ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] + return ls_data + + def get_session(self, client_id, session_ls=None): + if session_ls is None: + session_ls = self.fs.mds_asok(['session', 'ls']) + + return self._session_by_id(session_ls)[client_id] + + def _session_by_id(self, session_ls): + return dict([(s['id'], s) for s in session_ls]) + + +class LogStream(object): + def __init__(self): + self.buffer = "" + + def write(self, data): + self.buffer += data + if "\n" in self.buffer: + lines = self.buffer.split("\n") + for line in lines[:-1]: + log.info(line) + self.buffer = lines[-1] + + def flush(self): + pass + + +class InteractiveFailureResult(unittest.TextTestResult): + """ + Specialization that implements interactive-on-error style + behavior. + """ + ctx = None + + def addFailure(self, test, err): + log.error(self._exc_info_to_string(err, test)) + log.error("Failure in test '{0}', going interactive".format( + self.getDescription(test) + )) + interactive.task(ctx=self.ctx, config=None) + + def addError(self, test, err): + log.error(self._exc_info_to_string(err, test)) + log.error("Error in test '{0}', going interactive".format( + self.getDescription(test) + )) + interactive.task(ctx=self.ctx, config=None) + + +def run_tests(ctx, config, test_klass, params): + for k, v in params.items(): + setattr(test_klass, k, v) + + # Execute test suite + # ================== + if config and 'test_name' in config: + # Test names like TestCase.this_test + suite = unittest.TestLoader().loadTestsFromName( + "{0}.{1}".format(test_klass.__module__, config['test_name'])) + else: + suite = unittest.TestLoader().loadTestsFromTestCase(test_klass) + + if ctx.config.get("interactive-on-error", False): + InteractiveFailureResult.ctx = ctx + result_class = InteractiveFailureResult + else: + result_class = unittest.TextTestResult + result = unittest.TextTestRunner( + stream=LogStream(), + resultclass=result_class, + verbosity=2, + failfast=True).run(suite) + + if not result.wasSuccessful(): + result.printErrors() # duplicate output at end for convenience + raise RuntimeError("Test failure.") diff --git a/tasks/mds_client_recovery.py b/tasks/mds_client_recovery.py index 71e6dd3c8fe..7e3d28c99d5 100644 --- a/tasks/mds_client_recovery.py +++ b/tasks/mds_client_recovery.py @@ -11,8 +11,10 @@ from teuthology.orchestra import run from teuthology.orchestra.run import CommandFailedError from teuthology.task import interactive -from cephfs.filesystem import Filesystem -from tasks.ceph_fuse import FuseMount + +from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests +from tasks.cephfs.filesystem import Filesystem +from tasks.cephfs.fuse_mount import FuseMount log = logging.getLogger(__name__) @@ -23,9 +25,8 @@ log = logging.getLogger(__name__) MDS_RESTART_GRACE = 60 -class TestClientRecovery(unittest.TestCase): +class TestClientRecovery(CephFSTestCase): # Environment references - fs = None mount_a = None mount_b = None mds_session_timeout = None @@ -82,28 +83,6 @@ class TestClientRecovery(unittest.TestCase): self.mount_a.create_destroy() self.mount_b.create_destroy() - def assert_session_count(self, expected, ls_data=None): - if ls_data is None: - ls_data = self.fs.mds_asok(['session', 'ls']) - - self.assertEqual(expected, len(ls_data), "Expected {0} sessions, found {1}".format( - expected, len(ls_data) - )) - - def assert_session_state(self, client_id, expected_state): - self.assertEqual( - self._session_by_id( - self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'], - expected_state) - - def _session_list(self): - ls_data = self.fs.mds_asok(['session', 'ls']) - ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] - return ls_data - - def _session_by_id(self, session_ls): - return dict([(s['id'], s) for s in session_ls]) - def test_reconnect_timeout(self): # Reconnect timeout # ================= @@ -123,7 +102,7 @@ class TestClientRecovery(unittest.TestCase): self.assert_session_count(2, ls_data) # The session for the dead client should have the 'reconnect' flag set - self.assertTrue(self._session_by_id(ls_data)[mount_a_client_id]['reconnecting']) + self.assertTrue(self.get_session(mount_a_client_id)['reconnecting']) # Wait for the reconnect state to clear, this should take the # reconnect timeout period. @@ -384,49 +363,26 @@ def task(ctx, config): if mount_a.client_remote.hostname in fs.get_mds_hostnames(): raise RuntimeError("Require first client to on separate server from MDSs") - # Attach environment references to test case - # ========================================== - TestClientRecovery.mds_reconnect_timeout = int(fs.mds_asok( - ['config', 'get', 'mds_reconnect_timeout'] - )['mds_reconnect_timeout']) - TestClientRecovery.mds_session_timeout = int(fs.mds_asok( - ['config', 'get', 'mds_session_timeout'] - )['mds_session_timeout']) - TestClientRecovery.ms_max_backoff = int(fs.mds_asok( - ['config', 'get', 'ms_max_backoff'] - )['ms_max_backoff']) - TestClientRecovery.fs = fs - TestClientRecovery.mount_a = mount_a - TestClientRecovery.mount_b = mount_b - # Stash references on ctx so that we can easily debug in interactive mode # ======================================================================= ctx.filesystem = fs ctx.mount_a = mount_a ctx.mount_b = mount_b - # Execute test suite - # ================== - if config and 'test_name' in config: - suite = unittest.TestLoader().loadTestsFromName( - "teuthology.task.mds_client_recovery.{0}".format(config['test_name'])) - else: - suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery) - - if ctx.config.get("interactive-on-error", False): - InteractiveFailureResult.ctx = ctx - result_class = InteractiveFailureResult - else: - result_class = unittest.TextTestResult - result = unittest.TextTestRunner( - stream=LogStream(), - resultclass=result_class, - verbosity=2, - failfast=True).run(suite) - - if not result.wasSuccessful(): - result.printErrors() # duplicate output at end for convenience - raise RuntimeError("Test failure.") + run_tests(ctx, config, TestClientRecovery, { + "mds_reconnect_timeout": int(fs.mds_asok( + ['config', 'get', 'mds_reconnect_timeout'] + )['mds_reconnect_timeout']), + "mds_session_timeout": int(fs.mds_asok( + ['config', 'get', 'mds_session_timeout'] + )['mds_session_timeout']), + "ms_max_backoff": int(fs.mds_asok( + ['config', 'get', 'ms_max_backoff'] + )['ms_max_backoff']), + "fs": fs, + "mount_a": mount_a, + "mount_b": mount_b + }) # Continue to any downstream tasks # ================================