]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/volumes: Persist auth and subvolume metadata
authorKotresh HR <khiremat@redhat.com>
Wed, 18 Nov 2020 10:13:25 +0000 (15:43 +0530)
committerKotresh HR <khiremat@redhat.com>
Fri, 5 Feb 2021 18:26:08 +0000 (23:56 +0530)
1. Subvolume create and delete operations create and delete subvolume
   metadata file respectively.
2. Subvolume authorize creates the auth meta file and persists the
   required metadata on subvolume metadata file and auth metdata file
   on disk. Subvolume deauthorize clears the required metadata on
   both metadata files.

Fixes: https://tracker.ceph.com/issues/44931
Signed-off-by: Kotresh HR <khiremat@redhat.com>
(cherry picked from commit 04d876ced756ca86580bdff4ac116333dbb102e5)

src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py [new file with mode: 0644]
src/pybind/mgr/volumes/fs/operations/versions/subvolume_base.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
src/pybind/mgr/volumes/fs/operations/versions/subvolume_v2.py

diff --git a/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py b/src/pybind/mgr/volumes/fs/operations/versions/auth_metadata.py
new file mode 100644 (file)
index 0000000..ea4b032
--- /dev/null
@@ -0,0 +1,208 @@
+from contextlib import contextmanager
+import os
+import fcntl
+import json
+import logging
+import struct
+import uuid
+
+import cephfs
+
+from ..group import Group
+
+log = logging.getLogger(__name__)
+
+class AuthMetadataError(Exception):
+    pass
+
+class AuthMetadataManager(object):
+
+    # Current version
+    version = 5
+
+    # Filename extensions for meta files.
+    META_FILE_EXT = ".meta"
+    DEFAULT_VOL_PREFIX = "/volumes"
+
+    def __init__(self, fs):
+        self.fs = fs
+        self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
+        self.volume_prefix = self.DEFAULT_VOL_PREFIX
+
+    def _to_bytes(self, param):
+        '''
+        Helper method that returns byte representation of the given parameter.
+        '''
+        if isinstance(param, str):
+            return param.encode('utf-8')
+        elif param is None:
+            return param
+        else:
+            return str(param).encode('utf-8')
+
+    def _subvolume_metadata_path(self, group_name, subvol_name):
+        return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
+            group_name if group_name != Group.NO_GROUP_NAME else "",
+            subvol_name,
+            self.META_FILE_EXT))
+
+    def _check_compat_version(self, compat_version):
+        if self.version < compat_version:
+            msg = ("The current version of AuthMetadataManager, version {0} "
+                   "does not support the required feature. Need version {1} "
+                   "or greater".format(self.version, compat_version)
+                  )
+            log.error(msg)
+            raise AuthMetadataError(msg)
+
+    def _metadata_get(self, path):
+        """
+        Return a deserialized JSON object, or None
+        """
+        fd = self.fs.open(path, "r")
+        # TODO iterate instead of assuming file < 4MB
+        read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+        self.fs.close(fd)
+        if read_bytes:
+            return json.loads(read_bytes.decode())
+        else:
+            return None
+
+    def _metadata_set(self, path, data):
+        serialized = json.dumps(data)
+        fd = self.fs.open(path, "w")
+        try:
+            self.fs.write(fd, self._to_bytes(serialized), 0)
+            self.fs.fsync(fd, 0)
+        finally:
+            self.fs.close(fd)
+
+    def _lock(self, path):
+        @contextmanager
+        def fn():
+            while(1):
+                fd = self.fs.open(path, os.O_CREAT, 0o755)
+                self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+
+                # The locked file will be cleaned up sometime. It could be
+                # unlinked by consumer e.g., an another manila-share service
+                # instance, before lock was applied on it. Perform checks to
+                # ensure that this does not happen.
+                try:
+                    statbuf = self.fs.stat(path)
+                except cephfs.ObjectNotFound:
+                    self.fs.close(fd)
+                    continue
+
+                fstatbuf = self.fs.fstat(fd)
+                if statbuf.st_ino == fstatbuf.st_ino:
+                    break
+
+            try:
+                yield
+            finally:
+                self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+                self.fs.close(fd)
+
+        return fn()
+
+    def _auth_metadata_path(self, auth_id):
+        return os.path.join(self.volume_prefix, "${0}{1}".format(
+            auth_id, self.META_FILE_EXT))
+
+    def auth_lock(self, auth_id):
+        return self._lock(self._auth_metadata_path(auth_id))
+
+    def auth_metadata_get(self, auth_id):
+        """
+        Call me with the metadata locked!
+
+        Check whether a auth metadata structure can be decoded by the current
+        version of AuthMetadataManager.
+
+        Return auth metadata that the current version of AuthMetadataManager
+        can decode.
+        """
+        auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
+
+        if auth_metadata:
+            self._check_compat_version(auth_metadata['compat_version'])
+
+        return auth_metadata
+
+    def auth_metadata_set(self, auth_id, data):
+        """
+        Call me with the metadata locked!
+
+        Fsync the auth metadata.
+
+        Add two version attributes to the auth metadata,
+        'compat_version', the minimum AuthMetadataManager version that can
+        decode the metadata, and 'version', the AuthMetadataManager version
+        that encoded the metadata.
+        """
+        data['compat_version'] = 1
+        data['version'] = self.version
+        return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+    def create_subvolume_metadata_file(self, group_name, subvol_name):
+        """
+        Create a subvolume metadata file, if it does not already exist, to store
+        data about auth ids having access to the subvolume
+        """
+        fd = self.fs.open(self._subvolume_metadata_path(group_name, subvol_name),
+                          os.O_CREAT, 0o755)
+        self.fs.close(fd)
+
+    def delete_subvolume_metadata_file(self, group_name, subvol_name):
+        vol_meta_path = self._subvolume_metadata_path(group_name, subvol_name)
+        try:
+            self.fs.unlink(vol_meta_path)
+        except cephfs.ObjectNotFound:
+            pass
+
+    def subvol_metadata_lock(self, group_name, subvol_name):
+        """
+        Return a ContextManager which locks the authorization metadata for
+        a particular subvolume, and persists a flag to the metadata indicating
+        that it is currently locked, so that we can detect dirty situations
+        during recovery.
+
+        This lock isn't just to make access to the metadata safe: it's also
+        designed to be used over the two-step process of checking the
+        metadata and then responding to an authorization request, to
+        ensure that at the point we respond the metadata hasn't changed
+        in the background.  It's key to how we avoid security holes
+        resulting from races during that problem ,
+        """
+        return self._lock(self._subvolume_metadata_path(group_name, subvol_name))
+
+    def subvol_metadata_get(self, group_name, subvol_name):
+        """
+        Call me with the metadata locked!
+
+        Check whether a subvolume metadata structure can be decoded by the current
+        version of AuthMetadataManager.
+
+        Return a subvolume_metadata structure that the current version of
+        AuthMetadataManager can decode.
+        """
+        subvolume_metadata = self._metadata_get(self._subvolume_metadata_path(group_name, subvol_name))
+
+        if subvolume_metadata:
+            self._check_compat_version(subvolume_metadata['compat_version'])
+
+        return subvolume_metadata
+
+    def subvol_metadata_set(self, group_name, subvol_name, data):
+        """
+        Call me with the metadata locked!
+
+        Add two version attributes to the subvolume metadata,
+        'compat_version', the minimum AuthMetadataManager version that can
+        decode the metadata and 'version', the AuthMetadataManager version
+        that encoded the metadata.
+        """
+        data['compat_version'] = 1
+        data['version'] = self.version
+        return self._metadata_set(self._subvolume_metadata_path(group_name, subvol_name), data)
index cdb8fcf8cd0c38351124d70ec9b8a531854ddd25..f193dabd4a85f9d13708e77f5e0b525e2a27d97e 100644 (file)
@@ -13,6 +13,7 @@ from ..trash import create_trashcan, open_trashcan
 from ...fs_util import get_ancestor_xattr
 from ...exception import MetadataMgrException, VolumeException
 from .op_sm import SubvolumeOpSm
