--- /dev/null
+from contextlib import contextmanager
+import os
+import fcntl
+import json
+import logging
+import struct
+import uuid
+
+import cephfs
+
+from ..group import Group
+
+log = logging.getLogger(__name__)
+
+class AuthMetadataError(Exception):
+ pass
+
+class AuthMetadataManager(object):
+
+ # Current version
+ version = 5
+
+ # Filename extensions for meta files.
+ META_FILE_EXT = ".meta"
+ DEFAULT_VOL_PREFIX = "/volumes"
+
+ def __init__(self, fs):
+ self.fs = fs
+ self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
+ self.volume_prefix = self.DEFAULT_VOL_PREFIX
+
+ def _to_bytes(self, param):
+ '''
+ Helper method that returns byte representation of the given parameter.
+ '''
+ if isinstance(param, str):
+ return param.encode('utf-8')
+ elif param is None:
+ return param
+ else:
+ return str(param).encode('utf-8')
+
+ def _subvolume_metadata_path(self, group_name, subvol_name):
+ return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
+ group_name if group_name != Group.NO_GROUP_NAME else "",
+ subvol_name,
+ self.META_FILE_EXT))
+
+ def _check_compat_version(self, compat_version):
+ if self.version < compat_version:
+ msg = ("The current version of AuthMetadataManager, version {0} "
+ "does not support the required feature. Need version {1} "
+ "or greater".format(self.version, compat_version)
+ )
+ log.error(msg)
+ raise AuthMetadataError(msg)
+
+ def _metadata_get(self, path):
+ """
+ Return a deserialized JSON object, or None
+ """
+ fd = self.fs.open(path, "r")
+ # TODO iterate instead of assuming file < 4MB
+ read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+ self.fs.close(fd)
+ if read_bytes:
+ return json.loads(read_bytes.decode())
+ else:
+ return None
+
+ def _metadata_set(self, path, data):
+ serialized = json.dumps(data)
+ fd = self.fs.open(path, "w")
+ try:
+ self.fs.write(fd, self._to_bytes(serialized), 0)
+ self.fs.fsync(fd, 0)
+ finally:
+ self.fs.close(fd)
+
+ def _lock(self, path):
+ @contextmanager
+ def fn():
+ while(1):
+ fd = self.fs.open(path, os.O_CREAT, 0o755)
+ self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+
+ # The locked file will be cleaned up sometime. It could be
+ # unlinked by consumer e.g., an another manila-share service
+ # instance, before lock was applied on it. Perform checks to
+ # ensure that this does not happen.
+ try:
+ statbuf = self.fs.stat(path)
+ except cephfs.ObjectNotFound:
+ self.fs.close(fd)
+ continue
+
+ fstatbuf = self.fs.fstat(fd)
+ if statbuf.st_ino == fstatbuf.st_ino:
+ break
+
+ try:
+ yield
+ finally:
+ self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+ self.fs.close(fd)
+
+ return fn()
+
+ def _auth_metadata_path(self, auth_id):
+ return os.path.join(self.volume_prefix, "${0}{1}".format(
+ auth_id, self.META_FILE_EXT))
+
+ def auth_lock(self, auth_id):
+ return self._lock(self._auth_metadata_path(auth_id))
+
+ def auth_metadata_get(self, auth_id):
+ """
+ Call me with the metadata locked!
+
+ Check whether a auth metadata structure can be decoded by the current
+ version of AuthMetadataManager.
+
+ Return auth metadata that the current version of AuthMetadataManager
+ can decode.
+ """
+ auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
+
+ if auth_metadata:
+ self._check_compat_version(auth_metadata['compat_version'])
+
+ return auth_metadata
+
+ def auth_metadata_set(self, auth_id, data):
+ """
+ Call me with the metadata locked!
+
+ Fsync the auth metadata.
+
+ Add two version attributes to the auth metadata,
+ 'compat_version', the minimum AuthMetadataManager version that can
+ decode the metadata, and 'version', the AuthMetadataManager version
+ that encoded the metadata.
+ """
+ data['compat_version'] = 1
+ data['version'] = self.version
+ return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+ def create_subvolume_metadata_file(self, group_name, subvol_name):
+ """
+ Create a subvolume metadata file, if it does not already exist, to store
+ data about auth ids having access to the subvolume
+ """
+ fd = self.fs.open(self._subvolume_metadata_path(group_name, subvol_name),
+ os.O_CREAT, 0o755)
+ self.fs.close(fd)
+
+ def delete_subvolume_metadata_file(self, group_name, subvol_name):
+ vol_meta_path = self._subvolume_metadata_path(group_name, subvol_name)
+ try:
+ self.fs.unlink(vol_meta_path)
+ except cephfs.ObjectNotFound:
+ pass
+
+ def subvol_metadata_lock(self, group_name, subvol_name):
+ """
+ Return a ContextManager which locks the authorization metadata for
+ a particular subvolume, and persists a flag to the metadata indicating
+ that it is currently locked, so that we can detect dirty situations
+ during recovery.
+
+ This lock isn't just to make access to the metadata safe: it's also
+ designed to be used over the two-step process of checking the
+ metadata and then responding to an authorization request, to
+ ensure that at the point we respond the metadata hasn't changed
+ in the background. It's key to how we avoid security holes
+ resulting from races during that problem ,
+ """
+ return self._lock(self._subvolume_metadata_path(group_name, subvol_name))
+
+ def subvol_metadata_get(self, group_name, subvol_name):
+ """
+ Call me with the metadata locked!
+
+ Check whether a subvolume metadata structure can be decoded by the current
+ version of AuthMetadataManager.
+
+ Return a subvolume_metadata structure that the current version of
+ AuthMetadataManager can decode.
+ """
+ subvolume_metadata = self._metadata_get(self._subvolume_metadata_path(group_name, subvol_name))
+
+ if subvolume_metadata:
+ self._check_compat_version(subvolume_metadata['compat_version'])
+
+ return subvolume_metadata
+
+ def subvol_metadata_set(self, group_name, subvol_name, data):
+ """
+ Call me with the metadata locked!
+
+ Add two version attributes to the subvolume metadata,
+ 'compat_version', the minimum AuthMetadataManager version that can
+ decode the metadata and 'version', the AuthMetadataManager version
+ that encoded the metadata.
+ """
+ data['compat_version'] = 1
+ data['version'] = self.version
+ return self._metadata_set(self._subvolume_metadata_path(group_name, subvol_name), data)
import os
+import sys
import stat
import uuid
import errno
from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
from ...fs_util import listsnaps, is_inherited_snap
from ..template import SubvolumeOpType
+from ..group import Group
from ..clone_index import open_clone_index, create_clone_index
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
- def authorize(self, auth_id, access_level):
+ def _recover_auth_meta(self, auth_id, auth_meta):
+ """
+ Call me after locking the auth meta file.
+ """
+ remove_subvolumes = []
+
+ for subvol, subvol_data in auth_meta['volumes'].items():
+ if not subvol_data['dirty']:
+ continue
+
+ (group_name, subvol_name) = subvol.split('/')
+ group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME
+ access_level = subvol_data['access_level']
+
+ with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name)
+
+ # No SVMeta update indicates that there was no auth update
+ # in Ceph either. So it's safe to remove corresponding
+ # partial update in AMeta.
+ if not subvol_meta or auth_id not in subvol_meta['auths']:
+ remove_subvolumes.append(subvol)
+ continue
+
+ want_auth = {
+ 'access_level': access_level,
+ 'dirty': False,
+ }
+ # SVMeta update looks clean. Ceph auth update must have been
+ # clean.
+ if subvol_meta['auths'][auth_id] == want_auth:
+ continue
+
+ self._authorize_subvolume(auth_id, access_level)
+
+ # Recovered from partial auth updates for the auth ID's access
+ # to a subvolume.
+ auth_meta['volumes'][subvol]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ for subvol in remove_subvolumes:
+ del auth_meta['volumes'][subvol]
+
+ if not auth_meta['volumes']:
+ # Clean up auth meta file
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ # Recovered from all partial auth updates for the auth ID.
+ auth_meta['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ def authorize(self, auth_id, access_level, tenant_id=None):
+ """
+ Get-or-create a Ceph auth identity for `auth_id` and grant them access
+ to
+ :param auth_id:
+ :param access_level:
+ :param tenant_id: Optionally provide a stringizable object to
+ restrict any created cephx IDs to other callers
+ passing the same tenant ID.
+ :return:
+ """
+
+ with self.auth_mdata_mgr.auth_lock(auth_id):
+ # Existing meta, or None, to be updated
+ auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+ # subvolume data to be inserted
+ group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+ group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+ subvolume = {
+ group_subvol_id : {
+ # The access level at which the auth_id is authorized to
+ # access the volume.
+ 'access_level': access_level,
+ 'dirty': True,
+ }
+ }
+ if auth_meta is None:
+ sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+ auth_id, tenant_id
+ ))
+ log.debug("Authorize: no existing meta")
+ auth_meta = {
+ 'dirty': True,
+ 'tenant_id': str(tenant_id) if tenant_id else None,
+ 'volumes': subvolume
+ }
+
+ # Note: this is *not* guaranteeing that the key doesn't already
+ # exist in Ceph: we are allowing VolumeClient tenants to
+ # 'claim' existing Ceph keys. In order to prevent VolumeClient
+ # tenants from reading e.g. client.admin keys, you need to
+ # have configured your VolumeClient user (e.g. Manila) to
+ # have mon auth caps that prevent it from accessing those keys
+ # (e.g. limit it to only access keys with a manila.* prefix)
+ else:
+ # Disallow tenants to share auth IDs
+ if str(auth_meta['tenant_id']) != str(tenant_id):
+ msg = "auth ID: {0} is already in use".format(auth_id)
+ log.error(msg)
+ raise VolumeException(-errno.EPERM, msg)
+
+ if auth_meta['dirty']:
+ self._recover_auth_meta(auth_id, auth_meta)
+
+ log.debug("Authorize: existing tenant {tenant}".format(
+ tenant=auth_meta['tenant_id']
+ ))
+ auth_meta['dirty'] = True
+ auth_meta['volumes'].update(subvolume)
+
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ key = self._authorize_subvolume(auth_id, access_level)
+
+ auth_meta['dirty'] = False
+ auth_meta['volumes'][group_subvol_id]['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ if tenant_id:
+ return key
+ else:
+ # Caller wasn't multi-tenant aware: be safe and don't give
+ # them a key
+ return ""
+
+ def _authorize_subvolume(self, auth_id, access_level):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+ auth = {
+ auth_id: {
+ 'access_level': access_level,
+ 'dirty': True,
+ }
+ }
+
+ if subvol_meta is None:
+ subvol_meta = {
+ 'auths': auth
+ }
+ else:
+ subvol_meta['auths'].update(auth)
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ key = self._authorize(auth_id, access_level)
+
+ subvol_meta['auths'][auth_id]['dirty'] = False
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ return key
+
+ def _authorize(self, auth_id, access_level):
subvol_path = self.path
log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path))
unwanted_mds_cap, unwanted_osd_cap)
def deauthorize(self, auth_id):
+ with self.auth_mdata_mgr.auth_lock(auth_id):
+ # Existing meta, or None, to be updated
+ auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+ group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+ group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+ if (auth_meta is None) or (not auth_meta['volumes']):
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ # Clean up the auth meta file of an auth ID
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ if group_subvol_id not in auth_meta['volumes']:
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ return
+
+ if auth_meta['dirty']:
+ self._recover_auth_meta(auth_id, auth_meta)
+
+ auth_meta['dirty'] = True
+ auth_meta['volumes'][group_subvol_id]['dirty'] = True
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ self._deauthorize_subvolume(auth_id)
+
+ # Filter out the volume we're deauthorizing
+ del auth_meta['volumes'][group_subvol_id]
+
+ # Clean up auth meta file
+ if not auth_meta['volumes']:
+ self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+ return
+
+ auth_meta['dirty'] = False
+ self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+ def _deauthorize_subvolume(self, auth_id):
+ with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+ subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+ if (subvol_meta is None) or (auth_id not in subvol_meta['auths']):
+ log.warning("deauthorized called for already-removed auth"
+ "ID '{auth_id}' for subvolume '{subvolume}'".format(
+ auth_id=auth_id, subvolume=self.subvolname
+ ))
+ return
+
+ subvol_meta['auths'][auth_id]['dirty'] = True
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ self._deauthorize(auth_id)
+
+ # Remove the auth_id from the metadata *after* removing it
+ # from ceph, so that if we crashed here, we would actually
+ # recreate the auth ID during recovery (i.e. end up with
+ # a consistent state).
+
+ # Filter out the auth we're removing
+ del subvol_meta['auths'][auth_id]
+ self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+ def _deauthorize(self, auth_id):
"""
The volume must still exist.
"""