]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/cephfs: add tests for "fs authorize" subcommand 32581/head
authorRishabh Dave <ridave@redhat.com>
Mon, 24 Aug 2020 18:52:20 +0000 (00:22 +0530)
committerRishabh Dave <ridave@redhat.com>
Fri, 11 Sep 2020 12:34:23 +0000 (18:04 +0530)
Make caps FS-specific affects "fs authorize" subcommand. Let's add few
tests to verify its behaviour.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
qa/tasks/cephfs/test_admin.py

index 7dc3951bc1fb287bb66b4822a02d28dfa7097b27..fc46952a5569a1445e2435d0ebda72ad9a692aa4 100644 (file)
@@ -1,12 +1,13 @@
 import json
 from io import StringIO
+from os.path import join as os_path_join
 
 from teuthology.orchestra.run import CommandFailedError
 
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from tasks.cephfs.fuse_mount import FuseMount
-
 from tasks.cephfs.filesystem import FileLayout
+from tasks.cephfs.fuse_mount import FuseMount
+from tasks.cephfs.caps_helper import CapsHelper
 
 
 class TestAdminCommands(CephFSTestCase):
@@ -243,6 +244,7 @@ class TestConfigCommands(CephFSTestCase):
             self.assertTrue("NAME" in s)
             self.assertTrue("mon_host" in s)
 
+
     def test_client_config(self):
         """
         That I can successfully issue asok "config set" commands
@@ -292,6 +294,7 @@ class TestConfigCommands(CephFSTestCase):
         self.mount_a.umount_wait(require_clean=True)
         self.fs.mds_stop()
 
+
 class TestMirroringCommands(CephFSTestCase):
     CLIENTS_REQUIRED = 1
     MDSS_REQUIRED = 1
@@ -408,3 +411,109 @@ class TestMirroringCommands(CephFSTestCase):
         self.fs.mon_manager.raw_cluster_cmd("fs", "reset", self.fs.name, "--yes-i-really-mean-it")
         self.fs.wait_for_daemons()
         self._verify_mirroring(self.fs.name, "disabled")
+
+
+class TestSubCmdFsAuthorize(CapsHelper):
+    client_id = 'testuser'
+    client_name = 'client.' + client_id
+
+    def test_single_path_r(self):
+        perm = 'r'
+        filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
+        moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+        self.run_mon_cap_tests(moncap, keyring)
+        self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+    def test_single_path_rw(self):
+        perm = 'rw'
+        filepaths, filedata, mounts, keyring = self.setup_test_env(perm)
+        moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+        self.run_mon_cap_tests(moncap, keyring)
+        self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+    def test_multiple_path_r(self):
+        perm, paths = 'r', ('/dir1', '/dir2/dir22')
+        filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
+        moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+        keyring_path = self.create_keyring_file(self.mount_a.client_remote,
+                                                keyring)
+        for path in paths:
+            self.mount_a.remount(client_id=self.client_id,
+                                 client_keyring_path=keyring_path,
+                                 cephfs_mntpt=path)
+
+
+            # actual tests...
+            self.run_mon_cap_tests(moncap, keyring)
+            self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+    def test_multiple_path_rw(self):
+        perm, paths = 'rw', ('/dir1', '/dir2/dir22')
+        filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths)
+        moncap = self.get_mon_cap_from_keyring(self.client_name)
+
+        keyring_path = self.create_keyring_file(self.mount_a.client_remote,
+                                                keyring)
+        for path in paths:
+            self.mount_a.remount(client_id=self.client_id,
+                                 client_keyring_path=keyring_path,
+                                 cephfs_mntpt=path)
+
+
+            # actual tests...
+            self.run_mon_cap_tests(moncap, keyring)
+            self.run_mds_cap_tests(filepaths, filedata, mounts, perm)
+
+    def tearDown(self):
+        self.mount_a.umount_wait()
+        self.run_cluster_cmd(f'auth rm {self.client_name}')
+
+        super(type(self), self).tearDown()
+
+    def setup_for_single_path(self, perm):
+        filedata, filename = 'some data on fs 1', 'file_on_fs1'
+
+        filepath = os_path_join(self.mount_a.hostfs_mntpt, filename)
+        self.mount_a.write_file(filepath, filedata)
+
+        keyring = self.fs.authorize(self.client_id, ('/', perm))
+        keyring_path = self.create_keyring_file(self.mount_a.client_remote,
+                                                keyring)
+
+        self.mount_a.remount(client_id=self.client_id,
+                             client_keyring_path=keyring_path,
+                             cephfs_mntpt='/')
+
+        return filepath, filedata, keyring
+
+    def setup_for_multiple_paths(self, perm, paths):
+        filedata, filename = 'some data on fs 1', 'file_on_fs1'
+
+        self.mount_a.run_shell('mkdir -p dir1/dir12/dir13 dir2/dir22/dir23')
+
+        filepaths = []
+        for path in paths:
+            filepath = os_path_join(self.mount_a.hostfs_mntpt, path[1:], filename)
+            self.mount_a.write_file(filepath, filedata)
+            filepaths.append(filepath.replace(path, ''))
+        filepaths = tuple(filepaths)
+
+        keyring = self.fs.authorize(self.client_id, (path[0], perm, path[1],
+                                                     perm))
+
+        return filepaths, filedata, keyring
+
+    def setup_test_env(self, perm, paths=()):
+        filepaths, filedata, keyring = self.setup_for_multiple_paths(perm, paths) if paths \
+            else self.setup_for_single_path(perm)
+
+        if not isinstance(filepaths, tuple):
+            filepaths = (filepaths, )
+        if not isinstance(filedata, tuple):
+            filedata = (filedata, )
+        mounts = (self.mount_a, )
+
+        return filepaths, filedata, mounts, keyring