+from .auth_metadata import AuthMetadataManager
 
 log = logging.getLogger(__name__)
 
@@ -22,6 +23,7 @@ class SubvolumeBase(object):
     def __init__(self, mgr, fs, vol_spec, group, subvolname, legacy=False):
         self.mgr = mgr
         self.fs = fs
+        self.auth_mdata_mgr = AuthMetadataManager(fs)
         self.cmode = None
         self.user_id = None
         self.group_id = None
index 199cf3d83603fa5ac36580850b54b46f0306d0b3..33ddcabbeffe8dbffb14bd71eb9f295860d8b689 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import sys
 import stat
 import uuid
 import errno
@@ -18,6 +19,7 @@ from ..access import allow_access, deny_access
 from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException
 from ...fs_util import listsnaps, is_inherited_snap
 from ..template import SubvolumeOpType
+from ..group import Group
 
 from ..clone_index import open_clone_index, create_clone_index
 
@@ -231,7 +233,161 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
         except cephfs.Error as e:
             raise VolumeException(-e.args[0], e.args[1])
 
-    def authorize(self, auth_id, access_level):
+    def _recover_auth_meta(self, auth_id, auth_meta):
+        """
+        Call me after locking the auth meta file.
+        """
+        remove_subvolumes = []
+
+        for subvol, subvol_data in auth_meta['volumes'].items():
+            if not subvol_data['dirty']:
+                continue
+
+            (group_name, subvol_name) = subvol.split('/')
+            group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME
+            access_level = subvol_data['access_level']
+
+            with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name):
+                subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name)
+
+                # No SVMeta update indicates that there was no auth update
+                # in Ceph either. So it's safe to remove corresponding
+                # partial update in AMeta.
+                if not subvol_meta or auth_id not in subvol_meta['auths']:
+                    remove_subvolumes.append(subvol)
+                    continue
+
+                want_auth = {
+                    'access_level': access_level,
+                    'dirty': False,
+                }
+                # SVMeta update looks clean. Ceph auth update must have been
+                # clean.
+                if subvol_meta['auths'][auth_id] == want_auth:
+                    continue
+
+                self._authorize_subvolume(auth_id, access_level)
+
+            # Recovered from partial auth updates for the auth ID's access
+            # to a subvolume.
+            auth_meta['volumes'][subvol]['dirty'] = False
+            self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+        for subvol in remove_subvolumes:
+            del auth_meta['volumes'][subvol]
+
+        if not auth_meta['volumes']:
+            # Clean up auth meta file
+            self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+            return
+
+        # Recovered from all partial auth updates for the auth ID.
+        auth_meta['dirty'] = False
+        self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+    def authorize(self, auth_id, access_level, tenant_id=None):
+        """
+        Get-or-create a Ceph auth identity for `auth_id` and grant them access
+        to
+        :param auth_id:
+        :param access_level:
+        :param tenant_id: Optionally provide a stringizable object to
+                          restrict any created cephx IDs to other callers
+                          passing the same tenant ID.
+        :return:
+        """
+
+        with self.auth_mdata_mgr.auth_lock(auth_id):
+            # Existing meta, or None, to be updated
+            auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+            # subvolume data to be inserted
+            group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+            group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+            subvolume = {
+                group_subvol_id : {
+                    # The access level at which the auth_id is authorized to
+                    # access the volume.
+                    'access_level': access_level,
+                    'dirty': True,
+                }
+            }
+            if auth_meta is None:
+                sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+                    auth_id, tenant_id
+                ))
+                log.debug("Authorize: no existing meta")
+                auth_meta = {
+                    'dirty': True,
+                    'tenant_id': str(tenant_id) if tenant_id else None,
+                    'volumes': subvolume
+                }
+
+                # Note: this is *not* guaranteeing that the key doesn't already
+                # exist in Ceph: we are allowing VolumeClient tenants to
+                # 'claim' existing Ceph keys.  In order to prevent VolumeClient
+                # tenants from reading e.g. client.admin keys, you need to
+                # have configured your VolumeClient user (e.g. Manila) to
+                # have mon auth caps that prevent it from accessing those keys
+                # (e.g. limit it to only access keys with a manila.* prefix)
+            else:
+                # Disallow tenants to share auth IDs
+                if str(auth_meta['tenant_id']) != str(tenant_id):
+                    msg = "auth ID: {0} is already in use".format(auth_id)
+                    log.error(msg)
+                    raise VolumeException(-errno.EPERM, msg)
+
+                if auth_meta['dirty']:
+                    self._recover_auth_meta(auth_id, auth_meta)
+
+                log.debug("Authorize: existing tenant {tenant}".format(
+                    tenant=auth_meta['tenant_id']
+                ))
+                auth_meta['dirty'] = True
+                auth_meta['volumes'].update(subvolume)
+
+            self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+            with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+                key = self._authorize_subvolume(auth_id, access_level)
+
+            auth_meta['dirty'] = False
+            auth_meta['volumes'][group_subvol_id]['dirty'] = False
+            self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+            if tenant_id:
+                return key
+            else:
+                # Caller wasn't multi-tenant aware: be safe and don't give
+                # them a key
+                return ""
+
+    def _authorize_subvolume(self, auth_id, access_level):
+        subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+        auth = {
+            auth_id: {
+                'access_level': access_level,
+                'dirty': True,
+            }
+        }
+
+        if subvol_meta is None:
+            subvol_meta = {
+                'auths': auth
+            }
+        else:
+            subvol_meta['auths'].update(auth)
+            self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+        key = self._authorize(auth_id, access_level)
+
+        subvol_meta['auths'][auth_id]['dirty'] = False
+        self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+        return key
+
+    def _authorize(self, auth_id, access_level):
         subvol_path = self.path
         log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path))
 
