From: Rishabh Dave Date: Tue, 25 Aug 2020 07:27:42 +0000 (+0530) Subject: vstart_runner: rearrange exec_test() X-Git-Tag: v16.1.0~933^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=66735318049572fd206f4100432b5f7daeaa7198;p=ceph.git vstart_runner: rearrange exec_test() Rearrange the code that triggers the test runner so that in the later commits the way tests are triggered can be changed. Signed-off-by: Rishabh Dave --- diff --git a/qa/tasks/vstart_runner.py b/qa/tasks/vstart_runner.py index 334ed172f363..f7a6cabcf1c1 100644 --- a/qa/tasks/vstart_runner.py +++ b/qa/tasks/vstart_runner.py @@ -1176,26 +1176,48 @@ class LocalFilesystem(Filesystem, LocalMDSCluster): raise NotImplementedError() -class InteractiveFailureResult(unittest.TextTestResult): - """ - Specialization that implements interactive-on-error style - behavior. - """ - def addFailure(self, test, err): - super(InteractiveFailureResult, self).addFailure(test, err) - log.error(self._exc_info_to_string(err, test)) - log.error("Failure in test '{0}', going interactive".format( - self.getDescription(test) - )) - interactive.task(ctx=None, config=None) +class LocalCluster(object): + def __init__(self, rolename="placeholder"): + self.remotes = { + LocalRemote(): [rolename] + } + + def only(self, requested): + return self.__class__(rolename=requested) + + +class LocalContext(object): + def __init__(self): + self.config = {} + self.teuthology_config = teuth_config + self.cluster = LocalCluster() + self.daemons = DaemonGroup() + + # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any + # tests that want to look these up via ctx can do so. + # Inspect ceph.conf to see what roles exist + for conf_line in open("ceph.conf").readlines(): + for svc_type in ["mon", "osd", "mds", "mgr"]: + prefixed_type = "ceph." + svc_type + if prefixed_type not in self.daemons.daemons: + self.daemons.daemons[prefixed_type] = {} + match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) + if match: + svc_id = match.group(1) + self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id) + + def __del__(self): + test_path = self.teuthology_config['test_path'] + # opt_create_cluster_only does not create the test path + if test_path: + shutil.rmtree(test_path) - def addError(self, test, err): - super(InteractiveFailureResult, self).addError(test, err) - log.error(self._exc_info_to_string(err, test)) - log.error("Error in test '{0}', going interactive".format( - self.getDescription(test) - )) - interactive.task(ctx=None, config=None) + +######################################### +# +# stuff necessary for launching tests... +# +######################################### def enumerate_methods(s): @@ -1246,47 +1268,12 @@ def scan_tests(modules): max_required_mgr, require_memstore -class LocalCluster(object): - def __init__(self, rolename="placeholder"): - self.remotes = { - LocalRemote(): [rolename] - } - - def only(self, requested): - return self.__class__(rolename=requested) - - -class LocalContext(object): - def __init__(self): - self.config = {} - self.teuthology_config = teuth_config - self.cluster = LocalCluster() - self.daemons = DaemonGroup() - - # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any - # tests that want to look these up via ctx can do so. - # Inspect ceph.conf to see what roles exist - for conf_line in open("ceph.conf").readlines(): - for svc_type in ["mon", "osd", "mds", "mgr"]: - prefixed_type = "ceph." + svc_type - if prefixed_type not in self.daemons.daemons: - self.daemons.daemons[prefixed_type] = {} - match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) - if match: - svc_id = match.group(1) - self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id) - - def __del__(self): - test_path = self.teuthology_config['test_path'] - # opt_create_cluster_only does not create the test path - if test_path: - shutil.rmtree(test_path) - def teardown_cluster(): log.info('\ntearing down the cluster...') remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60) remote.run(args=['rm', '-rf', './dev', './out']) + def clear_old_log(): from os import stat @@ -1303,8 +1290,113 @@ def clear_old_log(): init_log() log.info('logging in a fresh file now...') + +class LogStream(object): + def __init__(self): + self.buffer = "" + + def _del_result_lines(self): + """ + Don't let unittest.TextTestRunner print "Ran X tests in Ys", + vstart_runner.py will do it for itself since it runs tests in a + testsuite one by one. + """ + self.buffer = re.sub('\n\n'+'-'*70+'\nran [0-9]* test in [0-9.]*s\n\n', + '', self.buffer, flags=re.I) + + self.buffer = self.buffer.replace('OK\n', '') + + def write(self, data): + self.buffer += data + if self.buffer.count("\n") > 5: + self._write() + + def _write(self): + self._del_result_lines() + if self.buffer == '': + return + + lines = self.buffer.split("\n") + for line in lines: + # sys.stderr.write(line + "\n") + log.info(line) + self.buffer = '' + + def flush(self): + pass + + def __del__(self): + self._write() + + +class InteractiveFailureResult(unittest.TextTestResult): + """ + Specialization that implements interactive-on-error style + behavior. + """ + def addFailure(self, test, err): + super(InteractiveFailureResult, self).addFailure(test, err) + log.error(self._exc_info_to_string(err, test)) + log.error("Failure in test '{0}', going interactive".format( + self.getDescription(test) + )) + interactive.task(ctx=None, config=None) + + def addError(self, test, err): + super(InteractiveFailureResult, self).addError(test, err) + log.error(self._exc_info_to_string(err, test)) + log.error("Error in test '{0}', going interactive".format( + self.getDescription(test) + )) + interactive.task(ctx=None, config=None) + + +# XXX: class we require would be inherited from this one and one of +# InteractiveFailureResult and unittestunittest.TextTestResult. +class LoggingResultTemplate(object): + fail_on_skip = False + + def startTest(self, test): + log.info("Starting test: {0}".format(self.getDescription(test))) + test.started_at = datetime.datetime.utcnow() + return super(LoggingResultTemplate, self).startTest(test) + + def stopTest(self, test): + log.info("Stopped test: {0} in {1}s".format( + self.getDescription(test), + (datetime.datetime.utcnow() - test.started_at).total_seconds() + )) + + def addSkip(self, test, reason): + if LoggingResultTemplate.fail_on_skip: + # Don't just call addFailure because that requires a traceback + self.failures.append((test, reason)) + else: + super(LoggingResultTemplate, self).addSkip(test, reason) + + +def launch_tests(overall_suite): + return launch_entire_suite(overall_suite) + + +def get_logging_result_class(): + result_class = InteractiveFailureResult if opt_interactive_on_error else \ + unittest.TextTestResult + return type('', (LoggingResultTemplate, result_class), {}) + + +def launch_entire_suite(overall_suite): + LoggingResult = get_logging_result_class() + + testrunner = unittest.TextTestRunner(stream=LogStream(), + resultclass=LoggingResult, + verbosity=2, failfast=True) + return testrunner.run(overall_suite) + + def exec_test(): # Parse arguments + global opt_interactive_on_error opt_interactive_on_error = False opt_create_cluster = False opt_create_cluster_only = False @@ -1461,23 +1553,6 @@ def exec_test(): from tasks.cephfs_test_runner import DecoratingLoader - class LogStream(object): - def __init__(self): - self.buffer = "" - - def write(self, data): - self.buffer += data - if "\n" in self.buffer: - lines = self.buffer.split("\n") - for line in lines[:-1]: - pass - # sys.stderr.write(line + "\n") - log.info(line) - self.buffer = lines[-1] - - def flush(self): - pass - decorating_loader = DecoratingLoader({ "ctx": ctx, "mounts": mounts, @@ -1541,40 +1616,10 @@ def exec_test(): for s, method in victims: s._tests.remove(method) - if opt_interactive_on_error: - result_class = InteractiveFailureResult - else: - result_class = unittest.TextTestResult - fail_on_skip = False - - class LoggingResult(result_class): - def startTest(self, test): - log.info("Starting test: {0}".format(self.getDescription(test))) - test.started_at = datetime.datetime.utcnow() - return super(LoggingResult, self).startTest(test) - - def stopTest(self, test): - log.info("Stopped test: {0} in {1}s".format( - self.getDescription(test), - (datetime.datetime.utcnow() - test.started_at).total_seconds() - )) - - def addSkip(self, test, reason): - if fail_on_skip: - # Don't just call addFailure because that requires a traceback - self.failures.append((test, reason)) - else: - super(LoggingResult, self).addSkip(test, reason) - - # Execute! - result = unittest.TextTestRunner( - stream=LogStream(), - resultclass=LoggingResult, - verbosity=2, - failfast=True).run(overall_suite) + overall_suite = load_tests(modules, loader.TestLoader()) + result = launch_tests(overall_suite) CephFSMount.cleanup_stale_netnses_and_bridge(remote) - if opt_teardown_cluster: teardown_cluster()