]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa: add quiesce protocol unit tests
authorPatrick Donnelly <pdonnell@redhat.com>
Wed, 24 Jan 2024 02:26:46 +0000 (21:26 -0500)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 20 Mar 2024 14:56:56 +0000 (10:56 -0400)
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
Fixes: https://tracker.ceph.com/issues/63664
qa/tasks/cephfs/filesystem.py
qa/tasks/cephfs/test_quiesce.py [new file with mode: 0644]

index 4532aaf280c5699f43ca3fec3bb4bfe83544c59f..f952a2740a799303892dea6167a6f6dcf0d977b0 100644 (file)
@@ -1314,7 +1314,9 @@ class Filesystem(MDSCluster):
         info = self.get_rank(rank=rank, status=status)
         return self.json_asok(command, 'mds', info['name'], timeout=timeout)
 
-    def rank_tell(self, command, rank=0, status=None, timeout=120):
+    def rank_tell(self, command, rank=None, status=None, timeout=120):
+        if rank is None:
+            rank = 0
         try:
             out = self.get_ceph_cmd_stdout("tell", f"mds.{self.id}:{rank}", *command, timeout=timeout)
             return json.loads(out)
@@ -1778,3 +1780,24 @@ class Filesystem(MDSCluster):
             return result
         else:
             return self.rank_tell(['damage', 'ls'], rank=rank)
