--- /dev/null
+
+import unittest
+import time
+import logging
+
+from teuthology.orchestra.run import CommandFailedError
+
+log = logging.getLogger(__name__)
+
+
+class CephTestCase(unittest.TestCase):
+ """
+ For test tasks that want to define a structured set of
+ tests implemented in python. Subclass this with appropriate
+ helpers for the subsystem you're testing.
+ """
+
+ # Environment references
+ mounts = None
+ fs = None
+ mds_cluster = None
+ mgr_cluster = None
+ ctx = None
+
+ mon_manager = None
+
+ def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
+ """
+ Context manager. Assert that during execution, or up to 5 seconds later,
+ the Ceph cluster log emits a message matching the expected pattern.
+
+ :param expected_pattern: a string that you expect to see in the log output
+ """
+
+ ceph_manager = self.fs.mon_manager
+
+ class ContextManager(object):
+ def match(self):
+ found = expected_pattern in self.watcher_process.stdout.getvalue()
+ if invert_match:
+ return not found
+
+ return found
+
+ def __enter__(self):
+ self.watcher_process = ceph_manager.run_ceph_w()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if not self.watcher_process.finished:
+ # Check if we got an early match, wait a bit if we didn't
+ if self.match():
+ return
+ else:
+ log.debug("No log hits yet, waiting...")
+ # Default monc tick interval is 10s, so wait that long and
+ # then some grace
+ time.sleep(5 + timeout)
+
+ self.watcher_process.stdin.close()
+ try:
+ self.watcher_process.wait()
+ except CommandFailedError:
+ pass
+
+ if not self.match():
+ log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
+ raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
+
+ return ContextManager()
+
+ def wait_for_health(self, pattern, timeout):
+ """
+ Wait until 'ceph health' contains messages matching the pattern
+ """
+ def seen_health_warning():
+ health = self.fs.mon_manager.get_mon_health()
+ summary_strings = [s['summary'] for s in health['summary']]
+ if len(summary_strings) == 0:
+ log.debug("Not expected number of summary strings ({0})".format(summary_strings))
+ return False
+ else:
+ for ss in summary_strings:
+ if pattern in ss:
+ return True
+
+ log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
+ return False
+
+ self.wait_until_true(seen_health_warning, timeout)
+
+ def wait_for_health_clear(self, timeout):
+ """
+ Wait until `ceph health` returns no messages
+ """
+ def is_clear():
+ health = self.fs.mon_manager.get_mon_health()
+ return len(health['summary']) == 0
+
+ self.wait_until_true(is_clear, timeout)
+
+ def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
+ period = 5
+ elapsed = 0
+ while True:
+ val = get_fn()
+ if val == expect_val:
+ return
+ elif reject_fn and reject_fn(val):
+ raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
+ else:
+ if elapsed >= timeout:
+ raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
+ elapsed, expect_val, val
+ ))
+ else:
+ log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
+ time.sleep(period)
+ elapsed += period
+
+ log.debug("wait_until_equal: success")
+
+ def wait_until_true(self, condition, timeout):
+ period = 5
+ elapsed = 0
+ while True:
+ if condition():
+ return
+ else:
+ if elapsed >= timeout:
+ raise RuntimeError("Timed out after {0} seconds".format(elapsed))
+ else:
+ log.debug("wait_until_true: waiting...")
+ time.sleep(period)
+ elapsed += period
+
+ log.debug("wait_until_true: success")
import json
import logging
-import unittest
from unittest import case
-import time
+from tasks.ceph_test_case import CephTestCase
import os
import re
from StringIO import StringIO
return f
-class CephFSTestCase(unittest.TestCase):
+class CephFSTestCase(CephTestCase):
"""
Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
Handles resetting the cluster under test between tests.
"""
- # Environment references
- mounts = None
- fs = None
- mds_cluster = None
- ctx = None
# FIXME weird explicit naming
mount_a = None
def _session_by_id(self, session_ls):
return dict([(s['id'], s) for s in session_ls])
- def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None):
- period = 5
- elapsed = 0
- while True:
- val = get_fn()
- if val == expect_val:
- return
- elif reject_fn and reject_fn(val):
- raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
- else:
- if elapsed >= timeout:
- raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
- elapsed, expect_val, val
- ))
- else:
- log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val))
- time.sleep(period)
- elapsed += period
-
- log.debug("wait_until_equal: success")
-
- def wait_until_true(self, condition, timeout):
- period = 5
- elapsed = 0
- while True:
- if condition():
- return
- else:
- if elapsed >= timeout:
- raise RuntimeError("Timed out after {0} seconds".format(elapsed))
- else:
- log.debug("wait_until_true: waiting...")
- time.sleep(period)
- elapsed += period
-
- log.debug("wait_until_true: success")
-
def wait_for_daemon_start(self, daemon_ids=None):
"""
Wait until all the daemons appear in the FSMap, either assigned
else:
raise AssertionError("MDS daemon '{0}' did not crash as expected".format(daemon_id))
-
- def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
- """
- Context manager. Assert that during execution, or up to 5 seconds later,
- the Ceph cluster log emits a message matching the expected pattern.
-
- :param expected_pattern: a string that you expect to see in the log output
- """
-
- ceph_manager = self.fs.mon_manager
-
- class ContextManager(object):
- def match(self):
- found = expected_pattern in self.watcher_process.stdout.getvalue()
- if invert_match:
- return not found
-
- return found
-
- def __enter__(self):
- self.watcher_process = ceph_manager.run_ceph_w()
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- if not self.watcher_process.finished:
- # Check if we got an early match, wait a bit if we didn't
- if self.match():
- return
- else:
- log.debug("No log hits yet, waiting...")
- # Default monc tick interval is 10s, so wait that long and
- # then some grace
- time.sleep(5 + timeout)
-
- self.watcher_process.stdin.close()
- try:
- self.watcher_process.wait()
- except CommandFailedError:
- pass
-
- if not self.match():
- log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
- raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
-
- return ContextManager()
-
- def wait_for_health(self, pattern, timeout):
- """
- Wait until 'ceph health' contains messages matching the pattern
- """
- def seen_health_warning():
- health = self.fs.mon_manager.get_mon_health()
- summary_strings = [s['summary'] for s in health['summary']]
- if len(summary_strings) == 0:
- log.debug("Not expected number of summary strings ({0})".format(summary_strings))
- return False
- else:
- for ss in summary_strings:
- if pattern in ss:
- return True
-
- log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
- return False
-
- self.wait_until_true(seen_health_warning, timeout)
-
- def wait_for_health_clear(self, timeout):
- """
- Wait until `ceph health` returns no messages
- """
- def is_clear():
- health = self.fs.mon_manager.get_mon_health()
- return len(health['summary']) == 0
-
- self.wait_until_true(is_clear, timeout)
return "Object not found: '{0}'".format(self._object_name)
-class MDSCluster(object):
+class CephCluster(object):
+ @property
+ def admin_remote(self):
+ first_mon = misc.get_first_mon(self._ctx, None)
+ (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
+ return result
+
+ def __init__(self, ctx):
+ self._ctx = ctx
+ self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
+
+ def get_config(self, key, service_type=None):
+ """
+ Get config from mon by default, or a specific service if caller asks for it
+ """
+ if service_type is None:
+ service_type = 'mon'
+
+ service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
+ return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+ def set_ceph_conf(self, subsys, key, value):
+ if subsys not in self._ctx.ceph['ceph'].conf:
+ self._ctx.ceph['ceph'].conf[subsys] = {}
+ self._ctx.ceph['ceph'].conf[subsys][key] = value
+ write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
+ # used a different config path this won't work.
+
+ def clear_ceph_conf(self, subsys, key):
+ del self._ctx.ceph['ceph'].conf[subsys][key]
+ write_conf(self._ctx)
+
+ def json_asok(self, command, service_type, service_id):
+ proc = self.mon_manager.admin_socket(service_type, service_id, command)
+ response_data = proc.stdout.getvalue()
+ log.info("_json_asok output: {0}".format(response_data))
+ if response_data.strip():
+ return json.loads(response_data)
+ else:
+ return None
+
+
+class MDSCluster(CephCluster):
"""
Collective operations on all the MDS daemons in the Ceph cluster. These
daemons may be in use by various Filesystems.
a parent of Filesystem. The correct way to use MDSCluster going forward is
as a separate instance outside of your (multiple) Filesystem instances.
"""
-
- @property
- def admin_remote(self):
- first_mon = misc.get_first_mon(self._ctx, None)
- (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
- return result
-
def __init__(self, ctx):
+ super(MDSCluster, self).__init__(ctx)
+
self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
- self._ctx = ctx
if len(self.mds_ids) == 0:
raise RuntimeError("This task requires at least one MDS")
- self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
if hasattr(self._ctx, "daemons"):
# Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
return list(result)
- def get_config(self, key, service_type=None):
- """
- Get config from mon by default, or a specific service if caller asks for it
- """
- if service_type is None:
- service_type = 'mon'
-
- service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
- return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
- def set_ceph_conf(self, subsys, key, value):
- if subsys not in self._ctx.ceph['ceph'].conf:
- self._ctx.ceph['ceph'].conf[subsys] = {}
- self._ctx.ceph['ceph'].conf[subsys][key] = value
- write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
- # used a different config path this won't work.
-
- def clear_ceph_conf(self, subsys, key):
- del self._ctx.ceph['ceph'].conf[subsys][key]
- write_conf(self._ctx)
-
- def json_asok(self, command, service_type, service_id):
- proc = self.mon_manager.admin_socket(service_type, service_id, command)
- response_data = proc.stdout.getvalue()
- log.info("_json_asok output: {0}".format(response_data))
- if response_data.strip():
- return json.loads(response_data)
- else:
- return None
-
def set_clients_block(self, blocked, mds_id=None):
"""
Block (using iptables) client communications to this MDS. Be careful: if
+++ /dev/null
-"""
-vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
-ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
-quickly during development.
-
-Usage (assuming teuthology, ceph, ceph-qa-suite checked out in ~/git):
-
- # Activate the teuthology virtualenv
- source ~/git/teuthology/virtualenv/bin/activate
- # Go into your ceph build directory
- cd ~/git/ceph/build
- # Start a vstart cluster
- MDS=2 MON=1 OSD=3 ../src/vstart.sh -n
- # Invoke a test using this script, with PYTHONPATH set appropriately
- python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
-
- # Alternatively, if you use different paths, specify them as follows:
- LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph-qa-suite:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
-
- # If you wish to drop to a python shell on failures, use --interactive:
- python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py --interactive
-
- # If you wish to run a named test case, pass it as an argument:
- python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py tasks.cephfs.test_data_scan
-
-"""
-
-from StringIO import StringIO
-from collections import defaultdict
-import getpass
-import signal
-import tempfile
-import threading
-import datetime
-import shutil
-import re
-import os
-import time
-import json
-import sys
-import errno
-from unittest import suite
-import unittest
-import platform
-from teuthology.orchestra.run import Raw, quote
-from teuthology.orchestra.daemon import DaemonGroup
-from teuthology.config import config as teuth_config
-
-import logging
-
-log = logging.getLogger(__name__)
-
-handler = logging.FileHandler("./vstart_runner.log")
-formatter = logging.Formatter(
- fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
- datefmt='%Y-%m-%dT%H:%M:%S')
-handler.setFormatter(formatter)
-log.addHandler(handler)
-log.setLevel(logging.INFO)
-
-
-def respawn_in_path(lib_path, python_paths):
- execv_cmd = ['python']
- if platform.system() == "Darwin":
- lib_path_var = "DYLD_LIBRARY_PATH"
- else:
- lib_path_var = "LD_LIBRARY_PATH"
-
- py_binary = os.environ.get("PYTHON", "python")
-
- if lib_path_var in os.environ:
- if lib_path not in os.environ[lib_path_var]:
- os.environ[lib_path_var] += ':' + lib_path
- os.execvp(py_binary, execv_cmd + sys.argv)
- else:
- os.environ[lib_path_var] = lib_path
- os.execvp(py_binary, execv_cmd + sys.argv)
-
- for p in python_paths:
- sys.path.insert(0, p)
-
-
-# Let's use some sensible defaults
-if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
-
- # A list of candidate paths for each package we need
- guesses = [
- ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
- ["~/git/ceph-qa-suite", "~/scm/ceph-qa-suite", "~/ceph-qa-suite"],
- ["lib/cython_modules/lib.2"],
- ["../src/pybind"],
- ]
-
- python_paths = []
- for package_guesses in guesses:
- for g in package_guesses:
- g_exp = os.path.abspath(os.path.expanduser(g))
- if os.path.exists(g_exp):
- python_paths.append(g_exp)
-
- ld_path = os.path.join(os.getcwd(), "lib/")
- print "Using guessed paths {0} {1}".format(ld_path, python_paths)
- respawn_in_path(ld_path, python_paths)
-
-
-try:
- from teuthology.exceptions import CommandFailedError
- from tasks.ceph_manager import CephManager
- from tasks.cephfs.fuse_mount import FuseMount
- from tasks.cephfs.filesystem import Filesystem, MDSCluster
- from teuthology.contextutil import MaxWhileTries
- from teuthology.task import interactive
-except ImportError:
- sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
- "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
- raise
-
-# Must import after teuthology because of gevent monkey patching
-import subprocess
-
-if os.path.exists("./CMakeCache.txt"):
- # Running in build dir of a cmake build
- BIN_PREFIX = "./bin/"
-else:
- # Running in src/ of an autotools build
- BIN_PREFIX = "./"
-
-
-class LocalRemoteProcess(object):
- def __init__(self, args, subproc, check_status, stdout, stderr):
- self.args = args
- self.subproc = subproc
- if stdout is None:
- self.stdout = StringIO()
- else:
- self.stdout = stdout
-
- if stderr is None:
- self.stderr = StringIO()
- else:
- self.stderr = stderr
-
- self.check_status = check_status
- self.exitstatus = self.returncode = None
-
- def wait(self):
- if self.finished:
- # Avoid calling communicate() on a dead process because it'll
- # give you stick about std* already being closed
- if self.exitstatus != 0:
- raise CommandFailedError(self.args, self.exitstatus)
- else:
- return
-
- out, err = self.subproc.communicate()
- self.stdout.write(out)
- self.stderr.write(err)
-
- self.exitstatus = self.returncode = self.subproc.returncode
-
- if self.exitstatus != 0:
- sys.stderr.write(out)
- sys.stderr.write(err)
-
- if self.check_status and self.exitstatus != 0:
- raise CommandFailedError(self.args, self.exitstatus)
-
- @property
- def finished(self):
- if self.exitstatus is not None:
- return True
-
- if self.subproc.poll() is not None:
- out, err = self.subproc.communicate()
- self.stdout.write(out)
- self.stderr.write(err)
- self.exitstatus = self.returncode = self.subproc.returncode
- return True
- else:
- return False
-
- def kill(self):
- log.info("kill ")
- if self.subproc.pid and not self.finished:
- log.info("kill: killing pid {0} ({1})".format(
- self.subproc.pid, self.args))
- safe_kill(self.subproc.pid)
- else:
- log.info("kill: already terminated ({0})".format(self.args))
-
- @property
- def stdin(self):
- class FakeStdIn(object):
- def __init__(self, mount_daemon):
- self.mount_daemon = mount_daemon
-
- def close(self):
- self.mount_daemon.kill()
-
- return FakeStdIn(self)
-
-
-class LocalRemote(object):
- """
- Amusingly named class to present the teuthology RemoteProcess interface when we are really
- running things locally for vstart
-
- Run this inside your src/ dir!
- """
-
- def __init__(self):
- self.name = "local"
- self.hostname = "localhost"
- self.user = getpass.getuser()
-
- def get_file(self, path, sudo, dest_dir):
- tmpfile = tempfile.NamedTemporaryFile(delete=False).name
- shutil.copy(path, tmpfile)
- return tmpfile
-
- def put_file(self, src, dst, sudo=False):
- shutil.copy(src, dst)
-
- def run(self, args, check_status=True, wait=True,
- stdout=None, stderr=None, cwd=None, stdin=None,
- logger=None, label=None):
- log.info("run args={0}".format(args))
-
- # We don't need no stinkin' sudo
- args = [a for a in args if a != "sudo"]
-
- # We have to use shell=True if any run.Raw was present, e.g. &&
- shell = any([a for a in args if isinstance(a, Raw)])
-
- if shell:
- filtered = []
- i = 0
- while i < len(args):
- if args[i] == 'adjust-ulimits':
- i += 1
- elif args[i] == 'ceph-coverage':
- i += 2
- elif args[i] == 'timeout':
- i += 2
- else:
- filtered.append(args[i])
- i += 1
-
- args = quote(filtered)
- log.info("Running {0}".format(args))
-
- subproc = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdin=subprocess.PIPE,
- cwd=cwd,
- shell=True)
- else:
- log.info("Running {0}".format(args))
-
- for arg in args:
- if not isinstance(arg, basestring):
- raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
- arg, arg.__class__
- ))
-
- subproc = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdin=subprocess.PIPE,
- cwd=cwd)
-
- if stdin:
- if not isinstance(stdin, basestring):
- raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
-
- # Hack: writing to stdin is not deadlock-safe, but it "always" works
- # as long as the input buffer is "small"
- subproc.stdin.write(stdin)
-
- proc = LocalRemoteProcess(
- args, subproc, check_status,
- stdout, stderr
- )
-
- if wait:
- proc.wait()
-
- return proc
-
-
-# FIXME: twiddling vstart daemons is likely to be unreliable, we should probably just let vstart
-# run RADOS and run the MDS daemons directly from the test runner
-class LocalDaemon(object):
- def __init__(self, daemon_type, daemon_id):
- self.daemon_type = daemon_type
- self.daemon_id = daemon_id
- self.controller = LocalRemote()
- self.proc = None
-
- @property
- def remote(self):
- return LocalRemote()
-
- def running(self):
- return self._get_pid() is not None
-
- def _get_pid(self):
- """
- Return PID as an integer or None if not found
- """
- ps_txt = self.controller.run(
- args=["ps", "-xwwu"+str(os.getuid())]
- ).stdout.getvalue().strip()
- lines = ps_txt.split("\n")[1:]
-
- for line in lines:
- if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
- log.info("Found ps line for daemon: {0}".format(line))
- return int(line.split()[1])
- log.info("No match for {0} {1}: {2}".format(
- self.daemon_type, self.daemon_id, ps_txt
- ))
- return None
-
- def wait(self, timeout):
- waited = 0
- while self._get_pid() is not None:
- if waited > timeout:
- raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
- time.sleep(1)
- waited += 1
-
- def stop(self, timeout=300):
- if not self.running():
- log.error('tried to stop a non-running daemon')
- return
-
- pid = self._get_pid()
- log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
- os.kill(pid, signal.SIGKILL)
- self.wait(timeout=timeout)
-
- def restart(self):
- if self._get_pid() is not None:
- self.stop()
-
- self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
-
-
-def safe_kill(pid):
- """
- os.kill annoyingly raises exception if process already dead. Ignore it.
- """
- try:
- return os.kill(pid, signal.SIGKILL)
- except OSError as e:
- if e.errno == errno.ESRCH:
- # Raced with process termination
- pass
- else:
- raise
-
-
-class LocalFuseMount(FuseMount):
- def __init__(self, test_dir, client_id):
- super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
-
- @property
- def config_path(self):
- return "./ceph.conf"
-
- def get_keyring_path(self):
- # This is going to end up in a config file, so use an absolute path
- # to avoid assumptions about daemons' pwd
- return os.path.abspath("./client.{0}.keyring".format(self.client_id))
-
- def run_shell(self, args, wait=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.client_remote.run(
- args, wait=wait, cwd=self.mountpoint
- )
-
- @property
- def _prefix(self):
- return BIN_PREFIX
-
- def _asok_path(self):
- # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
- # run foreground. When running it daemonized however, the asok is named after
- # the PID of the launching process, not the long running ceph-fuse process. Therefore
- # we need to give an exact path here as the logic for checking /proc/ for which
- # asok is alive does not work.
- path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
- log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
- return path
-
- def umount(self):
- if self.is_mounted():
- super(LocalFuseMount, self).umount()
-
- def mount(self, mount_path=None):
- self.client_remote.run(
- args=[
- 'mkdir',
- '--',
- self.mountpoint,
- ],
- )
-
- def list_connections():
- self.client_remote.run(
- args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
- check_status=False
- )
- p = self.client_remote.run(
- args=["ls", "/sys/fs/fuse/connections"],
- check_status=False
- )
- if p.exitstatus != 0:
- log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
- return []
-
- ls_str = p.stdout.getvalue().strip()
- if ls_str:
- return [int(n) for n in ls_str.split("\n")]
- else:
- return []
-
- # Before starting ceph-fuse process, note the contents of
- # /sys/fs/fuse/connections
- pre_mount_conns = list_connections()
- log.info("Pre-mount connections: {0}".format(pre_mount_conns))
-
- prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
- if os.getuid() != 0:
- prefix += ["--client-die-on-failed-remount=false"]
-
- if mount_path is not None:
- prefix += ["--client_mountpoint={0}".format(mount_path)]
-
- self.fuse_daemon = self.client_remote.run(args=
- prefix + [
- "-f",
- "--name",
- "client.{0}".format(self.client_id),
- self.mountpoint
- ], wait=False)
-
- log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
-
- # Wait for the connection reference to appear in /sys
- waited = 0
- post_mount_conns = list_connections()
- while len(post_mount_conns) <= len(pre_mount_conns):
- if self.fuse_daemon.finished:
- # Did mount fail? Raise the CommandFailedError instead of
- # hitting the "failed to populate /sys/" timeout
- self.fuse_daemon.wait()
- time.sleep(1)
- waited += 1
- if waited > 30:
- raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
- waited
- ))
- post_mount_conns = list_connections()
-
- log.info("Post-mount connections: {0}".format(post_mount_conns))
-
- # Record our fuse connection number so that we can use it when
- # forcing an unmount
- new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
- if len(new_conns) == 0:
- raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
- elif len(new_conns) > 1:
- raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
- else:
- self._fuse_conn = new_conns[0]
-
- def _run_python(self, pyscript):
- """
- Override this to remove the daemon-helper prefix that is used otherwise
- to make the process killable.
- """
- return self.client_remote.run(args=[
- 'python', '-c', pyscript
- ], wait=False)
-
-
-class LocalCephManager(CephManager):
- def __init__(self):
- # Deliberately skip parent init, only inheriting from it to get
- # util methods like osd_dump that sit on top of raw_cluster_cmd
- self.controller = LocalRemote()
-
- # A minority of CephManager fns actually bother locking for when
- # certain teuthology tests want to run tasks in parallel
- self.lock = threading.RLock()
-
- def find_remote(self, daemon_type, daemon_id):
- """
- daemon_type like 'mds', 'osd'
- daemon_id like 'a', '0'
- """
- return LocalRemote()
-
- def run_ceph_w(self):
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
- return proc
-
- def raw_cluster_cmd(self, *args):
- """
- args like ["osd", "dump"}
- return stdout string
- """
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
- return proc.stdout.getvalue()
-
- def raw_cluster_cmd_result(self, *args):
- """
- like raw_cluster_cmd but don't check status, just return rc
- """
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
- return proc.exitstatus
-
- def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
- return self.controller.run(
- args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
- )
-
- # FIXME: copypasta
- def get_mds_status(self, mds):
- """
- Run cluster commands for the mds in order to get mds information
- """
- out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
- j = json.loads(' '.join(out.splitlines()[1:]))
- # collate; for dup ids, larger gid wins.
- for info in j['info'].itervalues():
- if info['name'] == mds:
- return info
- return None
-
- # FIXME: copypasta
- def get_mds_status_by_rank(self, rank):
- """
- Run cluster commands for the mds in order to get mds information
- check rank.
- """
- j = self.get_mds_status_all()
- # collate; for dup ids, larger gid wins.
- for info in j['info'].itervalues():
- if info['rank'] == rank:
- return info
- return None
-
- def get_mds_status_all(self):
- """
- Run cluster command to extract all the mds status.
- """
- out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
- j = json.loads(' '.join(out.splitlines()[1:]))
- return j
-
-
-class LocalMDSCluster(MDSCluster):
- def __init__(self, ctx):
- # Deliberately skip calling parent constructor
- self._ctx = ctx
-
- self.mds_ids = ctx.daemons.daemons['mds'].keys()
- if not self.mds_ids:
- raise RuntimeError("No MDSs found in ceph.conf!")
-
- self.mon_manager = LocalCephManager()
- self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
-
- self._conf = defaultdict(dict)
-
- def get_config(self, key, service_type=None):
- if service_type is None:
- service_type = 'mon'
-
- # FIXME hardcoded vstart service IDs
- service_id = {
- 'mon': 'a',
- 'mds': 'a',
- 'osd': '0'
- }[service_type]
-
- return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
- def _write_conf(self):
- # In teuthology, we have the honour of writing the entire ceph.conf, but
- # in vstart land it has mostly already been written and we need to carefully
- # append to it.
- conf_path = "./ceph.conf"
- banner = "\n#LOCAL_TEST\n"
- existing_str = open(conf_path).read()
-
- if banner in existing_str:
- existing_str = existing_str[0:existing_str.find(banner)]
-
- existing_str += banner
-
- for subsys, kvs in self._conf.items():
- existing_str += "\n[{0}]\n".format(subsys)
- for key, val in kvs.items():
- # Comment out existing instance if it exists
- log.info("Searching for existing instance {0}/{1}".format(
- key, subsys
- ))
- existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
- subsys
- ), existing_str, re.MULTILINE)
-
- if existing_section:
- section_str = existing_str[existing_section.start():existing_section.end()]
- existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
- if existing_val:
- start = existing_section.start() + existing_val.start(1)
- log.info("Found string to replace at {0}".format(
- start
- ))
- existing_str = existing_str[0:start] + "#" + existing_str[start:]
-
- existing_str += "{0} = {1}\n".format(key, val)
-
- open(conf_path, "w").write(existing_str)
-
- def set_ceph_conf(self, subsys, key, value):
- self._conf[subsys][key] = value
- self._write_conf()
-
- def clear_ceph_conf(self, subsys, key):
- del self._conf[subsys][key]
- self._write_conf()
-
- def clear_firewall(self):
- # FIXME: unimplemented
- pass
-
- def get_filesystem(self, name):
- return LocalFilesystem(self._ctx, name)
-
-
-class LocalFilesystem(Filesystem, LocalMDSCluster):
- @property
- def admin_remote(self):
- return LocalRemote()
-
- def __init__(self, ctx, name=None):
- # Deliberately skip calling parent constructor
- self._ctx = ctx
-
- if name is None:
- name = "cephfs"
-
- self.name = name
- self.metadata_pool_name = "{0}_metadata".format(name)
- self.data_pool_name = "{0}_data".format(name)
-
- # Hack: cheeky inspection of ceph.conf to see what MDSs exist
- self.mds_ids = set()
- for line in open("ceph.conf").readlines():
- match = re.match("^\[mds\.(.+)\]$", line)
- if match:
- self.mds_ids.add(match.group(1))
-
- if not self.mds_ids:
- raise RuntimeError("No MDSs found in ceph.conf!")
-
- self.mds_ids = list(self.mds_ids)
-
- log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
-
- self.mon_manager = LocalCephManager()
-
- self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
-
- self.client_remote = LocalRemote()
-
- self._conf = defaultdict(dict)
-
- @property
- def _prefix(self):
- return BIN_PREFIX
-
- def set_clients_block(self, blocked, mds_id=None):
- raise NotImplementedError()
-
- def get_pgs_per_fs_pool(self):
- # FIXME: assuming there are 3 OSDs
- return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
-
-
-class InteractiveFailureResult(unittest.TextTestResult):
- """
- Specialization that implements interactive-on-error style
- behavior.
- """
- def addFailure(self, test, err):
- super(InteractiveFailureResult, self).addFailure(test, err)
- log.error(self._exc_info_to_string(err, test))
- log.error("Failure in test '{0}', going interactive".format(
- self.getDescription(test)
- ))
- interactive.task(ctx=None, config=None)
-
- def addError(self, test, err):
- super(InteractiveFailureResult, self).addError(test, err)
- log.error(self._exc_info_to_string(err, test))
- log.error("Error in test '{0}', going interactive".format(
- self.getDescription(test)
- ))
- interactive.task(ctx=None, config=None)
-
-
-def exec_test():
- # Help developers by stopping up-front if their tree isn't built enough for all the
- # tools that the tests might want to use (add more here if needed)
- require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
- "cephfs-table-tool", "ceph-fuse", "rados"]
- missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
- if missing_binaries:
- log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
- sys.exit(-1)
-
- test_dir = tempfile.mkdtemp()
-
- # Create as many of these as the biggest test requires
- clients = ["0", "1", "2", "3"]
-
- remote = LocalRemote()
-
- # Tolerate no MDSs or clients running at start
- ps_txt = remote.run(
- args=["ps", "-u"+str(os.getuid())]
- ).stdout.getvalue().strip()
- lines = ps_txt.split("\n")[1:]
-
- for line in lines:
- if 'ceph-fuse' in line or 'ceph-mds' in line:
- pid = int(line.split()[0])
- log.warn("Killing stray process {0}".format(line))
- os.kill(pid, signal.SIGKILL)
-
- class LocalCluster(object):
- def __init__(self, rolename="placeholder"):
- self.remotes = {
- remote: [rolename]
- }
-
- def only(self, requested):
- return self.__class__(rolename=requested)
-
- teuth_config['test_path'] = test_dir
-
- class LocalContext(object):
- def __init__(self):
- self.config = {}
- self.teuthology_config = teuth_config
- self.cluster = LocalCluster()
- self.daemons = DaemonGroup()
-
- # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
- # tests that want to look these up via ctx can do so.
- # Inspect ceph.conf to see what roles exist
- for conf_line in open("ceph.conf").readlines():
- for svc_type in ["mon", "osd", "mds"]:
- if svc_type not in self.daemons.daemons:
- self.daemons.daemons[svc_type] = {}
- match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
- if match:
- svc_id = match.group(1)
- self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
-
- def __del__(self):
- shutil.rmtree(self.teuthology_config['test_path'])
-
- ctx = LocalContext()
-
- mounts = []
- for client_id in clients:
- # Populate client keyring (it sucks to use client.admin for test clients
- # because it's awkward to find the logs later)
- client_name = "client.{0}".format(client_id)
-
- if client_name not in open("./keyring").read():
- p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
- "osd", "allow rw",
- "mds", "allow",
- "mon", "allow r"])
-
- open("./keyring", "a").write(p.stdout.getvalue())
-
- mount = LocalFuseMount(test_dir, client_id)
- mounts.append(mount)
- if mount.is_mounted():
- log.warn("unmounting {0}".format(mount.mountpoint))
- mount.umount_wait()
- else:
- if os.path.exists(mount.mountpoint):
- os.rmdir(mount.mountpoint)
- filesystem = LocalFilesystem(ctx)
- mds_cluster = LocalMDSCluster(ctx)
-
- from tasks.cephfs_test_runner import DecoratingLoader
-
- class LogStream(object):
- def __init__(self):
- self.buffer = ""
-
- def write(self, data):
- self.buffer += data
- if "\n" in self.buffer:
- lines = self.buffer.split("\n")
- for line in lines[:-1]:
- pass
- # sys.stderr.write(line + "\n")
- log.info(line)
- self.buffer = lines[-1]
-
- def flush(self):
- pass
-
- decorating_loader = DecoratingLoader({
- "ctx": ctx,
- "mounts": mounts,
- "fs": filesystem,
- "mds_cluster": mds_cluster
- })
-
- # For the benefit of polling tests like test_full -- in teuthology land we set this
- # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
- remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
- filesystem.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
-
- # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
- # from normal IO latency. Increase it for running teests.
- filesystem.set_ceph_conf("mds", "mds log max segments", "10")
-
- # Make sure the filesystem created in tests has uid/gid that will let us talk to
- # it after mounting it (without having to go root). Set in 'global' not just 'mds'
- # so that cephfs-data-scan will pick it up too.
- filesystem.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
- filesystem.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
-
- # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
- def _get_package_version(remote, pkg_name):
- # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
- return "2.9"
-
- import teuthology.packaging
- teuthology.packaging.get_package_version = _get_package_version
-
- def enumerate_methods(s):
- for t in s._tests:
- if isinstance(t, suite.BaseTestSuite):
- for sub in enumerate_methods(t):
- yield sub
- else:
- yield s, t
-
- interactive_on_error = False
-
- args = sys.argv[1:]
- flags = [a for a in args if a.startswith("-")]
- modules = [a for a in args if not a.startswith("-")]
- for f in flags:
- if f == "--interactive":
- interactive_on_error = True
- else:
- log.error("Unknown option '{0}'".format(f))
- sys.exit(-1)
-
- if modules:
- log.info("Executing modules: {0}".format(modules))
- module_suites = []
- for mod_name in modules:
- # Test names like cephfs.test_auto_repair
- module_suites.append(decorating_loader.loadTestsFromName(mod_name))
- log.info("Loaded: {0}".format(list(module_suites)))
- overall_suite = suite.TestSuite(module_suites)
- else:
- log.info("Excuting all tests")
- overall_suite = decorating_loader.discover(
- os.path.dirname(os.path.abspath(__file__))
- )
-
- # Filter out tests that don't lend themselves to interactive running,
- victims = []
- for case, method in enumerate_methods(overall_suite):
- fn = getattr(method, method._testMethodName)
-
- drop_test = False
-
- if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
- drop_test = True
- log.warn("Dropping test because long running: ".format(method.id()))
-
- if getattr(fn, "needs_trimming", False) is True:
- drop_test = (os.getuid() != 0)
- log.warn("Dropping test because client trim unavailable: ".format(method.id()))
-
- if drop_test:
- # Don't drop the test if it was explicitly requested in arguments
- is_named = False
- for named in modules:
- if named.endswith(method.id()):
- is_named = True
- break
-
- if not is_named:
- victims.append((case, method))
-
- log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
- for s, method in victims:
- s._tests.remove(method)
-
- if interactive_on_error:
- result_class = InteractiveFailureResult
- else:
- result_class = unittest.TextTestResult
- fail_on_skip = False
-
- class LoggingResult(result_class):
- def startTest(self, test):
- log.info("Starting test: {0}".format(self.getDescription(test)))
- test.started_at = datetime.datetime.utcnow()
- return super(LoggingResult, self).startTest(test)
-
- def stopTest(self, test):
- log.info("Stopped test: {0} in {1}s".format(
- self.getDescription(test),
- (datetime.datetime.utcnow() - test.started_at).total_seconds()
- ))
-
- def addSkip(self, test, reason):
- if fail_on_skip:
- # Don't just call addFailure because that requires a traceback
- self.failures.append((test, reason))
- else:
- super(LoggingResult, self).addSkip(test, reason)
-
- # Execute!
- result = unittest.TextTestRunner(
- stream=LogStream(),
- resultclass=LoggingResult,
- verbosity=2,
- failfast=True).run(overall_suite)
-
- if not result.wasSuccessful():
- result.printErrors() # duplicate output at end for convenience
-
- bad_tests = []
- for test, error in result.errors:
- bad_tests.append(str(test))
- for test, failure in result.failures:
- bad_tests.append(str(test))
-
- sys.exit(-1)
- else:
- sys.exit(0)
-
-
-if __name__ == "__main__":
- exec_test()
--- /dev/null
+"""
+vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
+ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
+quickly during development.
+
+Usage (assuming teuthology, ceph, ceph-qa-suite checked out in ~/git):
+
+ # Activate the teuthology virtualenv
+ source ~/git/teuthology/virtualenv/bin/activate
+ # Go into your ceph build directory
+ cd ~/git/ceph/build
+ # Start a vstart cluster
+ MDS=2 MON=1 OSD=3 ../src/vstart.sh -n
+ # Invoke a test using this script, with PYTHONPATH set appropriately
+ python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
+
+ # Alternatively, if you use different paths, specify them as follows:
+ LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph-qa-suite:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py
+
+ # If you wish to drop to a python shell on failures, use --interactive:
+ python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py --interactive
+
+ # If you wish to run a named test case, pass it as an argument:
+ python ~/git/ceph-qa-suite/tasks/cephfs/vstart_runner.py tasks.cephfs.test_data_scan
+
+"""
+
+from StringIO import StringIO
+from collections import defaultdict
+import getpass
+import signal
+import tempfile
+import threading
+import datetime
+import shutil
+import re
+import os
+import time
+import json
+import sys
+import errno
+from unittest import suite
+import unittest
+import platform
+from teuthology.orchestra.run import Raw, quote
+from teuthology.orchestra.daemon import DaemonGroup
+from teuthology.config import config as teuth_config
+
+import logging
+
+log = logging.getLogger(__name__)
+
+handler = logging.FileHandler("./vstart_runner.log")
+formatter = logging.Formatter(
+ fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
+ datefmt='%Y-%m-%dT%H:%M:%S')
+handler.setFormatter(formatter)
+log.addHandler(handler)
+log.setLevel(logging.INFO)
+
+
+def respawn_in_path(lib_path, python_paths):
+ execv_cmd = ['python']
+ if platform.system() == "Darwin":
+ lib_path_var = "DYLD_LIBRARY_PATH"
+ else:
+ lib_path_var = "LD_LIBRARY_PATH"
+
+ py_binary = os.environ.get("PYTHON", "python")
+
+ if lib_path_var in os.environ:
+ if lib_path not in os.environ[lib_path_var]:
+ os.environ[lib_path_var] += ':' + lib_path
+ os.execvp(py_binary, execv_cmd + sys.argv)
+ else:
+ os.environ[lib_path_var] = lib_path
+ os.execvp(py_binary, execv_cmd + sys.argv)
+
+ for p in python_paths:
+ sys.path.insert(0, p)
+
+
+# Let's use some sensible defaults
+if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
+
+ # A list of candidate paths for each package we need
+ guesses = [
+ ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
+ ["~/git/ceph-qa-suite", "~/scm/ceph-qa-suite", "~/ceph-qa-suite"],
+ ["lib/cython_modules/lib.2"],
+ ["../src/pybind"],
+ ]
+
+ python_paths = []
+ for package_guesses in guesses:
+ for g in package_guesses:
+ g_exp = os.path.abspath(os.path.expanduser(g))
+ if os.path.exists(g_exp):
+ python_paths.append(g_exp)
+
+ ld_path = os.path.join(os.getcwd(), "lib/")
+ print "Using guessed paths {0} {1}".format(ld_path, python_paths)
+ respawn_in_path(ld_path, python_paths)
+
+
+try:
+ from teuthology.exceptions import CommandFailedError
+ from tasks.ceph_manager import CephManager
+ from tasks.cephfs.fuse_mount import FuseMount
+ from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
+ from mgr.mgr_test_case import MgrCluster
+ from teuthology.contextutil import MaxWhileTries
+ from teuthology.task import interactive
+except ImportError:
+ sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
+ "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
+ raise
+
+# Must import after teuthology because of gevent monkey patching
+import subprocess
+
+if os.path.exists("./CMakeCache.txt"):
+ # Running in build dir of a cmake build
+ BIN_PREFIX = "./bin/"
+else:
+ # Running in src/ of an autotools build
+ BIN_PREFIX = "./"
+
+
+class LocalRemoteProcess(object):
+ def __init__(self, args, subproc, check_status, stdout, stderr):
+ self.args = args
+ self.subproc = subproc
+ if stdout is None:
+ self.stdout = StringIO()
+ else:
+ self.stdout = stdout
+
+ if stderr is None:
+ self.stderr = StringIO()
+ else:
+ self.stderr = stderr
+
+ self.check_status = check_status
+ self.exitstatus = self.returncode = None
+
+ def wait(self):
+ if self.finished:
+ # Avoid calling communicate() on a dead process because it'll
+ # give you stick about std* already being closed
+ if self.exitstatus != 0:
+ raise CommandFailedError(self.args, self.exitstatus)
+ else:
+ return
+
+ out, err = self.subproc.communicate()
+ self.stdout.write(out)
+ self.stderr.write(err)
+
+ self.exitstatus = self.returncode = self.subproc.returncode
+
+ if self.exitstatus != 0:
+ sys.stderr.write(out)
+ sys.stderr.write(err)
+
+ if self.check_status and self.exitstatus != 0:
+ raise CommandFailedError(self.args, self.exitstatus)
+
+ @property
+ def finished(self):
+ if self.exitstatus is not None:
+ return True
+
+ if self.subproc.poll() is not None:
+ out, err = self.subproc.communicate()
+ self.stdout.write(out)
+ self.stderr.write(err)
+ self.exitstatus = self.returncode = self.subproc.returncode
+ return True
+ else:
+ return False
+
+ def kill(self):
+ log.info("kill ")
+ if self.subproc.pid and not self.finished:
+ log.info("kill: killing pid {0} ({1})".format(
+ self.subproc.pid, self.args))
+ safe_kill(self.subproc.pid)
+ else:
+ log.info("kill: already terminated ({0})".format(self.args))
+
+ @property
+ def stdin(self):
+ class FakeStdIn(object):
+ def __init__(self, mount_daemon):
+ self.mount_daemon = mount_daemon
+
+ def close(self):
+ self.mount_daemon.kill()
+
+ return FakeStdIn(self)
+
+
+class LocalRemote(object):
+ """
+ Amusingly named class to present the teuthology RemoteProcess interface when we are really
+ running things locally for vstart
+
+ Run this inside your src/ dir!
+ """
+
+ def __init__(self):
+ self.name = "local"
+ self.hostname = "localhost"
+ self.user = getpass.getuser()
+
+ def get_file(self, path, sudo, dest_dir):
+ tmpfile = tempfile.NamedTemporaryFile(delete=False).name
+ shutil.copy(path, tmpfile)
+ return tmpfile
+
+ def put_file(self, src, dst, sudo=False):
+ shutil.copy(src, dst)
+
+ def run(self, args, check_status=True, wait=True,
+ stdout=None, stderr=None, cwd=None, stdin=None,
+ logger=None, label=None):
+ log.info("run args={0}".format(args))
+
+ # We don't need no stinkin' sudo
+ args = [a for a in args if a != "sudo"]
+
+ # We have to use shell=True if any run.Raw was present, e.g. &&
+ shell = any([a for a in args if isinstance(a, Raw)])
+
+ if shell:
+ filtered = []
+ i = 0
+ while i < len(args):
+ if args[i] == 'adjust-ulimits':
+ i += 1
+ elif args[i] == 'ceph-coverage':
+ i += 2
+ elif args[i] == 'timeout':
+ i += 2
+ else:
+ filtered.append(args[i])
+ i += 1
+
+ args = quote(filtered)
+ log.info("Running {0}".format(args))
+
+ subproc = subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ cwd=cwd,
+ shell=True)
+ else:
+ log.info("Running {0}".format(args))
+
+ for arg in args:
+ if not isinstance(arg, basestring):
+ raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
+ arg, arg.__class__
+ ))
+
+ subproc = subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ cwd=cwd)
+
+ if stdin:
+ if not isinstance(stdin, basestring):
+ raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
+
+ # Hack: writing to stdin is not deadlock-safe, but it "always" works
+ # as long as the input buffer is "small"
+ subproc.stdin.write(stdin)
+
+ proc = LocalRemoteProcess(
+ args, subproc, check_status,
+ stdout, stderr
+ )
+
+ if wait:
+ proc.wait()
+
+ return proc
+
+
+class LocalDaemon(object):
+ def __init__(self, daemon_type, daemon_id):
+ self.daemon_type = daemon_type
+ self.daemon_id = daemon_id
+ self.controller = LocalRemote()
+ self.proc = None
+
+ @property
+ def remote(self):
+ return LocalRemote()
+
+ def running(self):
+ return self._get_pid() is not None
+
+ def _get_pid(self):
+ """
+ Return PID as an integer or None if not found
+ """
+ ps_txt = self.controller.run(
+ args=["ps", "-xwwu"+str(os.getuid())]
+ ).stdout.getvalue().strip()
+ lines = ps_txt.split("\n")[1:]
+
+ for line in lines:
+ if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
+ log.info("Found ps line for daemon: {0}".format(line))
+ return int(line.split()[1])
+ log.info("No match for {0} {1}: {2}".format(
+ self.daemon_type, self.daemon_id, ps_txt
+ ))
+ return None
+
+ def wait(self, timeout):
+ waited = 0
+ while self._get_pid() is not None:
+ if waited > timeout:
+ raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
+ time.sleep(1)
+ waited += 1
+
+ def stop(self, timeout=300):
+ if not self.running():
+ log.error('tried to stop a non-running daemon')
+ return
+
+ pid = self._get_pid()
+ log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
+ os.kill(pid, signal.SIGKILL)
+ self.wait(timeout=timeout)
+
+ def restart(self):
+ if self._get_pid() is not None:
+ self.stop()
+
+ self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
+
+
+def safe_kill(pid):
+ """
+ os.kill annoyingly raises exception if process already dead. Ignore it.
+ """
+ try:
+ return os.kill(pid, signal.SIGKILL)
+ except OSError as e:
+ if e.errno == errno.ESRCH:
+ # Raced with process termination
+ pass
+ else:
+ raise
+
+
+class LocalFuseMount(FuseMount):
+ def __init__(self, test_dir, client_id):
+ super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
+
+ @property
+ def config_path(self):
+ return "./ceph.conf"
+
+ def get_keyring_path(self):
+ # This is going to end up in a config file, so use an absolute path
+ # to avoid assumptions about daemons' pwd
+ return os.path.abspath("./client.{0}.keyring".format(self.client_id))
+
+ def run_shell(self, args, wait=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.client_remote.run(
+ args, wait=wait, cwd=self.mountpoint
+ )
+
+ @property
+ def _prefix(self):
+ return BIN_PREFIX
+
+ def _asok_path(self):
+ # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
+ # run foreground. When running it daemonized however, the asok is named after
+ # the PID of the launching process, not the long running ceph-fuse process. Therefore
+ # we need to give an exact path here as the logic for checking /proc/ for which
+ # asok is alive does not work.
+ path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
+ log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+ return path
+
+ def umount(self):
+ if self.is_mounted():
+ super(LocalFuseMount, self).umount()
+
+ def mount(self, mount_path=None):
+ self.client_remote.run(
+ args=[
+ 'mkdir',
+ '--',
+ self.mountpoint,
+ ],
+ )
+
+ def list_connections():
+ self.client_remote.run(
+ args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
+ check_status=False
+ )
+ p = self.client_remote.run(
+ args=["ls", "/sys/fs/fuse/connections"],
+ check_status=False
+ )
+ if p.exitstatus != 0:
+ log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
+ return []
+
+ ls_str = p.stdout.getvalue().strip()
+ if ls_str:
+ return [int(n) for n in ls_str.split("\n")]
+ else:
+ return []
+
+ # Before starting ceph-fuse process, note the contents of
+ # /sys/fs/fuse/connections
+ pre_mount_conns = list_connections()
+ log.info("Pre-mount connections: {0}".format(pre_mount_conns))
+
+ prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
+ if os.getuid() != 0:
+ prefix += ["--client-die-on-failed-remount=false"]
+
+ if mount_path is not None:
+ prefix += ["--client_mountpoint={0}".format(mount_path)]
+
+ self.fuse_daemon = self.client_remote.run(args=
+ prefix + [
+ "-f",
+ "--name",
+ "client.{0}".format(self.client_id),
+ self.mountpoint
+ ], wait=False)
+
+ log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
+
+ # Wait for the connection reference to appear in /sys
+ waited = 0
+ post_mount_conns = list_connections()
+ while len(post_mount_conns) <= len(pre_mount_conns):
+ if self.fuse_daemon.finished:
+ # Did mount fail? Raise the CommandFailedError instead of
+ # hitting the "failed to populate /sys/" timeout
+ self.fuse_daemon.wait()
+ time.sleep(1)
+ waited += 1
+ if waited > 30:
+ raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
+ waited
+ ))
+ post_mount_conns = list_connections()
+
+ log.info("Post-mount connections: {0}".format(post_mount_conns))
+
+ # Record our fuse connection number so that we can use it when
+ # forcing an unmount
+ new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
+ if len(new_conns) == 0:
+ raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
+ elif len(new_conns) > 1:
+ raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
+ else:
+ self._fuse_conn = new_conns[0]
+
+ def _run_python(self, pyscript):
+ """
+ Override this to remove the daemon-helper prefix that is used otherwise
+ to make the process killable.
+ """
+ return self.client_remote.run(args=[
+ 'python', '-c', pyscript
+ ], wait=False)
+
+
+class LocalCephManager(CephManager):
+ def __init__(self):
+ # Deliberately skip parent init, only inheriting from it to get
+ # util methods like osd_dump that sit on top of raw_cluster_cmd
+ self.controller = LocalRemote()
+
+ # A minority of CephManager fns actually bother locking for when
+ # certain teuthology tests want to run tasks in parallel
+ self.lock = threading.RLock()
+
+ def find_remote(self, daemon_type, daemon_id):
+ """
+ daemon_type like 'mds', 'osd'
+ daemon_id like 'a', '0'
+ """
+ return LocalRemote()
+
+ def run_ceph_w(self):
+ proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
+ return proc
+
+ def raw_cluster_cmd(self, *args):
+ """
+ args like ["osd", "dump"}
+ return stdout string
+ """
+ proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
+ return proc.stdout.getvalue()
+
+ def raw_cluster_cmd_result(self, *args):
+ """
+ like raw_cluster_cmd but don't check status, just return rc
+ """
+ proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
+ return proc.exitstatus
+
+ def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
+ return self.controller.run(
+ args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
+ )
+
+ # FIXME: copypasta
+ def get_mds_status(self, mds):
+ """
+ Run cluster commands for the mds in order to get mds information
+ """
+ out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+ j = json.loads(' '.join(out.splitlines()[1:]))
+ # collate; for dup ids, larger gid wins.
+ for info in j['info'].itervalues():
+ if info['name'] == mds:
+ return info
+ return None
+
+ # FIXME: copypasta
+ def get_mds_status_by_rank(self, rank):
+ """
+ Run cluster commands for the mds in order to get mds information
+ check rank.
+ """
+ j = self.get_mds_status_all()
+ # collate; for dup ids, larger gid wins.
+ for info in j['info'].itervalues():
+ if info['rank'] == rank:
+ return info
+ return None
+
+ def get_mds_status_all(self):
+ """
+ Run cluster command to extract all the mds status.
+ """
+ out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
+ j = json.loads(' '.join(out.splitlines()[1:]))
+ return j
+
+
+class LocalCephCluster(CephCluster):
+ def __init__(self, ctx):
+ # Deliberately skip calling parent constructor
+ self._ctx = ctx
+ self.mon_manager = LocalCephManager()
+ self._conf = defaultdict(dict)
+
+ def get_config(self, key, service_type=None):
+ if service_type is None:
+ service_type = 'mon'
+
+ # FIXME hardcoded vstart service IDs
+ service_id = {
+ 'mon': 'a',
+ 'mds': 'a',
+ 'osd': '0'
+ }[service_type]
+
+ return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+ def _write_conf(self):
+ # In teuthology, we have the honour of writing the entire ceph.conf, but
+ # in vstart land it has mostly already been written and we need to carefully
+ # append to it.
+ conf_path = "./ceph.conf"
+ banner = "\n#LOCAL_TEST\n"
+ existing_str = open(conf_path).read()
+
+ if banner in existing_str:
+ existing_str = existing_str[0:existing_str.find(banner)]
+
+ existing_str += banner
+
+ for subsys, kvs in self._conf.items():
+ existing_str += "\n[{0}]\n".format(subsys)
+ for key, val in kvs.items():
+ # Comment out existing instance if it exists
+ log.info("Searching for existing instance {0}/{1}".format(
+ key, subsys
+ ))
+ existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
+ subsys
+ ), existing_str, re.MULTILINE)
+
+ if existing_section:
+ section_str = existing_str[existing_section.start():existing_section.end()]
+ existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
+ if existing_val:
+ start = existing_section.start() + existing_val.start(1)
+ log.info("Found string to replace at {0}".format(
+ start
+ ))
+ existing_str = existing_str[0:start] + "#" + existing_str[start:]
+
+ existing_str += "{0} = {1}\n".format(key, val)
+
+ open(conf_path, "w").write(existing_str)
+
+ def set_ceph_conf(self, subsys, key, value):
+ self._conf[subsys][key] = value
+ self._write_conf()
+
+ def clear_ceph_conf(self, subsys, key):
+ del self._conf[subsys][key]
+ self._write_conf()
+
+
+class LocalMDSCluster(LocalCephCluster, MDSCluster):
+ def __init__(self, ctx):
+ super(LocalMDSCluster, self).__init__(ctx)
+
+ self.mds_ids = ctx.daemons.daemons['mds'].keys()
+ if not self.mds_ids:
+ raise RuntimeError("No MDSs found in ceph.conf!")
+
+ self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+ def clear_firewall(self):
+ # FIXME: unimplemented
+ pass
+
+ def get_filesystem(self, name):
+ return LocalFilesystem(self._ctx, name)
+
+
+class LocalMgrCluster(LocalCephCluster, MgrCluster):
+ def __init__(self, ctx):
+ super(LocalMgrCluster, self).__init__(ctx)
+
+ self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
+ if not self.mgr_ids:
+ raise RuntimeError("No manager daemonss found in ceph.conf!")
+
+ self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
+
+
+class LocalFilesystem(Filesystem, LocalMDSCluster):
+ @property
+ def admin_remote(self):
+ return LocalRemote()
+
+ def __init__(self, ctx, name=None):
+ # Deliberately skip calling parent constructor
+ self._ctx = ctx
+
+ if name is None:
+ name = "cephfs"
+
+ self.name = name
+ self.metadata_pool_name = "{0}_metadata".format(name)
+ self.data_pool_name = "{0}_data".format(name)
+
+ # Hack: cheeky inspection of ceph.conf to see what MDSs exist
+ self.mds_ids = set()
+ for line in open("ceph.conf").readlines():
+ match = re.match("^\[mds\.(.+)\]$", line)
+ if match:
+ self.mds_ids.add(match.group(1))
+
+ if not self.mds_ids:
+ raise RuntimeError("No MDSs found in ceph.conf!")
+
+ self.mds_ids = list(self.mds_ids)
+
+ log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
+
+ self.mon_manager = LocalCephManager()
+
+ self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+ self.client_remote = LocalRemote()
+
+ self._conf = defaultdict(dict)
+
+ @property
+ def _prefix(self):
+ return BIN_PREFIX
+
+ def set_clients_block(self, blocked, mds_id=None):
+ raise NotImplementedError()
+
+ def get_pgs_per_fs_pool(self):
+ # FIXME: assuming there are 3 OSDs
+ return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+ """
+ Specialization that implements interactive-on-error style
+ behavior.
+ """
+ def addFailure(self, test, err):
+ super(InteractiveFailureResult, self).addFailure(test, err)
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Failure in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=None, config=None)
+
+ def addError(self, test, err):
+ super(InteractiveFailureResult, self).addError(test, err)
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Error in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=None, config=None)
+
+
+def exec_test():
+ # Help developers by stopping up-front if their tree isn't built enough for all the
+ # tools that the tests might want to use (add more here if needed)
+ require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
+ "cephfs-table-tool", "ceph-fuse", "rados"]
+ missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
+ if missing_binaries:
+ log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
+ sys.exit(-1)
+
+ test_dir = tempfile.mkdtemp()
+
+ # Create as many of these as the biggest test requires
+ clients = ["0", "1", "2", "3"]
+
+ remote = LocalRemote()
+
+ # Tolerate no MDSs or clients running at start
+ ps_txt = remote.run(
+ args=["ps", "-u"+str(os.getuid())]
+ ).stdout.getvalue().strip()
+ lines = ps_txt.split("\n")[1:]
+
+ for line in lines:
+ if 'ceph-fuse' in line or 'ceph-mds' in line:
+ pid = int(line.split()[0])
+ log.warn("Killing stray process {0}".format(line))
+ os.kill(pid, signal.SIGKILL)
+
+ class LocalCluster(object):
+ def __init__(self, rolename="placeholder"):
+ self.remotes = {
+ remote: [rolename]
+ }
+
+ def only(self, requested):
+ return self.__class__(rolename=requested)
+
+ teuth_config['test_path'] = test_dir
+
+ class LocalContext(object):
+ def __init__(self):
+ self.config = {}
+ self.teuthology_config = teuth_config
+ self.cluster = LocalCluster()
+ self.daemons = DaemonGroup()
+
+ # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
+ # tests that want to look these up via ctx can do so.
+ # Inspect ceph.conf to see what roles exist
+ for conf_line in open("ceph.conf").readlines():
+ for svc_type in ["mon", "osd", "mds", "mgr"]:
+ if svc_type not in self.daemons.daemons:
+ self.daemons.daemons[svc_type] = {}
+ match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
+ if match:
+ svc_id = match.group(1)
+ self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
+
+ def __del__(self):
+ shutil.rmtree(self.teuthology_config['test_path'])
+
+ ctx = LocalContext()
+
+ mounts = []
+ for client_id in clients:
+ # Populate client keyring (it sucks to use client.admin for test clients
+ # because it's awkward to find the logs later)
+ client_name = "client.{0}".format(client_id)
+
+ if client_name not in open("./keyring").read():
+ p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
+ "osd", "allow rw",
+ "mds", "allow",
+ "mon", "allow r"])
+
+ open("./keyring", "a").write(p.stdout.getvalue())
+
+ mount = LocalFuseMount(test_dir, client_id)
+ mounts.append(mount)
+ if mount.is_mounted():
+ log.warn("unmounting {0}".format(mount.mountpoint))
+ mount.umount_wait()
+ else:
+ if os.path.exists(mount.mountpoint):
+ os.rmdir(mount.mountpoint)
+ filesystem = LocalFilesystem(ctx)
+ mds_cluster = LocalMDSCluster(ctx)
+ mgr_cluster = LocalMgrCluster(ctx)
+
+ from tasks.cephfs_test_runner import DecoratingLoader
+
+ class LogStream(object):
+ def __init__(self):
+ self.buffer = ""
+
+ def write(self, data):
+ self.buffer += data
+ if "\n" in self.buffer:
+ lines = self.buffer.split("\n")
+ for line in lines[:-1]:
+ pass
+ # sys.stderr.write(line + "\n")
+ log.info(line)
+ self.buffer = lines[-1]
+
+ def flush(self):
+ pass
+
+ decorating_loader = DecoratingLoader({
+ "ctx": ctx,
+ "mounts": mounts,
+ "fs": filesystem,
+ "mds_cluster": mds_cluster,
+ "mgr_cluster": mgr_cluster,
+ })
+
+ # For the benefit of polling tests like test_full -- in teuthology land we set this
+ # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
+ remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
+ filesystem.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
+
+ # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
+ # from normal IO latency. Increase it for running teests.
+ filesystem.set_ceph_conf("mds", "mds log max segments", "10")
+
+ # Make sure the filesystem created in tests has uid/gid that will let us talk to
+ # it after mounting it (without having to go root). Set in 'global' not just 'mds'
+ # so that cephfs-data-scan will pick it up too.
+ filesystem.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
+ filesystem.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
+
+ # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
+ def _get_package_version(remote, pkg_name):
+ # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
+ return "2.9"
+
+ import teuthology.packaging
+ teuthology.packaging.get_package_version = _get_package_version
+
+ def enumerate_methods(s):
+ for t in s._tests:
+ if isinstance(t, suite.BaseTestSuite):
+ for sub in enumerate_methods(t):
+ yield sub
+ else:
+ yield s, t
+
+ interactive_on_error = False
+
+ args = sys.argv[1:]
+ flags = [a for a in args if a.startswith("-")]
+ modules = [a for a in args if not a.startswith("-")]
+ for f in flags:
+ if f == "--interactive":
+ interactive_on_error = True
+ else:
+ log.error("Unknown option '{0}'".format(f))
+ sys.exit(-1)
+
+ if modules:
+ log.info("Executing modules: {0}".format(modules))
+ module_suites = []
+ for mod_name in modules:
+ # Test names like cephfs.test_auto_repair
+ module_suites.append(decorating_loader.loadTestsFromName(mod_name))
+ log.info("Loaded: {0}".format(list(module_suites)))
+ overall_suite = suite.TestSuite(module_suites)
+ else:
+ log.info("Excuting all tests")
+ overall_suite = decorating_loader.discover(
+ os.path.dirname(os.path.abspath(__file__))
+ )
+
+ # Filter out tests that don't lend themselves to interactive running,
+ victims = []
+ for case, method in enumerate_methods(overall_suite):
+ fn = getattr(method, method._testMethodName)
+
+ drop_test = False
+
+ if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
+ drop_test = True
+ log.warn("Dropping test because long running: ".format(method.id()))
+
+ if getattr(fn, "needs_trimming", False) is True:
+ drop_test = (os.getuid() != 0)
+ log.warn("Dropping test because client trim unavailable: ".format(method.id()))
+
+ if drop_test:
+ # Don't drop the test if it was explicitly requested in arguments
+ is_named = False
+ for named in modules:
+ if named.endswith(method.id()):
+ is_named = True
+ break
+
+ if not is_named:
+ victims.append((case, method))
+
+ log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
+ for s, method in victims:
+ s._tests.remove(method)
+
+ if interactive_on_error:
+ result_class = InteractiveFailureResult
+ else:
+ result_class = unittest.TextTestResult
+ fail_on_skip = False
+
+ class LoggingResult(result_class):
+ def startTest(self, test):
+ log.info("Starting test: {0}".format(self.getDescription(test)))
+ test.started_at = datetime.datetime.utcnow()
+ return super(LoggingResult, self).startTest(test)
+
+ def stopTest(self, test):
+ log.info("Stopped test: {0} in {1}s".format(
+ self.getDescription(test),
+ (datetime.datetime.utcnow() - test.started_at).total_seconds()
+ ))
+
+ def addSkip(self, test, reason):
+ if fail_on_skip:
+ # Don't just call addFailure because that requires a traceback
+ self.failures.append((test, reason))
+ else:
+ super(LoggingResult, self).addSkip(test, reason)
+
+ # Execute!
+ result = unittest.TextTestRunner(
+ stream=LogStream(),
+ resultclass=LoggingResult,
+ verbosity=2,
+ failfast=True).run(overall_suite)
+
+ if not result.wasSuccessful():
+ result.printErrors() # duplicate output at end for convenience
+
+ bad_tests = []
+ for test, error in result.errors:
+ bad_tests.append(str(test))
+ for test, failure in result.failures:
+ bad_tests.append(str(test))
+
+ sys.exit(-1)
+ else:
+ sys.exit(0)
+
+
+if __name__ == "__main__":
+ exec_test()