def osd_admin_socket(self, osd_id, command, check_status=True):
return self.admin_socket('osd', osd_id, command, check_status)
- def admin_socket(self, service_type, service_id, command, check_status=True):
+ def find_remote(self, service_type, service_id):
"""
- Remotely start up ceph specifying the admin socket
+ Get the Remote for the host where a particular service runs.
+
+ :param service_type: 'mds', 'osd', 'client'
+ :param service_id: The second part of a role, e.g. '0' for the role 'client.0'
+ :return: a Remote instance for the host where the requested role is placed
"""
- testdir = teuthology.get_testdir(self.ctx)
- remote = None
for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, service_type):
if id_ == str(service_id):
- remote = _remote
- assert remote is not None
+ return _remote
+
+ raise KeyError("Service {0}.{1} not found".format(service_type, service_id))
+
+ def admin_socket(self, service_type, service_id, command, check_status=True):
+ """
+ Remotely start up ceph specifying the admin socket
+ """
+ testdir = teuthology.get_testdir(self.ctx)
+ remote = self.find_remote(service_type, service_id)
args = [
'sudo',
'adjust-ulimits',
import time
from teuthology import misc
+from teuthology.nuke import clear_firewall
from teuthology.parallel import parallel
from tasks import ceph_manager
self.client_id = client_list[0]
self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
+ def get_mds_hostnames(self):
+ result = set()
+ for mds_id in self.mds_ids:
+ mds_remote = self.mon_manager.find_remote('mds', mds_id)
+ result.add(mds_remote.hostname)
+
+ return list(result)
+
def are_daemons_healthy(self):
"""
Return true if all daemons are in one of active, standby, standby-replay
else:
return None
+ def set_clients_block(self, blocked, mds_id=None):
+ """
+ Block (using iptables) client communications to this MDS. Be careful: if
+ other services are running on this MDS, or other MDSs try to talk to this
+ MDS, their communications may also be blocked as collatoral damage.
+
+ :param mds_id: Optional ID of MDS to block, default to all
+ :return:
+ """
+ da_flag = "-A" if blocked else "-D"
+
+ def set_block(_mds_id):
+ remote = self.mon_manager.find_remote('mds', _mds_id)
+ remote.run(args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", "6800:6900", "-j", "REJECT", "-m", "comment", "--comment", "teuthology"])
+ remote.run(args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", "6800:6900", "-j", "REJECT", "-m", "comment", "--comment", "teuthology"])
+
+ self._one_or_all(mds_id, set_block)
+
+ def clear_firewall(self):
+ clear_firewall(self._ctx)
+
def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None):
"""
Block until the MDS reaches a particular state, or a failure condition
self.fuse_daemon = proc
def is_mounted(self):
- proc = self.client_remote.run(
- args=[
- 'stat',
- '--file-system',
- '--printf=%T\n',
- '--',
- self.mountpoint,
- ],
- stdout=StringIO(),
- )
+ try:
+ proc = self.client_remote.run(
+ args=[
+ 'stat',
+ '--file-system',
+ '--printf=%T\n',
+ '--',
+ self.mountpoint,
+ ],
+ stdout=StringIO(),
+ )
+ except CommandFailedError:
+ # This happens if the mount directory doesn't exist
+ return False
+
fstype = proc.stdout.getvalue().rstrip('\n')
if fstype == 'fuseblk':
log.info('ceph-fuse is mounted on %s', self.mountpoint)
self.client_remote.run(
args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], )
+ def _mountpoint_exists(self):
+ return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
+
def umount(self):
try:
self.client_remote.run(
],
)
except run.CommandFailedError:
- # FIXME: this will clobber all FUSE mounts, not just this one
-
log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
# abort the fuse mount, killing all hung processes
self.client_remote.run(
],
)
# make sure its unmounted
- self.client_remote.run(
- args=[
- 'sudo',
- 'umount',
- '-l',
- '-f',
- self.mountpoint,
- ],
- )
+ if self._mountpoint_exists():
+ self.client_remote.run(
+ args=[
+ 'sudo',
+ 'umount',
+ '-l',
+ '-f',
+ self.mountpoint,
+ ],
+ )
def umount_wait(self, force=False):
"""
"""
super(FuseMount, self).teardown()
- self.umount()
+ if self.is_mounted():
+ self.umount()
if not self.fuse_daemon.finished:
self.fuse_daemon.stdin.close()
try:
mount_b = None
mds_session_timeout = None
mds_reconnect_timeout = None
+ ms_max_backoff = None
def setUp(self):
+ self.fs.clear_firewall()
self.fs.mds_restart()
self.mount_a.mount()
self.mount_b.mount()
self.mount_a.wait_until_mounted()
def tearDown(self):
+ self.fs.clear_firewall()
self.mount_a.teardown()
self.mount_b.teardown()
- # mount_a.umount()
- # mount_b.umount()
- # run.wait([mount_a.fuse_daemon, mount_b.fuse_daemon], timeout=600)
- # mount_a.cleanup()
- # mount_b.cleanup()
def test_basic(self):
# Check that two clients come up healthy and see each others' files
expected, len(ls_data)
))
+ def assert_session_state(self, client_id, expected_state):
+ self.assertEqual(
+ self._session_by_id(
+ self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'],
+ expected_state)
+
def _session_list(self):
ls_data = self.fs.mds_asok(['session', 'ls'])
ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
self.mount_a.mount()
self.mount_a.wait_until_mounted()
+ def test_network_death(self):
+ """
+ Simulate software freeze or temporary network failure.
+
+ Check that the client blocks I/O during failure, and completes
+ I/O after failure.
+ """
+
+ # We only need one client
+ self.mount_b.umount_wait()
+
+ # Initially our one client session should be visible
+ client_id = self.mount_a.get_client_id()
+ ls_data = self._session_list()
+ self.assert_session_count(1, ls_data)
+ self.assertEqual(ls_data[0]['id'], client_id)
+ self.assert_session_state(client_id, "open")
+
+ # ...and capable of doing I/O without blocking
+ self.mount_a.create_files()
+
+ # ...but if we turn off the network
+ self.fs.set_clients_block(True)
+
+ # ...and try and start an I/O
+ write_blocked = self.mount_a.write_background()
+
+ # ...then it should block
+ self.assertFalse(write_blocked.finished)
+ self.assert_session_state(client_id, "open")
+ time.sleep(self.mds_session_timeout * 1.5) # Long enough for MDS to consider session stale
+ self.assertFalse(write_blocked.finished)
+ self.assert_session_state(client_id, "stale")
+
+ # ...until we re-enable I/O
+ self.fs.set_clients_block(False)
+
+ # ...when it should complete promptly
+ a = time.time()
+ write_blocked.wait()
+ b = time.time()
+ recovery_time = b - a
+ log.info("recovery time: {0}".format(recovery_time))
+ self.assertLess(recovery_time, self.ms_max_backoff * 2)
+ self.assert_session_state(client_id, "open")
+
class LogStream(object):
def __init__(self):
client_b_id = client_list[1]
client_b_role = "client.{0}".format(client_b_id)
- client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_a_id)]))[0][1]
+ client_b_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(client_b_id)]))[0][1]
+
+ # Check we have at least one remote client for use with network-dependent tests
+ # =============================================================================
+ if client_a_remote.hostname in fs.get_mds_hostnames():
+ raise RuntimeError("Require first client to on separate server from MDSs")
test_dir = misc.get_testdir(ctx)
TestClientRecovery.mds_session_timeout = int(fs.mds_asok(
['config', 'get', 'mds_session_timeout']
)['mds_session_timeout'])
+ TestClientRecovery.ms_max_backoff = int(fs.mds_asok(
+ ['config', 'get', 'ms_max_backoff']
+ )['ms_max_backoff'])
TestClientRecovery.fs = fs
TestClientRecovery.mount_a = mount_a
TestClientRecovery.mount_b = mount_b
# Execute test suite
# ==================
- suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery)
+ if 'test_name' in config:
+ suite = unittest.TestLoader().loadTestsFromName(
+ "teuthology.task.mds_client_recovery.{0}".format(config['test_name']))
+ else:
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestClientRecovery)
+
if ctx.config.get("interactive-on-error", False):
InteractiveFailureResult.ctx = ctx
result_class = InteractiveFailureResult