+
+    def get_ops(self, locks=False, path=None, rank=None, status=None):
+        name = self.get_rank(rank=rank, status=status)['name']
+        cmd = ['ops']
+        if locks:
+            cmd.append('--flags=locks')
+        if path:
+            cmd.append(f'--path={path}')
+        J = self.rank_tell(cmd, rank=rank)
+        if path:
+            mds_remote = self.mon_manager.find_remote('mds', name)
+            blob = misc.get_file(mds_remote, path, sudo=True).decode('utf-8')
+            log.debug(f"read {len(blob)}B of ops")
+            J = json.loads(blob)
+        return J
+
+    def get_op(self, reqid, rank=None):
+        return self.rank_tell(['op', 'get', reqid], rank=rank)
+
+    def kill_op(self, reqid, rank=None):
+        return self.rank_tell(['op', 'kill', reqid], rank=rank)
diff --git a/qa/tasks/cephfs/test_quiesce.py b/qa/tasks/cephfs/test_quiesce.py
new file mode 100644 (file)
index 0000000..12f4f79
--- /dev/null
@@ -0,0 +1,623 @@
+import json
+import logging
+import os
+import re
+import secrets
+import tempfile
+import unittest
+from io import StringIO
+import os.path
+from time import sleep
+
+from teuthology.contextutil import safe_while
+
+from tasks.cephfs.cephfs_test_case import CephFSTestCase
+
+log = logging.getLogger(__name__)
+
+INODE_RE = re.compile(r'\[inode 0x([0-9a-fA-F]+)')
+FP_RE = re.compile(r'fp=#0x([0-9a-fA-F]+)(\S*)')
+
+# MDS uses linux defines:
+S_IFMT   = 0o0170000
+S_IFSOCK =  0o140000
+S_IFLNK  =  0o120000
+S_IFREG  =  0o100000
+S_IFBLK  =  0o060000
+S_IFDIR  =  0o040000
+S_IFCHR  =  0o020000
+S_IFIFO  =  0o010000
+S_ISUID  =  0o004000
+S_ISGID  =  0o002000
+S_ISVTX  =  0o001000
+
+class QuiesceTestCase(CephFSTestCase):
+    """
+    Test case for quiescing subvolumes.
+    """
+
+    CLIENTS_REQUIRED = 2
+    MDSS_REQUIRED = 1
+
+    QUIESCE_SUBVOLUME = "subvol_quiesce"
+
+    def setUp(self):
+        super().setUp()
+        self.run_ceph_cmd(f'fs subvolume create {self.fs.name} {self.QUIESCE_SUBVOLUME} --mode=777')
+        p = self.run_ceph_cmd(f'fs subvolume getpath {self.fs.name} {self.QUIESCE_SUBVOLUME}', stdout=StringIO())
+        self.mntpnt = p.stdout.getvalue().strip()
+        self.subvolume = self.mntpnt
+
+    def tearDown(self):
+        # restart fs so quiesce commands clean up and commands are left unkillable
+        self.fs.fail()
+        self.fs.set_joinable()
+        self.fs.wait_for_daemons()
+        super().tearDown()
+
+    def _configure_subvolume(self):
+        for m in self.mounts:
+            m.umount_wait()
+        for m in self.mounts:
+            m.update_attrs(cephfs_mntpt = self.mntpnt)
+            m.mount()
+
+    CLIENT_WORKLOAD = """
+        set -ex
+        pushd `mktemp -d -p .`
+        cp -a /usr .
+        popd
+    """
+    def _client_background_workload(self):
+       for m in self.mounts:
+           p = m.run_shell_payload(self.CLIENT_WORKLOAD, wait=False, stderr=StringIO(), timeout=1)
+           m.background_procs.append(p)
+
+    def _wait_for_quiesce_complete(self, reqid, rank=0, path=None):
+        op = None
+        try:
+            with safe_while(sleep=1, tries=120, action='wait for quiesce completion') as proceed:
+                while proceed():
+                    op = self.fs.get_op(reqid, rank=rank)
+                    log.debug(f"op:\n{op}")
+                    self.assertEqual(op['type_data']['op_name'], 'quiesce_path')
+                    if op['type_data']['flag_point'] in (self.FP_QUIESCE_COMPLETE, self.FP_QUIESCE_COMPLETE_NON_AUTH_TREE):
+                        return op
+        except:
+            log.info(f"op:\n{op}")
+            if path is not None:
+                cache = self.fs.read_cache(path, rank=rank)
+                (fd, path) = tempfile.mkstemp()
+                with os.fdopen(fd, "wt") as f:
+                    f.write(f"{json.dumps(cache, indent=2)}")
+                    log.error(f"cache written to {path}")
+            (fd, path) = tempfile.mkstemp()
+            with os.fdopen(fd, "wt") as f:
+                ops = self.fs.get_ops(locks=True, rank=rank)
+                f.write(f"{json.dumps(ops, indent=2)}")
+                log.error(f"ops written to {path}")
+            raise
+
+    FP_QUIESCE_COMPLETE = 'quiesce complete'
+    FP_QUIESCE_BLOCKED = 'quiesce blocked'
+    FP_QUIESCE_COMPLETE_NON_AUTH = 'quiesce complete for non-auth inode'
+    FP_QUIESCE_COMPLETE_NON_AUTH_TREE = 'quiesce complete for non-auth tree'
+    def _verify_quiesce(self, rank=0, root=None, splitauth=False):
+        if root is None:
+            root = self.subvolume
+
+        name = self.fs.get_rank(rank=rank)['name']
+        root_ino = self.fs.read_cache(root, depth=0, rank=rank)[0]['ino']
+        ops = self.fs.get_ops(locks=True, rank=rank, path=f"/tmp/mds.{rank}-ops")
+        quiesce_inode_ops = {}
+        skipped_nonauth = False
+
+        count_q = 0
+        count_qb = 0
+        count_qna = 0
+
+        for op in ops['ops']:
+            try:
+                log.debug(f"op = {op}")
+                type_data = op['type_data']
+                flag_point = type_data['flag_point']
+                op_type = type_data['op_type']
+                if op_type == 'client_request' or op_type == 'peer_request':
+                    continue
+                op_name = type_data['op_name']
+                if op_name == "quiesce_path":
+                    self.assertIn(flag_point, (self.FP_QUIESCE_COMPLETE, self.FP_QUIESCE_COMPLETE_NON_AUTH_TREE))
+                    if flag_point == self.FP_QUIESCE_COMPLETE_NON_AUTH_TREE:
+                        skipped_nonauth = True
+                elif op_name == "quiesce_inode":
+                    # get the inode number
+                    op_description = op['description']
+                    m = FP_RE.search(op_description)
+                    self.assertIsNotNone(m)
+                    if len(m.group(2)) == 0:
+                        ino = int(m.group(1), 16)
+                    else:
+                        self.assertEqual(int(m.group(1)), 1)
+                        fp = m.group(2)
+                        dump = self.fs.read_cache(fp, depth=0, rank=rank)
+                        ino = dump[0]['ino']
+                    self.assertNotIn(ino, quiesce_inode_ops)
+
+                    self.assertIn(flag_point, (self.FP_QUIESCE_COMPLETE, self.FP_QUIESCE_BLOCKED, self.FP_QUIESCE_COMPLETE_NON_AUTH))
+
+                    locks = type_data['locks']
+                    if flag_point == self.FP_QUIESCE_BLOCKED:
+                        count_qb += 1
+                        self.assertEqual(locks, [])
+                    elif flag_point == self.FP_QUIESCE_COMPLETE_NON_AUTH:
+                        count_qna += 1
+                        #self.assertEqual(len(locks), 1)
+                        #lock = locks[0]
+                        #lock_type = lock['lock']['type']
+                        #self.assertEqual(lock_type, "iauth")
+                        #object_string = lock['object_string']
+                        #m = INODE_RE.match(object_string)
+                        #self.assertIsNotNone(m)
+                        #self.assertEqual(ino, int(m.group(1), 16))
+                    else:
+                        count_q += 1
+                        for lock in locks:
+                            lock_type = lock['lock']['type']
+                            if lock_type.startswith('i'):
+                                object_string = lock['object_string']
+                                m = INODE_RE.match(object_string)
+                                self.assertIsNotNone(m)
+                                self.assertEqual(ino, int(m.group(1), 16))
+                        self.assertIsNotNone(ino)
+                    quiesce_inode_ops[ino] = op
+            except:
+                log.error(f"op:\n{json.dumps(op, indent=2)}")
+                raise
+
+        log.info(f"q = {count_q}; qb = {count_qb}; qna = {count_qna}")
+
+        if skipped_nonauth:
+            return
+
+        for ino, op in quiesce_inode_ops.items():
+            log.debug(f"{ino}: {op['description']}")
+
+        # now verify all files in cache have an op
+        cache = self.fs.read_cache(root, rank=rank)
+        visited = set()
+        locks_expected = set([
+          "iquiesce",
+          "isnap",
+          "ipolicy",
+          "ifile",
+          "inest",
+          "idft",
+          "iauth",
+          "ilink",
+          "ixattr",
+        ])
+        for inode in cache:
+            ino = inode['ino']
+            visited.add(ino)
+            mode = inode['mode']
+            self.assertIn(ino, quiesce_inode_ops)
+            op = quiesce_inode_ops[ino]
+            type_data = op['type_data']
+            flag_point = type_data['flag_point']
+            try:
+                locks_seen = set()
+                lock_type = None
+                op_name = type_data['op_name']
+                for lock in op['type_data']['locks']:
+                    lock_type = lock['lock']['type']
+                    if lock_type == "iquiesce":
+                        if ino == root_ino:
+                            self.assertEqual(lock['flags'], 1)
+                            self.assertEqual(lock['lock']['state'], 'sync')
+                        else:
+                            self.assertEqual(lock['flags'], 4)
+                            self.assertEqual(lock['lock']['state'], 'xlock')
+                    elif lock_type == "isnap":
+                        self.assertEqual(lock['flags'], 1)
+                        self.assertEqual(lock['lock']['state'][:4], 'sync')
+                    elif lock_type == "ifile":
+                        self.assertEqual(lock['flags'], 1)
+                        self.assertEqual(lock['lock']['state'][:4], 'sync')
+                    elif lock_type in ("ipolicy", "inest", "idft", "iauth", "ilink", "ixattr"):
+                        self.assertEqual(lock['flags'], 1)
+                        self.assertEqual(lock['lock']['state'][:4], 'sync')
+                    else:
+                        # no iflock
+                        self.assertFalse(lock_type.startswith("i"))
+                    if flag_point == self.FP_QUIESCE_COMPLETE and lock_type.startswith("i"):
+                        #if op_name == "quiesce_inode":
+                        #    self.assertTrue(lock['object']['is_auth'])
+                        locks_seen.add(lock_type)
+                try:
+                    if flag_point == self.FP_QUIESCE_BLOCKED:
+                        self.assertTrue(inode['quiesce_block'])
+                        self.assertEqual(set(), locks_seen)
+                    elif flag_point == self.FP_QUIESCE_COMPLETE_NON_AUTH:
+                        self.assertFalse(inode['quiesce_block'])
+                        self.assertEqual(set(), locks_seen)
+                    else:
+                        self.assertFalse(inode['quiesce_block'])
+                        self.assertEqual(locks_expected, locks_seen)
+                except:
+                    log.error(f"{sorted(locks_expected)} != {sorted(locks_seen)}")
+                    raise
+            except:
+                log.error(f"inode:\n{json.dumps(inode, indent=2)}")
+                log.error(f"op:\n{json.dumps(op, indent=2)}")
+                log.error(f"lock_type: {lock_type}")
+                raise
+        try:
+            self.assertEqual(visited, quiesce_inode_ops.keys())
+        except:
+            log.error(f"cache:\n{json.dumps(cache, indent=2)}")
+            log.error(f"ops:\n{json.dumps(quiesce_inode_ops, indent=2)}")
+
+        # check request/cap count is stopped
+        # count inodes under /usr and count subops!
+
+    def reqid_tostr(self, reqid):
+        return f"{reqid['entity']['type']}.{reqid['entity']['num']}:{reqid['tid']}"
+
+class TestQuiesce(QuiesceTestCase):
+    """
+    Single rank functional tests.
+    """
+
+    def test_quiesce_path_workload(self):
+        """
+        That a quiesce op can be created and verified while a workload is running.
+        """
+
+        self._configure_subvolume()
+        self._client_background_workload()
+
+        sleep(secrets.randbelow(30)+10)
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume])
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+
+        self._verify_quiesce()
+
+    def test_quiesce_path_snap(self):
+        """
+        That a snapshot can be taken on a quiesced subvolume.
+        """
+
+        self._configure_subvolume()
+        self._client_background_workload()
+
+        sleep(secrets.randbelow(30)+10)
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume])
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+
+        #path = os.path.normpath(os.path.join(self.mntpnt, ".."))
+        #p = self.fs.run_client_payload(f"mkdir {path}/.snap/foo && ls {path}/.snap/", stdout=StringIO())
+        #p = self.mount_a.run_shell_payload(f"mkdir ../.snap/foo && ls ../.snap/", stdout=StringIO())
+        self.run_ceph_cmd(f'fs subvolume snapshot create {self.fs.name} {self.QUIESCE_SUBVOLUME} foo')
+        p = self.run_ceph_cmd(f'fs subvolume snapshot ls {self.fs.name} {self.QUIESCE_SUBVOLUME}', stdout=StringIO())
+        log.debug(f"{p.stdout.getvalue()}")
+
+    def test_quiesce_path_create(self):
+        """
+        That a quiesce op can be created and verified.
+        """
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume])
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+        self._verify_quiesce()
+
+    def test_quiesce_path_kill(self):
+        """
+        That killing a quiesce op also kills its subops
+        ("quiesce_inode").
+        """
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume])
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+        self._verify_quiesce()
+        ops = self.fs.get_ops()
+        quiesce_inode = 0
+        for op in ops['ops']:
+            op_name = op['type_data'].get('op_name', None)
+            if op_name == "quiesce_inode":
+                quiesce_inode += 1
+        log.debug(f"there are {quiesce_inode} quiesce_path_inode requests")
+        self.assertLess(0, quiesce_inode)
+        J = self.fs.kill_op(reqid)
+        log.debug(f"{J}")
+        ops = self.fs.get_ops()
+        for op in ops['ops']:
+            op_name = op['type_data'].get('op_name', None)
+            self.assertNotIn(op_name, ('quiesce_path', 'quiesce_inode'))
+
+    def test_quiesce_path_release(self):
+        """
+        That killing the quiesce op properly releases the subvolume so that
+        client IO proceeds.
+        """
+
+        self._configure_subvolume()
+        self._client_background_workload()
+
+        P = self.fs.rank_tell(["ops"])
+        log.debug(f"{P}")
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume])
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+
+        P = self.fs.rank_tell(["ops"])
+        log.debug(f"{P}")
+
+        self.fs.kill_op(reqid)
+
+        P = self.fs.rank_tell(["perf", "dump"])
+        log.debug(f"{P}")
+        requests = P['mds']['request']
+        replies = P['mds']['reply']
+        grants = P['mds']['ceph_cap_op_grant']
+
+        def resumed():
+            P = self.fs.rank_tell(["perf", "dump"])
+            log.debug(f"{P}")
+            try:
+                self.assertLess(requests, P['mds']['request'])
+                self.assertLess(replies, P['mds']['reply'])
+                self.assertLess(grants, P['mds']['ceph_cap_op_grant'])
+                return True
+            except AssertionError:
+                return False
+
+        self.wait_until_true(resumed, 60)
+
+        P = self.fs.rank_tell(["ops"])
+        log.debug(f"{P}")
+
+    def test_quiesce_path_link_terminal(self):
+        """
+        That quiesce on path with an terminal link fails with ENOTDIR even
+        pointing to a valid subvolume.
+        """
+
+        self._configure_subvolume()
+
+        self.mount_a.run_shell_payload("ln -s ../.. subvol_quiesce")
+        path = self.mount_a.cephfs_mntpt + "/subvol_quiesce"
+
+        J = self.fs.rank_tell(["quiesce", "path", path, '--wait'])
+        log.debug(f"{J}")
+        self.assertEqual(J['op']['result'], -20) # ENOTDIR: the link is not a directory
+
+    def test_quiesce_path_link_intermediate(self):
+        """
+        That quiesce on path with an intermediate link fails with ENOTDIR.
+        """
+
+        self._configure_subvolume()
+
+        self.mount_a.run_shell_payload("ln -s ../../.. _nogroup")
+        path = self.mount_a.cephfs_mntpt + "/_nogroup/" + self.QUIESCE_SUBVOLUME
+
+        J = self.fs.rank_tell(["quiesce", "path", path, '--wait'])
+        log.debug(f"{J}")
+        self.assertEqual(J['op']['result'], -20) # ENOTDIR: path_traverse: the intermediate link is not a directory
+
+    def test_quiesce_path_notsubvol(self):
+        """
+        That quiesce on a directory under a subvolume is valid.
+        """
+
+        self._configure_subvolume()
+
+        self.mount_a.run_shell_payload("mkdir dir")
+        path = self.mount_a.cephfs_mntpt + "/dir"
+
+        J = self.fs.rank_tell(["quiesce", "path", path, '--wait'])
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+        self._verify_quiesce(root=path)
+
+    def test_quiesce_path_regfile(self):
+        """
+        That quiesce on a regular file fails with ENOTDIR.
+        """
+
+        self._configure_subvolume()
+
+        self.mount_a.run_shell_payload("touch file")
+        path = self.mount_a.cephfs_mntpt + "/file"
+
+        J = self.fs.rank_tell(["quiesce", "path", path, '--wait'])
+        log.debug(f"{J}")
+        self.assertEqual(J['op']['result'], -20) # ENOTDIR
+
+    def test_quiesce_path_dup(self):
+        """
+        That two identical quiesce ops will result in one failing with
+        EINPROGRESS.
+        """
+
+        self._configure_subvolume()
+
+        op1 = self.fs.rank_tell(["quiesce", "path", self.subvolume])['op']
+        op1_reqid = self.reqid_tostr(op1['reqid'])
+        op2 = self.fs.rank_tell(["quiesce", "path", self.subvolume, '--wait'])['op']
+        op1 = self.fs.get_op(op1_reqid)['type_data'] # for possible dup result
+        log.debug(f"op1 = {op1}")
+        log.debug(f"op2 = {op2}")
+        self.assertIn(op1['flag_point'], (self.FP_QUIESCE_COMPLETE, 'cleaned up request'))
+        self.assertIn(op2['flag_point'], (self.FP_QUIESCE_COMPLETE, 'cleaned up request'))
+        self.assertTrue(op1['result'] == -115 or op2['result'] == -115) # EINPROGRESS
+
+    def test_quiesce_blocked(self):
+        """
+        That a file with ceph.quiesce.block is not quiesced.
+        """
+
+        self._configure_subvolume()
+
+        self.mount_a.run_shell_payload("touch file")
+        self.mount_a.setfattr("file", "ceph.quiesce.block", "1")
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume, '--wait'])
+        log.debug(f"{J}")
+        self.assertEqual(J['op']['result'], 0)
+        self.assertEqual(J['state']['inodes_blocked'], 1)
+        self._verify_quiesce(root=self.subvolume)
+
+    def test_quiesce_slow(self):
+        """
+        That a subvolume is quiesced when artificially slowed down.
+        """
+
+        self.config_set('mds', 'mds_cache_quiesce_delay', '2000')
+        self._configure_subvolume()
+        self._client_background_workload()
+
+        J = self.fs.rank_tell(["quiesce", "path", self.subvolume])
+        log.debug(f"{J}")
+        reqid = self.reqid_tostr(J['op']['reqid'])
+        self._wait_for_quiesce_complete(reqid)
+        self._verify_quiesce(root=self.subvolume)
+
+    # TODO test lookup leaf file/dir after quiesce
+    # TODO ditto path_traverse
+
+class TestQuiesceMultiRank(QuiesceTestCase):
+    """
+    Tests for quiescing subvolumes on multiple ranks.
+    """
+
+    MDSS_REQUIRED = 2
+
+    CLIENT_WORKLOAD = """
+        set -ex
+        for ((i = 0; i < 10; ++i)); do
+            (
+                pushd `mktemp -d -p .`
+                touch file
+                sleep 5 # for export
+                cp -a /usr .
+                popd
+            ) &
+        done
+        wait
+    """
+
+    def setUp(self):
+        super().setUp()
+        self.fs.set_max_mds(2)
+        status = self.fs.wait_for_daemons()
+        self.mds_map = self.fs.get_mds_map(status=status)
+        self.ranks = list(range(self.mds_map['max_mds']))
+
+    def test_quiesce_path_splitauth(self):
+        """
+        That quiesce fails (by default) if auth is split on a path.
+        """
+
+        self._configure_subvolume()
+        self.mount_a.setfattr(".", "ceph.dir.pin.distributed", "1")
+        self._client_background_workload()
+        self._wait_distributed_subtrees(2*2, rank="all", path=self.mntpnt)
+
+        op = self.fs.rank_tell(["quiesce", "path", self.subvolume, '--wait'], rank=0)['op']
+        self.assertEqual(op['result'], -1) # EPERM
+
+
+    def test_quiesce_path_multirank(self):
+        """
+        That quiesce may complete with two ranks and a basic workload.
+        """
+
+        self._configure_subvolume()
+        self.mount_a.setfattr(".", "ceph.dir.pin.distributed", "1")
+        self._client_background_workload()
+        self._wait_distributed_subtrees(2*2, rank="all", path=self.mntpnt)
+
+        sleep(secrets.randbelow(30)+10)
+
+        p = self.mount_a.run_shell_payload("ls", stdout=StringIO())
+        dirs = p.stdout.getvalue().strip().split()
+
+        ops = []
+        for d in dirs:
+            path = os.path.join(self.mntpnt, d)
+            for r in self.ranks:
+                op = self.fs.rank_tell(["quiesce", "path", path], rank=r)['op']
+                reqid = self.reqid_tostr(op['reqid'])
+                log.info(f"created {reqid}")
+                ops.append((r, op, path))
+        for rank, op, path in ops:
+            reqid = self.reqid_tostr(op['reqid'])
+            log.debug(f"waiting for ({rank}, {reqid})")
+            op = self._wait_for_quiesce_complete(reqid, rank=rank, path=path)
+        # FIXME _verify_quiesce needs adjustment for multiple quiesce
+        #for rank, op, path in ops:
+        #    self._verify_quiesce(root=path, rank=rank)
+
+    # TODO: test for quiesce_counter
+
+class TestQuiesceSplitAuth(QuiesceTestCase):
+    """
+    Tests for quiescing subvolumes on multiple ranks with split auth.
+    """
+
+    MDSS_REQUIRED = 2
+
+    CLIENT_WORKLOAD = """
+        set -ex
+        for ((i = 0; i < 10; ++i)); do
+            (
+                pushd `mktemp -d -p .`
+                touch file
+                sleep 5 # for export
+                cp -a /usr .
+                popd
+            ) &
+        done
+        wait
+    """
+
+    def setUp(self):
+        super().setUp()
+        self.config_set('mds', 'mds_export_ephemeral_random_max', '0.75')
+        self.config_set('mds', 'mds_cache_quiesce_splitauth', 'true')
+        self.fs.set_max_mds(2)
+        status = self.fs.wait_for_daemons()
+        self.mds_map = self.fs.get_mds_map(status=status)
+        self.ranks = list(range(self.mds_map['max_mds']))
+
+    @unittest.skip("splitauth is experimental")
+    def test_quiesce_path_multirank_exports(self):
+        """
+        That quiesce may complete with two ranks and a basic workload.
+        """
+
+        self.config_set('mds', 'mds_cache_quiesce_delay', '4000')
+        self._configure_subvolume()
+        self.mount_a.setfattr(".", "ceph.dir.pin.random", "0.5")
+        self._client_background_workload()
+
+        sleep(2)
+
+        op0 = self.fs.rank_tell(["quiesce", "path", self.subvolume], rank=0)['op']
+        op1 = self.fs.rank_tell(["quiesce", "path", self.subvolume], rank=1)['op']
+        reqid0 = self.reqid_tostr(op0['reqid'])
+        reqid1 = self.reqid_tostr(op1['reqid'])
+        op0 = self._wait_for_quiesce_complete(reqid0, rank=0)
+        op1 = self._wait_for_quiesce_complete(reqid1, rank=1)
+        log.debug(f"op0 = {op0}")
+        log.debug(f"op1 = {op1}")
+        self._verify_quiesce(rank=0)
+        self._verify_quiesce(rank=1)