"""
Ceph FUSE client task
"""
+
import contextlib
import logging
-import os
from teuthology import misc as teuthology
from ..orchestra import run
+from teuthology.task.cephfs.fuse_mount import FuseMount
log = logging.getLogger(__name__)
+
+def get_client_configs(ctx, config):
+ """
+ Get a map of the configuration for each FUSE client in the configuration
+ by combining the configuration of the current task with any global overrides.
+
+ :param ctx: Context instance
+ :param config: configuration for this task
+ :return: dict of client name to config or to None
+ """
+ if config is None:
+ config = dict(('client.{id}'.format(id=id_), None)
+ for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client'))
+ elif isinstance(config, list):
+ config = dict((name, None) for name in config)
+
+ overrides = ctx.config.get('overrides', {})
+ teuthology.deep_merge(config, overrides.get('ceph-fuse', {}))
+
+ return config
+
+
@contextlib.contextmanager
def task(ctx, config):
"""
:param config: Configuration
"""
log.info('Mounting ceph-fuse clients...')
- fuse_daemons = {}
testdir = teuthology.get_testdir(ctx)
-
- if config is None:
- config = dict(('client.{id}'.format(id=id_), None)
- for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client'))
- elif isinstance(config, list):
- config = dict((name, None) for name in config)
-
- overrides = ctx.config.get('overrides', {})
- teuthology.deep_merge(config, overrides.get('ceph-fuse', {}))
+ config = get_client_configs(ctx, config)
clients = list(teuthology.get_clients(ctx=ctx, roles=config.keys()))
+ fuse_mounts = []
for id_, remote in clients:
client_config = config.get("client.%s" % id_)
if client_config is None:
client_config = {}
- log.info("Client client.%s config is %s" % (id_, client_config))
-
- daemon_signal = 'kill'
- if client_config.get('coverage') or client_config.get('valgrind') is not None:
- daemon_signal = 'term'
-
- mnt = os.path.join(testdir, 'mnt.{id}'.format(id=id_))
- log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
- id=id_, remote=remote,mnt=mnt))
-
- remote.run(
- args=[
- 'mkdir',
- '--',
- mnt,
- ],
- )
-
- run_cmd=[
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'daemon-helper',
- daemon_signal,
- ]
- run_cmd_tail=[
- 'ceph-fuse',
- '-f',
- '--name', 'client.{id}'.format(id=id_),
- # TODO ceph-fuse doesn't understand dash dash '--',
- mnt,
- ]
-
- if client_config.get('valgrind') is not None:
- run_cmd = teuthology.get_valgrind_args(
- testdir,
- 'client.{id}'.format(id=id_),
- run_cmd,
- client_config.get('valgrind'),
- )
-
- run_cmd.extend(run_cmd_tail)
-
- proc = remote.run(
- args=run_cmd,
- logger=log.getChild('ceph-fuse.{id}'.format(id=id_)),
- stdin=run.PIPE,
- wait=False,
- )
- fuse_daemons[id_] = proc
- for id_, remote in clients:
- mnt = os.path.join(testdir, 'mnt.{id}'.format(id=id_))
- teuthology.wait_until_fuse_mounted(
- remote=remote,
- fuse=fuse_daemons[id_],
- mountpoint=mnt,
- )
- remote.run(args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=testdir, id=id_)],)
+ fuse_mount = FuseMount(client_config, testdir, id_, remote)
+ fuse_mounts.append(fuse_mount)
+
+ fuse_mount.mount()
+
+ for mount in fuse_mounts:
+ mount.wait_until_mounted()
try:
yield
finally:
log.info('Unmounting ceph-fuse clients...')
- for id_, remote in clients:
- mnt = os.path.join(testdir, 'mnt.{id}'.format(id=id_))
- try:
- remote.run(
- args=[
- 'sudo',
- 'fusermount',
- '-u',
- mnt,
- ],
- )
- except run.CommandFailedError:
- log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=remote.name))
- # abort the fuse mount, killing all hung processes
- remote.run(
- args=[
- 'if', 'test', '-e', '/sys/fs/fuse/connections/*/abort',
- run.Raw(';'), 'then',
- 'echo',
- '1',
- run.Raw('>'),
- run.Raw('/sys/fs/fuse/connections/*/abort'),
- run.Raw(';'), 'fi',
- ],
- )
- # make sure its unmounted
- remote.run(
- args=[
- 'sudo',
- 'umount',
- '-l',
- '-f',
- mnt,
- ],
- )
-
- run.wait(fuse_daemons.itervalues(), timeout=600)
-
- for id_, remote in clients:
- mnt = os.path.join(testdir, 'mnt.{id}'.format(id=id_))
- remote.run(
- args=[
- 'rmdir',
- '--',
- mnt,
- ],
- )
+ for mount in fuse_mounts:
+ mount.umount()
+
+ run.wait([m.fuse_daemon for m in fuse_mounts.values()], timeout=600)
+
+ for mount in fuse_mounts:
+ mount.cleanup()
--- /dev/null
+
+from StringIO import StringIO
+import time
+import os
+import logging
+
+from teuthology import misc
+from ...orchestra import run
+
+log = logging.getLogger(__name__)
+
+
+class FuseMount(object):
+ def __init__(self, client_config, test_dir, client_id, client_remote):
+ """
+ :param client_config: Configuration dictionary for this particular client
+ :param test_dir: Global teuthology test dir
+ :param client_id: Client ID, the 'foo' in client.foo
+ :param client_remote: Remote instance for the host where client will run
+ """
+
+ self.client_config = client_config
+ self.test_dir = test_dir
+ self.client_id = client_id
+ self.client_remote = client_remote
+ self.fuse_daemon = None
+
+ self.mountpoint = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
+
+
+ def mount(self):
+ log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
+
+ daemon_signal = 'kill'
+ if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
+ daemon_signal = 'term'
+
+ mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
+ log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
+ id=self.client_id, remote=self.client_remote, mnt=mnt))
+
+ self.client_remote.run(
+ args=[
+ 'mkdir',
+ '--',
+ mnt,
+ ],
+ )
+
+ run_cmd = [
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=self.test_dir),
+ 'daemon-helper',
+ daemon_signal,
+ ]
+ run_cmd_tail = [
+ 'ceph-fuse',
+ '-f',
+ '--name', 'client.{id}'.format(id=self.client_id),
+ # TODO ceph-fuse doesn't understand dash dash '--',
+ mnt,
+ ]
+
+ if self.client_config.get('valgrind') is not None:
+ run_cmd = misc.get_valgrind_args(
+ self.test_dir,
+ 'client.{id}'.format(id=self.client_id),
+ run_cmd,
+ self.client_config.get('valgrind'),
+ )
+
+ run_cmd.extend(run_cmd_tail)
+
+ proc = self.client_remote.run(
+ args=run_cmd,
+ logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
+ stdin=run.PIPE,
+ wait=False,
+ )
+ self.fuse_daemon = proc
+
+ def is_mounted(self):
+ proc = self.client_remote.run(
+ args=[
+ 'stat',
+ '--file-system',
+ '--printf=%T\n',
+ '--',
+ self.mountpoint,
+ ],
+ stdout=StringIO(),
+ )
+ fstype = proc.stdout.getvalue().rstrip('\n')
+ if fstype == 'fuseblk':
+ log.info('ceph-fuse is mounted on %s', self.mountpoint)
+ return True
+ else:
+ log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
+ fstype=fstype))
+
+ def wait_until_mounted(self):
+ """
+ Check to make sure that fuse is mounted on mountpoint. If not,
+ sleep for 5 seconds and check again.
+ """
+
+ while not self.is_mounted():
+ # Even if it's not mounted, it should at least
+ # be running: catch simple failures where it has terminated.
+ assert not self.fuse_daemon.poll()
+
+ time.sleep(5)
+
+ # Now that we're mounted, set permissions so that the rest of the test will have
+ # unrestricted access to the filesystem mount.
+ self.client_remote.run(
+ args=['sudo', 'chmod', '1777', '{tdir}/mnt.{id}'.format(tdir=self.test_dir, id=self.client_id)], )
+
+ def umount(self):
+ mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
+ try:
+ self.client_remote.run(
+ args=[
+ 'sudo',
+ 'fusermount',
+ '-u',
+ mnt,
+ ],
+ )
+ except run.CommandFailedError:
+ log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
+ # abort the fuse mount, killing all hung processes
+ self.client_remote.run(
+ args=[
+ 'if', 'test', '-e', '/sys/fs/fuse/connections/*/abort',
+ run.Raw(';'), 'then',
+ 'echo',
+ '1',
+ run.Raw('>'),
+ run.Raw('/sys/fs/fuse/connections/*/abort'),
+ run.Raw(';'), 'fi',
+ ],
+ )
+ # make sure its unmounted
+ self.client_remote.run(
+ args=[
+ 'sudo',
+ 'umount',
+ '-l',
+ '-f',
+ mnt,
+ ],
+ )
+
+ def cleanup(self):
+ """
+ Remove the mount point.
+
+ Prerequisite: the client is not mounted.
+ """
+ mnt = os.path.join(self.test_dir, 'mnt.{id}'.format(id=self.client_id))
+ self.client_remote.run(
+ args=[
+ 'rmdir',
+ '--',
+ mnt,
+ ],
+ )