self.id = None
self.inst = None
self.addr = None
+ self.mount_timeout = int(self.client_config.get('mount_timeout', 30))
+
+ self._mount_bin = [
+ 'ceph-fuse', "-f",
+ "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"]
+ self._mount_cmd_cwd = self.test_dir
+ if self.client_config.get('valgrind') is not None:
+ self.cwd = None # get_valgrind_args chdir for us
+ self._mount_cmd_logger = log.getChild('ceph-fuse.{id}'.format(id=self.client_id))
+ self._mount_cmd_stdin = run.PIPE
def mount(self, mntopts=[], check_status=True, **kwargs):
self.update_attrs(**kwargs)
log.info("Client client.%s config is %s" % (self.client_id,
self.client_config))
- daemon_signal = 'kill'
- if self.client_config.get('coverage') or \
- self.client_config.get('valgrind') is not None:
- daemon_signal = 'term'
+ self._create_mntpt()
+
+ retval = self._run_mount_cmd(mntopts, check_status)
+ if retval:
+ return retval
+
+ self.gather_mount_info()
+
+ self.mounted = True
+ def _create_mntpt(self):
+ stderr = StringIO()
# Use 0000 mode to prevent undesired modifications to the mountpoint on
# the local file system.
script = f'mkdir -m 0000 -p -v {self.hostfs_mntpt}'.split()
- stderr = StringIO()
try:
self.client_remote.run(args=script, timeout=(15*60),
- stderr=StringIO())
+ stderr=stderr)
except CommandFailedError:
if 'file exists' not in stderr.getvalue().lower():
raise
- run_cmd = [
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=self.test_dir),
- 'daemon-helper',
- daemon_signal,
- ]
+ def _run_mount_cmd(self, mntopts, check_status):
+ mount_cmd = self._get_mount_cmd(mntopts)
+ mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
- fuse_cmd = [
- 'ceph-fuse', "-f",
- "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok",
- ]
- if self.client_id is not None:
- fuse_cmd += ['--id', self.client_id]
- if self.client_keyring_path and self.client_id is not None:
- fuse_cmd += ['-k', self.client_keyring_path]
- if self.cephfs_mntpt is not None:
- fuse_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
- if self.cephfs_name is not None:
- fuse_cmd += ["--client_fs=" + self.cephfs_name]
+ # Before starting ceph-fuse process, note the contents of
+ # /sys/fs/fuse/connections
+ pre_mount_conns = self._list_fuse_conns()
+ log.info("Pre-mount connections: {0}".format(pre_mount_conns))
+
+ self.fuse_daemon = self.client_remote.run(
+ args=mount_cmd,
+ cwd=self._mount_cmd_cwd,
+ logger=self._mount_cmd_logger,
+ stdin=self._mount_cmd_stdin,
+ stdout=mountcmd_stdout,
+ stderr=mountcmd_stderr,
+ wait=False
+ )
+
+ return self._wait_and_record_our_fuse_conn(
+ check_status, pre_mount_conns, mountcmd_stdout, mountcmd_stderr)
+
+ def _get_mount_cmd(self, mntopts):
+ daemon_signal = 'kill'
+ if self.client_config.get('coverage') or \
+ self.client_config.get('valgrind') is not None:
+ daemon_signal = 'term'
+
+ mount_cmd = ['sudo', 'adjust-ulimits', 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=self.test_dir),
+ 'daemon-helper', daemon_signal]
+
+ mount_cmd = self._add_valgrind_args(mount_cmd)
+ mount_cmd = ['sudo'] + self._nsenter_args + mount_cmd
+
+ mount_cmd += self._mount_bin + [self.hostfs_mntpt]
+ if self.client_id:
+ mount_cmd += ['--id', self.client_id]
+ if self.client_keyring_path and self.client_id:
+ mount_cmd += ['-k', self.client_keyring_path]
+ if self.cephfs_mntpt:
+ mount_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
+ if self.cephfs_name:
+ mount_cmd += ["--client_fs=" + self.cephfs_name]
if mntopts:
- fuse_cmd += mntopts
- fuse_cmd.append(self.hostfs_mntpt)
+ mount_cmd += mntopts
+ return mount_cmd
+
+ @property
+ def _nsenter_args(self):
+ return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
+
+ def _add_valgrind_args(self, mount_cmd):
if self.client_config.get('valgrind') is not None:
- run_cmd = get_valgrind_args(
+ mount_cmd = get_valgrind_args(
self.test_dir,
'client.{id}'.format(id=self.client_id),
- run_cmd,
+ mount_cmd,
self.client_config.get('valgrind'),
cd=False
)
- netns_prefix = ['sudo', 'nsenter',
- '--net=/var/run/netns/{0}'.format(self.netns_name)]
- run_cmd = netns_prefix + run_cmd
-
- run_cmd.extend(fuse_cmd)
+ return mount_cmd
- def list_connections():
- conn_dir = "/sys/fs/fuse/connections"
+ def _list_fuse_conns(self):
+ conn_dir = "/sys/fs/fuse/connections"
- self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
- check_status=False)
- self.client_remote.run(
- args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
- check_status=False, timeout=(30))
+ self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
+ check_status=False)
+ self.client_remote.run(
+ args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
+ check_status=False, timeout=(30))
- try:
- ls_str = self.client_remote.sh("ls " + conn_dir,
- stdout=StringIO(),
- timeout=(15*60)).strip()
- except CommandFailedError:
- return []
-
- if ls_str:
- return [int(n) for n in ls_str.split("\n")]
- else:
- return []
-
- # Before starting ceph-fuse process, note the contents of
- # /sys/fs/fuse/connections
- pre_mount_conns = list_connections()
- log.info("Pre-mount connections: {0}".format(pre_mount_conns))
+ try:
+ ls_str = self.client_remote.sh("ls " + conn_dir,
+ stdout=StringIO(),
+ timeout=(15*60)).strip()
+ except CommandFailedError:
+ return []
- mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
- self.fuse_daemon = self.client_remote.run(
- args=run_cmd,
- logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
- stdin=run.PIPE,
- stdout=mountcmd_stdout,
- stderr=mountcmd_stderr,
- wait=False
- )
+ if ls_str:
+ return [int(n) for n in ls_str.split("\n")]
+ else:
+ return []
- # Wait for the connection reference to appear in /sys
- mount_wait = self.client_config.get('mount_wait', 0)
- if mount_wait > 0:
- log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
- time.sleep(mount_wait)
- timeout = int(self.client_config.get('mount_timeout', 30))
+ def _wait_and_record_our_fuse_conn(self, check_status, pre_mount_conns,
+ mountcmd_stdout, mountcmd_stderr):
+ """
+ Wait for the connection reference to appear in /sys
+ """
waited = 0
- post_mount_conns = list_connections()
+ post_mount_conns = self._list_fuse_conns()
while len(post_mount_conns) <= len(pre_mount_conns):
if self.fuse_daemon.finished:
# Did mount fail? Raise the CommandFailedError instead of
mountcmd_stderr.getvalue())
time.sleep(1)
waited += 1
- if waited > timeout:
+ if waited > self._fuse_conn_check_timeout:
raise RuntimeError(
"Fuse mount failed to populate/sys/ after {} "
"seconds".format(waited))
else:
- post_mount_conns = list_connections()
+ post_mount_conns = self._list_fuse_conns()
log.info("Post-mount connections: {0}".format(post_mount_conns))
- # Record our fuse connection number so that we can use it when
- # forcing an unmount
+ self._record_our_fuse_conn(pre_mount_conns, post_mount_conns)
+
+ @property
+ def _fuse_conn_check_timeout(self):
+ mount_wait = self.client_config.get('mount_wait', 0)
+ if mount_wait > 0:
+ log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
+ time.sleep(mount_wait)
+ timeout = int(self.client_config.get('mount_timeout', 30))
+ return timeout
+
+ def _record_our_fuse_conn(self, pre_mount_conns, post_mount_conns):
+ """
+ Record our fuse connection number so that we can use it when forcing
+ an unmount.
+ """
new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
if len(new_conns) == 0:
raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
else:
self._fuse_conn = new_conns[0]
- self.gather_mount_info()
-
- self.mounted = True
-
def gather_mount_info(self):
status = self.admin_socket(['status'])
self.id = status['id']