import json
import logging
+from tasks.cephfs.fuse_mount import FuseMount
+from teuthology.exceptions import CommandFailedError
from tasks.cephfs.cephfs_test_case import CephFSTestCase
log = logging.getLogger(__name__)
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)
self.assertEqual(len(table_json['0']['data']['Sessions']), 0)
+
+ def _sudo_write_file(self, remote, path, data):
+ """
+ Write data to a remote file as super user
+
+ :param remote: Remote site.
+ :param path: Path on the remote being written to.
+ :param data: Data to be written.
+
+ Both perms and owner are passed directly to chmod.
+ """
+ remote.run(
+ args=[
+ 'sudo',
+ 'python',
+ '-c',
+ 'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
+ path,
+ ],
+ stdin=data,
+ )
+
+ def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None):
+ """
+ Set up auth credentials for a client mount, and write out the keyring
+ for the client to use.
+ """
+
+ # This keyring stuff won't work for kclient
+ assert(isinstance(mount, FuseMount))
+
+ if osd_caps is None:
+ osd_caps = "allow rw"
+
+ if mon_caps is None:
+ mon_caps = "allow r"
+
+ out = self.fs.mon_manager.raw_cluster_cmd(
+ "auth", "get-or-create", "client.{name}".format(name=id_name),
+ "mds", mds_caps,
+ "osd", osd_caps,
+ "mon", mon_caps
+ )
+ mount.client_id = id_name
+ self._sudo_write_file(mount.client_remote, mount.get_keyring_path(), out)
+ self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path())
+
+ def test_session_reject(self):
+ self.mount_a.run_shell(["mkdir", "foo"])
+ self.mount_a.run_shell(["mkdir", "foo/bar"])
+ self.mount_a.umount_wait()
+
+ # Mount B will be my rejected client
+ self.mount_b.umount_wait()
+
+ # Configure a client that is limited to /foo/bar
+ self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar")
+ # Check he can mount that dir and do IO
+ self.mount_b.mount(mount_path="/foo/bar")
+ self.mount_b.wait_until_mounted()
+ self.mount_b.create_destroy()
+ self.mount_b.umount_wait()
+
+ # Configure the client to claim that its mount point metadata is /baz
+ self.set_conf("client.badguy", "client_metadata", "root=/baz")
+ # Try to mount the client, see that it fails
+ with self.assert_cluster_log("client session with invalid root '/baz' denied"):
+ with self.assertRaises(CommandFailedError):
+ self.mount_b.mount(mount_path="/foo/bar")