From: John Spray Date: Wed, 2 Jul 2014 18:25:14 +0000 (+0100) Subject: task: add mds_client_recovery X-Git-Tag: 1.1.0~1315^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8bb77ed9e18d134e2e9964e950cbcbfc65e1b97d;p=teuthology.git task: add mds_client_recovery This task exercises the CephFS session recovery behavior. Signed-off-by: John Spray --- diff --git a/teuthology/task/cephfs/filesystem.py b/teuthology/task/cephfs/filesystem.py index 6295ac04d..901d2ecb1 100644 --- a/teuthology/task/cephfs/filesystem.py +++ b/teuthology/task/cephfs/filesystem.py @@ -2,6 +2,7 @@ from StringIO import StringIO import json import logging +import time from teuthology import misc from teuthology.task import ceph_manager @@ -41,9 +42,20 @@ class Filesystem(object): self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1] def mds_stop(self): + """ + Stop the MDS daemon process. If it held a rank, that rank + will eventually go laggy. + """ mds = self._ctx.daemons.get_daemon('mds', self.mds_id) mds.stop() + def mds_fail(self): + """ + Inform MDSMonitor that the daemon process is dead. If it held + a rank, that rank will be relinquished. + """ + self.mds_manager.raw_cluster_cmd("mds", "fail", "0") + def mds_restart(self): mds = self._ctx.daemons.get_daemon('mds', self.mds_id) mds.restart() @@ -96,4 +108,44 @@ class Filesystem(object): version = journal_header_dump['journal_header']['stream_format'] log.info("Read journal version {0}".format(version)) - return version \ No newline at end of file + return version + + def mds_asok(self, command): + proc = self.mds_manager.admin_socket('mds', self.mds_id, command) + response_data = proc.stdout.getvalue() + log.info("mds_asok output: {0}".format(response_data)) + if response_data.strip(): + return json.loads(response_data) + else: + return None + + def wait_for_state(self, goal_state, reject=None, timeout=None): + """ + Block until the MDS reaches a particular state, or a failure condition + is met. + + :param goal_state: Return once the MDS is in this state + :param reject: Fail if the MDS enters this state before the goal state + :param timeout: Fail if this many seconds pass before reaching goal + :return: number of seconds waited, rounded down to integer + """ + + elapsed = 0 + while True: + # mds_info is None if no daemon currently claims this rank + mds_info = self.mds_manager.get_mds_status(self.mds_id) + current_state = mds_info['state'] if mds_info else None + + if current_state == goal_state: + log.info("reached state '{0}' in {1}s".format(current_state, elapsed)) + return elapsed + elif reject is not None and current_state == reject: + raise RuntimeError("MDS in reject state {0}".format(current_state)) + elif timeout is not None and elapsed > timeout: + raise RuntimeError( + "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format( + elapsed, goal_state, current_state + )) + else: + time.sleep(1) + elapsed += 1 \ No newline at end of file diff --git a/teuthology/task/cephfs/fuse_mount.py b/teuthology/task/cephfs/fuse_mount.py index 67078c6b7..98980573b 100644 --- a/teuthology/task/cephfs/fuse_mount.py +++ b/teuthology/task/cephfs/fuse_mount.py @@ -1,11 +1,13 @@ from StringIO import StringIO +import json import time import os import logging from teuthology import misc from ...orchestra import run +from teuthology.orchestra.run import CommandFailedError from teuthology.task.cephfs.mount import CephFSMount log = logging.getLogger(__name__) @@ -15,7 +17,7 @@ class FuseMount(CephFSMount): def __init__(self, client_config, test_dir, client_id, client_remote): super(FuseMount, self).__init__(test_dir, client_id, client_remote) - self.client_config = client_config + self.client_config = client_config if client_config else {} self.fuse_daemon = None def mount(self): @@ -109,17 +111,18 @@ class FuseMount(CephFSMount): args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], ) def umount(self): - mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id)) try: self.client_remote.run( args=[ 'sudo', 'fusermount', '-u', - mnt, + self.mountpoint, ], ) except run.CommandFailedError: + # FIXME: this will clobber all FUSE mounts, not just this one + log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) # abort the fuse mount, killing all hung processes self.client_remote.run( @@ -140,21 +143,111 @@ class FuseMount(CephFSMount): 'umount', '-l', '-f', - mnt, + self.mountpoint, ], ) + def umount_wait(self, force=False): + """ + :param force: Complete even if the MDS is offline + """ + self.umount() + if force: + self.fuse_daemon.stdin.close() + try: + self.fuse_daemon.wait() + except CommandFailedError: + pass + self.cleanup() + def cleanup(self): """ Remove the mount point. Prerequisite: the client is not mounted. """ - mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id)) self.client_remote.run( args=[ 'rmdir', '--', - mnt, + self.mountpoint, ], ) + + def kill(self): + """ + Terminate the client without removing the mount point. + """ + self.fuse_daemon.stdin.close() + try: + self.fuse_daemon.wait() + except CommandFailedError: + pass + + def kill_cleanup(self): + """ + Follow up ``kill`` to get to a clean unmounted state. + """ + self.umount() + self.cleanup() + + def teardown(self): + """ + Whatever the state of the mount, get it gone. + """ + super(FuseMount, self).teardown() + + self.umount() + if not self.fuse_daemon.finished: + self.fuse_daemon.stdin.close() + try: + self.fuse_daemon.wait() + except CommandFailedError: + pass + + # Indiscriminate, unlike the touchier cleanup() + self.client_remote.run( + args=[ + 'rm', + '-rf', + self.mountpoint, + ], + ) + + # FIXME: bad naming scheme to call this client_id and also have the + # 'client_id' attr which is something completely different. This + # is what a MonSession calls global_id. + def get_client_id(self): + """ + Look up the CephFS client ID for this mount + """ + + pyscript = """ +import glob +import re +import os +import subprocess + +def find_socket(client_name): + files = glob.glob("/var/run/ceph/ceph-{{client_name}}.*.asok".format(client_name=client_name)) + for f in files: + pid = re.match(".*\.(\d+)\.asok$", f).group(1) + if os.path.exists("/proc/{{0}}".format(pid)): + return f + raise RuntimeError("Client socket {{0}} not found".format(client_name)) + +print find_socket("{client_name}") +""".format(client_name="client.{0}".format(self.client_id)) + + # Find the admin socket + p = self.client_remote.run(args=[ + 'python', '-c', pyscript + ], stdout=StringIO()) + asok_path = p.stdout.getvalue().strip() + log.info("Found client admin socket at {0}".format(asok_path)) + + # Query client ID from admin socket + p = self.client_remote.run( + args=['sudo', 'ceph', '--admin-daemon', asok_path, 'mds_sessions'], + stdout=StringIO()) + return json.loads(p.stdout.getvalue())['id'] diff --git a/teuthology/task/cephfs/mount.py b/teuthology/task/cephfs/mount.py index 337b45110..6a02a0ab2 100644 --- a/teuthology/task/cephfs/mount.py +++ b/teuthology/task/cephfs/mount.py @@ -1,6 +1,9 @@ import logging +import datetime import os +from teuthology.orchestra import run +from teuthology.orchestra.run import CommandFailedError log = logging.getLogger(__name__) @@ -20,25 +23,119 @@ class CephFSMount(object): self.mountpoint = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id)) self.test_files = ['a', 'b', 'c'] - @property - def _mount_path(self): - return os.path.join(self.test_dir, 'mnt.{0}'.format(self.client_id)) + self.background_procs = [] + + def is_mounted(self): + raise NotImplementedError() + + def mount(self): + raise NotImplementedError() + + def umount(self): + raise NotImplementedError() + + def umount_wait(self): + raise NotImplementedError() + + def kill_cleanup(self): + raise NotImplementedError() + + def kill(self): + raise NotImplementedError() + + def cleanup(self): + raise NotImplementedError() def create_files(self): + assert(self.is_mounted()) + for suffix in self.test_files: log.info("Creating file {0}".format(suffix)) self.client_remote.run(args=[ - 'sudo', 'touch', os.path.join(self._mount_path, suffix) + 'sudo', 'touch', os.path.join(self.mountpoint, suffix) ]) def check_files(self): - """ - This will raise a CommandFailedException if expected files are not present - """ + assert(self.is_mounted()) + for suffix in self.test_files: log.info("Checking file {0}".format(suffix)) r = self.client_remote.run(args=[ - 'sudo', 'ls', os.path.join(self._mount_path, suffix) + 'sudo', 'ls', os.path.join(self.mountpoint, suffix) ], check_status=False) if r.exitstatus != 0: raise RuntimeError("Expected file {0} not found".format(suffix)) + + def create_destroy(self): + assert(self.is_mounted()) + + filename = "{0} {1}".format(datetime.datetime.now(), self.client_id) + log.debug("Creating test file {0}".format(filename)) + self.client_remote.run(args=[ + 'sudo', 'touch', os.path.join(self.mountpoint, filename) + ]) + log.debug("Deleting test file {0}".format(filename)) + self.client_remote.run(args=[ + 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename) + ]) + + def _run_python(self, pyscript): + return self.client_remote.run(args=[ + 'sudo', 'daemon-helper', 'kill', 'python', '-c', pyscript + ], wait=False, stdin=run.PIPE) + + def open_background(self, basename="background_file"): + """ + Open a file for writing, then block such that the client + will hold a capability + """ + assert(self.is_mounted()) + + path = os.path.join(self.mountpoint, basename) + + pyscript = """ +import time + +f = open("{path}", 'w') +f.write('content') +f.flush() +f.write('content2') +while True: + time.sleep(1) +""".format(path=path) + + rproc = self._run_python(pyscript) + self.background_procs.append(rproc) + return rproc + + def write_background(self, basename="background_file"): + """ + Open a file for writing, complete as soon as you can + :param basename: + :return: + """ + assert(self.is_mounted()) + + path = os.path.join(self.mountpoint, basename) + + pyscript = """ +import time + +f = open("{path}", 'w') +f.write('content') +f.close() +""".format(path=path) + + rproc = self._run_python(pyscript) + self.background_procs.append(rproc) + return rproc + + def teardown(self): + for p in self.background_procs: + log.info("Terminating background process") + if p.stdin: + p.stdin.close() + try: + p.wait() + except CommandFailedError: + pass diff --git a/teuthology/task/mds_client_recovery.py b/teuthology/task/mds_client_recovery.py new file mode 100644 index 000000000..903a70a5b --- /dev/null +++ b/teuthology/task/mds_client_recovery.py @@ -0,0 +1,352 @@ + +""" +Teuthology task for exercising CephFS client recovery +""" + +import contextlib +import logging +import time +import unittest + +from teuthology import misc +from teuthology.orchestra.run import CommandFailedError +from teuthology.task import interactive +from teuthology.task.cephfs.filesystem import Filesystem +from teuthology.task.ceph_fuse import get_client_configs, FuseMount + + +log = logging.getLogger(__name__) + + +# Arbitrary timeouts for operations involving restarting +# an MDS or waiting for it to come up +MDS_RESTART_GRACE = 60 + + +class TestClientRecovery(unittest.TestCase): + # Environment references + fs = None + mount_a = None + mount_b = None + mds_session_timeout = None + mds_reconnect_timeout = None + + def setUp(self): + self.fs.mds_restart() + self.mount_a.mount() + self.mount_b.mount() + self.mount_a.wait_until_mounted() + self.mount_a.wait_until_mounted() + + def tearDown(self): + self.mount_a.teardown() + self.mount_b.teardown() + # mount_a.umount() + # mount_b.umount() + # run.wait([mount_a.fuse_daemon, mount_b.fuse_daemon], timeout=600) + # mount_a.cleanup() + # mount_b.cleanup() + + def test_basic(self): + # Check that two clients come up healthy and see each others' files + # ===================================================== + self.mount_a.create_files() + self.mount_a.check_files() + self.mount_a.umount_wait() + + self.mount_b.check_files() + + self.mount_a.mount() + self.mount_a.wait_until_mounted() + + # Check that the admin socket interface is correctly reporting + # two sessions + # ===================================================== + ls_data = self._session_list() + self.assert_session_count(2, ls_data) + + self.assertSetEqual( + set([l['id'] for l in ls_data]), + {self.mount_a.get_client_id(), self.mount_b.get_client_id()} + ) + + def test_restart(self): + # Check that after an MDS restart both clients reconnect and continue + # to handle I/O + # ===================================================== + self.fs.mds_stop() + self.fs.mds_fail() + self.fs.mds_restart() + self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) + + self.mount_a.create_destroy() + self.mount_b.create_destroy() + + def assert_session_count(self, expected, ls_data=None): + if ls_data is None: + ls_data = self.fs.mds_asok(['session', 'ls']) + + self.assertEqual(expected, len(ls_data), "Expected {0} sessions, found {1}".format( + expected, len(ls_data) + )) + + def _session_list(self): + ls_data = self.fs.mds_asok(['session', 'ls']) + ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] + return ls_data + + def _session_by_id(self, session_ls): + return dict([(s['id'], s) for s in session_ls]) + + def test_reconnect_timeout(self): + # Reconnect timeout + # ================= + # Check that if I stop an MDS and a client goes away, the MDS waits + # for the reconnect period + self.fs.mds_stop() + self.fs.mds_fail() + + mount_a_client_id = self.mount_a.get_client_id() + self.mount_a.umount_wait(force=True) + + self.fs.mds_restart() + + self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) + + ls_data = self._session_list() + self.assert_session_count(2, ls_data) + + # The session for the dead client should have the 'reconnect' flag set + self.assertTrue(self._session_by_id(ls_data)[mount_a_client_id]['reconnecting']) + + # Wait for the reconnect state to clear, this should take the + # reconnect timeout period. + in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2) + # Check that the period we waited to enter active is within a factor + # of two of the reconnect timeout. + self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout / 2, + "Should have been in reconnect phase for {0} but only took {1}".format( + self.mds_reconnect_timeout, in_reconnect_for + )) + + self.assert_session_count(1) + + # Check that the client that timed out during reconnect can + # mount again and do I/O + self.mount_a.mount() + self.mount_a.wait_until_mounted() + self.mount_a.create_destroy() + + self.assert_session_count(2) + + def test_reconnect_eviction(self): + # Eviction during reconnect + # ========================= + self.fs.mds_stop() + self.fs.mds_fail() + + mount_a_client_id = self.mount_a.get_client_id() + self.mount_a.umount_wait(force=True) + + self.fs.mds_restart() + + # Enter reconnect phase + self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) + self.assert_session_count(2) + + # Evict the stuck client + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + self.assert_session_count(1) + + # Observe that we proceed to active phase without waiting full reconnect timeout + evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) + # Once we evict the troublemaker, the reconnect phase should complete + # in well under the reconnect timeout. + self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5, + "reconnect did not complete soon enough after eviction, took {0}".format( + evict_til_active + )) + + # Bring the client back + self.mount_a.mount() + self.mount_a.create_destroy() + + def test_stale_caps(self): + # Capability release from stale session + # ===================================== + cap_holder = self.mount_a.open_background() + self.mount_a.kill() + + # Now, after mds_session_timeout seconds, the waiter should + # complete their operation when the MDS marks the holder's + # session stale. + cap_waiter = self.mount_b.write_background() + a = time.time() + cap_waiter.wait() + b = time.time() + cap_waited = b - a + log.info("cap_waiter waited {0}s".format(cap_waited)) + self.assertTrue(self.mds_session_timeout / 2.0 <= cap_waited <= self.mds_session_timeout * 2.0, + "Capability handover took {0}, expected approx {1}".format( + cap_waited, self.mds_session_timeout + )) + + cap_holder.stdin.close() + try: + cap_holder.wait() + except CommandFailedError: + # We killed it, so it raises an error + pass + + self.mount_a.kill_cleanup() + + self.mount_a.mount() + self.mount_a.wait_until_mounted() + + def test_evicted_caps(self): + # Eviction while holding a capability + # =================================== + + # Take out a write capability on a file on client A, + # and then immediately kill it. + cap_holder = self.mount_a.open_background() + mount_a_client_id = self.mount_a.get_client_id() + self.mount_a.kill() + + # The waiter should get stuck waiting for the capability + # held on the MDS by the now-dead client A + cap_waiter = self.mount_b.write_background() + time.sleep(5) + self.assertFalse(cap_waiter.finished) + + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + # Now, because I evicted the old holder of the capability, it should + # immediately get handed over to the waiter + a = time.time() + cap_waiter.wait() + b = time.time() + cap_waited = b - a + log.info("cap_waiter waited {0}s".format(cap_waited)) + # This is the check that it happened 'now' rather than waiting + # for the session timeout + self.assertLess(cap_waited, self.mds_session_timeout / 2.0, + "Capability handover took {0}, expected less than {1}".format( + cap_waited, self.mds_session_timeout / 2.0 + )) + + cap_holder.stdin.close() + try: + cap_holder.wait() + except CommandFailedError: + # We killed it, so it raises an error + pass + + self.mount_a.kill_cleanup() + + self.mount_a.mount() + self.mount_a.wait_until_mounted() + + +class LogStream(object): + def __init__(self): + self.buffer = "" + + def write(self, data): + self.buffer += data + if "\n" in self.buffer: + lines = self.buffer.split("\n") + for line in lines[:-1]: + log.info(line) + self.buffer = lines[-1] + + def flush(self): + pass + + +class InteractiveFailureResult(unittest.TextTestResult): + """ + Specialization that implements interactive-on-error style + behavior. + """ + ctx = None + + def addFailure(self, test, err): + log.error(self._exc_info_to_string(err, test)) + log.error("Failure in test '{0}', going interactive".format( + self.getDescription(test) + )) + interactive.task(ctx=self.ctx, config=None) + + def addError(self, test, err): + log.error(self._exc_info_to_string(err, test)) + log.error("Error in test '{0}', going interactive".format( + self.getDescription(test) + )) + interactive.task(ctx=self.ctx, config=None) + + +@contextlib.contextmanager +def task(ctx, config): + fs = Filesystem(ctx, config) + + # Pick out the clients we will use from the configuration + # ======================================================= + client_list = list(misc.all_roles_of_type(ctx.cluster, 'client')) + if len(client_list) < 2: + raise RuntimeError("Need at least two clients") + + client_a_id = client_list[0] + client_a_role = "client.{0}".format(client_a_id) + client_a_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1] + + client_b_id = client_list[1] + client_b_role = "client.{0}".format(client_b_id) + client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1] + + test_dir = misc.get_testdir(ctx) + + # TODO: enable switching FUSE to kclient here + # or perhaps just use external client tasks and consume ctx.mounts here? + client_configs = get_client_configs(ctx, config) + mount_a = FuseMount(client_configs.get(client_a_role, {}), test_dir, client_a_id, client_a_remote) + mount_b = FuseMount(client_configs.get(client_b_role, {}), test_dir, client_b_id, client_b_remote) + + # Attach environment references to test case + # ========================================== + TestClientRecovery.mds_reconnect_timeout = int(fs.mds_asok( + ['config', 'get', 'mds_reconnect_timeout'] + )['mds_reconnect_timeout']) + TestClientRecovery.mds_session_timeout = int(fs.mds_asok( + ['config', 'get', 'mds_session_timeout'] + )['mds_session_timeout']) + TestClientRecovery.fs = fs + TestClientRecovery.mount_a = mount_a + TestClientRecovery.mount_b = mount_b + + # Stash references on ctx so that we can easily debug in interactive mode + # ======================================================================= + ctx.filesystem = fs + ctx.mount_a = mount_a + ctx.mount_b = mount_b + + # Execute test suite + # ================== + suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery) + if ctx.config.get("interactive-on-error", False): + InteractiveFailureResult.ctx = ctx + result_class = InteractiveFailureResult + else: + result_class = unittest.TextTestResult + result = unittest.TextTestRunner( + stream=LogStream(), + resultclass=result_class, + verbosity=2, + failfast=True).run(suite) + + if not result.wasSuccessful(): + result.printErrors() # duplicate output at end for convenience + raise RuntimeError("Test failure.") + + # Continue to any downstream tasks + # ================================ + yield