from StringIO import StringIO
import json
import logging
+import time
from teuthology import misc
from teuthology.task import ceph_manager
self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
def mds_stop(self):
+ """
+ Stop the MDS daemon process. If it held a rank, that rank
+ will eventually go laggy.
+ """
mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
mds.stop()
+ def mds_fail(self):
+ """
+ Inform MDSMonitor that the daemon process is dead. If it held
+ a rank, that rank will be relinquished.
+ """
+ self.mds_manager.raw_cluster_cmd("mds", "fail", "0")
+
def mds_restart(self):
mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
mds.restart()
version = journal_header_dump['journal_header']['stream_format']
log.info("Read journal version {0}".format(version))
- return version
\ No newline at end of file
+ return version
+
+ def mds_asok(self, command):
+ proc = self.mds_manager.admin_socket('mds', self.mds_id, command)
+ response_data = proc.stdout.getvalue()
+ log.info("mds_asok output: {0}".format(response_data))
+ if response_data.strip():
+ return json.loads(response_data)
+ else:
+ return None
+
+ def wait_for_state(self, goal_state, reject=None, timeout=None):
+ """
+ Block until the MDS reaches a particular state, or a failure condition
+ is met.
+
+ :param goal_state: Return once the MDS is in this state
+ :param reject: Fail if the MDS enters this state before the goal state
+ :param timeout: Fail if this many seconds pass before reaching goal
+ :return: number of seconds waited, rounded down to integer
+ """
+
+ elapsed = 0
+ while True:
+ # mds_info is None if no daemon currently claims this rank
+ mds_info = self.mds_manager.get_mds_status(self.mds_id)
+ current_state = mds_info['state'] if mds_info else None
+
+ if current_state == goal_state:
+ log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
+ return elapsed
+ elif reject is not None and current_state == reject:
+ raise RuntimeError("MDS in reject state {0}".format(current_state))
+ elif timeout is not None and elapsed > timeout:
+ raise RuntimeError(
+ "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
+ elapsed, goal_state, current_state
+ ))
+ else:
+ time.sleep(1)
+ elapsed += 1
\ No newline at end of file
from StringIO import StringIO
+import json
import time
import os
import logging
from teuthology import misc
from ...orchestra import run
+from teuthology.orchestra.run import CommandFailedError
from teuthology.task.cephfs.mount import CephFSMount
log = logging.getLogger(__name__)
def __init__(self, client_config, test_dir, client_id, client_remote):
super(FuseMount, self).__init__(test_dir, client_id, client_remote)
- self.client_config = client_config
+ self.client_config = client_config if client_config else {}
self.fuse_daemon = None
def mount(self):
args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], )
def umount(self):
- mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
try:
self.client_remote.run(
args=[
'sudo',
'fusermount',
'-u',
- mnt,
+ self.mountpoint,
],
)
except run.CommandFailedError:
+ # FIXME: this will clobber all FUSE mounts, not just this one
+
log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
# abort the fuse mount, killing all hung processes
self.client_remote.run(
'umount',
'-l',
'-f',
- mnt,
+ self.mountpoint,
],
)
+ def umount_wait(self, force=False):
+ """
+ :param force: Complete even if the MDS is offline
+ """
+ self.umount()
+ if force:
+ self.fuse_daemon.stdin.close()
+ try:
+ self.fuse_daemon.wait()
+ except CommandFailedError:
+ pass
+ self.cleanup()
+
def cleanup(self):
"""
Remove the mount point.
Prerequisite: the client is not mounted.
"""
- mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
self.client_remote.run(
args=[
'rmdir',
'--',
- mnt,
+ self.mountpoint,
],
)
+
+ def kill(self):
+ """
+ Terminate the client without removing the mount point.
+ """
+ self.fuse_daemon.stdin.close()
+ try:
+ self.fuse_daemon.wait()
+ except CommandFailedError:
+ pass
+
+ def kill_cleanup(self):
+ """
+ Follow up ``kill`` to get to a clean unmounted state.
+ """
+ self.umount()
+ self.cleanup()
+
+ def teardown(self):
+ """
+ Whatever the state of the mount, get it gone.
+ """
+ super(FuseMount, self).teardown()
+
+ self.umount()
+ if not self.fuse_daemon.finished:
+ self.fuse_daemon.stdin.close()
+ try:
+ self.fuse_daemon.wait()
+ except CommandFailedError:
+ pass
+
+ # Indiscriminate, unlike the touchier cleanup()
+ self.client_remote.run(
+ args=[
+ 'rm',
+ '-rf',
+ self.mountpoint,
+ ],
+ )
+
+ # FIXME: bad naming scheme to call this client_id and also have the
+ # 'client_id' attr which is something completely different. This
+ # is what a MonSession calls global_id.
+ def get_client_id(self):
+ """
+ Look up the CephFS client ID for this mount
+ """
+
+ pyscript = """
+import glob
+import re
+import os
+import subprocess
+
+def find_socket(client_name):
+ files = glob.glob("/var/run/ceph/ceph-{{client_name}}.*.asok".format(client_name=client_name))
+ for f in files:
+ pid = re.match(".*\.(\d+)\.asok$", f).group(1)
+ if os.path.exists("/proc/{{0}}".format(pid)):
+ return f
+ raise RuntimeError("Client socket {{0}} not found".format(client_name))
+
+print find_socket("{client_name}")
+""".format(client_name="client.{0}".format(self.client_id))
+
+ # Find the admin socket
+ p = self.client_remote.run(args=[
+ 'python', '-c', pyscript
+ ], stdout=StringIO())
+ asok_path = p.stdout.getvalue().strip()
+ log.info("Found client admin socket at {0}".format(asok_path))
+
+ # Query client ID from admin socket
+ p = self.client_remote.run(
+ args=['sudo', 'ceph', '--admin-daemon', asok_path, 'mds_sessions'],
+ stdout=StringIO())
+ return json.loads(p.stdout.getvalue())['id']
import logging
+import datetime
import os
+from teuthology.orchestra import run
+from teuthology.orchestra.run import CommandFailedError
log = logging.getLogger(__name__)
self.mountpoint = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
self.test_files = ['a', 'b', 'c']
- @property
- def _mount_path(self):
- return os.path.join(self.test_dir, 'mnt.{0}'.format(self.client_id))
+ self.background_procs = []
+
+ def is_mounted(self):
+ raise NotImplementedError()
+
+ def mount(self):
+ raise NotImplementedError()
+
+ def umount(self):
+ raise NotImplementedError()
+
+ def umount_wait(self):
+ raise NotImplementedError()
+
+ def kill_cleanup(self):
+ raise NotImplementedError()
+
+ def kill(self):
+ raise NotImplementedError()
+
+ def cleanup(self):
+ raise NotImplementedError()
def create_files(self):
+ assert(self.is_mounted())
+
for suffix in self.test_files:
log.info("Creating file {0}".format(suffix))
self.client_remote.run(args=[
- 'sudo', 'touch', os.path.join(self._mount_path, suffix)
+ 'sudo', 'touch', os.path.join(self.mountpoint, suffix)
])
def check_files(self):
- """
- This will raise a CommandFailedException if expected files are not present
- """
+ assert(self.is_mounted())
+
for suffix in self.test_files:
log.info("Checking file {0}".format(suffix))
r = self.client_remote.run(args=[
- 'sudo', 'ls', os.path.join(self._mount_path, suffix)
+ 'sudo', 'ls', os.path.join(self.mountpoint, suffix)
], check_status=False)
if r.exitstatus != 0:
raise RuntimeError("Expected file {0} not found".format(suffix))
+
+ def create_destroy(self):
+ assert(self.is_mounted())
+
+ filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
+ log.debug("Creating test file {0}".format(filename))
+ self.client_remote.run(args=[
+ 'sudo', 'touch', os.path.join(self.mountpoint, filename)
+ ])
+ log.debug("Deleting test file {0}".format(filename))
+ self.client_remote.run(args=[
+ 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
+ ])
+
+ def _run_python(self, pyscript):
+ return self.client_remote.run(args=[
+ 'sudo', 'daemon-helper', 'kill', 'python', '-c', pyscript
+ ], wait=False, stdin=run.PIPE)
+
+ def open_background(self, basename="background_file"):
+ """
+ Open a file for writing, then block such that the client
+ will hold a capability
+ """
+ assert(self.is_mounted())
+
+ path = os.path.join(self.mountpoint, basename)
+
+ pyscript = """
+import time
+
+f = open("{path}", 'w')
+f.write('content')
+f.flush()
+f.write('content2')
+while True:
+ time.sleep(1)
+""".format(path=path)
+
+ rproc = self._run_python(pyscript)
+ self.background_procs.append(rproc)
+ return rproc
+
+ def write_background(self, basename="background_file"):
+ """
+ Open a file for writing, complete as soon as you can
+ :param basename:
+ :return:
+ """
+ assert(self.is_mounted())
+
+ path = os.path.join(self.mountpoint, basename)
+
+ pyscript = """
+import time
+
+f = open("{path}", 'w')
+f.write('content')
+f.close()
+""".format(path=path)
+
+ rproc = self._run_python(pyscript)
+ self.background_procs.append(rproc)
+ return rproc
+
+ def teardown(self):
+ for p in self.background_procs:
+ log.info("Terminating background process")
+ if p.stdin:
+ p.stdin.close()
+ try:
+ p.wait()
+ except CommandFailedError:
+ pass
--- /dev/null
+
+"""
+Teuthology task for exercising CephFS client recovery
+"""
+
+import contextlib
+import logging
+import time
+import unittest
+
+from teuthology import misc
+from teuthology.orchestra.run import CommandFailedError
+from teuthology.task import interactive
+from teuthology.task.cephfs.filesystem import Filesystem
+from teuthology.task.ceph_fuse import get_client_configs, FuseMount
+
+
+log = logging.getLogger(__name__)
+
+
+# Arbitrary timeouts for operations involving restarting
+# an MDS or waiting for it to come up
+MDS_RESTART_GRACE = 60
+
+
+class TestClientRecovery(unittest.TestCase):
+ # Environment references
+ fs = None
+ mount_a = None
+ mount_b = None
+ mds_session_timeout = None
+ mds_reconnect_timeout = None
+
+ def setUp(self):
+ self.fs.mds_restart()
+ self.mount_a.mount()
+ self.mount_b.mount()
+ self.mount_a.wait_until_mounted()
+ self.mount_a.wait_until_mounted()
+
+ def tearDown(self):
+ self.mount_a.teardown()
+ self.mount_b.teardown()
+ # mount_a.umount()
+ # mount_b.umount()
+ # run.wait([mount_a.fuse_daemon, mount_b.fuse_daemon], timeout=600)
+ # mount_a.cleanup()
+ # mount_b.cleanup()
+
+ def test_basic(self):
+ # Check that two clients come up healthy and see each others' files
+ # =====================================================
+ self.mount_a.create_files()
+ self.mount_a.check_files()
+ self.mount_a.umount_wait()
+
+ self.mount_b.check_files()
+
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()
+
+ # Check that the admin socket interface is correctly reporting
+ # two sessions
+ # =====================================================
+ ls_data = self._session_list()
+ self.assert_session_count(2, ls_data)
+
+ self.assertSetEqual(
+ set([l['id'] for l in ls_data]),
+ {self.mount_a.get_client_id(), self.mount_b.get_client_id()}
+ )
+
+ def test_restart(self):
+ # Check that after an MDS restart both clients reconnect and continue
+ # to handle I/O
+ # =====================================================
+ self.fs.mds_stop()
+ self.fs.mds_fail()
+ self.fs.mds_restart()
+ self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
+
+ self.mount_a.create_destroy()
+ self.mount_b.create_destroy()
+
+ def assert_session_count(self, expected, ls_data=None):
+ if ls_data is None:
+ ls_data = self.fs.mds_asok(['session', 'ls'])
+
+ self.assertEqual(expected, len(ls_data), "Expected {0} sessions, found {1}".format(
+ expected, len(ls_data)
+ ))
+
+ def _session_list(self):
+ ls_data = self.fs.mds_asok(['session', 'ls'])
+ ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
+ return ls_data
+
+ def _session_by_id(self, session_ls):
+ return dict([(s['id'], s) for s in session_ls])
+
+ def test_reconnect_timeout(self):
+ # Reconnect timeout
+ # =================
+ # Check that if I stop an MDS and a client goes away, the MDS waits
+ # for the reconnect period
+ self.fs.mds_stop()
+ self.fs.mds_fail()
+
+ mount_a_client_id = self.mount_a.get_client_id()
+ self.mount_a.umount_wait(force=True)
+
+ self.fs.mds_restart()
+
+ self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
+
+ ls_data = self._session_list()
+ self.assert_session_count(2, ls_data)
+
+ # The session for the dead client should have the 'reconnect' flag set
+ self.assertTrue(self._session_by_id(ls_data)[mount_a_client_id]['reconnecting'])
+
+ # Wait for the reconnect state to clear, this should take the
+ # reconnect timeout period.
+ in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2)
+ # Check that the period we waited to enter active is within a factor
+ # of two of the reconnect timeout.
+ self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout / 2,
+ "Should have been in reconnect phase for {0} but only took {1}".format(
+ self.mds_reconnect_timeout, in_reconnect_for
+ ))
+
+ self.assert_session_count(1)
+
+ # Check that the client that timed out during reconnect can
+ # mount again and do I/O
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()
+ self.mount_a.create_destroy()
+
+ self.assert_session_count(2)
+
+ def test_reconnect_eviction(self):
+ # Eviction during reconnect
+ # =========================
+ self.fs.mds_stop()
+ self.fs.mds_fail()
+
+ mount_a_client_id = self.mount_a.get_client_id()
+ self.mount_a.umount_wait(force=True)
+
+ self.fs.mds_restart()
+
+ # Enter reconnect phase
+ self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
+ self.assert_session_count(2)
+
+ # Evict the stuck client
+ self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
+ self.assert_session_count(1)
+
+ # Observe that we proceed to active phase without waiting full reconnect timeout
+ evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
+ # Once we evict the troublemaker, the reconnect phase should complete
+ # in well under the reconnect timeout.
+ self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5,
+ "reconnect did not complete soon enough after eviction, took {0}".format(
+ evict_til_active
+ ))
+
+ # Bring the client back
+ self.mount_a.mount()
+ self.mount_a.create_destroy()
+
+ def test_stale_caps(self):
+ # Capability release from stale session
+ # =====================================
+ cap_holder = self.mount_a.open_background()
+ self.mount_a.kill()
+
+ # Now, after mds_session_timeout seconds, the waiter should
+ # complete their operation when the MDS marks the holder's
+ # session stale.
+ cap_waiter = self.mount_b.write_background()
+ a = time.time()
+ cap_waiter.wait()
+ b = time.time()
+ cap_waited = b - a
+ log.info("cap_waiter waited {0}s".format(cap_waited))
+ self.assertTrue(self.mds_session_timeout / 2.0 <= cap_waited <= self.mds_session_timeout * 2.0,
+ "Capability handover took {0}, expected approx {1}".format(
+ cap_waited, self.mds_session_timeout
+ ))
+
+ cap_holder.stdin.close()
+ try:
+ cap_holder.wait()
+ except CommandFailedError:
+ # We killed it, so it raises an error
+ pass
+
+ self.mount_a.kill_cleanup()
+
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()
+
+ def test_evicted_caps(self):
+ # Eviction while holding a capability
+ # ===================================
+
+ # Take out a write capability on a file on client A,
+ # and then immediately kill it.
+ cap_holder = self.mount_a.open_background()
+ mount_a_client_id = self.mount_a.get_client_id()
+ self.mount_a.kill()
+
+ # The waiter should get stuck waiting for the capability
+ # held on the MDS by the now-dead client A
+ cap_waiter = self.mount_b.write_background()
+ time.sleep(5)
+ self.assertFalse(cap_waiter.finished)
+
+ self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
+ # Now, because I evicted the old holder of the capability, it should
+ # immediately get handed over to the waiter
+ a = time.time()
+ cap_waiter.wait()
+ b = time.time()
+ cap_waited = b - a
+ log.info("cap_waiter waited {0}s".format(cap_waited))
+ # This is the check that it happened 'now' rather than waiting
+ # for the session timeout
+ self.assertLess(cap_waited, self.mds_session_timeout / 2.0,
+ "Capability handover took {0}, expected less than {1}".format(
+ cap_waited, self.mds_session_timeout / 2.0
+ ))
+
+ cap_holder.stdin.close()
+ try:
+ cap_holder.wait()
+ except CommandFailedError:
+ # We killed it, so it raises an error
+ pass
+
+ self.mount_a.kill_cleanup()
+
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()
+
+
+class LogStream(object):
+ def __init__(self):
+ self.buffer = ""
+
+ def write(self, data):
+ self.buffer += data
+ if "\n" in self.buffer:
+ lines = self.buffer.split("\n")
+ for line in lines[:-1]:
+ log.info(line)
+ self.buffer = lines[-1]
+
+ def flush(self):
+ pass
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+ """
+ Specialization that implements interactive-on-error style
+ behavior.
+ """
+ ctx = None
+
+ def addFailure(self, test, err):
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Failure in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=self.ctx, config=None)
+
+ def addError(self, test, err):
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Error in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=self.ctx, config=None)
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+ fs = Filesystem(ctx, config)
+
+ # Pick out the clients we will use from the configuration
+ # =======================================================
+ client_list = list(misc.all_roles_of_type(ctx.cluster, 'client'))
+ if len(client_list) < 2:
+ raise RuntimeError("Need at least two clients")
+
+ client_a_id = client_list[0]
+ client_a_role = "client.{0}".format(client_a_id)
+ client_a_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1]
+
+ client_b_id = client_list[1]
+ client_b_role = "client.{0}".format(client_b_id)
+ client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1]
+
+ test_dir = misc.get_testdir(ctx)
+
+ # TODO: enable switching FUSE to kclient here
+ # or perhaps just use external client tasks and consume ctx.mounts here?
+ client_configs = get_client_configs(ctx, config)
+ mount_a = FuseMount(client_configs.get(client_a_role, {}), test_dir, client_a_id, client_a_remote)
+ mount_b = FuseMount(client_configs.get(client_b_role, {}), test_dir, client_b_id, client_b_remote)
+
+ # Attach environment references to test case
+ # ==========================================
+ TestClientRecovery.mds_reconnect_timeout = int(fs.mds_asok(
+ ['config', 'get', 'mds_reconnect_timeout']
+ )['mds_reconnect_timeout'])
+ TestClientRecovery.mds_session_timeout = int(fs.mds_asok(
+ ['config', 'get', 'mds_session_timeout']
+ )['mds_session_timeout'])
+ TestClientRecovery.fs = fs
+ TestClientRecovery.mount_a = mount_a
+ TestClientRecovery.mount_b = mount_b
+
+ # Stash references on ctx so that we can easily debug in interactive mode
+ # =======================================================================
+ ctx.filesystem = fs
+ ctx.mount_a = mount_a
+ ctx.mount_b = mount_b
+
+ # Execute test suite
+ # ==================
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery)
+ if ctx.config.get("interactive-on-error", False):
+ InteractiveFailureResult.ctx = ctx
+ result_class = InteractiveFailureResult
+ else:
+ result_class = unittest.TextTestResult
+ result = unittest.TextTestRunner(
+ stream=LogStream(),
+ resultclass=result_class,
+ verbosity=2,
+ failfast=True).run(suite)
+
+ if not result.wasSuccessful():
+ result.printErrors() # duplicate output at end for convenience
+ raise RuntimeError("Test failure.")
+
+ # Continue to any downstream tasks
+ # ================================
+ yield