@@ -265,6 +421,74 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
                             unwanted_mds_cap, unwanted_osd_cap)
 
     def deauthorize(self, auth_id):
+        with self.auth_mdata_mgr.auth_lock(auth_id):
+            # Existing meta, or None, to be updated
+            auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
+
+            group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
+            group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
+            if (auth_meta is None) or (not auth_meta['volumes']):
+                log.warning("deauthorized called for already-removed auth"
+                         "ID '{auth_id}' for subvolume '{subvolume}'".format(
+                    auth_id=auth_id, subvolume=self.subvolname
+                ))
+                # Clean up the auth meta file of an auth ID
+                self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+                return
+
+            if group_subvol_id not in auth_meta['volumes']:
+                log.warning("deauthorized called for already-removed auth"
+                         "ID '{auth_id}' for subvolume '{subvolume}'".format(
+                    auth_id=auth_id, subvolume=self.subvolname
+                ))
+                return
+
+            if auth_meta['dirty']:
+                self._recover_auth_meta(auth_id, auth_meta)
+
+            auth_meta['dirty'] = True
+            auth_meta['volumes'][group_subvol_id]['dirty'] = True
+            self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+            self._deauthorize_subvolume(auth_id)
+
+            # Filter out the volume we're deauthorizing
+            del auth_meta['volumes'][group_subvol_id]
+
+            # Clean up auth meta file
+            if not auth_meta['volumes']:
+                self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
+                return
+
+            auth_meta['dirty'] = False
+            self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
+
+    def _deauthorize_subvolume(self, auth_id):
+        with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
+            subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
+
+            if (subvol_meta is None) or (auth_id not in subvol_meta['auths']):
+                log.warning("deauthorized called for already-removed auth"
+                         "ID '{auth_id}' for subvolume '{subvolume}'".format(
+                    auth_id=auth_id, subvolume=self.subvolname
+                ))
+                return
+
+            subvol_meta['auths'][auth_id]['dirty'] = True
+            self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+            self._deauthorize(auth_id)
+
+            # Remove the auth_id from the metadata *after* removing it
+            # from ceph, so that if we crashed here, we would actually
+            # recreate the auth ID during recovery (i.e. end up with
+            # a consistent state).
+
+            # Filter out the auth we're removing
+            del subvol_meta['auths'][auth_id]
+            self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
+
+    def _deauthorize(self, auth_id):
         """
         The volume must still exist.
         """
