]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
vstart_runner: rearrange exec_test()
authorRishabh Dave <ridave@redhat.com>
Tue, 25 Aug 2020 07:27:42 +0000 (12:57 +0530)
committerRishabh Dave <ridave@redhat.com>
Tue, 29 Sep 2020 09:19:51 +0000 (14:49 +0530)
Rearrange the code that triggers the test runner so that in the later
commits the way tests are triggered can be changed.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
qa/tasks/vstart_runner.py

index 334ed172f36318051527e2370139c1eb3105a648..f7a6cabcf1c1e1e97d277b2d66ac6477b66eccb2 100644 (file)
@@ -1176,26 +1176,48 @@ class LocalFilesystem(Filesystem, LocalMDSCluster):
         raise NotImplementedError()
 
 
-class InteractiveFailureResult(unittest.TextTestResult):
-    """
-    Specialization that implements interactive-on-error style
-    behavior.
-    """
-    def addFailure(self, test, err):
-        super(InteractiveFailureResult, self).addFailure(test, err)
-        log.error(self._exc_info_to_string(err, test))
-        log.error("Failure in test '{0}', going interactive".format(
-            self.getDescription(test)
-        ))
-        interactive.task(ctx=None, config=None)
+class LocalCluster(object):
+    def __init__(self, rolename="placeholder"):
+        self.remotes = {
+            LocalRemote(): [rolename]
+        }
+
+    def only(self, requested):
+        return self.__class__(rolename=requested)
+
+
+class LocalContext(object):
+    def __init__(self):
+        self.config = {}
+        self.teuthology_config = teuth_config
+        self.cluster = LocalCluster()
+        self.daemons = DaemonGroup()
+
+        # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
+        # tests that want to look these up via ctx can do so.
+        # Inspect ceph.conf to see what roles exist
+        for conf_line in open("ceph.conf").readlines():
+            for svc_type in ["mon", "osd", "mds", "mgr"]:
+                prefixed_type = "ceph." + svc_type
+                if prefixed_type not in self.daemons.daemons:
+                    self.daemons.daemons[prefixed_type] = {}
+                match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
+                if match:
+                    svc_id = match.group(1)
+                    self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
+
+    def __del__(self):
+        test_path = self.teuthology_config['test_path']
+        # opt_create_cluster_only does not create the test path
+        if test_path:
+            shutil.rmtree(test_path)
 
-    def addError(self, test, err):
-        super(InteractiveFailureResult, self).addError(test, err)
-        log.error(self._exc_info_to_string(err, test))
-        log.error("Error in test '{0}', going interactive".format(
-            self.getDescription(test)
-        ))
-        interactive.task(ctx=None, config=None)
+
+#########################################
+#
+# stuff necessary for launching tests...
+#
+#########################################
 
 
 def enumerate_methods(s):
@@ -1246,47 +1268,12 @@ def scan_tests(modules):
             max_required_mgr, require_memstore
 
 
-class LocalCluster(object):
-    def __init__(self, rolename="placeholder"):
-        self.remotes = {
-            LocalRemote(): [rolename]
-        }
-
-    def only(self, requested):
-        return self.__class__(rolename=requested)
-
-
-class LocalContext(object):
-    def __init__(self):
-        self.config = {}
-        self.teuthology_config = teuth_config
-        self.cluster = LocalCluster()
-        self.daemons = DaemonGroup()
-
-        # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
-        # tests that want to look these up via ctx can do so.
-        # Inspect ceph.conf to see what roles exist
-        for conf_line in open("ceph.conf").readlines():
-            for svc_type in ["mon", "osd", "mds", "mgr"]:
-                prefixed_type = "ceph." + svc_type
-                if prefixed_type not in self.daemons.daemons:
-                    self.daemons.daemons[prefixed_type] = {}
-                match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
-                if match:
-                    svc_id = match.group(1)
-                    self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
-
-    def __del__(self):
-        test_path = self.teuthology_config['test_path']
-        # opt_create_cluster_only does not create the test path
-        if test_path:
-            shutil.rmtree(test_path)
-
 def teardown_cluster():
     log.info('\ntearing down the cluster...')
     remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
     remote.run(args=['rm', '-rf', './dev', './out'])
 
+
 def clear_old_log():
     from os import stat
 
@@ -1303,8 +1290,113 @@ def clear_old_log():
         init_log()
         log.info('logging in a fresh file now...')
 
+
+class LogStream(object):
+    def __init__(self):
+        self.buffer = ""
+
+    def _del_result_lines(self):
+        """
+        Don't let unittest.TextTestRunner print "Ran X tests in Ys",
+        vstart_runner.py will do it for itself since it runs tests in a
+        testsuite one by one.
+        """
+        self.buffer = re.sub('\n\n'+'-'*70+'\nran [0-9]* test in [0-9.]*s\n\n',
+                             '', self.buffer, flags=re.I)
+
+        self.buffer = self.buffer.replace('OK\n', '')
+
+    def write(self, data):
+        self.buffer += data
+        if self.buffer.count("\n") > 5:
+            self._write()
+
+    def _write(self):
+        self._del_result_lines()
+        if self.buffer == '':
+            return
+
+        lines = self.buffer.split("\n")
+        for line in lines:
+            # sys.stderr.write(line + "\n")
+            log.info(line)
+        self.buffer = ''
+
+    def flush(self):
+        pass
+
+    def __del__(self):
+        self._write()
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+    """
+    Specialization that implements interactive-on-error style
+    behavior.
+    """
+    def addFailure(self, test, err):
+        super(InteractiveFailureResult, self).addFailure(test, err)
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Failure in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=None, config=None)
+
+    def addError(self, test, err):
+        super(InteractiveFailureResult, self).addError(test, err)
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Error in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=None, config=None)
+
+
+# XXX: class we require would be inherited from this one and one of
+# InteractiveFailureResult and unittestunittest.TextTestResult.
+class LoggingResultTemplate(object):
+    fail_on_skip = False
+
+    def startTest(self, test):
+        log.info("Starting test: {0}".format(self.getDescription(test)))
+        test.started_at = datetime.datetime.utcnow()
+        return super(LoggingResultTemplate, self).startTest(test)
+
+    def stopTest(self, test):
+        log.info("Stopped test: {0} in {1}s".format(
+            self.getDescription(test),
+            (datetime.datetime.utcnow() - test.started_at).total_seconds()
+        ))
+
+    def addSkip(self, test, reason):
+        if LoggingResultTemplate.fail_on_skip:
+            # Don't just call addFailure because that requires a traceback
+            self.failures.append((test, reason))
+        else:
+            super(LoggingResultTemplate, self).addSkip(test, reason)
+
+
+def launch_tests(overall_suite):
+    return launch_entire_suite(overall_suite)
+
+
+def get_logging_result_class():
+    result_class = InteractiveFailureResult if opt_interactive_on_error else \
+        unittest.TextTestResult
+    return type('', (LoggingResultTemplate, result_class), {})
+
+
+def launch_entire_suite(overall_suite):
+    LoggingResult = get_logging_result_class()
+
+    testrunner = unittest.TextTestRunner(stream=LogStream(),
+                                         resultclass=LoggingResult,
+                                         verbosity=2, failfast=True)
+    return testrunner.run(overall_suite)
+
+
 def exec_test():
     # Parse arguments
+    global opt_interactive_on_error
     opt_interactive_on_error = False
     opt_create_cluster = False
     opt_create_cluster_only = False
@@ -1461,23 +1553,6 @@ def exec_test():
 
     from tasks.cephfs_test_runner import DecoratingLoader
 
-    class LogStream(object):
-        def __init__(self):
-            self.buffer = ""
-
-        def write(self, data):
-            self.buffer += data
-            if "\n" in self.buffer:
-                lines = self.buffer.split("\n")
-                for line in lines[:-1]:
-                    pass
-                    # sys.stderr.write(line + "\n")
-                    log.info(line)
-                self.buffer = lines[-1]
-
-        def flush(self):
-            pass
-
     decorating_loader = DecoratingLoader({
         "ctx": ctx,
         "mounts": mounts,
@@ -1541,40 +1616,10 @@ def exec_test():
     for s, method in victims:
         s._tests.remove(method)
 
-    if opt_interactive_on_error:
-        result_class = InteractiveFailureResult
-    else:
-        result_class = unittest.TextTestResult
-    fail_on_skip = False
-
-    class LoggingResult(result_class):
-        def startTest(self, test):
-            log.info("Starting test: {0}".format(self.getDescription(test)))
-            test.started_at = datetime.datetime.utcnow()
-            return super(LoggingResult, self).startTest(test)
-
-        def stopTest(self, test):
-            log.info("Stopped test: {0} in {1}s".format(
-                self.getDescription(test),
-                (datetime.datetime.utcnow() - test.started_at).total_seconds()
-            ))
-
-        def addSkip(self, test, reason):
-            if fail_on_skip:
-                # Don't just call addFailure because that requires a traceback
-                self.failures.append((test, reason))
-            else:
-                super(LoggingResult, self).addSkip(test, reason)
-
-    # Execute!
-    result = unittest.TextTestRunner(
-        stream=LogStream(),
-        resultclass=LoggingResult,
-        verbosity=2,
-        failfast=True).run(overall_suite)
+    overall_suite = load_tests(modules, loader.TestLoader())
+    result = launch_tests(overall_suite)
 
     CephFSMount.cleanup_stale_netnses_and_bridge(remote)
-
     if opt_teardown_cluster:
         teardown_cluster()