From: John Spray Date: Wed, 20 Aug 2014 11:36:02 +0000 (+0100) Subject: tasks/mds_client_recovery: network freeze test X-Git-Tag: v0.94.10~27^2^2~332^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bb52a9733acf57b9a2c4022507c13c02c75e3a12;p=ceph.git tasks/mds_client_recovery: network freeze test This is about testing the CephFS client's handling of losing connectivity to the MDS. Fixes: #7810 Signed-off-by: John Spray --- diff --git a/tasks/ceph_manager.py b/tasks/ceph_manager.py index 10e3bde4a6db..65d2b816dcd5 100644 --- a/tasks/ceph_manager.py +++ b/tasks/ceph_manager.py @@ -567,17 +567,27 @@ class CephManager: def osd_admin_socket(self, osd_id, command, check_status=True): return self.admin_socket('osd', osd_id, command, check_status) - def admin_socket(self, service_type, service_id, command, check_status=True): + def find_remote(self, service_type, service_id): """ - Remotely start up ceph specifying the admin socket + Get the Remote for the host where a particular service runs. + + :param service_type: 'mds', 'osd', 'client' + :param service_id: The second part of a role, e.g. '0' for the role 'client.0' + :return: a Remote instance for the host where the requested role is placed """ - testdir = teuthology.get_testdir(self.ctx) - remote = None for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems(): for id_ in teuthology.roles_of_type(roles_for_host, service_type): if id_ == str(service_id): - remote = _remote - assert remote is not None + return _remote + + raise KeyError("Service {0}.{1} not found".format(service_type, service_id)) + + def admin_socket(self, service_type, service_id, command, check_status=True): + """ + Remotely start up ceph specifying the admin socket + """ + testdir = teuthology.get_testdir(self.ctx) + remote = self.find_remote(service_type, service_id) args = [ 'sudo', 'adjust-ulimits', diff --git a/tasks/cephfs/filesystem.py b/tasks/cephfs/filesystem.py index e78680146460..3cd6206dbc63 100644 --- a/tasks/cephfs/filesystem.py +++ b/tasks/cephfs/filesystem.py @@ -5,6 +5,7 @@ import logging import time from teuthology import misc +from teuthology.nuke import clear_firewall from teuthology.parallel import parallel from tasks import ceph_manager @@ -40,6 +41,14 @@ class Filesystem(object): self.client_id = client_list[0] self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1] + def get_mds_hostnames(self): + result = set() + for mds_id in self.mds_ids: + mds_remote = self.mon_manager.find_remote('mds', mds_id) + result.add(mds_remote.hostname) + + return list(result) + def are_daemons_healthy(self): """ Return true if all daemons are in one of active, standby, standby-replay @@ -186,6 +195,27 @@ class Filesystem(object): else: return None + def set_clients_block(self, blocked, mds_id=None): + """ + Block (using iptables) client communications to this MDS. Be careful: if + other services are running on this MDS, or other MDSs try to talk to this + MDS, their communications may also be blocked as collatoral damage. + + :param mds_id: Optional ID of MDS to block, default to all + :return: + """ + da_flag = "-A" if blocked else "-D" + + def set_block(_mds_id): + remote = self.mon_manager.find_remote('mds', _mds_id) + remote.run(args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", "6800:6900", "-j", "REJECT", "-m", "comment", "--comment", "teuthology"]) + remote.run(args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", "6800:6900", "-j", "REJECT", "-m", "comment", "--comment", "teuthology"]) + + self._one_or_all(mds_id, set_block) + + def clear_firewall(self): + clear_firewall(self._ctx) + def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None): """ Block until the MDS reaches a particular state, or a failure condition diff --git a/tasks/cephfs/fuse_mount.py b/tasks/cephfs/fuse_mount.py index 9adba1c47295..51ceccc496cf 100644 --- a/tasks/cephfs/fuse_mount.py +++ b/tasks/cephfs/fuse_mount.py @@ -74,16 +74,21 @@ class FuseMount(CephFSMount): self.fuse_daemon = proc def is_mounted(self): - proc = self.client_remote.run( - args=[ - 'stat', - '--file-system', - '--printf=%T\n', - '--', - self.mountpoint, - ], - stdout=StringIO(), - ) + try: + proc = self.client_remote.run( + args=[ + 'stat', + '--file-system', + '--printf=%T\n', + '--', + self.mountpoint, + ], + stdout=StringIO(), + ) + except CommandFailedError: + # This happens if the mount directory doesn't exist + return False + fstype = proc.stdout.getvalue().rstrip('\n') if fstype == 'fuseblk': log.info('ceph-fuse is mounted on %s', self.mountpoint) @@ -110,6 +115,9 @@ class FuseMount(CephFSMount): self.client_remote.run( args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], ) + def _mountpoint_exists(self): + return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0 + def umount(self): try: self.client_remote.run( @@ -121,8 +129,6 @@ class FuseMount(CephFSMount): ], ) except run.CommandFailedError: - # FIXME: this will clobber all FUSE mounts, not just this one - log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) # abort the fuse mount, killing all hung processes self.client_remote.run( @@ -137,15 +143,16 @@ class FuseMount(CephFSMount): ], ) # make sure its unmounted - self.client_remote.run( - args=[ - 'sudo', - 'umount', - '-l', - '-f', - self.mountpoint, - ], - ) + if self._mountpoint_exists(): + self.client_remote.run( + args=[ + 'sudo', + 'umount', + '-l', + '-f', + self.mountpoint, + ], + ) def umount_wait(self, force=False): """ @@ -197,7 +204,8 @@ class FuseMount(CephFSMount): """ super(FuseMount, self).teardown() - self.umount() + if self.is_mounted(): + self.umount() if not self.fuse_daemon.finished: self.fuse_daemon.stdin.close() try: diff --git a/tasks/mds_client_recovery.py b/tasks/mds_client_recovery.py index 1032616977a8..b44fcd8bc07d 100644 --- a/tasks/mds_client_recovery.py +++ b/tasks/mds_client_recovery.py @@ -30,8 +30,10 @@ class TestClientRecovery(unittest.TestCase): mount_b = None mds_session_timeout = None mds_reconnect_timeout = None + ms_max_backoff = None def setUp(self): + self.fs.clear_firewall() self.fs.mds_restart() self.mount_a.mount() self.mount_b.mount() @@ -39,13 +41,9 @@ class TestClientRecovery(unittest.TestCase): self.mount_a.wait_until_mounted() def tearDown(self): + self.fs.clear_firewall() self.mount_a.teardown() self.mount_b.teardown() - # mount_a.umount() - # mount_b.umount() - # run.wait([mount_a.fuse_daemon, mount_b.fuse_daemon], timeout=600) - # mount_a.cleanup() - # mount_b.cleanup() def test_basic(self): # Check that two clients come up healthy and see each others' files @@ -90,6 +88,12 @@ class TestClientRecovery(unittest.TestCase): expected, len(ls_data) )) + def assert_session_state(self, client_id, expected_state): + self.assertEqual( + self._session_by_id( + self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'], + expected_state) + def _session_list(self): ls_data = self.fs.mds_asok(['session', 'ls']) ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] @@ -246,6 +250,52 @@ class TestClientRecovery(unittest.TestCase): self.mount_a.mount() self.mount_a.wait_until_mounted() + def test_network_death(self): + """ + Simulate software freeze or temporary network failure. + + Check that the client blocks I/O during failure, and completes + I/O after failure. + """ + + # We only need one client + self.mount_b.umount_wait() + + # Initially our one client session should be visible + client_id = self.mount_a.get_client_id() + ls_data = self._session_list() + self.assert_session_count(1, ls_data) + self.assertEqual(ls_data[0]['id'], client_id) + self.assert_session_state(client_id, "open") + + # ...and capable of doing I/O without blocking + self.mount_a.create_files() + + # ...but if we turn off the network + self.fs.set_clients_block(True) + + # ...and try and start an I/O + write_blocked = self.mount_a.write_background() + + # ...then it should block + self.assertFalse(write_blocked.finished) + self.assert_session_state(client_id, "open") + time.sleep(self.mds_session_timeout * 1.5) # Long enough for MDS to consider session stale + self.assertFalse(write_blocked.finished) + self.assert_session_state(client_id, "stale") + + # ...until we re-enable I/O + self.fs.set_clients_block(False) + + # ...when it should complete promptly + a = time.time() + write_blocked.wait() + b = time.time() + recovery_time = b - a + log.info("recovery time: {0}".format(recovery_time)) + self.assertLess(recovery_time, self.ms_max_backoff * 2) + self.assert_session_state(client_id, "open") + class LogStream(object): def __init__(self): @@ -301,7 +351,12 @@ def task(ctx, config): client_b_id = client_list[1] client_b_role = "client.{0}".format(client_b_id) - client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1] + client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_b_id)]))[0][1] + + # Check we have at least one remote client for use with network-dependent tests + # ============================================================================= + if client_a_remote.hostname in fs.get_mds_hostnames(): + raise RuntimeError("Require first client to on separate server from MDSs") test_dir = misc.get_testdir(ctx) @@ -319,6 +374,9 @@ def task(ctx, config): TestClientRecovery.mds_session_timeout = int(fs.mds_asok( ['config', 'get', 'mds_session_timeout'] )['mds_session_timeout']) + TestClientRecovery.ms_max_backoff = int(fs.mds_asok( + ['config', 'get', 'ms_max_backoff'] + )['ms_max_backoff']) TestClientRecovery.fs = fs TestClientRecovery.mount_a = mount_a TestClientRecovery.mount_b = mount_b @@ -331,7 +389,12 @@ def task(ctx, config): # Execute test suite # ================== - suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery) + if 'test_name' in config: + suite = unittest.TestLoader().loadTestsFromName( + "teuthology.task.mds_client_recovery.{0}".format(config['test_name'])) + else: + suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery) + if ctx.config.get("interactive-on-error", False): InteractiveFailureResult.ctx = ctx result_class = InteractiveFailureResult