for t in threads:
t.join()
- #
- # threads = []
- # for rank, gid in up.items():
- # thread = RankEvicter(self, ["auth_name={0}".format(auth_id)], rank, gid, mds_map, timeout)
- # thread.run()
- # threads.append(thread)
log.info("evict: joined all")
log.info("create_volume: {0}".format(volume_path))
path = self._get_path(volume_path)
- # # Fast pre-check: if the auth key (last thing created) exists, then
- # # the rest of the volume already exists too.
- # try:
- # auth = self._rados_command("auth get", {"entity": client_entity})[0]
- # except rados.Error:
- # pass
- # else:
- # log.info("create_volume: {0} already exists, returning")
- # return {
- # 'volume_key': auth['key'],
- # 'mount_path': path
- # }
-
self._mkdir_p(path)
if size is not None:
def delete_volume(self, volume_path, data_isolated=False):
"""
- Remove all trace of a volume from the Ceph cluster. This function is
- idempotent.
+ Make a volume inaccessible to guests. This function is
+ idempotent. This is the fast part of tearing down a volume: you must
+ also later call purge_volume, which is the slow part.
:param volume_path: Same identifier used in create_volume
:return:
else:
self.fs.rename(path, trashed_volume)
+ def purge_volume(self, volume_path, data_isolated=False):
+ """
+ Finish clearing up a volume that was previously passed to delete_volume. This
+ function is idempotent.
+ """
+
+ trash = os.path.join(self.VOLUME_PREFIX, "_deleting")
+ trashed_volume = os.path.join(trash, volume_path.volume_id)
+
try:
self.fs.stat(trashed_volume)
except cephfs.ObjectNotFound:
"sure": "--yes-i-really-really-mean-it"
})
+ def _get_ancestor_xattr(self, path, attr):
+ """
+ Helper for reading layout information: if this xattr is missing
+ on the requested path, keep checking parents until we find it.
+ """
+ try:
+ result = self.fs.getxattr(path, attr)
+ if result == "":
+ # Annoying! cephfs gives us empty instead of an error when attr not found
+ raise cephfs.NoData()
+ else:
+ return result
+ except cephfs.NoData:
+ if path == "/":
+ raise
+ else:
+ return self._get_ancestor_xattr(os.path.split(path)[0], attr)
+
def authorize(self, volume_path, auth_id):
"""
Get-or-create a Ceph auth identity for `auth_id` and grant them access
:return:
"""
+ # First I need to work out what the data pool is for this share:
+ # read the layout
path = self._get_path(volume_path)
+ pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
+
+ # Now construct auth capabilities that give the guest just enough
+ # permissions to access the share
client_entity = "client.{0}".format(auth_id)
- caps = self._rados_command(
- 'auth get-or-create',
- {
- 'entity': client_entity,
- 'caps': [
- 'mds', 'allow rw path={0}'.format(path),
- 'osd', 'allow rw',
- 'mon', 'allow r']
- })
+ want_mds_cap = 'allow rw path={0}'.format(path)
+ want_osd_cap = 'allow rw pool={0}'.format(pool_name)
+ try:
+ existing = self._rados_command(
+ 'auth get',
+ {
+ 'entity': client_entity
+ }
+ )
+ # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
+ except rados.Error:
+ caps = self._rados_command(
+ 'auth get-or-create',
+ {
+ 'entity': client_entity,
+ 'caps': [
+ 'mds', want_mds_cap,
+ 'osd', want_osd_cap,
+ 'mon', 'allow r']
+ })
+ else:
+ # entity exists, extend it
+ cap = existing[0]
+
+ def cap_extend(orig, want):
+ cap_tokens = orig.split(",")
+ if want not in cap_tokens:
+ cap_tokens.append(want)
+
+ return ",".join(cap_tokens)
+
+ osd_cap_str = cap_extend(cap['caps'].get('osd', ""), want_osd_cap)
+ mds_cap_str = cap_extend(cap['caps'].get('mds', ""), want_mds_cap)
+
+ caps = self._rados_command(
+ 'auth caps',
+ {
+ 'entity': client_entity,
+ 'caps': [
+ 'mds', mds_cap_str,
+ 'osd', osd_cap_str,
+ 'mon', cap['caps'].get('mon')]
+ })
+ caps = self._rados_command(
+ 'auth get',
+ {
+ 'entity': client_entity
+ }
+ )
+
# Result expected like this:
# [
# {
# }
# }
# ]
-
assert len(caps) == 1
assert caps[0]['entity'] == client_entity
key = caps[0]['key']
- return key
+ return {
+ 'auth_key': key
+ }
def deauthorize(self, volume_path, auth_id):
+ """
+ The volume must still exist.
+ """
client_entity = "client.{0}".format(auth_id)
+ path = self._get_path(volume_path)
+ pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
+
+ want_mds_cap = 'allow rw path={0}'.format(path)
+ want_osd_cap = 'allow rw pool={0}'.format(pool_name)
- # Remove the auth key for this volume
- self._rados_command('auth del', {'entity': client_entity}, decode=False)
+ try:
+ existing = self._rados_command(
+ 'auth get',
+ {
+ 'entity': client_entity
+ }
+ )
+
+ def cap_remove(orig, want):
+ cap_tokens = orig.split(",")
+ if want in cap_tokens:
+ cap_tokens.remove(want)
+
+ return ",".join(cap_tokens)
+
+ cap = existing[0]
+ osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_cap)
+ mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_cap)
+ if (not osd_cap_str) and (not mds_cap_str):
+ self._rados_command('auth del', {'entity': client_entity}, decode=False)
+ else:
+ self._rados_command(
+ 'auth caps',
+ {
+ 'entity': client_entity,
+ 'caps': [
+ 'mds', mds_cap_str,
+ 'osd', osd_cap_str,
+ 'mon', cap['caps'].get('mon')]
+ })
+
+ # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
+ except rados.Error:
+ # Already gone, great.
+ return
def _rados_command(self, prefix, args=None, decode=True):
"""