index 170c5dcab1618a8bd61119a453fa8cb0f3bc71a6..1dd6f3fe3aa82e6eb0d02ff45fd5c46d7845d368 100644 (file)
@@ -185,6 +185,9 @@ class SubvolumeV2(SubvolumeV1):
                 self.metadata_mgr.flush()
             else:
                 self.init_config(SubvolumeV2.VERSION, subvolume_type, qpath, initial_state)
+
+            # Create the subvolume metadata file which manages auth-ids if it doesn't exist
+            self.auth_mdata_mgr.create_subvolume_metadata_file(self.group.groupname, self.subvolname)
         except (VolumeException, MetadataMgrException, cephfs.Error) as e:
             try:
                 self._remove_on_failure(subvol_path, retained)
@@ -341,12 +344,16 @@ class SubvolumeV2(SubvolumeV1):
         else:
             if not self.has_pending_purges:
                 self.trash_base_dir()
+                # Delete the volume meta file, if it's not already deleted
+                self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
                 return
         if self.state != SubvolumeStates.STATE_RETAINED:
             self.trash_incarnation_dir()
             self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
             self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
             self.metadata_mgr.flush()
+            # Delete the volume meta file, if it's not already deleted
+            self.auth_mdata_mgr.delete_subvolume_metadata_file(self.group.groupname, self.subvolname)
 
     def info(self):
         if self.state != SubvolumeStates.STATE_RETAINED: