]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tasks/cephfs: refactor test case class
authorJohn Spray <john.spray@redhat.com>
Tue, 1 Nov 2016 11:21:41 +0000 (12:21 +0100)
committerJohn Spray <john.spray@redhat.com>
Tue, 1 Nov 2016 11:21:41 +0000 (12:21 +0100)
A more generic CephTestCase and CephCluster, for
writeing non-cephfs test cases.

This avoids overloading one class with the functionality
needed by lots of different subsystems.

Signed-off-by: John Spray <john.spray@redhat.com>
tasks/ceph_test_case.py [new file with mode: 0644]
tasks/cephfs/cephfs_test_case.py
tasks/cephfs/filesystem.py
tasks/cephfs/vstart_runner.py [deleted file]
tasks/vstart_runner.py [new file with mode: 0644]

diff --git a/tasks/ceph_test_case.py b/tasks/ceph_test_case.py
new file mode 100644 (file)
index 0000000..d9eefe4
--- /dev/null
@@ -0,0 +1,136 @@
+
+import unittest
+import time
+import logging
+
+from teuthology.orchestra.run import CommandFailedError
+
+log = logging.getLogger(__name__)
+
+
+class CephTestCase(unittest.TestCase):
+    """
+    For test tasks that want to define a structured set of
+    tests implemented in python.  Subclass this with appropriate
+    helpers for the subsystem you're testing.
+    """
+
+    # Environment references
+    mounts = None
+    fs = None
+    mds_cluster = None
+    mgr_cluster = None
+    ctx = None
+
+    mon_manager = None
+
+    def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
+        """
+        Context manager.  Assert that during execution, or up to 5 seconds later,
+        the Ceph cluster log emits a message matching the expected pattern.
+
+        :param expected_pattern: a string that you expect to see in the log output
+        """
+
+        ceph_manager = self.fs.mon_manager
+
+        class ContextManager(object):
+            def match(self):
+                found = expected_pattern in self.watcher_process.stdout.getvalue()
+                if invert_match:
+                    return not found
+
+                return found
+
+            def __enter__(self):
+                self.watcher_process = ceph_manager.run_ceph_w()
+
+            def __exit__(self, exc_type, exc_val, exc_tb):
+                if not self.watcher_process.finished:
+                    # Check if we got an early match, wait a bit if we didn't
+                    if self.match():
+                        return
+                    else:
+                        log.debug("No log hits yet, waiting...")
+                        # Default monc tick interval is 10s, so wait that long and
+                        # then some grace
+                        time.sleep(5 + timeout)
+
+                self.watcher_process.stdin.close()
+                try:
+                    self.watcher_process.wait()
+                except CommandFailedError:
+                    pass
+
+                if not self.match():
+                    log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
+                    raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
+
+        return ContextManager()
+
+    def wait_for_health(self, pattern, timeout):
+        """
+        Wait until 'ceph health' contains messages matching the pattern
+        """
+        def seen_health_warning():
+            health = self.fs.mon_manager.get_mon_health()
+            summary_strings = [s['summary'] for s in health['summary']]
+            if len(summary_strings) == 0:
+                log.debug("Not expected number of summary strings ({0})".format(summary_strings))
+                return False
+            else:
+                for ss in summary_strings:
+                    if pattern in ss:
+                         return True
+
+            log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
+            return False
+
+        self.wait_until_true(seen_health_warning, timeout)
+
+    def wait_for_health_clear(self, timeout):
+        """
+        Wait until `ceph health` returns no messages
+        """
+        def is_clear():
+            health = self.fs.mon_manager.get_mon_health()
+            return len(health['summary']) == 0
+
+        self.wait_until_true(is_clear, timeout)
+
+    def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
+        period = 5
+        elapsed = 0
+        while True:
+            val = get_fn()
+            if val == expect_val:
+                return
+            elif reject_fn and reject_fn(val):
+                raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
+            else:
+                if elapsed >= timeout:
+                    raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
+                        elapsed, expect_val, val
+                    ))
+                else:
+                    log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
+                time.sleep(period)
+                elapsed += period
+
+        log.debug("wait_until_equal: success")
+
+    def wait_until_true(self, condition, timeout):
+        period = 5
+        elapsed = 0
+        while True:
+            if condition():
+                return
+            else:
+                if elapsed >= timeout:
+                    raise RuntimeError("Timed out after {0} seconds".format(elapsed))
+                else:
+                    log.debug("wait_until_true: waiting...")
+                time.sleep(period)
+                elapsed += period
+
+        log.debug("wait_until_true: success")
index b9228e269ad4d9928a76655a8baa45296c9e4a22..ae9c0d636d8ee139000e4ba3f4bd069a07e711c3 100644 (file)
@@ -1,8 +1,7 @@
 import json
 import logging
-import unittest
 from unittest import case
-import time
+from tasks.ceph_test_case import CephTestCase
 import os
 import re
 from StringIO import StringIO
@@ -33,18 +32,13 @@ def needs_trimming(f):
     return f
 
 
-class CephFSTestCase(unittest.TestCase):
+class CephFSTestCase(CephTestCase):
     """
     Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
     into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
 
     Handles resetting the cluster under test between tests.
     """
-    # Environment references
-    mounts = None
-    fs = None
-    mds_cluster = None
-    ctx = None
 
     # FIXME weird explicit naming
     mount_a = None
@@ -234,43 +228,6 @@ class CephFSTestCase(unittest.TestCase):
     def _session_by_id(self, session_ls):
         return dict([(s['id'], s) for s in session_ls])
 
-    def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
-        period = 5
-        elapsed = 0
-        while True:
-            val = get_fn()
-            if val == expect_val:
-                return
-            elif reject_fn and reject_fn(val):
-                raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
-            else:
-                if elapsed >= timeout:
-                    raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
-                        elapsed, expect_val, val
-                    ))
-                else:
-                    log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
-                time.sleep(period)
-                elapsed += period
-
-        log.debug("wait_until_equal: success")
-
-    def wait_until_true(self, condition, timeout):
-        period = 5
-        elapsed = 0
-        while True:
-            if condition():
-                return
-            else:
-                if elapsed >= timeout:
-                    raise RuntimeError("Timed out after {0} seconds".format(elapsed))
-                else:
-                    log.debug("wait_until_true: waiting...")
-                time.sleep(period)
-                elapsed += period
-
-        log.debug("wait_until_true: success")
-
     def wait_for_daemon_start(self, daemon_ids=None):
         """
         Wait until all the daemons appear in the FSMap, either assigned
@@ -347,77 +304,3 @@ class CephFSTestCase(unittest.TestCase):
 
         else:
             raise AssertionError("MDS daemon '{0}' did not crash as expected".format(daemon_id))
-
-    def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
-        """
-        Context manager.  Assert that during execution, or up to 5 seconds later,
-        the Ceph cluster log emits a message matching the expected pattern.
-
-        :param expected_pattern: a string that you expect to see in the log output
-        """
-
-        ceph_manager = self.fs.mon_manager
-
-        class ContextManager(object):
-            def match(self):
-                found = expected_pattern in self.watcher_process.stdout.getvalue()
-                if invert_match:
-                    return not found
-
-                return found
-
-            def __enter__(self):
-                self.watcher_process = ceph_manager.run_ceph_w()
-
-            def __exit__(self, exc_type, exc_val, exc_tb):
-                if not self.watcher_process.finished:
-                    # Check if we got an early match, wait a bit if we didn't
-                    if self.match():
-                        return
-                    else:
-                        log.debug("No log hits yet, waiting...")
-                        # Default monc tick interval is 10s, so wait that long and
-                        # then some grace
-                        time.sleep(5 + timeout)
-
-                self.watcher_process.stdin.close()
-                try:
-                    self.watcher_process.wait()
-                except CommandFailedError:
-                    pass
-
-                if not self.match():
-                    log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
-                    raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
-
-        return ContextManager()
-
-    def wait_for_health(self, pattern, timeout):
-        """
-        Wait until 'ceph health' contains messages matching the pattern
-        """
-        def seen_health_warning():
-            health = self.fs.mon_manager.get_mon_health()
-            summary_strings = [s['summary'] for s in health['summary']]
-            if len(summary_strings) == 0:
-                log.debug("Not expected number of summary strings ({0})".format(summary_strings))
-                return False
-            else:
-                for ss in summary_strings:
-                    if pattern in ss:
-                         return True
-
-            log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
-            return False
-
-        self.wait_until_true(seen_health_warning, timeout)
-
-    def wait_for_health_clear(self, timeout):
-        """
-        Wait until `ceph health` returns no messages
-        """
-        def is_clear():
-            health = self.fs.mon_manager.get_mon_health()
-            return len(health['summary']) == 0
-
-        self.wait_until_true(is_clear, timeout)
index 709ed1a1c10ffea8387af890576eb55fba1e1f87..fa4507f02feddd6d008ea1d22b3bb89756e07ef1 100644 (file)
@@ -32,7 +32,49 @@ class ObjectNotFound(Exception):
         return "Object not found: '{0}'".format(self._object_name)
 
 
-class MDSCluster(object):
+class CephCluster(object):
+    @property
+    def admin_remote(self):
+        first_mon = misc.get_first_mon(self._ctx, None)
+        (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
+        return result
+
+    def __init__(self, ctx):
+        self._ctx = ctx
+        self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
+
+    def get_config(self, key, service_type=None):
+        """
+        Get config from mon by default, or a specific service if caller asks for it
+        """
+        if service_type is None:
+            service_type = 'mon'
+
+        service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
+        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+    def set_ceph_conf(self, subsys, key, value):
+        if subsys not in self._ctx.ceph['ceph'].conf:
+            self._ctx.ceph['ceph'].conf[subsys] = {}
+        self._ctx.ceph['ceph'].conf[subsys][key] = value
+        write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
+                               # used a different config path this won't work.
+
+    def clear_ceph_conf(self, subsys, key):
+        del self._ctx.ceph['ceph'].conf[subsys][key]
+        write_conf(self._ctx)
+
+    def json_asok(self, command, service_type, service_id):
+        proc = self.mon_manager.admin_socket(service_type, service_id, command)
+        response_data = proc.stdout.getvalue()
+        log.info("_json_asok output: {0}".format(response_data))
+        if response_data.strip():
+            return json.loads(response_data)
+        else:
+            return None
+
+
+class MDSCluster(CephCluster):
     """
     Collective operations on all the MDS daemons in the Ceph cluster.  These
     daemons may be in use by various Filesystems.
@@ -41,21 +83,14 @@ class MDSCluster(object):
     a parent of Filesystem.  The correct way to use MDSCluster going forward is
     as a separate instance outside of your (multiple) Filesystem instances.
     """
-
-    @property
-    def admin_remote(self):
-        first_mon = misc.get_first_mon(self._ctx, None)
-        (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
-        return result
-
     def __init__(self, ctx):
+        super(MDSCluster, self).__init__(ctx)
+
         self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
-        self._ctx = ctx
 
         if len(self.mds_ids) == 0:
             raise RuntimeError("This task requires at least one MDS")
 
-        self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
         if hasattr(self._ctx, "daemons"):
             # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
             self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
@@ -157,36 +192,6 @@ class MDSCluster(object):
 
         return list(result)
 
-    def get_config(self, key, service_type=None):
-        """
-        Get config from mon by default, or a specific service if caller asks for it
-        """
-        if service_type is None:
-            service_type = 'mon'
-
-        service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
-        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
-    def set_ceph_conf(self, subsys, key, value):
-        if subsys not in self._ctx.ceph['ceph'].conf:
-            self._ctx.ceph['ceph'].conf[subsys] = {}
-        self._ctx.ceph['ceph'].conf[subsys][key] = value
-        write_conf(self._ctx)  # XXX because we don't have the ceph task's config object, if they
-                               # used a different config path this won't work.
-
-    def clear_ceph_conf(self, subsys, key):
-        del self._ctx.ceph['ceph'].conf[subsys][key]
-        write_conf(self._ctx)
-
-    def json_asok(self, command, service_type, service_id):
-        proc = self.mon_manager.admin_socket(service_type, service_id, command)
-        response_data = proc.stdout.getvalue()
-        log.info("_json_asok output: {0}".format(response_data))
-        if response_data.strip():
-            return json.loads(response_data)
-        else:
-            return None
-
     def set_clients_block(self, blocked, mds_id=None):
         """
         Block (using iptables) client communications to this MDS.  Be careful: if
diff --git a/tasks/cephfs/vstart_runner.py b/tasks/cephfs/vstart_runner.py
deleted file mode 100644 (file)
index 81dca6c..0000000
+++ /dev/null
@@ -1,970 +0,0 @@
-"""
-vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
-ceph instance instead of a packaged/installed cluster.  Use this to turn around test cases
-quickly during development.
-
-Usage (assuming teuthology, ceph, ceph-qa-suite checked out in ~/git):
-
-    # Activate the teuthology virtualenv
-    source ~/git/teuthology/virtualenv/bin/activate
-    # Go into your ceph build directory
-    cd ~/git/ceph/build
-    # Start a vstart cluster
-    MDS=2 MON=1 OSD=3 ../src/vstart.sh -n
-    # Invoke a test using this script, with PYTHONPATH set appropriately
-    python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
-
-    # Alternatively, if you use different paths, specify them as follows:
-    LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph-qa-suite:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
-
-    # If you wish to drop to a python shell on failures, use --interactive:
-    python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py --interactive
-
-    # If you wish to run a named test case, pass it as an argument:
-    python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py tasks.cephfs.test_data_scan
-
-"""
-
-from StringIO import StringIO
-from collections import defaultdict
-import getpass
-import signal
-import tempfile
-import threading
-import datetime
-import shutil
-import re
-import os
-import time
-import json
-import sys
-import errno
-from unittest import suite
-import unittest
-import platform
-from teuthology.orchestra.run import Raw, quote
-from teuthology.orchestra.daemon import DaemonGroup
-from teuthology.config import config as teuth_config
-
-import logging
-
-log = logging.getLogger(__name__)
-
-handler = logging.FileHandler("./vstart_runner.log")
-formatter = logging.Formatter(
-    fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
-    datefmt='%Y-%m-%dT%H:%M:%S')
-handler.setFormatter(formatter)
-log.addHandler(handler)
-log.setLevel(logging.INFO)
-
-
-def respawn_in_path(lib_path, python_paths):
-    execv_cmd = ['python']
-    if platform.system() == "Darwin":
-        lib_path_var = "DYLD_LIBRARY_PATH"
-    else:
-        lib_path_var = "LD_LIBRARY_PATH"
-
-    py_binary = os.environ.get("PYTHON", "python")
-
-    if lib_path_var in os.environ:
-        if lib_path not in os.environ[lib_path_var]:
-            os.environ[lib_path_var] += ':' + lib_path
-            os.execvp(py_binary, execv_cmd + sys.argv)
-    else:
-        os.environ[lib_path_var] = lib_path
-        os.execvp(py_binary, execv_cmd + sys.argv)
-
-    for p in python_paths:
-        sys.path.insert(0, p)
-
-
-# Let's use some sensible defaults
-if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
-
-    # A list of candidate paths for each package we need
-    guesses = [
-        ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
-        ["~/git/ceph-qa-suite", "~/scm/ceph-qa-suite", "~/ceph-qa-suite"],
-        ["lib/cython_modules/lib.2"],
-        ["../src/pybind"],
-    ]
-
-    python_paths = []
-    for package_guesses in guesses:
-        for g in package_guesses:
-            g_exp = os.path.abspath(os.path.expanduser(g))
-            if os.path.exists(g_exp):
-                python_paths.append(g_exp)
-
-    ld_path = os.path.join(os.getcwd(), "lib/")
-    print "Using guessed paths {0} {1}".format(ld_path, python_paths)
-    respawn_in_path(ld_path, python_paths)
-
-
-try:
-    from teuthology.exceptions import CommandFailedError
-    from tasks.ceph_manager import CephManager
-    from tasks.cephfs.fuse_mount import FuseMount
-    from tasks.cephfs.filesystem import Filesystem, MDSCluster
-    from teuthology.contextutil import MaxWhileTries
-    from teuthology.task import interactive
-except ImportError:
-    sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
-                     "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
-    raise
-
-# Must import after teuthology because of gevent monkey patching
-import subprocess
-
-if os.path.exists("./CMakeCache.txt"):
-    # Running in build dir of a cmake build
-    BIN_PREFIX = "./bin/"
-else:
-    # Running in src/ of an autotools build
-    BIN_PREFIX = "./"
-
-
-class LocalRemoteProcess(object):
-    def __init__(self, args, subproc, check_status, stdout, stderr):
-        self.args = args
-        self.subproc = subproc
-        if stdout is None:
-            self.stdout = StringIO()
-        else:
-            self.stdout = stdout
-
-        if stderr is None:
-            self.stderr = StringIO()
-        else:
-            self.stderr = stderr
-
-        self.check_status = check_status
-        self.exitstatus = self.returncode = None
-
-    def wait(self):
-        if self.finished:
-            # Avoid calling communicate() on a dead process because it'll
-            # give you stick about std* already being closed
-            if self.exitstatus != 0:
-                raise CommandFailedError(self.args, self.exitstatus)
-            else:
-                return
-
-        out, err = self.subproc.communicate()
-        self.stdout.write(out)
-        self.stderr.write(err)
-
-        self.exitstatus = self.returncode = self.subproc.returncode
-
-        if self.exitstatus != 0:
-            sys.stderr.write(out)
-            sys.stderr.write(err)
-
-        if self.check_status and self.exitstatus != 0:
-            raise CommandFailedError(self.args, self.exitstatus)
-
-    @property
-    def finished(self):
-        if self.exitstatus is not None:
-            return True
-
-        if self.subproc.poll() is not None:
-            out, err = self.subproc.communicate()
-            self.stdout.write(out)
-            self.stderr.write(err)
-            self.exitstatus = self.returncode = self.subproc.returncode
-            return True
-        else:
-            return False
-
-    def kill(self):
-        log.info("kill ")
-        if self.subproc.pid and not self.finished:
-            log.info("kill: killing pid {0} ({1})".format(
-                self.subproc.pid, self.args))
-            safe_kill(self.subproc.pid)
-        else:
-            log.info("kill: already terminated ({0})".format(self.args))
-
-    @property
-    def stdin(self):
-        class FakeStdIn(object):
-            def __init__(self, mount_daemon):
-                self.mount_daemon = mount_daemon
-
-            def close(self):
-                self.mount_daemon.kill()
-
-        return FakeStdIn(self)
-
-
-class LocalRemote(object):
-    """
-    Amusingly named class to present the teuthology RemoteProcess interface when we are really
-    running things locally for vstart
-
-    Run this inside your src/ dir!
-    """
-
-    def __init__(self):
-        self.name = "local"
-        self.hostname = "localhost"
-        self.user = getpass.getuser()
-
-    def get_file(self, path, sudo, dest_dir):
-        tmpfile = tempfile.NamedTemporaryFile(delete=False).name
-        shutil.copy(path, tmpfile)
-        return tmpfile
-
-    def put_file(self, src, dst, sudo=False):
-        shutil.copy(src, dst)
-
-    def run(self, args, check_status=True, wait=True,
-            stdout=None, stderr=None, cwd=None, stdin=None,
-            logger=None, label=None):
-        log.info("run args={0}".format(args))
-
-        # We don't need no stinkin' sudo
-        args = [a for a in args if a != "sudo"]
-
-        # We have to use shell=True if any run.Raw was present, e.g. &&
-        shell = any([a for a in args if isinstance(a, Raw)])
-
-        if shell:
-            filtered = []
-            i = 0
-            while i < len(args):
-                if args[i] == 'adjust-ulimits':
-                    i += 1
-                elif args[i] == 'ceph-coverage':
-                    i += 2
-                elif args[i] == 'timeout':
-                    i += 2
-                else:
-                    filtered.append(args[i])
-                    i += 1
-
-            args = quote(filtered)
-            log.info("Running {0}".format(args))
-
-            subproc = subprocess.Popen(args,
-                                       stdout=subprocess.PIPE,
-                                       stderr=subprocess.PIPE,
-                                       stdin=subprocess.PIPE,
-                                       cwd=cwd,
-                                       shell=True)
-        else:
-            log.info("Running {0}".format(args))
-
-            for arg in args:
-                if not isinstance(arg, basestring):
-                    raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
-                        arg, arg.__class__
-                    ))
-
-            subproc = subprocess.Popen(args,
-                                       stdout=subprocess.PIPE,
-                                       stderr=subprocess.PIPE,
-                                       stdin=subprocess.PIPE,
-                                       cwd=cwd)
-
-        if stdin:
-            if not isinstance(stdin, basestring):
-                raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
-
-            # Hack: writing to stdin is not deadlock-safe, but it "always" works
-            # as long as the input buffer is "small"
-            subproc.stdin.write(stdin)
-
-        proc = LocalRemoteProcess(
-            args, subproc, check_status,
-            stdout, stderr
-        )
-
-        if wait:
-            proc.wait()
-
-        return proc
-
-
-# FIXME: twiddling vstart daemons is likely to be unreliable, we should probably just let vstart
-# run RADOS and run the MDS daemons directly from the test runner
-class LocalDaemon(object):
-    def __init__(self, daemon_type, daemon_id):
-        self.daemon_type = daemon_type
-        self.daemon_id = daemon_id
-        self.controller = LocalRemote()
-        self.proc = None
-
-    @property
-    def remote(self):
-        return LocalRemote()
-
-    def running(self):
-        return self._get_pid() is not None
-
-    def _get_pid(self):
-        """
-        Return PID as an integer or None if not found
-        """
-        ps_txt = self.controller.run(
-            args=["ps", "-xwwu"+str(os.getuid())]
-        ).stdout.getvalue().strip()
-        lines = ps_txt.split("\n")[1:]
-
-        for line in lines:
-            if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
-                log.info("Found ps line for daemon: {0}".format(line))
-                return int(line.split()[1])
-        log.info("No match for {0} {1}: {2}".format(
-            self.daemon_type, self.daemon_id, ps_txt
-            ))
-        return None
-
-    def wait(self, timeout):
-        waited = 0
-        while self._get_pid() is not None:
-            if waited > timeout:
-                raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
-            time.sleep(1)
-            waited += 1
-
-    def stop(self, timeout=300):
-        if not self.running():
-            log.error('tried to stop a non-running daemon')
-            return
-
-        pid = self._get_pid()
-        log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
-        os.kill(pid, signal.SIGKILL)
-        self.wait(timeout=timeout)
-
-    def restart(self):
-        if self._get_pid() is not None:
-            self.stop()
-
-        self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
-
-
-def safe_kill(pid):
-    """
-    os.kill annoyingly raises exception if process already dead.  Ignore it.
-    """
-    try:
-        return os.kill(pid, signal.SIGKILL)
-    except OSError as e:
-        if e.errno == errno.ESRCH:
-            # Raced with process termination
-            pass
-        else:
-            raise
-
-
-class LocalFuseMount(FuseMount):
-    def __init__(self, test_dir, client_id):
-        super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
-
-    @property
-    def config_path(self):
-        return "./ceph.conf"
-
-    def get_keyring_path(self):
-        # This is going to end up in a config file, so use an absolute path
-        # to avoid assumptions about daemons' pwd
-        return os.path.abspath("./client.{0}.keyring".format(self.client_id))
-
-    def run_shell(self, args, wait=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.client_remote.run(
-            args, wait=wait, cwd=self.mountpoint
-        )
-
-    @property
-    def _prefix(self):
-        return BIN_PREFIX
-
-    def _asok_path(self):
-        # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
-        # run foreground.  When running it daemonized however, the asok is named after
-        # the PID of the launching process, not the long running ceph-fuse process.  Therefore
-        # we need to give an exact path here as the logic for checking /proc/ for which
-        # asok is alive does not work.
-        path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
-        log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
-        return path
-
-    def umount(self):
-        if self.is_mounted():
-            super(LocalFuseMount, self).umount()
-
-    def mount(self, mount_path=None):
-        self.client_remote.run(
-            args=[
-                'mkdir',
-                '--',
-                self.mountpoint,
-            ],
-        )
-
-        def list_connections():
-            self.client_remote.run(
-                args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
-                check_status=False
-            )
-            p = self.client_remote.run(
-                args=["ls", "/sys/fs/fuse/connections"],
-                check_status=False
-            )
-            if p.exitstatus != 0:
-                log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
-                return []
-
-            ls_str = p.stdout.getvalue().strip()
-            if ls_str:
-                return [int(n) for n in ls_str.split("\n")]
-            else:
-                return []
-
-        # Before starting ceph-fuse process, note the contents of
-        # /sys/fs/fuse/connections
-        pre_mount_conns = list_connections()
-        log.info("Pre-mount connections: {0}".format(pre_mount_conns))
-
-        prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
-        if os.getuid() != 0:
-            prefix += ["--client-die-on-failed-remount=false"]
-
-        if mount_path is not None:
-            prefix += ["--client_mountpoint={0}".format(mount_path)]
-
-        self.fuse_daemon = self.client_remote.run(args=
-                                            prefix + [
-                                                "-f",
-                                                "--name",
-                                                "client.{0}".format(self.client_id),
-                                                self.mountpoint
-                                            ], wait=False)
-
-        log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
-
-        # Wait for the connection reference to appear in /sys
-        waited = 0
-        post_mount_conns = list_connections()
-        while len(post_mount_conns) <= len(pre_mount_conns):
-            if self.fuse_daemon.finished:
-                # Did mount fail?  Raise the CommandFailedError instead of
-                # hitting the "failed to populate /sys/" timeout
-                self.fuse_daemon.wait()
-            time.sleep(1)
-            waited += 1
-            if waited > 30:
-                raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
-                    waited
-                ))
-            post_mount_conns = list_connections()
-
-        log.info("Post-mount connections: {0}".format(post_mount_conns))
-
-        # Record our fuse connection number so that we can use it when
-        # forcing an unmount
-        new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
-        if len(new_conns) == 0:
-            raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
-        elif len(new_conns) > 1:
-            raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
-        else:
-            self._fuse_conn = new_conns[0]
-
-    def _run_python(self, pyscript):
-        """
-        Override this to remove the daemon-helper prefix that is used otherwise
-        to make the process killable.
-        """
-        return self.client_remote.run(args=[
-            'python', '-c', pyscript
-        ], wait=False)
-
-
-class LocalCephManager(CephManager):
-    def __init__(self):
-        # Deliberately skip parent init, only inheriting from it to get
-        # util methods like osd_dump that sit on top of raw_cluster_cmd
-        self.controller = LocalRemote()
-
-        # A minority of CephManager fns actually bother locking for when
-        # certain teuthology tests want to run tasks in parallel
-        self.lock = threading.RLock()
-
-    def find_remote(self, daemon_type, daemon_id):
-        """
-        daemon_type like 'mds', 'osd'
-        daemon_id like 'a', '0'
-        """
-        return LocalRemote()
-
-    def run_ceph_w(self):
-        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
-        return proc
-
-    def raw_cluster_cmd(self, *args):
-        """
-        args like ["osd", "dump"}
-        return stdout string
-        """
-        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
-        return proc.stdout.getvalue()
-
-    def raw_cluster_cmd_result(self, *args):
-        """
-        like raw_cluster_cmd but don't check status, just return rc
-        """
-        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
-        return proc.exitstatus
-
-    def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
-        return self.controller.run(
-            args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
-        )
-
-    # FIXME: copypasta
-    def get_mds_status(self, mds):
-        """
-        Run cluster commands for the mds in order to get mds information
-        """
-        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
-        j = json.loads(' '.join(out.splitlines()[1:]))
-        # collate; for dup ids, larger gid wins.
-        for info in j['info'].itervalues():
-            if info['name'] == mds:
-                return info
-        return None
-
-    # FIXME: copypasta
-    def get_mds_status_by_rank(self, rank):
-        """
-        Run cluster commands for the mds in order to get mds information
-        check rank.
-        """
-        j = self.get_mds_status_all()
-        # collate; for dup ids, larger gid wins.
-        for info in j['info'].itervalues():
-            if info['rank'] == rank:
-                return info
-        return None
-
-    def get_mds_status_all(self):
-        """
-        Run cluster command to extract all the mds status.
-        """
-        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
-        j = json.loads(' '.join(out.splitlines()[1:]))
-        return j
-
-
-class LocalMDSCluster(MDSCluster):
-    def __init__(self, ctx):
-        # Deliberately skip calling parent constructor
-        self._ctx = ctx
-
-        self.mds_ids = ctx.daemons.daemons['mds'].keys()
-        if not self.mds_ids:
-            raise RuntimeError("No MDSs found in ceph.conf!")
-
-        self.mon_manager = LocalCephManager()
-        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
-
-        self._conf = defaultdict(dict)
-
-    def get_config(self, key, service_type=None):
-        if service_type is None:
-            service_type = 'mon'
-
-        # FIXME hardcoded vstart service IDs
-        service_id = {
-            'mon': 'a',
-            'mds': 'a',
-            'osd': '0'
-        }[service_type]
-
-        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
-    def _write_conf(self):
-        # In teuthology, we have the honour of writing the entire ceph.conf, but
-        # in vstart land it has mostly already been written and we need to carefully
-        # append to it.
-        conf_path = "./ceph.conf"
-        banner = "\n#LOCAL_TEST\n"
-        existing_str = open(conf_path).read()
-
-        if banner in existing_str:
-            existing_str = existing_str[0:existing_str.find(banner)]
-
-        existing_str += banner
-
-        for subsys, kvs in self._conf.items():
-            existing_str += "\n[{0}]\n".format(subsys)
-            for key, val in kvs.items():
-                # Comment out existing instance if it exists
-                log.info("Searching for existing instance {0}/{1}".format(
-                    key, subsys
-                ))
-                existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
-                    subsys
-                ), existing_str, re.MULTILINE)
-
-                if existing_section:
-                    section_str = existing_str[existing_section.start():existing_section.end()]
-                    existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
-                    if existing_val:
-                        start = existing_section.start() + existing_val.start(1)
-                        log.info("Found string to replace at {0}".format(
-                            start
-                        ))
-                        existing_str = existing_str[0:start] + "#" + existing_str[start:]
-
-                existing_str += "{0} = {1}\n".format(key, val)
-
-        open(conf_path, "w").write(existing_str)
-
-    def set_ceph_conf(self, subsys, key, value):
-        self._conf[subsys][key] = value
-        self._write_conf()
-
-    def clear_ceph_conf(self, subsys, key):
-        del self._conf[subsys][key]
-        self._write_conf()
-
-    def clear_firewall(self):
-        # FIXME: unimplemented
-        pass
-
-    def get_filesystem(self, name):
-        return LocalFilesystem(self._ctx, name)
-
-
-class LocalFilesystem(Filesystem, LocalMDSCluster):
-    @property
-    def admin_remote(self):
-        return LocalRemote()
-
-    def __init__(self, ctx, name=None):
-        # Deliberately skip calling parent constructor
-        self._ctx = ctx
-
-        if name is None:
-            name = "cephfs"
-
-        self.name = name
-        self.metadata_pool_name = "{0}_metadata".format(name)
-        self.data_pool_name = "{0}_data".format(name)
-
-        # Hack: cheeky inspection of ceph.conf to see what MDSs exist
-        self.mds_ids = set()
-        for line in open("ceph.conf").readlines():
-            match = re.match("^\[mds\.(.+)\]$", line)
-            if match:
-                self.mds_ids.add(match.group(1))
-
-        if not self.mds_ids:
-            raise RuntimeError("No MDSs found in ceph.conf!")
-
-        self.mds_ids = list(self.mds_ids)
-
-        log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
-
-        self.mon_manager = LocalCephManager()
-
-        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
-
-        self.client_remote = LocalRemote()
-
-        self._conf = defaultdict(dict)
-
-    @property
-    def _prefix(self):
-        return BIN_PREFIX
-
-    def set_clients_block(self, blocked, mds_id=None):
-        raise NotImplementedError()
-
-    def get_pgs_per_fs_pool(self):
-        # FIXME: assuming there are 3 OSDs
-        return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
-
-
-class InteractiveFailureResult(unittest.TextTestResult):
-    """
-    Specialization that implements interactive-on-error style
-    behavior.
-    """
-    def addFailure(self, test, err):
-        super(InteractiveFailureResult, self).addFailure(test, err)
-        log.error(self._exc_info_to_string(err, test))
-        log.error("Failure in test '{0}', going interactive".format(
-            self.getDescription(test)
-        ))
-        interactive.task(ctx=None, config=None)
-
-    def addError(self, test, err):
-        super(InteractiveFailureResult, self).addError(test, err)
-        log.error(self._exc_info_to_string(err, test))
-        log.error("Error in test '{0}', going interactive".format(
-            self.getDescription(test)
-        ))
-        interactive.task(ctx=None, config=None)
-
-
-def exec_test():
-    # Help developers by stopping up-front if their tree isn't built enough for all the
-    # tools that the tests might want to use (add more here if needed)
-    require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
-                        "cephfs-table-tool", "ceph-fuse", "rados"]
-    missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
-    if missing_binaries:
-        log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
-        sys.exit(-1)
-
-    test_dir = tempfile.mkdtemp()
-
-    # Create as many of these as the biggest test requires
-    clients = ["0", "1", "2", "3"]
-
-    remote = LocalRemote()
-
-    # Tolerate no MDSs or clients running at start
-    ps_txt = remote.run(
-        args=["ps", "-u"+str(os.getuid())]
-    ).stdout.getvalue().strip()
-    lines = ps_txt.split("\n")[1:]
-
-    for line in lines:
-        if 'ceph-fuse' in line or 'ceph-mds' in line:
-            pid = int(line.split()[0])
-            log.warn("Killing stray process {0}".format(line))
-            os.kill(pid, signal.SIGKILL)
-
-    class LocalCluster(object):
-        def __init__(self, rolename="placeholder"):
-            self.remotes = {
-                remote: [rolename]
-            }
-
-        def only(self, requested):
-            return self.__class__(rolename=requested)
-
-    teuth_config['test_path'] = test_dir
-
-    class LocalContext(object):
-        def __init__(self):
-            self.config = {}
-            self.teuthology_config = teuth_config
-            self.cluster = LocalCluster()
-            self.daemons = DaemonGroup()
-
-            # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
-            # tests that want to look these up via ctx can do so.
-            # Inspect ceph.conf to see what roles exist
-            for conf_line in open("ceph.conf").readlines():
-                for svc_type in ["mon", "osd", "mds"]:
-                    if svc_type not in self.daemons.daemons:
-                        self.daemons.daemons[svc_type] = {}
-                    match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
-                    if match:
-                        svc_id = match.group(1)
-                        self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
-
-        def __del__(self):
-            shutil.rmtree(self.teuthology_config['test_path'])
-
-    ctx = LocalContext()
-
-    mounts = []
-    for client_id in clients:
-        # Populate client keyring (it sucks to use client.admin for test clients
-        # because it's awkward to find the logs later)
-        client_name = "client.{0}".format(client_id)
-
-        if client_name not in open("./keyring").read():
-            p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
-                                 "osd", "allow rw",
-                                 "mds", "allow",
-                                 "mon", "allow r"])
-
-            open("./keyring", "a").write(p.stdout.getvalue())
-
-        mount = LocalFuseMount(test_dir, client_id)
-        mounts.append(mount)
-        if mount.is_mounted():
-            log.warn("unmounting {0}".format(mount.mountpoint))
-            mount.umount_wait()
-        else:
-            if os.path.exists(mount.mountpoint):
-                os.rmdir(mount.mountpoint)
-    filesystem = LocalFilesystem(ctx)
-    mds_cluster = LocalMDSCluster(ctx)
-
-    from tasks.cephfs_test_runner import DecoratingLoader
-
-    class LogStream(object):
-        def __init__(self):
-            self.buffer = ""
-
-        def write(self, data):
-            self.buffer += data
-            if "\n" in self.buffer:
-                lines = self.buffer.split("\n")
-                for line in lines[:-1]:
-                    pass
-                    # sys.stderr.write(line + "\n")
-                    log.info(line)
-                self.buffer = lines[-1]
-
-        def flush(self):
-            pass
-
-    decorating_loader = DecoratingLoader({
-        "ctx": ctx,
-        "mounts": mounts,
-        "fs": filesystem,
-        "mds_cluster": mds_cluster
-    })
-
-    # For the benefit of polling tests like test_full -- in teuthology land we set this
-    # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
-    remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
-    filesystem.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
-
-    # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
-    # from normal IO latency.  Increase it for running teests.
-    filesystem.set_ceph_conf("mds", "mds log max segments", "10")
-
-    # Make sure the filesystem created in tests has uid/gid that will let us talk to
-    # it after mounting it (without having to  go root).  Set in 'global' not just 'mds'
-    # so that cephfs-data-scan will pick it up too.
-    filesystem.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
-    filesystem.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
-
-    # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
-    def _get_package_version(remote, pkg_name):
-        # Used in cephfs tests to find fuse version.  Your development workstation *does* have >=2.9, right?
-        return "2.9"
-
-    import teuthology.packaging
-    teuthology.packaging.get_package_version = _get_package_version
-
-    def enumerate_methods(s):
-        for t in s._tests:
-            if isinstance(t, suite.BaseTestSuite):
-                for sub in enumerate_methods(t):
-                    yield sub
-            else:
-                yield s, t
-
-    interactive_on_error = False
-
-    args = sys.argv[1:]
-    flags = [a for a in args if a.startswith("-")]
-    modules = [a for a in args if not a.startswith("-")]
-    for f in flags:
-        if f == "--interactive":
-            interactive_on_error = True
-        else:
-            log.error("Unknown option '{0}'".format(f))
-            sys.exit(-1)
-
-    if modules:
-        log.info("Executing modules: {0}".format(modules))
-        module_suites = []
-        for mod_name in modules:
-            # Test names like cephfs.test_auto_repair
-            module_suites.append(decorating_loader.loadTestsFromName(mod_name))
-        log.info("Loaded: {0}".format(list(module_suites)))
-        overall_suite = suite.TestSuite(module_suites)
-    else:
-        log.info("Excuting all tests")
-        overall_suite = decorating_loader.discover(
-            os.path.dirname(os.path.abspath(__file__))
-        )
-
-    # Filter out tests that don't lend themselves to interactive running,
-    victims = []
-    for case, method in enumerate_methods(overall_suite):
-        fn = getattr(method, method._testMethodName)
-
-        drop_test = False
-
-        if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
-            drop_test = True
-            log.warn("Dropping test because long running: ".format(method.id()))
-
-        if getattr(fn, "needs_trimming", False) is True:
-            drop_test = (os.getuid() != 0)
-            log.warn("Dropping test because client trim unavailable: ".format(method.id()))
-
-        if drop_test:
-            # Don't drop the test if it was explicitly requested in arguments
-            is_named = False
-            for named in modules:
-                if named.endswith(method.id()):
-                    is_named = True
-                    break
-
-            if not is_named:
-                victims.append((case, method))
-
-    log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
-    for s, method in victims:
-        s._tests.remove(method)
-
-    if interactive_on_error:
-        result_class = InteractiveFailureResult
-    else:
-        result_class = unittest.TextTestResult
-    fail_on_skip = False
-
-    class LoggingResult(result_class):
-        def startTest(self, test):
-            log.info("Starting test: {0}".format(self.getDescription(test)))
-            test.started_at = datetime.datetime.utcnow()
-            return super(LoggingResult, self).startTest(test)
-
-        def stopTest(self, test):
-            log.info("Stopped test: {0} in {1}s".format(
-                self.getDescription(test),
-                (datetime.datetime.utcnow() - test.started_at).total_seconds()
-            ))
-
-        def addSkip(self, test, reason):
-            if fail_on_skip:
-                # Don't just call addFailure because that requires a traceback
-                self.failures.append((test, reason))
-            else:
-                super(LoggingResult, self).addSkip(test, reason)
-
-    # Execute!
-    result = unittest.TextTestRunner(
-        stream=LogStream(),
-        resultclass=LoggingResult,
-        verbosity=2,
-        failfast=True).run(overall_suite)
-
-    if not result.wasSuccessful():
-        result.printErrors()  # duplicate output at end for convenience
-
-        bad_tests = []
-        for test, error in result.errors:
-            bad_tests.append(str(test))
-        for test, failure in result.failures:
-            bad_tests.append(str(test))
-
-        sys.exit(-1)
-    else:
-        sys.exit(0)
-
-
-if __name__ == "__main__":
-    exec_test()
diff --git a/tasks/vstart_runner.py b/tasks/vstart_runner.py
new file mode 100644 (file)
index 0000000..012ec76
--- /dev/null
@@ -0,0 +1,986 @@
+"""
+vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
+ceph instance instead of a packaged/installed cluster.  Use this to turn around test cases
+quickly during development.
+
+Usage (assuming teuthology, ceph, ceph-qa-suite checked out in ~/git):
+
+    # Activate the teuthology virtualenv
+    source ~/git/teuthology/virtualenv/bin/activate
+    # Go into your ceph build directory
+    cd ~/git/ceph/build
+    # Start a vstart cluster
+    MDS=2 MON=1 OSD=3 ../src/vstart.sh -n
+    # Invoke a test using this script, with PYTHONPATH set appropriately
+    python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
+
+    # Alternatively, if you use different paths, specify them as follows:
+    LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph-qa-suite:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
+
+    # If you wish to drop to a python shell on failures, use --interactive:
+    python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py --interactive
+
+    # If you wish to run a named test case, pass it as an argument:
+    python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py tasks.cephfs.test_data_scan
+
+"""
+
+from StringIO import StringIO
+from collections import defaultdict
+import getpass
+import signal
+import tempfile
+import threading
+import datetime
+import shutil
+import re
+import os
+import time
+import json
+import sys
+import errno
+from unittest import suite
+import unittest
+import platform
+from teuthology.orchestra.run import Raw, quote
+from teuthology.orchestra.daemon import DaemonGroup
+from teuthology.config import config as teuth_config
+
+import logging
+
+log = logging.getLogger(__name__)
+
+handler = logging.FileHandler("./vstart_runner.log")
+formatter = logging.Formatter(
+    fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
+    datefmt='%Y-%m-%dT%H:%M:%S')
+handler.setFormatter(formatter)
+log.addHandler(handler)
+log.setLevel(logging.INFO)
+
+
+def respawn_in_path(lib_path, python_paths):
+    execv_cmd = ['python']
+    if platform.system() == "Darwin":
+        lib_path_var = "DYLD_LIBRARY_PATH"
+    else:
+        lib_path_var = "LD_LIBRARY_PATH"
+
+    py_binary = os.environ.get("PYTHON", "python")
+
+    if lib_path_var in os.environ:
+        if lib_path not in os.environ[lib_path_var]:
+            os.environ[lib_path_var] += ':' + lib_path
+            os.execvp(py_binary, execv_cmd + sys.argv)
+    else:
+        os.environ[lib_path_var] = lib_path
+        os.execvp(py_binary, execv_cmd + sys.argv)
+
+    for p in python_paths:
+        sys.path.insert(0, p)
+
+
+# Let's use some sensible defaults
+if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
+
+    # A list of candidate paths for each package we need
+    guesses = [
+        ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
+        ["~/git/ceph-qa-suite", "~/scm/ceph-qa-suite", "~/ceph-qa-suite"],
+        ["lib/cython_modules/lib.2"],
+        ["../src/pybind"],
+    ]
+
+    python_paths = []
+    for package_guesses in guesses:
+        for g in package_guesses:
+            g_exp = os.path.abspath(os.path.expanduser(g))
+            if os.path.exists(g_exp):
+                python_paths.append(g_exp)
+
+    ld_path = os.path.join(os.getcwd(), "lib/")
+    print "Using guessed paths {0} {1}".format(ld_path, python_paths)
+    respawn_in_path(ld_path, python_paths)
+
+
+try:
+    from teuthology.exceptions import CommandFailedError
+    from tasks.ceph_manager import CephManager
+    from tasks.cephfs.fuse_mount import FuseMount
+    from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
+    from mgr.mgr_test_case import MgrCluster
+    from teuthology.contextutil import MaxWhileTries
+    from teuthology.task import interactive
+except ImportError:
+    sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
+                     "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
+    raise
+
+# Must import after teuthology because of gevent monkey patching
+import subprocess
+
+if os.path.exists("./CMakeCache.txt"):
+    # Running in build dir of a cmake build
+    BIN_PREFIX = "./bin/"
+else:
+    # Running in src/ of an autotools build
+    BIN_PREFIX = "./"
+
+
+class LocalRemoteProcess(object):
+    def __init__(self, args, subproc, check_status, stdout, stderr):
+        self.args = args
+        self.subproc = subproc
+        if stdout is None:
+            self.stdout = StringIO()
+        else:
+            self.stdout = stdout
+
+        if stderr is None:
+            self.stderr = StringIO()
+        else:
+            self.stderr = stderr
+
+        self.check_status = check_status
+        self.exitstatus = self.returncode = None
+
+    def wait(self):
+        if self.finished:
+            # Avoid calling communicate() on a dead process because it'll
+            # give you stick about std* already being closed
+            if self.exitstatus != 0:
+                raise CommandFailedError(self.args, self.exitstatus)
+            else:
+                return
+
+        out, err = self.subproc.communicate()
+        self.stdout.write(out)
+        self.stderr.write(err)
+
+        self.exitstatus = self.returncode = self.subproc.returncode
+
+        if self.exitstatus != 0:
+            sys.stderr.write(out)
+            sys.stderr.write(err)
+
+        if self.check_status and self.exitstatus != 0:
+            raise CommandFailedError(self.args, self.exitstatus)
+
+    @property
+    def finished(self):
+        if self.exitstatus is not None:
+            return True
+
+        if self.subproc.poll() is not None:
+            out, err = self.subproc.communicate()
+            self.stdout.write(out)
+            self.stderr.write(err)
+            self.exitstatus = self.returncode = self.subproc.returncode
+            return True
+        else:
+            return False
+
+    def kill(self):
+        log.info("kill ")
+        if self.subproc.pid and not self.finished:
+            log.info("kill: killing pid {0} ({1})".format(
+                self.subproc.pid, self.args))
+            safe_kill(self.subproc.pid)
+        else:
+            log.info("kill: already terminated ({0})".format(self.args))
+
+    @property
+    def stdin(self):
+        class FakeStdIn(object):
+            def __init__(self, mount_daemon):
+                self.mount_daemon = mount_daemon
+
+            def close(self):
+                self.mount_daemon.kill()
+
+        return FakeStdIn(self)
+
+
+class LocalRemote(object):
+    """
+    Amusingly named class to present the teuthology RemoteProcess interface when we are really
+    running things locally for vstart
+
+    Run this inside your src/ dir!
+    """
+
+    def __init__(self):
+        self.name = "local"
+        self.hostname = "localhost"
+        self.user = getpass.getuser()
+
+    def get_file(self, path, sudo, dest_dir):
+        tmpfile = tempfile.NamedTemporaryFile(delete=False).name
+        shutil.copy(path, tmpfile)
+        return tmpfile
+
+    def put_file(self, src, dst, sudo=False):
+        shutil.copy(src, dst)
+
+    def run(self, args, check_status=True, wait=True,
+            stdout=None, stderr=None, cwd=None, stdin=None,
+            logger=None, label=None):
+        log.info("run args={0}".format(args))
+
+        # We don't need no stinkin' sudo
+        args = [a for a in args if a != "sudo"]
+
+        # We have to use shell=True if any run.Raw was present, e.g. &&
+        shell = any([a for a in args if isinstance(a, Raw)])
+
+        if shell:
+            filtered = []
+            i = 0
+            while i < len(args):
+                if args[i] == 'adjust-ulimits':
+                    i += 1
+                elif args[i] == 'ceph-coverage':
+                    i += 2
+                elif args[i] == 'timeout':
+                    i += 2
+                else:
+                    filtered.append(args[i])
+                    i += 1
+
+            args = quote(filtered)
+            log.info("Running {0}".format(args))
+
+            subproc = subprocess.Popen(args,
+                                       stdout=subprocess.PIPE,
+                                       stderr=subprocess.PIPE,
+                                       stdin=subprocess.PIPE,
+                                       cwd=cwd,
+                                       shell=True)
+        else:
+            log.info("Running {0}".format(args))
+
+            for arg in args:
+                if not isinstance(arg, basestring):
+                    raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
+                        arg, arg.__class__
+                    ))
+
+            subproc = subprocess.Popen(args,
+                                       stdout=subprocess.PIPE,
+                                       stderr=subprocess.PIPE,
+                                       stdin=subprocess.PIPE,
+                                       cwd=cwd)
+
+        if stdin:
+            if not isinstance(stdin, basestring):
+                raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
+
+            # Hack: writing to stdin is not deadlock-safe, but it "always" works
+            # as long as the input buffer is "small"
+            subproc.stdin.write(stdin)
+
+        proc = LocalRemoteProcess(
+            args, subproc, check_status,
+            stdout, stderr
+        )
+
+        if wait:
+            proc.wait()
+
+        return proc
+
+
+class LocalDaemon(object):
+    def __init__(self, daemon_type, daemon_id):
+        self.daemon_type = daemon_type
+        self.daemon_id = daemon_id
+        self.controller = LocalRemote()
+        self.proc = None
+
+    @property
+    def remote(self):
+        return LocalRemote()
+
+    def running(self):
+        return self._get_pid() is not None
+
+    def _get_pid(self):
+        """
+        Return PID as an integer or None if not found
+        """
+        ps_txt = self.controller.run(
+            args=["ps", "-xwwu"+str(os.getuid())]
+        ).stdout.getvalue().strip()
+        lines = ps_txt.split("\n")[1:]
+
+        for line in lines:
+            if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
+                log.info("Found ps line for daemon: {0}".format(line))
+                return int(line.split()[1])
+        log.info("No match for {0} {1}: {2}".format(
+            self.daemon_type, self.daemon_id, ps_txt
+            ))
+        return None
+
+    def wait(self, timeout):
+        waited = 0
+        while self._get_pid() is not None:
+            if waited > timeout:
+                raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
+            time.sleep(1)
+            waited += 1
+
+    def stop(self, timeout=300):
+        if not self.running():
+            log.error('tried to stop a non-running daemon')
+            return
+
+        pid = self._get_pid()
+        log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
+        os.kill(pid, signal.SIGKILL)
+        self.wait(timeout=timeout)
+
+    def restart(self):
+        if self._get_pid() is not None:
+            self.stop()
+
+        self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
+
+
+def safe_kill(pid):
+    """
+    os.kill annoyingly raises exception if process already dead.  Ignore it.
+    """
+    try:
+        return os.kill(pid, signal.SIGKILL)
+    except OSError as e:
+        if e.errno == errno.ESRCH:
+            # Raced with process termination
+            pass
+        else:
+            raise
+
+
+class LocalFuseMount(FuseMount):
+    def __init__(self, test_dir, client_id):
+        super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
+
+    @property
+    def config_path(self):
+        return "./ceph.conf"
+
+    def get_keyring_path(self):
+        # This is going to end up in a config file, so use an absolute path
+        # to avoid assumptions about daemons' pwd
+        return os.path.abspath("./client.{0}.keyring".format(self.client_id))
+
+    def run_shell(self, args, wait=True):
+        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+        # the "cd foo && bar" shenanigans isn't needed to begin with and
+        # then we wouldn't have to special case this
+        return self.client_remote.run(
+            args, wait=wait, cwd=self.mountpoint
+        )
+
+    @property
+    def _prefix(self):
+        return BIN_PREFIX
+
+    def _asok_path(self):
+        # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
+        # run foreground.  When running it daemonized however, the asok is named after
+        # the PID of the launching process, not the long running ceph-fuse process.  Therefore
+        # we need to give an exact path here as the logic for checking /proc/ for which
+        # asok is alive does not work.
+        path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
+        log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+        return path
+
+    def umount(self):
+        if self.is_mounted():
+            super(LocalFuseMount, self).umount()
+
+    def mount(self, mount_path=None):
+        self.client_remote.run(
+            args=[
+                'mkdir',
+                '--',
+                self.mountpoint,
+            ],
+        )
+
+        def list_connections():
+            self.client_remote.run(
+                args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
+                check_status=False
+            )
+            p = self.client_remote.run(
+                args=["ls", "/sys/fs/fuse/connections"],
+                check_status=False
+            )
+            if p.exitstatus != 0:
+                log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
+                return []
+
+            ls_str = p.stdout.getvalue().strip()
+            if ls_str:
+                return [int(n) for n in ls_str.split("\n")]
+            else:
+                return []
+
+        # Before starting ceph-fuse process, note the contents of
+        # /sys/fs/fuse/connections
+        pre_mount_conns = list_connections()
+        log.info("Pre-mount connections: {0}".format(pre_mount_conns))
+
+        prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
+        if os.getuid() != 0:
+            prefix += ["--client-die-on-failed-remount=false"]
+
+        if mount_path is not None:
+            prefix += ["--client_mountpoint={0}".format(mount_path)]
+
+        self.fuse_daemon = self.client_remote.run(args=
+                                            prefix + [
+                                                "-f",
+                                                "--name",
+                                                "client.{0}".format(self.client_id),
+                                                self.mountpoint
+                                            ], wait=False)
+
+        log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
+
+        # Wait for the connection reference to appear in /sys
+        waited = 0
+        post_mount_conns = list_connections()
+        while len(post_mount_conns) <= len(pre_mount_conns):
+            if self.fuse_daemon.finished:
+                # Did mount fail?  Raise the CommandFailedError instead of
+                # hitting the "failed to populate /sys/" timeout
+                self.fuse_daemon.wait()
+            time.sleep(1)
+            waited += 1
+            if waited > 30:
+                raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
+                    waited
+                ))
+            post_mount_conns = list_connections()
+
+        log.info("Post-mount connections: {0}".format(post_mount_conns))
+
+        # Record our fuse connection number so that we can use it when
+        # forcing an unmount
+        new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
+        if len(new_conns) == 0:
+            raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
+        elif len(new_conns) > 1:
+            raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
+        else:
+            self._fuse_conn = new_conns[0]
+
+    def _run_python(self, pyscript):
+        """
+        Override this to remove the daemon-helper prefix that is used otherwise
+        to make the process killable.
+        """
+        return self.client_remote.run(args=[
+            'python', '-c', pyscript
+        ], wait=False)
+
+
+class LocalCephManager(CephManager):
+    def __init__(self):
+        # Deliberately skip parent init, only inheriting from it to get
+        # util methods like osd_dump that sit on top of raw_cluster_cmd
+        self.controller = LocalRemote()
+
+        # A minority of CephManager fns actually bother locking for when
+        # certain teuthology tests want to run tasks in parallel
+        self.lock = threading.RLock()
+
+    def find_remote(self, daemon_type, daemon_id):
+        """
+        daemon_type like 'mds', 'osd'
+        daemon_id like 'a', '0'
+        """
+        return LocalRemote()
+
+    def run_ceph_w(self):
+        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
+        return proc
+
+    def raw_cluster_cmd(self, *args):
+        """
+        args like ["osd", "dump"}
+        return stdout string
+        """
+        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
+        return proc.stdout.getvalue()
+
+    def raw_cluster_cmd_result(self, *args):
+        """
+        like raw_cluster_cmd but don't check status, just return rc
+        """
+        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
+        return proc.exitstatus
+
+    def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
+        return self.controller.run(
+            args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
+        )
+
+    # FIXME: copypasta
+    def get_mds_status(self, mds):
+        """
+        Run cluster commands for the mds in order to get mds information
+        """
+        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+        j = json.loads(' '.join(out.splitlines()[1:]))
+        # collate; for dup ids, larger gid wins.
+        for info in j['info'].itervalues():
+            if info['name'] == mds:
+                return info
+        return None
+
+    # FIXME: copypasta
+    def get_mds_status_by_rank(self, rank):
+        """
+        Run cluster commands for the mds in order to get mds information
+        check rank.
+        """
+        j = self.get_mds_status_all()
+        # collate; for dup ids, larger gid wins.
+        for info in j['info'].itervalues():
+            if info['rank'] == rank:
+                return info
+        return None
+
+    def get_mds_status_all(self):
+        """
+        Run cluster command to extract all the mds status.
+        """
+        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+        j = json.loads(' '.join(out.splitlines()[1:]))
+        return j
+
+
+class LocalCephCluster(CephCluster):
+    def __init__(self, ctx):
+        # Deliberately skip calling parent constructor
+        self._ctx = ctx
+        self.mon_manager = LocalCephManager()
+        self._conf = defaultdict(dict)
+
+    def get_config(self, key, service_type=None):
+        if service_type is None:
+            service_type = 'mon'
+
+        # FIXME hardcoded vstart service IDs
+        service_id = {
+            'mon': 'a',
+            'mds': 'a',
+            'osd': '0'
+        }[service_type]
+
+        return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+    def _write_conf(self):
+        # In teuthology, we have the honour of writing the entire ceph.conf, but
+        # in vstart land it has mostly already been written and we need to carefully
+        # append to it.
+        conf_path = "./ceph.conf"
+        banner = "\n#LOCAL_TEST\n"
+        existing_str = open(conf_path).read()
+
+        if banner in existing_str:
+            existing_str = existing_str[0:existing_str.find(banner)]
+
+        existing_str += banner
+
+        for subsys, kvs in self._conf.items():
+            existing_str += "\n[{0}]\n".format(subsys)
+            for key, val in kvs.items():
+                # Comment out existing instance if it exists
+                log.info("Searching for existing instance {0}/{1}".format(
+                    key, subsys
+                ))
+                existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
+                    subsys
+                ), existing_str, re.MULTILINE)
+
+                if existing_section:
+                    section_str = existing_str[existing_section.start():existing_section.end()]
+                    existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
+                    if existing_val:
+                        start = existing_section.start() + existing_val.start(1)
+                        log.info("Found string to replace at {0}".format(
+                            start
+                        ))
+                        existing_str = existing_str[0:start] + "#" + existing_str[start:]
+
+                existing_str += "{0} = {1}\n".format(key, val)
+
+        open(conf_path, "w").write(existing_str)
+
+    def set_ceph_conf(self, subsys, key, value):
+        self._conf[subsys][key] = value
+        self._write_conf()
+
+    def clear_ceph_conf(self, subsys, key):
+        del self._conf[subsys][key]
+        self._write_conf()
+
+
+class LocalMDSCluster(LocalCephCluster, MDSCluster):
+    def __init__(self, ctx):
+        super(LocalMDSCluster, self).__init__(ctx)
+
+        self.mds_ids = ctx.daemons.daemons['mds'].keys()
+        if not self.mds_ids:
+            raise RuntimeError("No MDSs found in ceph.conf!")
+
+        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+    def clear_firewall(self):
+        # FIXME: unimplemented
+        pass
+
+    def get_filesystem(self, name):
+        return LocalFilesystem(self._ctx, name)
+
+
+class LocalMgrCluster(LocalCephCluster, MgrCluster):
+    def __init__(self, ctx):
+        super(LocalMgrCluster, self).__init__(ctx)
+
+        self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
+        if not self.mgr_ids:
+            raise RuntimeError("No manager daemonss found in ceph.conf!")
+
+        self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
+
+
+class LocalFilesystem(Filesystem, LocalMDSCluster):
+    @property
+    def admin_remote(self):
+        return LocalRemote()
+
+    def __init__(self, ctx, name=None):
+        # Deliberately skip calling parent constructor
+        self._ctx = ctx
+
+        if name is None:
+            name = "cephfs"
+
+        self.name = name
+        self.metadata_pool_name = "{0}_metadata".format(name)
+        self.data_pool_name = "{0}_data".format(name)
+
+        # Hack: cheeky inspection of ceph.conf to see what MDSs exist
+        self.mds_ids = set()
+        for line in open("ceph.conf").readlines():
+            match = re.match("^\[mds\.(.+)\]$", line)
+            if match:
+                self.mds_ids.add(match.group(1))
+
+        if not self.mds_ids:
+            raise RuntimeError("No MDSs found in ceph.conf!")
+
+        self.mds_ids = list(self.mds_ids)
+
+        log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
+
+        self.mon_manager = LocalCephManager()
+
+        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+        self.client_remote = LocalRemote()
+
+        self._conf = defaultdict(dict)
+
+    @property
+    def _prefix(self):
+        return BIN_PREFIX
+
+    def set_clients_block(self, blocked, mds_id=None):
+        raise NotImplementedError()
+
+    def get_pgs_per_fs_pool(self):
+        # FIXME: assuming there are 3 OSDs
+        return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+    """
+    Specialization that implements interactive-on-error style
+    behavior.
+    """
+    def addFailure(self, test, err):
+        super(InteractiveFailureResult, self).addFailure(test, err)
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Failure in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=None, config=None)
+
+    def addError(self, test, err):
+        super(InteractiveFailureResult, self).addError(test, err)
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Error in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=None, config=None)
+
+
+def exec_test():
+    # Help developers by stopping up-front if their tree isn't built enough for all the
+    # tools that the tests might want to use (add more here if needed)
+    require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
+                        "cephfs-table-tool", "ceph-fuse", "rados"]
+    missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
+    if missing_binaries:
+        log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
+        sys.exit(-1)
+
+    test_dir = tempfile.mkdtemp()
+
+    # Create as many of these as the biggest test requires
+    clients = ["0", "1", "2", "3"]
+
+    remote = LocalRemote()
+
+    # Tolerate no MDSs or clients running at start
+    ps_txt = remote.run(
+        args=["ps", "-u"+str(os.getuid())]
+    ).stdout.getvalue().strip()
+    lines = ps_txt.split("\n")[1:]
+
+    for line in lines:
+        if 'ceph-fuse' in line or 'ceph-mds' in line:
+            pid = int(line.split()[0])
+            log.warn("Killing stray process {0}".format(line))
+            os.kill(pid, signal.SIGKILL)
+
+    class LocalCluster(object):
+        def __init__(self, rolename="placeholder"):
+            self.remotes = {
+                remote: [rolename]
+            }
+
+        def only(self, requested):
+            return self.__class__(rolename=requested)
+
+    teuth_config['test_path'] = test_dir
+
+    class LocalContext(object):
+        def __init__(self):
+            self.config = {}
+            self.teuthology_config = teuth_config
+            self.cluster = LocalCluster()
+            self.daemons = DaemonGroup()
+
+            # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
+            # tests that want to look these up via ctx can do so.
+            # Inspect ceph.conf to see what roles exist
+            for conf_line in open("ceph.conf").readlines():
+                for svc_type in ["mon", "osd", "mds", "mgr"]:
+                    if svc_type not in self.daemons.daemons:
+                        self.daemons.daemons[svc_type] = {}
+                    match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
+                    if match:
+                        svc_id = match.group(1)
+                        self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
+
+        def __del__(self):
+            shutil.rmtree(self.teuthology_config['test_path'])
+
+    ctx = LocalContext()
+
+    mounts = []
+    for client_id in clients:
+        # Populate client keyring (it sucks to use client.admin for test clients
+        # because it's awkward to find the logs later)
+        client_name = "client.{0}".format(client_id)
+
+        if client_name not in open("./keyring").read():
+            p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
+                                 "osd", "allow rw",
+                                 "mds", "allow",
+                                 "mon", "allow r"])
+
+            open("./keyring", "a").write(p.stdout.getvalue())
+
+        mount = LocalFuseMount(test_dir, client_id)
+        mounts.append(mount)
+        if mount.is_mounted():
+            log.warn("unmounting {0}".format(mount.mountpoint))
+            mount.umount_wait()
+        else:
+            if os.path.exists(mount.mountpoint):
+                os.rmdir(mount.mountpoint)
+    filesystem = LocalFilesystem(ctx)
+    mds_cluster = LocalMDSCluster(ctx)
+    mgr_cluster = LocalMgrCluster(ctx)
+
+    from tasks.cephfs_test_runner import DecoratingLoader
+
+    class LogStream(object):
+        def __init__(self):
+            self.buffer = ""
+
+        def write(self, data):
+            self.buffer += data
+            if "\n" in self.buffer:
+                lines = self.buffer.split("\n")
+                for line in lines[:-1]:
+                    pass
+                    # sys.stderr.write(line + "\n")
+                    log.info(line)
+                self.buffer = lines[-1]
+
+        def flush(self):
+            pass
+
+    decorating_loader = DecoratingLoader({
+        "ctx": ctx,
+        "mounts": mounts,
+        "fs": filesystem,
+        "mds_cluster": mds_cluster,
+        "mgr_cluster": mgr_cluster,
+    })
+
+    # For the benefit of polling tests like test_full -- in teuthology land we set this
+    # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
+    remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
+    filesystem.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
+
+    # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
+    # from normal IO latency.  Increase it for running teests.
+    filesystem.set_ceph_conf("mds", "mds log max segments", "10")
+
+    # Make sure the filesystem created in tests has uid/gid that will let us talk to
+    # it after mounting it (without having to  go root).  Set in 'global' not just 'mds'
+    # so that cephfs-data-scan will pick it up too.
+    filesystem.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
+    filesystem.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
+
+    # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
+    def _get_package_version(remote, pkg_name):
+        # Used in cephfs tests to find fuse version.  Your development workstation *does* have >=2.9, right?
+        return "2.9"
+
+    import teuthology.packaging
+    teuthology.packaging.get_package_version = _get_package_version
+
+    def enumerate_methods(s):
+        for t in s._tests:
+            if isinstance(t, suite.BaseTestSuite):
+                for sub in enumerate_methods(t):
+                    yield sub
+            else:
+                yield s, t
+
+    interactive_on_error = False
+
+    args = sys.argv[1:]
+    flags = [a for a in args if a.startswith("-")]
+    modules = [a for a in args if not a.startswith("-")]
+    for f in flags:
+        if f == "--interactive":
+            interactive_on_error = True
+        else:
+            log.error("Unknown option '{0}'".format(f))
+            sys.exit(-1)
+
+    if modules:
+        log.info("Executing modules: {0}".format(modules))
+        module_suites = []
+        for mod_name in modules:
+            # Test names like cephfs.test_auto_repair
+            module_suites.append(decorating_loader.loadTestsFromName(mod_name))
+        log.info("Loaded: {0}".format(list(module_suites)))
+        overall_suite = suite.TestSuite(module_suites)
+    else:
+        log.info("Excuting all tests")
+        overall_suite = decorating_loader.discover(
+            os.path.dirname(os.path.abspath(__file__))
+        )
+
+    # Filter out tests that don't lend themselves to interactive running,
+    victims = []
+    for case, method in enumerate_methods(overall_suite):
+        fn = getattr(method, method._testMethodName)
+
+        drop_test = False
+
+        if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
+            drop_test = True
+            log.warn("Dropping test because long running: ".format(method.id()))
+
+        if getattr(fn, "needs_trimming", False) is True:
+            drop_test = (os.getuid() != 0)
+            log.warn("Dropping test because client trim unavailable: ".format(method.id()))
+
+        if drop_test:
+            # Don't drop the test if it was explicitly requested in arguments
+            is_named = False
+            for named in modules:
+                if named.endswith(method.id()):
+                    is_named = True
+                    break
+
+            if not is_named:
+                victims.append((case, method))
+
+    log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
+    for s, method in victims:
+        s._tests.remove(method)
+
+    if interactive_on_error:
+        result_class = InteractiveFailureResult
+    else:
+        result_class = unittest.TextTestResult
+    fail_on_skip = False
+
+    class LoggingResult(result_class):
+        def startTest(self, test):
+            log.info("Starting test: {0}".format(self.getDescription(test)))
+            test.started_at = datetime.datetime.utcnow()
+            return super(LoggingResult, self).startTest(test)
+
+        def stopTest(self, test):
+            log.info("Stopped test: {0} in {1}s".format(
+                self.getDescription(test),
+                (datetime.datetime.utcnow() - test.started_at).total_seconds()
+            ))
+
+        def addSkip(self, test, reason):
+            if fail_on_skip:
+                # Don't just call addFailure because that requires a traceback
+                self.failures.append((test, reason))
+            else:
+                super(LoggingResult, self).addSkip(test, reason)
+
+    # Execute!
+    result = unittest.TextTestRunner(
+        stream=LogStream(),
+        resultclass=LoggingResult,
+        verbosity=2,
+        failfast=True).run(overall_suite)
+
+    if not result.wasSuccessful():
+        result.printErrors()  # duplicate output at end for convenience
+
+        bad_tests = []
+        for test, error in result.errors:
+            bad_tests.append(str(test))
+        for test, failure in result.failures:
+            bad_tests.append(str(test))
+
+        sys.exit(-1)
+    else:
+        sys.exit(0)
+
+
+if __name__ == "__main__":
+    exec_test()