$ ceph fs subvolume authorized_list <vol_name> <sub_name> [--group_name=<group_name>]
+Evict fs clients based on auth ID and subvolume mounted::
+
+ $ ceph fs subvolume evict <vol_name> <sub_name> <auth_id> [--group_name=<group_name>]
+
Fetch the absolute path of a subvolume using::
$ ceph fs subvolume getpath <vol_name> <subvol_name> [--group_name <subvol_group_name>]
self._fs_cmd("subvolume", "rm", self.volname, subvolume2, "--group_name", group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+ def test_subvolume_evict_client(self):
+ """
+ That a subvolume client can be evicted based on the auth ID
+ """
+
+ subvolumes = self._generate_random_subvolume_name(2)
+ group = self._generate_random_group_name()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # mounts[0] and mounts[1] would be used as guests to mount the volumes/shares.
+ for i in range(0, 2):
+ self.mounts[i].umount_wait()
+ guest_mounts = (self.mounts[0], self.mounts[1])
+ auth_id = "guest"
+ guestclient_1 = {
+ "auth_id": auth_id,
+ "tenant_id": "tenant1",
+ }
+
+ # Create two subvolumes. Authorize 'guest' auth ID to mount the two
+ # subvolumes. Mount the two subvolumes. Write data to the volumes.
+ for i in range(2):
+ # Create subvolume.
+ self._fs_cmd("subvolume", "create", self.volname, subvolumes[i], "--group_name", group)
+
+ # authorize guest authID read-write access to subvolume
+ key = self._fs_cmd("subvolume", "authorize", self.volname, subvolumes[i], guestclient_1["auth_id"],
+ "--group_name", group, "--tenant_id", guestclient_1["tenant_id"])
+
+ mount_path = self._fs_cmd("subvolume", "getpath", self.volname, subvolumes[i],
+ "--group_name", group).rstrip()
+ # configure credentials for guest client
+ self._configure_guest_auth(guest_mounts[i], auth_id, key)
+
+ # mount the subvolume, and write to it
+ guest_mounts[i].mount(cephfs_mntpt=mount_path)
+ guest_mounts[i].write_n_mb("data.bin", 1)
+
+ # Evict client, guest_mounts[0], using auth ID 'guest' and has mounted
+ # one volume.
+ self._fs_cmd("subvolume", "evict", self.volname, subvolumes[0], auth_id, "--group_name", group)
+
+ # Evicted guest client, guest_mounts[0], should not be able to do
+ # anymore metadata ops. It should start failing all operations
+ # when it sees that its own address is in the blocklist.
+ try:
+ guest_mounts[0].write_n_mb("rogue.bin", 1)
+ except CommandFailedError:
+ pass
+ else:
+ raise RuntimeError("post-eviction write should have failed!")
+
+ # The blocklisted guest client should now be unmountable
+ guest_mounts[0].umount_wait()
+
+ # Guest client, guest_mounts[1], using the same auth ID 'guest', but
+ # has mounted the other volume, should be able to use its volume
+ # unaffected.
+ guest_mounts[1].write_n_mb("data.bin.1", 1)
+
+ # Cleanup.
+ guest_mounts[1].umount_wait()
+ for i in range(2):
+ self._fs_cmd("subvolume", "deauthorize", self.volname, subvolumes[i], auth_id, "--group_name", group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolumes[i], "--group_name", group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
def test_subvolume_pin_random(self):
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()
class NotImplementedException(Exception):
pass
+
+class ClusterTimeout(Exception):
+ """
+ Exception indicating that we timed out trying to talk to the Ceph cluster,
+ either to the mons, or to any individual daemon that the mons indicate ought
+ to be up but isn't responding to us.
+ """
+ pass
+
+class ClusterError(Exception):
+ """
+ Exception indicating that the cluster returned an error to a command that
+ we thought should be successful based on our last knowledge of the cluster
+ state.
+ """
+ def __init__(self, action, result_code, result_str):
+ self._action = action
+ self._result_code = result_code
+ self._result_str = result_str
+
+ def __str__(self):
+ return "Error {0} (\"{1}\") while {2}".format(
+ self._result_code, self._result_str, self._action)
+
+class EvictionError(Exception):
+ pass
--- /dev/null
+import errno
+import json
+import logging
+import threading
+import time
+
+from .volume import get_mds_map
+from ..exception import ClusterTimeout, ClusterError
+
+log = logging.getLogger(__name__)
+
+class RankEvicter(threading.Thread):
+ """
+ Thread for evicting client(s) from a particular MDS daemon instance.
+
+ This is more complex than simply sending a command, because we have to
+ handle cases where MDS daemons might not be fully up yet, and/or might
+ be transiently unresponsive to commands.
+ """
+ class GidGone(Exception):
+ pass
+
+ POLL_PERIOD = 5
+
+ def __init__(self, mgr, fs, client_spec, volname, rank, gid, mds_map, ready_timeout):
+ """
+ :param client_spec: list of strings, used as filter arguments to "session evict"
+ pass ["id=123"] to evict a single client with session id 123.
+ """
+ self.volname = volname
+ self.rank = rank
+ self.gid = gid
+ self._mds_map = mds_map
+ self._client_spec = client_spec
+ self._fs = fs
+ self._ready_timeout = ready_timeout
+ self._ready_waited = 0
+ self.mgr = mgr
+
+ self.success = False
+ self.exception = None
+
+ super(RankEvicter, self).__init__()
+
+ def _ready_to_evict(self):
+ if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
+ log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
+ self._client_spec, self.rank, self.gid
+ ))
+ raise RankEvicter.GidGone()
+
+ info = self._mds_map['info']["gid_{0}".format(self.gid)]
+ log.debug("_ready_to_evict: state={0}".format(info['state']))
+ return info['state'] in ["up:active", "up:clientreplay"]
+
+ def _wait_for_ready(self):
+ """
+ Wait for that MDS rank to reach an active or clientreplay state, and
+ not be laggy.
+ """
+ while not self._ready_to_evict():
+ if self._ready_waited > self._ready_timeout:
+ raise ClusterTimeout()
+
+ time.sleep(self.POLL_PERIOD)
+ self._ready_waited += self.POLL_PERIOD
+ self._mds_map = get_mds_map(self.mgr, self.volname)
+
+ def _evict(self):
+ """
+ Run the eviction procedure. Return true on success, false on errors.
+ """
+
+ # Wait til the MDS is believed by the mon to be available for commands
+ try:
+ self._wait_for_ready()
+ except self.GidGone:
+ return True
+
+ # Then send it an evict
+ ret = -errno.ETIMEDOUT
+ while ret == -errno.ETIMEDOUT:
+ log.debug("mds_command: {0}, {1}".format(
+ "%s" % self.gid, ["session", "evict"] + self._client_spec
+ ))
+ ret, outb, outs = self._fs.mds_command(
+ "%s" % self.gid,
+ json.dumps({
+ "prefix": "session evict",
+ "filters": self._client_spec
+ }), "")
+ log.debug("mds_command: complete {0} {1}".format(ret, outs))
+
+ # If we get a clean response, great, it's gone from that rank.
+ if ret == 0:
+ return True
+ elif ret == -errno.ETIMEDOUT:
+ # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
+ self._mds_map = get_mds_map(self.mgr, self.volname)
+ try:
+ self._wait_for_ready()
+ except self.GidGone:
+ return True
+ else:
+ raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
+
+ def run(self):
+ try:
+ self._evict()
+ except Exception as e:
+ self.success = False
+ self.exception = e
+ else:
+ self.success = True
CLONE_INTERNAL = 'clone_internal'
ALLOW_ACCESS = 'allow-access'
DENY_ACCESS = 'deny-access'
+ AUTH_LIST = 'auth-list'
+ EVICT = 'evict'
class SubvolumeTemplate(object):
VERSION = None # type: int
from ..template import SubvolumeTemplate
from ..snapshot_util import mksnap, rmsnap
from ..access import allow_access, deny_access
-from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
+from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError
from ...fs_util import listsnaps, is_inherited_snap
from ..template import SubvolumeOpType
from ..group import Group
+from ..rankevicter import RankEvicter
+from ..volume import get_mds_map
from ..clone_index import open_clone_index, create_clone_index
return auths
+ def evict(self, volname, auth_id, timeout=30):
+ """
+ Evict all clients based on the authorization ID and the subvolume path mounted.
+ Assumes that the authorization key has been revoked prior to calling this function.
+
+ This operation can throw an exception if the mon cluster is unresponsive, or
+ any individual MDS daemon is unresponsive for longer than the timeout passed in.
+ """
+
+ client_spec = ["auth_name={0}".format(auth_id), ]
+ client_spec.append("client_metadata.root={0}".
+ format(self.path.decode('utf-8')))
+
+ log.info("evict clients with {0}".format(', '.join(client_spec)))
+
+ mds_map = get_mds_map(self.mgr, volname)
+ if not mds_map:
+ raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname))
+
+ up = {}
+ for name, gid in mds_map['up'].items():
+ # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
+ assert name.startswith("mds_")
+ up[int(name[4:])] = gid
+
+ # For all MDS ranks held by a daemon
+ # Do the parallelism in python instead of using "tell mds.*", because
+ # the latter doesn't give us per-mds output
+ threads = []
+ for rank, gid in up.items():
+ thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout)
+ thread.start()
+ threads.append(thread)
+
+ for t in threads:
+ t.join()
+
+ log.info("evict: joined all")
+
+ for t in threads:
+ if not t.success:
+ msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
+ format(', '.join(client_spec), t.rank, t.gid, t.exception)
+ )
+ log.error(msg)
+ raise EvictionError(msg)
+
def _get_clone_source(self):
try:
clone_source = {
"""
return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
+def get_mds_map(mgr, volname):
+ """
+ return mdsmap for a volname
+ """
+ mds_map = None
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ return f['mdsmap']
+ return mds_map
+
def get_pool_names(mgr, volname):
"""
return metadata and data pools (list) names of volume as a tuple
create_clone
from .vol_spec import VolSpec
-from .exception import VolumeException
+from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
- with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
auths = subvolume.authorized_list()
ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
+ def evict(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
+ key = subvolume.evict(volname, authid)
+ ret = 0, "", ""
+ except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
+ if isinstance(e, VolumeException):
+ ret = self.volume_exception_to_retval(e)
+ elif isinstance(e, ClusterTimeout):
+ ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
+ elif isinstance(e, ClusterError):
+ ret = e._result_code , "", e._result_str
+ elif isinstance(e, EvictionError):
+ ret = -errno.EINVAL, "", str(e)
+ return ret
+
def resize_subvolume(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
'desc': "List auth IDs that have access to a subvolume",
'perm': 'r'
},
+ {
+ 'cmd': 'fs subvolume evict '
+ 'name=vol_name,type=CephString '
+ 'name=sub_name,type=CephString '
+ 'name=auth_id,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Evict clients based on auth IDs and subvolume mounted",
+ 'perm': 'rw'
+ },
{
'cmd': 'fs subvolumegroup getpath '
'name=vol_name,type=CephString '
sub_name=cmd['sub_name'],
group_name=cmd.get('group_name', None))
+ @mgr_cmd_wrap
+ def _cmd_fs_subvolume_evict(self, inbuf, cmd):
+ """
+ :return: a 3-tuple of return code(int), empyt string(str), error message (str)
+ """
+ return self.vc.evict(vol_name=cmd['vol_name'],
+ sub_name=cmd['sub_name'],
+ auth_id=cmd['auth_id'],
+ group_name=cmd.get('group_name', None))
+
@mgr_cmd_wrap
def _cmd_fs_subvolume_ls(self, inbuf, cmd):
return self.vc.list_subvolumes(vol_name=cmd['vol_name'],