]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/cephfs: refactor FuseMount.mount method
authorRishabh Dave <ridave@redhat.com>
Thu, 15 Oct 2020 06:47:47 +0000 (12:17 +0530)
committerRishabh Dave <ridave@redhat.com>
Fri, 5 Mar 2021 04:25:10 +0000 (09:55 +0530)
Make FuseMount.mount() such that it can LocalFuseMount can reuse it
instead of duplicating it. For this, move subtasks required for mounting
Ceph FS (like creating mountpoint directory) to a different method, so
that these methods can be overriden in LocalFuseMount as per
requirement.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
qa/tasks/cephfs/fuse_mount.py

index 63af624880864d0ad29cfbd83d1a09222b4d736e..c58a427e837273b706e9ee947895ce5f867a6d91 100644 (file)
@@ -30,6 +30,16 @@ class FuseMount(CephFSMount):
         self.id = None
         self.inst = None
         self.addr = None
+        self.mount_timeout = int(self.client_config.get('mount_timeout', 30))
+
+        self._mount_bin = [
+            'ceph-fuse', "-f",
+            "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"]
+        self._mount_cmd_cwd = self.test_dir
+        if self.client_config.get('valgrind') is not None:
+            self.cwd = None # get_valgrind_args chdir for us
+        self._mount_cmd_logger = log.getChild('ceph-fuse.{id}'.format(id=self.client_id))
+        self._mount_cmd_stdin = run.PIPE
 
     def mount(self, mntopts=[], check_status=True, **kwargs):
         self.update_attrs(**kwargs)
@@ -52,107 +62,122 @@ class FuseMount(CephFSMount):
         log.info("Client client.%s config is %s" % (self.client_id,
                                                     self.client_config))
 
-        daemon_signal = 'kill'
-        if self.client_config.get('coverage') or \
-           self.client_config.get('valgrind') is not None:
-            daemon_signal = 'term'
+        self._create_mntpt()
+
+        retval = self._run_mount_cmd(mntopts, check_status)
+        if retval:
+            return retval
+
+        self.gather_mount_info()
+
+        self.mounted = True
 
+    def _create_mntpt(self):
+        stderr = StringIO()
         # Use 0000 mode to prevent undesired modifications to the mountpoint on
         # the local file system.
         script = f'mkdir -m 0000 -p -v {self.hostfs_mntpt}'.split()
-        stderr = StringIO()
         try:
             self.client_remote.run(args=script, timeout=(15*60),
-                stderr=StringIO())
+                                   stderr=stderr)
         except CommandFailedError:
             if 'file exists' not in stderr.getvalue().lower():
                 raise
 
-        run_cmd = [
-            'sudo',
-            'adjust-ulimits',
-            'ceph-coverage',
-            '{tdir}/archive/coverage'.format(tdir=self.test_dir),
-            'daemon-helper',
-            daemon_signal,
-        ]
+    def _run_mount_cmd(self, mntopts, check_status):
+        mount_cmd = self._get_mount_cmd(mntopts)
+        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
 
-        fuse_cmd = [
-            'ceph-fuse', "-f",
-            "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok",
-        ]
-        if self.client_id is not None:
-            fuse_cmd += ['--id', self.client_id]
-        if self.client_keyring_path and self.client_id is not None:
-            fuse_cmd += ['-k', self.client_keyring_path]
-        if self.cephfs_mntpt is not None:
-            fuse_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
-        if self.cephfs_name is not None:
-            fuse_cmd += ["--client_fs=" + self.cephfs_name]
+        # Before starting ceph-fuse process, note the contents of
+        # /sys/fs/fuse/connections
+        pre_mount_conns = self._list_fuse_conns()
+        log.info("Pre-mount connections: {0}".format(pre_mount_conns))
+
+        self.fuse_daemon = self.client_remote.run(
+            args=mount_cmd,
+            cwd=self._mount_cmd_cwd,
+            logger=self._mount_cmd_logger,
+            stdin=self._mount_cmd_stdin,
+            stdout=mountcmd_stdout,
+            stderr=mountcmd_stderr,
+            wait=False
+        )
+
+        return self._wait_and_record_our_fuse_conn(
+            check_status, pre_mount_conns, mountcmd_stdout, mountcmd_stderr)
+
+    def _get_mount_cmd(self, mntopts):
+        daemon_signal = 'kill'
+        if self.client_config.get('coverage') or \
+           self.client_config.get('valgrind') is not None:
+            daemon_signal = 'term'
+
+        mount_cmd = ['sudo', 'adjust-ulimits', 'ceph-coverage',
+                     '{tdir}/archive/coverage'.format(tdir=self.test_dir),
+                     'daemon-helper', daemon_signal]
+
+        mount_cmd = self._add_valgrind_args(mount_cmd)
+        mount_cmd = ['sudo'] + self._nsenter_args + mount_cmd
+
+        mount_cmd += self._mount_bin + [self.hostfs_mntpt]
+        if self.client_id:
+            mount_cmd += ['--id', self.client_id]
+        if self.client_keyring_path and self.client_id:
+            mount_cmd += ['-k', self.client_keyring_path]
+        if self.cephfs_mntpt:
+            mount_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
+        if self.cephfs_name:
+            mount_cmd += ["--client_fs=" + self.cephfs_name]
         if mntopts:
