From 21cbda91f1c005b76b5fceac6401813d1fda9af1 Mon Sep 17 00:00:00 2001 From: Rishabh Dave Date: Thu, 15 Oct 2020 12:17:47 +0530 Subject: [PATCH] qa/cephfs: refactor FuseMount.mount method Make FuseMount.mount() such that it can LocalFuseMount can reuse it instead of duplicating it. For this, move subtasks required for mounting Ceph FS (like creating mountpoint directory) to a different method, so that these methods can be overriden in LocalFuseMount as per requirement. Signed-off-by: Rishabh Dave --- qa/tasks/cephfs/fuse_mount.py | 199 ++++++++++++++++++++-------------- 1 file changed, 117 insertions(+), 82 deletions(-) diff --git a/qa/tasks/cephfs/fuse_mount.py b/qa/tasks/cephfs/fuse_mount.py index 63af6248808..c58a427e837 100644 --- a/qa/tasks/cephfs/fuse_mount.py +++ b/qa/tasks/cephfs/fuse_mount.py @@ -30,6 +30,16 @@ class FuseMount(CephFSMount): self.id = None self.inst = None self.addr = None + self.mount_timeout = int(self.client_config.get('mount_timeout', 30)) + + self._mount_bin = [ + 'ceph-fuse', "-f", + "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"] + self._mount_cmd_cwd = self.test_dir + if self.client_config.get('valgrind') is not None: + self.cwd = None # get_valgrind_args chdir for us + self._mount_cmd_logger = log.getChild('ceph-fuse.{id}'.format(id=self.client_id)) + self._mount_cmd_stdin = run.PIPE def mount(self, mntopts=[], check_status=True, **kwargs): self.update_attrs(**kwargs) @@ -52,107 +62,122 @@ class FuseMount(CephFSMount): log.info("Client client.%s config is %s" % (self.client_id, self.client_config)) - daemon_signal = 'kill' - if self.client_config.get('coverage') or \ - self.client_config.get('valgrind') is not None: - daemon_signal = 'term' + self._create_mntpt() + + retval = self._run_mount_cmd(mntopts, check_status) + if retval: + return retval + + self.gather_mount_info() + + self.mounted = True + def _create_mntpt(self): + stderr = StringIO() # Use 0000 mode to prevent undesired modifications to the mountpoint on # the local file system. script = f'mkdir -m 0000 -p -v {self.hostfs_mntpt}'.split() - stderr = StringIO() try: self.client_remote.run(args=script, timeout=(15*60), - stderr=StringIO()) + stderr=stderr) except CommandFailedError: if 'file exists' not in stderr.getvalue().lower(): raise - run_cmd = [ - 'sudo', - 'adjust-ulimits', - 'ceph-coverage', - '{tdir}/archive/coverage'.format(tdir=self.test_dir), - 'daemon-helper', - daemon_signal, - ] + def _run_mount_cmd(self, mntopts, check_status): + mount_cmd = self._get_mount_cmd(mntopts) + mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO() - fuse_cmd = [ - 'ceph-fuse', "-f", - "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok", - ] - if self.client_id is not None: - fuse_cmd += ['--id', self.client_id] - if self.client_keyring_path and self.client_id is not None: - fuse_cmd += ['-k', self.client_keyring_path] - if self.cephfs_mntpt is not None: - fuse_cmd += ["--client_mountpoint=" + self.cephfs_mntpt] - if self.cephfs_name is not None: - fuse_cmd += ["--client_fs=" + self.cephfs_name] + # Before starting ceph-fuse process, note the contents of + # /sys/fs/fuse/connections + pre_mount_conns = self._list_fuse_conns() + log.info("Pre-mount connections: {0}".format(pre_mount_conns)) + + self.fuse_daemon = self.client_remote.run( + args=mount_cmd, + cwd=self._mount_cmd_cwd, + logger=self._mount_cmd_logger, + stdin=self._mount_cmd_stdin, + stdout=mountcmd_stdout, + stderr=mountcmd_stderr, + wait=False + ) + + return self._wait_and_record_our_fuse_conn( + check_status, pre_mount_conns, mountcmd_stdout, mountcmd_stderr) + + def _get_mount_cmd(self, mntopts): + daemon_signal = 'kill' + if self.client_config.get('coverage') or \ + self.client_config.get('valgrind') is not None: + daemon_signal = 'term' + + mount_cmd = ['sudo', 'adjust-ulimits', 'ceph-coverage', + '{tdir}/archive/coverage'.format(tdir=self.test_dir), + 'daemon-helper', daemon_signal] + + mount_cmd = self._add_valgrind_args(mount_cmd) + mount_cmd = ['sudo'] + self._nsenter_args + mount_cmd + + mount_cmd += self._mount_bin + [self.hostfs_mntpt] + if self.client_id: + mount_cmd += ['--id', self.client_id] + if self.client_keyring_path and self.client_id: + mount_cmd += ['-k', self.client_keyring_path] + if self.cephfs_mntpt: + mount_cmd += ["--client_mountpoint=" + self.cephfs_mntpt] + if self.cephfs_name: + mount_cmd += ["--client_fs=" + self.cephfs_name] if mntopts: - fuse_cmd += mntopts - fuse_cmd.append(self.hostfs_mntpt) + mount_cmd += mntopts + return mount_cmd + + @property + def _nsenter_args(self): + return ['nsenter', f'--net=/var/run/netns/{self.netns_name}'] + + def _add_valgrind_args(self, mount_cmd): if self.client_config.get('valgrind') is not None: - run_cmd = get_valgrind_args( + mount_cmd = get_valgrind_args( self.test_dir, 'client.{id}'.format(id=self.client_id), - run_cmd, + mount_cmd, self.client_config.get('valgrind'), cd=False ) - netns_prefix = ['sudo', 'nsenter', - '--net=/var/run/netns/{0}'.format(self.netns_name)] - run_cmd = netns_prefix + run_cmd - - run_cmd.extend(fuse_cmd) + return mount_cmd - def list_connections(): - conn_dir = "/sys/fs/fuse/connections" + def _list_fuse_conns(self): + conn_dir = "/sys/fs/fuse/connections" - self.client_remote.run(args=['sudo', 'modprobe', 'fuse'], - check_status=False) - self.client_remote.run( - args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir], - check_status=False, timeout=(30)) + self.client_remote.run(args=['sudo', 'modprobe', 'fuse'], + check_status=False) + self.client_remote.run( + args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir], + check_status=False, timeout=(30)) - try: - ls_str = self.client_remote.sh("ls " + conn_dir, - stdout=StringIO(), - timeout=(15*60)).strip() - except CommandFailedError: - return [] - - if ls_str: - return [int(n) for n in ls_str.split("\n")] - else: - return [] - - # Before starting ceph-fuse process, note the contents of - # /sys/fs/fuse/connections - pre_mount_conns = list_connections() - log.info("Pre-mount connections: {0}".format(pre_mount_conns)) + try: + ls_str = self.client_remote.sh("ls " + conn_dir, + stdout=StringIO(), + timeout=(15*60)).strip() + except CommandFailedError: + return [] - mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO() - self.fuse_daemon = self.client_remote.run( - args=run_cmd, - logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)), - stdin=run.PIPE, - stdout=mountcmd_stdout, - stderr=mountcmd_stderr, - wait=False - ) + if ls_str: + return [int(n) for n in ls_str.split("\n")] + else: + return [] - # Wait for the connection reference to appear in /sys - mount_wait = self.client_config.get('mount_wait', 0) - if mount_wait > 0: - log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) - time.sleep(mount_wait) - timeout = int(self.client_config.get('mount_timeout', 30)) + def _wait_and_record_our_fuse_conn(self, check_status, pre_mount_conns, + mountcmd_stdout, mountcmd_stderr): + """ + Wait for the connection reference to appear in /sys + """ waited = 0 - post_mount_conns = list_connections() + post_mount_conns = self._list_fuse_conns() while len(post_mount_conns) <= len(pre_mount_conns): if self.fuse_daemon.finished: # Did mount fail? Raise the CommandFailedError instead of @@ -168,17 +193,31 @@ class FuseMount(CephFSMount): mountcmd_stderr.getvalue()) time.sleep(1) waited += 1 - if waited > timeout: + if waited > self._fuse_conn_check_timeout: raise RuntimeError( "Fuse mount failed to populate/sys/ after {} " "seconds".format(waited)) else: - post_mount_conns = list_connections() + post_mount_conns = self._list_fuse_conns() log.info("Post-mount connections: {0}".format(post_mount_conns)) - # Record our fuse connection number so that we can use it when - # forcing an unmount + self._record_our_fuse_conn(pre_mount_conns, post_mount_conns) + + @property + def _fuse_conn_check_timeout(self): + mount_wait = self.client_config.get('mount_wait', 0) + if mount_wait > 0: + log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) + time.sleep(mount_wait) + timeout = int(self.client_config.get('mount_timeout', 30)) + return timeout + + def _record_our_fuse_conn(self, pre_mount_conns, post_mount_conns): + """ + Record our fuse connection number so that we can use it when forcing + an unmount. + """ new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) if len(new_conns) == 0: raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) @@ -187,10 +226,6 @@ class FuseMount(CephFSMount): else: self._fuse_conn = new_conns[0] - self.gather_mount_info() - - self.mounted = True - def gather_mount_info(self): status = self.admin_socket(['status']) self.id = status['id'] -- 2.39.5