raise NotImplementedError()
-class InteractiveFailureResult(unittest.TextTestResult):
- """
- Specialization that implements interactive-on-error style
- behavior.
- """
- def addFailure(self, test, err):
- super(InteractiveFailureResult, self).addFailure(test, err)
- log.error(self._exc_info_to_string(err, test))
- log.error("Failure in test '{0}', going interactive".format(
- self.getDescription(test)
- ))
- interactive.task(ctx=None, config=None)
+class LocalCluster(object):
+ def __init__(self, rolename="placeholder"):
+ self.remotes = {
+ LocalRemote(): [rolename]
+ }
+
+ def only(self, requested):
+ return self.__class__(rolename=requested)
+
+
+class LocalContext(object):
+ def __init__(self):
+ self.config = {}
+ self.teuthology_config = teuth_config
+ self.cluster = LocalCluster()
+ self.daemons = DaemonGroup()
+
+ # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
+ # tests that want to look these up via ctx can do so.
+ # Inspect ceph.conf to see what roles exist
+ for conf_line in open("ceph.conf").readlines():
+ for svc_type in ["mon", "osd", "mds", "mgr"]:
+ prefixed_type = "ceph." + svc_type
+ if prefixed_type not in self.daemons.daemons:
+ self.daemons.daemons[prefixed_type] = {}
+ match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
+ if match:
+ svc_id = match.group(1)
+ self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
+
+ def __del__(self):
+ test_path = self.teuthology_config['test_path']
+ # opt_create_cluster_only does not create the test path
+ if test_path:
+ shutil.rmtree(test_path)
- def addError(self, test, err):
- super(InteractiveFailureResult, self).addError(test, err)
- log.error(self._exc_info_to_string(err, test))
- log.error("Error in test '{0}', going interactive".format(
- self.getDescription(test)
- ))
- interactive.task(ctx=None, config=None)
+
+#########################################
+#
+# stuff necessary for launching tests...
+#
+#########################################
def enumerate_methods(s):
max_required_mgr, require_memstore
-class LocalCluster(object):
- def __init__(self, rolename="placeholder"):
- self.remotes = {
- LocalRemote(): [rolename]
- }
-
- def only(self, requested):
- return self.__class__(rolename=requested)
-
-
-class LocalContext(object):
- def __init__(self):
- self.config = {}
- self.teuthology_config = teuth_config
- self.cluster = LocalCluster()
- self.daemons = DaemonGroup()
-
- # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
- # tests that want to look these up via ctx can do so.
- # Inspect ceph.conf to see what roles exist
- for conf_line in open("ceph.conf").readlines():
- for svc_type in ["mon", "osd", "mds", "mgr"]:
- prefixed_type = "ceph." + svc_type
- if prefixed_type not in self.daemons.daemons:
- self.daemons.daemons[prefixed_type] = {}
- match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
- if match:
- svc_id = match.group(1)
- self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
-
- def __del__(self):
- test_path = self.teuthology_config['test_path']
- # opt_create_cluster_only does not create the test path
- if test_path:
- shutil.rmtree(test_path)
-
def teardown_cluster():
log.info('\ntearing down the cluster...')
remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
remote.run(args=['rm', '-rf', './dev', './out'])
+
def clear_old_log():
from os import stat
init_log()
log.info('logging in a fresh file now...')
+
+class LogStream(object):
+ def __init__(self):
+ self.buffer = ""
+
+ def _del_result_lines(self):
+ """
+ Don't let unittest.TextTestRunner print "Ran X tests in Ys",
+ vstart_runner.py will do it for itself since it runs tests in a
+ testsuite one by one.
+ """
+ self.buffer = re.sub('\n\n'+'-'*70+'\nran [0-9]* test in [0-9.]*s\n\n',
+ '', self.buffer, flags=re.I)
+
+ self.buffer = self.buffer.replace('OK\n', '')
+
+ def write(self, data):
+ self.buffer += data
+ if self.buffer.count("\n") > 5:
+ self._write()
+
+ def _write(self):
+ self._del_result_lines()
+ if self.buffer == '':
+ return
+
+ lines = self.buffer.split("\n")
+ for line in lines:
+ # sys.stderr.write(line + "\n")
+ log.info(line)
+ self.buffer = ''
+
+ def flush(self):
+ pass
+
+ def __del__(self):
+ self._write()
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+ """
+ Specialization that implements interactive-on-error style
+ behavior.
+ """
+ def addFailure(self, test, err):
+ super(InteractiveFailureResult, self).addFailure(test, err)
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Failure in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=None, config=None)
+
+ def addError(self, test, err):
+ super(InteractiveFailureResult, self).addError(test, err)
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Error in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=None, config=None)
+
+
+# XXX: class we require would be inherited from this one and one of
+# InteractiveFailureResult and unittestunittest.TextTestResult.
+class LoggingResultTemplate(object):
+ fail_on_skip = False
+
+ def startTest(self, test):
+ log.info("Starting test: {0}".format(self.getDescription(test)))
+ test.started_at = datetime.datetime.utcnow()
+ return super(LoggingResultTemplate, self).startTest(test)
+
+ def stopTest(self, test):
+ log.info("Stopped test: {0} in {1}s".format(
+ self.getDescription(test),
+ (datetime.datetime.utcnow() - test.started_at).total_seconds()
+ ))
+
+ def addSkip(self, test, reason):
+ if LoggingResultTemplate.fail_on_skip:
+ # Don't just call addFailure because that requires a traceback
+ self.failures.append((test, reason))
+ else:
+ super(LoggingResultTemplate, self).addSkip(test, reason)
+
+
+def launch_tests(overall_suite):
+ return launch_entire_suite(overall_suite)
+
+
+def get_logging_result_class():
+ result_class = InteractiveFailureResult if opt_interactive_on_error else \
+ unittest.TextTestResult
+ return type('', (LoggingResultTemplate, result_class), {})
+
+
+def launch_entire_suite(overall_suite):
+ LoggingResult = get_logging_result_class()
+
+ testrunner = unittest.TextTestRunner(stream=LogStream(),
+ resultclass=LoggingResult,
+ verbosity=2, failfast=True)
+ return testrunner.run(overall_suite)
+
+
def exec_test():
# Parse arguments
+ global opt_interactive_on_error
opt_interactive_on_error = False
opt_create_cluster = False
opt_create_cluster_only = False
from tasks.cephfs_test_runner import DecoratingLoader
- class LogStream(object):
- def __init__(self):
- self.buffer = ""
-
- def write(self, data):
- self.buffer += data
- if "\n" in self.buffer:
- lines = self.buffer.split("\n")
- for line in lines[:-1]:
- pass
- # sys.stderr.write(line + "\n")
- log.info(line)
- self.buffer = lines[-1]
-
- def flush(self):
- pass
-
decorating_loader = DecoratingLoader({
"ctx": ctx,
"mounts": mounts,
for s, method in victims:
s._tests.remove(method)
- if opt_interactive_on_error:
- result_class = InteractiveFailureResult
- else:
- result_class = unittest.TextTestResult
- fail_on_skip = False
-
- class LoggingResult(result_class):
- def startTest(self, test):
- log.info("Starting test: {0}".format(self.getDescription(test)))
- test.started_at = datetime.datetime.utcnow()
- return super(LoggingResult, self).startTest(test)
-
- def stopTest(self, test):
- log.info("Stopped test: {0} in {1}s".format(
- self.getDescription(test),
- (datetime.datetime.utcnow() - test.started_at).total_seconds()
- ))
-
- def addSkip(self, test, reason):
- if fail_on_skip:
- # Don't just call addFailure because that requires a traceback
- self.failures.append((test, reason))
- else:
- super(LoggingResult, self).addSkip(test, reason)
-
- # Execute!
- result = unittest.TextTestRunner(
- stream=LogStream(),
- resultclass=LoggingResult,
- verbosity=2,
- failfast=True).run(overall_suite)
+ overall_suite = load_tests(modules, loader.TestLoader())
+ result = launch_tests(overall_suite)
CephFSMount.cleanup_stale_netnses_and_bridge(remote)
-
if opt_teardown_cluster:
teardown_cluster()