-            fuse_cmd += mntopts
-        fuse_cmd.append(self.hostfs_mntpt)
+            mount_cmd += mntopts
 
+        return mount_cmd
+
+    @property
+    def _nsenter_args(self):
+        return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
+
+    def _add_valgrind_args(self, mount_cmd):
         if self.client_config.get('valgrind') is not None:
-            run_cmd = get_valgrind_args(
+            mount_cmd = get_valgrind_args(
                 self.test_dir,
                 'client.{id}'.format(id=self.client_id),
-                run_cmd,
+                mount_cmd,
                 self.client_config.get('valgrind'),
                 cd=False
             )
 
-        netns_prefix = ['sudo', 'nsenter',
-                        '--net=/var/run/netns/{0}'.format(self.netns_name)]
-        run_cmd = netns_prefix + run_cmd
-
-        run_cmd.extend(fuse_cmd)
+        return mount_cmd
 
-        def list_connections():
-            conn_dir = "/sys/fs/fuse/connections"
+    def _list_fuse_conns(self):
+        conn_dir = "/sys/fs/fuse/connections"
 
-            self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
-                                   check_status=False)
-            self.client_remote.run(
-                args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
-                check_status=False, timeout=(30))
+        self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
+                               check_status=False)
+        self.client_remote.run(
+            args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
+            check_status=False, timeout=(30))
 
-            try:
-                ls_str = self.client_remote.sh("ls " + conn_dir,
-                                               stdout=StringIO(),
-                                               timeout=(15*60)).strip()
-            except CommandFailedError:
-                return []
-
-            if ls_str:
-                return [int(n) for n in ls_str.split("\n")]
-            else:
-                return []
-
-        # Before starting ceph-fuse process, note the contents of
-        # /sys/fs/fuse/connections
-        pre_mount_conns = list_connections()
-        log.info("Pre-mount connections: {0}".format(pre_mount_conns))
+        try:
+            ls_str = self.client_remote.sh("ls " + conn_dir,
+                                           stdout=StringIO(),
+                                           timeout=(15*60)).strip()
+        except CommandFailedError:
+            return []
 
-        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
-        self.fuse_daemon = self.client_remote.run(
-            args=run_cmd,
-            logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
-            stdin=run.PIPE,
-            stdout=mountcmd_stdout,
-            stderr=mountcmd_stderr,
-            wait=False
-        )
+        if ls_str:
+            return [int(n) for n in ls_str.split("\n")]
+        else:
+            return []
 
-        # Wait for the connection reference to appear in /sys
-        mount_wait = self.client_config.get('mount_wait', 0)
-        if mount_wait > 0:
-            log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
-            time.sleep(mount_wait)            
-        timeout = int(self.client_config.get('mount_timeout', 30))
+    def _wait_and_record_our_fuse_conn(self, check_status, pre_mount_conns,
+                                       mountcmd_stdout, mountcmd_stderr):
+        """
+        Wait for the connection reference to appear in /sys
+        """
         waited = 0
 
-        post_mount_conns = list_connections()
+        post_mount_conns = self._list_fuse_conns()
         while len(post_mount_conns) <= len(pre_mount_conns):
             if self.fuse_daemon.finished:
                 # Did mount fail?  Raise the CommandFailedError instead of
@@ -168,17 +193,31 @@ class FuseMount(CephFSMount):
                                 mountcmd_stderr.getvalue())
             time.sleep(1)
             waited += 1
-            if waited > timeout:
+            if waited > self._fuse_conn_check_timeout:
                 raise RuntimeError(
                     "Fuse mount failed to populate/sys/ after {} "
                     "seconds".format(waited))
             else:
-                post_mount_conns = list_connections()
+                post_mount_conns = self._list_fuse_conns()
 
         log.info("Post-mount connections: {0}".format(post_mount_conns))
 
-        # Record our fuse connection number so that we can use it when
-        # forcing an unmount
+        self._record_our_fuse_conn(pre_mount_conns, post_mount_conns)
+
+    @property
+    def _fuse_conn_check_timeout(self):
+        mount_wait = self.client_config.get('mount_wait', 0)
+        if mount_wait > 0:
+            log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
+            time.sleep(mount_wait)
+        timeout = int(self.client_config.get('mount_timeout', 30))
+        return timeout
+
+    def _record_our_fuse_conn(self, pre_mount_conns, post_mount_conns):
+        """
+        Record our fuse connection number so that we can use it when forcing
+        an unmount.
+        """
         new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
         if len(new_conns) == 0:
             raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
@@ -187,10 +226,6 @@ class FuseMount(CephFSMount):
         else:
             self._fuse_conn = new_conns[0]
 
-        self.gather_mount_info()
-
-        self.mounted = True
-
     def gather_mount_info(self):
         status = self.admin_socket(['status'])
         self.id = status['id']