TEST_FILE_NAME_PREFIX="subvolume_file"
# for filling subvolume with data
- CLIENTS_REQUIRED = 1
+ CLIENTS_REQUIRED = 2
MDSS_REQUIRED = 2
# io defaults
else:
self.mount_a.run_shell(['rmdir', trashpath])
+ def _configure_guest_auth(self, guest_mount, authid, key):
+ """
+ Set up auth credentials for a guest client.
+ """
+ # Create keyring file for the guest client.
+ keyring_txt = dedent("""
+ [client.{authid}]
+ key = {key}
+
+ """.format(authid=authid,key=key))
+
+ guest_mount.client_id = authid
+ guest_mount.client_remote.write_file(guest_mount.get_keyring_path(),
+ keyring_txt, sudo=True)
+ # Add a guest client section to the ceph config file.
+ self.config_set("client.{0}".format(authid), "debug client", 20)
+ self.config_set("client.{0}".format(authid), "debug objecter", 20)
+ self.set_conf("client.{0}".format(authid),
+ "keyring", guest_mount.get_keyring_path())
+
def setUp(self):
super(TestVolumes, self).setUp()
self.volname = None
def test_connection_expiration(self):
# unmount any cephfs mounts
- self.mount_a.umount_wait()
+ for i in range(0, self.CLIENTS_REQUIRED):
+ self.mounts[i].umount_wait()
sessions = self._session_list()
self.assertLessEqual(len(sessions), 1) # maybe mgr is already mounted
# verify trash dir is clean
self._wait_for_trash_empty()
+ ### authorize operations
+
+ def test_authorize_deauthorize_legacy_subvolume(self):
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ authid = "alice"
+
+ guest_mount = self.mount_b
+ guest_mount.umount_wait()
+
+ # emulate a old-fashioned subvolume in a custom group
+ createpath = os.path.join(".", "volumes", group, subvolume)
+ self.mount_a.run_shell(['mkdir', '-p', createpath])
+
+ # add required xattrs to subvolume
+ default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+ self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
+
+ mount_path = os.path.join("/", "volumes", group, subvolume)
+
+ # authorize guest authID read-write access to subvolume
+ key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
+ "--group_name", group)
+
+ # guest authID should exist
+ existing_ids = [a['entity'] for a in self.auth_list()]
+ self.assertIn("client.{0}".format(authid), existing_ids)
+
+ # configure credentials for guest client
+ self._configure_guest_auth(guest_mount, authid, key)
+
+ # mount the subvolume, and write to it
+ guest_mount.mount(mount_path=mount_path)
+ guest_mount.write_n_mb("data.bin", 1)
+
+ # authorize guest authID read access to subvolume
+ key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
+ "--group_name", group, "--access_level", "r")
+
+ # guest client sees the change in access level to read only after a
+ # remount of the subvolume.
+ guest_mount.umount_wait()
+ guest_mount.mount(mount_path=mount_path)
+
+ # read existing content of the subvolume
+ self.assertListEqual(guest_mount.ls(guest_mount.mountpoint), ["data.bin"])
+ # cannot write into read-only subvolume
+ with self.assertRaises(CommandFailedError):
+ guest_mount.write_n_mb("rogue.bin", 1)
+
+ # cleanup
+ guest_mount.umount_wait()
+ self._fs_cmd("subvolume", "deauthorize", self.volname, subvolume, authid,
+ "--group_name", group)
+ # guest authID should no longer exist
+ existing_ids = [a['entity'] for a in self.auth_list()]
+ self.assertNotIn("client.{0}".format(authid), existing_ids)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_authorize_deauthorize_subvolume(self):
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ authid = "alice"
+
+ guest_mount = self.mount_b
+ guest_mount.umount_wait()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+ mount_path = self._fs_cmd("subvolume", "getpath", self.volname, subvolume,
+ "--group_name", group).rstrip()
+
+ # authorize guest authID read-write access to subvolume
+ key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
+ "--group_name", group)
+
+ # guest authID should exist
+ existing_ids = [a['entity'] for a in self.auth_list()]
+ self.assertIn("client.{0}".format(authid), existing_ids)
+
+ # configure credentials for guest client
+ self._configure_guest_auth(guest_mount, authid, key)
+
+ # mount the subvolume, and write to it
+ guest_mount.mount(mount_path=mount_path)
+ guest_mount.write_n_mb("data.bin", 1)
+
+ # authorize guest authID read access to subvolume
+ key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
+ "--group_name", group, "--access_level", "r")
+
+ # guest client sees the change in access level to read only after a
+ # remount of the subvolume.
+ guest_mount.umount_wait()
+ guest_mount.mount(mount_path=mount_path)
+
+ # read existing content of the subvolume
+ self.assertListEqual(guest_mount.ls(guest_mount.mountpoint), ["data.bin"])
+ # cannot write into read-only subvolume
+ with self.assertRaises(CommandFailedError):
+ guest_mount.write_n_mb("rogue.bin", 1)
+
+ # cleanup
+ guest_mount.umount_wait()
+ self._fs_cmd("subvolume", "deauthorize", self.volname, subvolume, authid,
+ "--group_name", group)
+ # guest authID should no longer exist
+ existing_ids = [a['entity'] for a in self.auth_list()]
+ self.assertNotIn("client.{0}".format(authid), existing_ids)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
def test_subvolume_resize_infinite_size_future_writes(self):
"""
That a subvolume can be resized to an infinite size and the future writes succeed.
def test_mgr_eviction(self):
# unmount any cephfs mounts
- self.mount_a.umount_wait()
+ for i in range(0, self.CLIENTS_REQUIRED):
+ self.mounts[i].umount_wait()
sessions = self._session_list()
self.assertLessEqual(len(sessions), 1) # maybe mgr is already mounted