class TestVolumeClient(CephFSTestCase):
+ #
+ # TODO: Test that VolumeClient can recover from partial auth updates.
+ #
+
# One for looking at the global filesystem, one for being
# the VolumeClient, two for mounting the created shares
CLIENTS_REQUIRED = 4
def _configure_guest_auth(self, volumeclient_mount, guest_mount,
guest_entity, mount_path,
- namespace_prefix=None, readonly=False):
+ namespace_prefix=None, readonly=False,
+ tenant_id=None):
"""
Set up auth credentials for the guest client to mount a volume.
is used for the volume's layout.
:param readonly: defaults to False. If set to 'True' only read-only
mount access is granted to the guest.
+ :param tenant_id: (OpenStack) tenant ID of the guest client.
"""
head, volume_id = os.path.split(mount_path)
# Authorize the guest client's auth ID to mount the volume.
key = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
- auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly})
+ auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly},
+ tenant_id="{tenant_id}")
print auth_result['auth_key']
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity=guest_entity,
- readonly=readonly)), volume_prefix, namespace_prefix
+ readonly=readonly,
+ tenant_id=tenant_id)), volume_prefix, namespace_prefix
)
+ # CephFSVolumeClient's authorize() does not return the secret
+ # key to a caller who isn't multi-tenant aware. Explicitly
+ # query the key for such a client.
+ if not tenant_id:
+ key = self.fs.mon_manager.raw_cluster_cmd(
+ "auth", "get-key", "client.{name}".format(name=guest_entity),
+ )
+
# The guest auth ID should exist.
existing_ids = [a['entity'] for a in self.auth_list()]
self.assertIn("client.{0}".format(guest_entity), existing_ids)
# Cannot write into read-only volume.
with self.assertRaises(CommandFailedError):
guest_mount.write_n_mb("rogue.bin", 1)
+
+ def test_get_authorized_ids(self):
+ """
+ That for a volume, the authorized IDs and their access levels
+ can be obtained using CephFSVolumeClient's get_authorized_ids().
+ """
+ volumeclient_mount = self.mounts[1]
+ volumeclient_mount.umount_wait()
+
+ # Configure volumeclient_mount as the handle for driving volumeclient.
+ self._configure_vc_auth(volumeclient_mount, "manila")
+
+ group_id = "grpid"
+ volume_id = "volid"
+ guest_entity_1 = "guest1"
+ guest_entity_2 = "guest2"
+
+ log.info("print group ID: {0}".format(group_id))
+
+ # Create a volume.
+ auths = self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.create_volume(vp, 1024*1024*10)
+ auths = vc.get_authorized_ids(vp)
+ print auths
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ )))
+ # Check the list of authorized IDs for the volume.
+ expected_result = None
+ self.assertEqual(str(expected_result), auths)
+
+ # Allow two auth IDs access to the volume.
+ auths = self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.authorize(vp, "{guest_entity_1}", readonly=False)
+ vc.authorize(vp, "{guest_entity_2}", readonly=True)
+ auths = vc.get_authorized_ids(vp)
+ print auths
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ guest_entity_1=guest_entity_1,
+ guest_entity_2=guest_entity_2,
+ )))
+ # Check the list of authorized IDs and their access levels.
+ expected_result = [(u'guest1', u'rw'), (u'guest2', u'r')]
+ self.assertItemsEqual(str(expected_result), auths)
+
+ # Disallow both the auth IDs' access to the volume.
+ auths = self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.deauthorize(vp, "{guest_entity_1}")
+ vc.deauthorize(vp, "{guest_entity_2}")
+ auths = vc.get_authorized_ids(vp)
+ print auths
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ guest_entity_1=guest_entity_1,
+ guest_entity_2=guest_entity_2,
+ )))
+ # Check the list of authorized IDs for the volume.
+ expected_result = None
+ self.assertItemsEqual(str(expected_result), auths)
+
+ def test_multitenant_volumes(self):
+ """
+ That volume access can be restricted to a tenant.
+
+ That metadata used to enforce tenant isolation of
+ volumes is stored as a two-way mapping between auth
+ IDs and volumes that they're authorized to access.
+ """
+ volumeclient_mount = self.mounts[1]
+ volumeclient_mount.umount_wait()
+
+ # Configure volumeclient_mount as the handle for driving volumeclient.
+ self._configure_vc_auth(volumeclient_mount, "manila")
+
+ group_id = "groupid"
+ volume_id = "volumeid"
+
+ # Guest clients belonging to different tenants, but using the same
+ # auth ID.
+ auth_id = "guest"
+ guestclient_1 = {
+ "auth_id": auth_id,
+ "tenant_id": "tenant1",
+ }
+ guestclient_2 = {
+ "auth_id": auth_id,
+ "tenant_id": "tenant2",
+ }
+
+ # Create a volume.
+ self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.create_volume(vp, 1024*1024*10)
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ )))
+
+ # Check that volume metadata file is created on volume creation.
+ vol_metadata_filename = "_{0}:{1}.meta".format(group_id, volume_id)
+ self.assertIn(vol_metadata_filename, self.mounts[0].ls("volumes"))
+
+ # Authorize 'guestclient_1', using auth ID 'guest' and belonging to
+ # 'tenant1', with 'rw' access to the volume.
+ self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ auth_id=guestclient_1["auth_id"],
+ tenant_id=guestclient_1["tenant_id"]
+ )))
+
+ # Check that auth metadata file for auth ID 'guest', is
+ # created on authorizing 'guest' access to the volume.
+ auth_metadata_filename = "${0}.meta".format(guestclient_1["auth_id"])
+ self.assertIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
+
+ # Verify that the auth metadata file stores the tenant ID that the
+ # auth ID belongs to, the auth ID's authorized access levels
+ # for different volumes, versioning details, etc.
+ expected_auth_metadata = {
+ u"version": 1,
+ u"compat_version": 1,
+ u"dirty": False,
+ u"tenant_id": u"tenant1",
+ u"volumes": {
+ u"groupid/volumeid": {
+ u"dirty": False,
+ u"access_level": u"rw",
+ }
+ }
+ }
+
+ auth_metadata = self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ auth_metadata = vc._auth_metadata_get("{auth_id}")
+ print auth_metadata
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ auth_id=guestclient_1["auth_id"],
+ )))
+
+ self.assertItemsEqual(str(expected_auth_metadata), auth_metadata)
+
+ # Verify that the volume metadata file stores info about auth IDs
+ # and their access levels to the volume, versioning details, etc.
+ expected_vol_metadata = {
+ u"version": 1,
+ u"compat_version": 1,
+ u"auths": {
+ u"guest": {
+ u"dirty": False,
+ u"access_level": u"rw"
+ }
+ }
+ }
+
+ vol_metadata = self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ volume_metadata = vc._volume_metadata_get(vp)
+ print volume_metadata
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ )))
+ self.assertItemsEqual(str(expected_vol_metadata), vol_metadata)
+
+ # Cannot authorize 'guestclient_2' to access the volume.
+ # It uses auth ID 'guest', which has already been used by a
+ # 'guestclient_1' belonging to an another tenant for accessing
+ # the volume.
+ with self.assertRaises(CommandFailedError):
+ self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.authorize(vp, "{auth_id}", tenant_id="{tenant_id}")
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ auth_id=guestclient_2["auth_id"],
+ tenant_id=guestclient_2["tenant_id"]
+ )))
+
+ # Check that auth metadata file is cleaned up on removing
+ # auth ID's only access to a volume.
+ self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.deauthorize(vp, "{guest_entity}")
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ guest_entity=guestclient_1["auth_id"]
+ )))
+
+ self.assertNotIn(auth_metadata_filename, self.mounts[0].ls("volumes"))
+
+ # Check that volume metadata file is cleaned up on volume deletion.
+ self._volume_client_python(volumeclient_mount, dedent("""
+ vp = VolumePath("{group_id}", "{volume_id}")
+ vc.delete_volume(vp)
+ """.format(
+ group_id=group_id,
+ volume_id=volume_id,
+ )))
+ self.assertNotIn(vol_metadata_filename, self.mounts[0].ls("volumes"))