]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
pybind: ceph_volume_client authentication metadata
authorJohn Spray <john.spray@redhat.com>
Wed, 2 Mar 2016 12:30:45 +0000 (12:30 +0000)
committerRamana Raja <rraja@redhat.com>
Mon, 18 Jul 2016 10:06:40 +0000 (15:36 +0530)
Store a two-way mapping between auth IDs and volumes.

Enables us to record some metadata on auth ids (which
openstack tenant created it) so that we can avoid exposing
keys to other tenants who try to use the same ceph
auth id.

Enables us to expose the list of which auth ids have access
to a volume, so that Manila's update_access() can be
implemented efficiently.

DNM: see TODOs inline.

Fixes: http://tracker.ceph.com/issues/15615
Signed-off-by: John Spray <john.spray@redhat.com>
src/pybind/ceph_volume_client.py

index b570800b911d08aee8ace91f10993139eb859732..c08c3c6ed9c971a92d8e9d7f84b7cefc5c79b977 100644 (file)
@@ -4,17 +4,24 @@ Copyright (C) 2015 Red Hat, Inc.
 LGPL2.  See file COPYING.
 """
 
+from contextlib import contextmanager
+import errno
+import fcntl
 import json
 import logging
 import os
+import re
+import struct
+import sys
 import threading
-import errno
 import time
+import uuid
 
-import rados
-import cephfs
 from ceph_argparse import json_command
 
+import cephfs
+import rados
+
 
 class RadosError(Exception):
     """
@@ -217,6 +224,89 @@ class CephFSVolumeClient(object):
         self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
         self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
 
+        # For flock'ing in cephfs, I want a unique ID to distinguish me
+        # from any other manila-share services that are loading this module.
+        # We could use pid, but that's unnecessary weak: generate a
+        # UUID
+        self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
+
+        # TODO: prevent craftily-named volumes from colliding with
+        # ".meta" filenames
+        # TODO: remove .meta files on volume deletion
+        # TODO: remove .meta files on last rule for an auth ID deletion
+        # TODO: implement fsync in bindings so that we don't have to syncfs
+        # TODO: version the on-disk structures
+        # TODO: check dirty flag after locking something and call recover()
+        # if we are opening something dirty (racing with another instance
+        # of the driver restarting after failure) -- only required if someone
+        # running multiple manila-share instances with Ceph loaded.
+
+    def recover(self):
+        # Scan all auth keys to see if they're dirty: if they are, they have
+        # state that might not have propagated to Ceph or to the related
+        # volumes yet.
+
+        # Important: we *always* acquire locks in the order auth->volume
+        # That means a volume can never be dirty without the auth key
+        # we're updating it with being dirty at the same time.
+
+        # First list the auth IDs that have potentially dirty on-disk metadata
+        dir_handle = self.fs.opendir(self.volume_prefix)
+        d = self.fs.readdir(dir_handle)
+        auth_ids = []
+        while d:
+            d = self.fs.readdir(dir_handle)
+
+            # Identify auth IDs from auth meta filenames. The auth meta files
+            # are named as, "$<auth_id>.meta".
+            match = re.search("^\$(.*)\.meta$", d.d_name)
+            if match:
+                auth_ids.append(match.group(1))
+
+        self.fs.closedir(dir_handle)
+
+        # Key points based on ordering:
+        # * Anything added in VMeta is already added in AMeta
+        # * Anything added in Ceph is already added in VMeta
+        # * Anything removed in VMeta is already removed in Ceph
+        # * Anything removed in AMeta is already removed in VMeta
+
+        # Deauthorization: because I only update metadata AFTER the
+        # update of the next level down, I have the same ordering of
+        # -> things which exist in the AMeta should also exist
+        #    in the VMeta, should also exist in Ceph, and the same
+        #    recovery procedure that gets me consistent after crashes
+        #    during authorization will also work during deauthorization
+
+        # Now for each auth ID, check for dirty flag and apply updates
+        # if dirty flag is found
+        for auth_id in auth_ids:
+            with self._auth_lock(auth_id):
+                auth_meta = self._auth_metadata_get(auth_id)
+                if not auth_meta or not auth_meta['dirty']:
+                    continue
+
+                for volume in auth_meta['volumes']:
+                    volume_path = VolumePath(volume['volume_id'],
+                                             volume['group_id'])
+                    access_level = volume['access_level']
+
+                    with self._volume_lock(volume_path):
+                        volume_meta = self._volume_metadata_get(volume_path)
+
+                        auth = {
+                            'id': auth_id,
+                            'access_level': access_level
+                        }
+
+                        if (auth not in volume_meta['auths'] or
+                            volume_meta['dirty']):
+                            readonly = True if access_level is 'r' else False
+                            self._authorize_volume(volume_path, auth_id, readonly)
+
+                auth_meta['dirty'] = False
+                self._auth_metadata_set(auth_id, auth_meta)
+
     def evict(self, auth_id, timeout=30, volume_path=None):
         """
         Evict all clients based on the authorization ID and optionally based on
@@ -343,7 +433,8 @@ class CephFSVolumeClient(object):
         self.disconnect()
 
     def _get_pool_id(self, osd_map, pool_name):
-        # Maybe borrow the OSDMap wrapper class from calamari if more helpers like this aren't needed.
+        # Maybe borrow the OSDMap wrapper class from calamari if more helpers
+        # like this are needed.
         for pool in osd_map['pools']:
             if pool['pool_name'] == pool_name:
                 return pool['pool']
@@ -454,7 +545,7 @@ class CephFSVolumeClient(object):
         if size is not None:
             self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
 
-        # data_isolated means create a seperate pool for this volume
+        # data_isolated means create a separate pool for this volume
         if data_isolated:
             pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
             log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
@@ -576,19 +667,198 @@ class CephFSVolumeClient(object):
             else:
                 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
 
-    def authorize(self, volume_path, auth_id, readonly=False):
+    def _metadata_get(self, path):
+        """
+        Return a deserialized JSON object, or None
+        """
+        fd = self.fs.open(path, "r")
+        # TODO iterate instead of assuming file < 4MB
+        read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+        self.fs.close(fd)
+        if read_bytes:
+            return json.loads(read_bytes)
+        else:
+            return None
+
+    def _metadata_set(self, path, data):
+        serialized = json.dumps(data)
+        fd = self.fs.open(path, "w")
+        try:
+            self.fs.write(fd, serialized, 0)
+            self.fs.sync_fs()
+        finally:
+            self.fs.close(fd)
+
+    def _lock(self, path):
+        @contextmanager
+        def fn():
+            fd = self.fs.open(path, os.O_CREAT, 0755)
+            self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+            try:
+                yield
+            finally:
+                self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+                self.fs.close(fd)
+
+        return fn()
+
+    def _auth_metadata_path(self, auth_id):
+        return os.path.join(self.volume_prefix, "${auth_id}.meta".format(
+            auth_id=auth_id))
+
+    def _auth_lock(self, auth_id):
+        return self._lock(self._auth_metadata_path(auth_id))
+
+    def _auth_metadata_get(self, auth_id):
+        return self._metadata_get(self._auth_metadata_path(auth_id))
+
+    def _auth_metadata_set(self, auth_id, data):
+        return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+    def _volume_metadata_path(self, volume_path):
+       if volume_path.group_id:
+            return os.path.join(self.volume_prefix, "_{0}:{1}.meta".format(
+                volume_path.group_id if volume_path.group_id else "",
+                volume_path.volume_id
+            ))
+
+    def _volume_lock(self, volume_path):
+        """
+        Return a ContextManager which locks the authorization metadata for
+        a particular volume, and persists a flag to the metadata indicating
+        that it is currently locked, so that we can detect dirty situations
+        during recovery.
+
+        This lock isn't just to make access to the metadata safe: it's also
+        designed to be used over the two-step process of checking the
+        metadata and then responding to an authorization request, to
+        ensure that at the point we respond the metadata hasn't changed
+        in the background.  It's key to how we avoid security holes
+        resulting from races during that problem ,
+        """
+        return self._lock(self._volume_metadata_path(volume_path))
+
+    def _volume_metadata_get(self, volume_path):
+        """
+        Call me with the metadata locked!
+        """
+        return self._metadata_get(self._volume_metadata_path(volume_path))
+
+    def _volume_metadata_set(self, volume_path, data):
+        """
+        Call me with the metadata locked!
+        """
+        return self._metadata_set(self._volume_metadata_path(volume_path), data)
+
+    def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
         """
         Get-or-create a Ceph auth identity for `auth_id` and grant them access
         to
         :param volume_path:
         :param auth_id:
         :param readonly:
+        :param tenant_id: Optionally provide a stringizable object to
+                          restrict any created cephx IDs to other callers
+                          passing the same tenant ID.
         :return:
         """
 
+        with self._auth_lock(auth_id):
+            # Existing meta, or None, to be updated
+            auth_meta = self._auth_metadata_get(auth_id)
+
+            # volume data to be inserted
+            volume = {
+                'group_id': volume_path.group_id,
+                'volume_id': volume_path.volume_id,
+                # The access level at which the auth_id is authorized to
+                # access the volume.
+                'access_level': 'r' if readonly else 'rw'
+            }
+            if auth_meta is None:
+                sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+                    auth_id, tenant_id
+                ))
+                log.debug("Authorize: no existing meta")
+                auth_meta = {
+                    'dirty': True,
+                    'tenant_id': tenant_id.__str__() if tenant_id else None,
+                    'volumes': [volume]
+                }
+
+                # Note: this is *not* guaranteeing that the key doesn't already
+                # exist in Ceph: we are allowing VolumeClient tenants to
+                # 'claim' existing Ceph keys.  In order to prevent VolumeClient
+                # tenants from reading e.g. client.admin keys, you need to
+                # have configured your VolumeClient user (e.g. Manila) to
+                # have mon auth caps that prevent it from accessing those keys
+                # (e.g. limit it to only access keys with a manila.* prefix)
+            else:
+                log.debug("Authorize: existing tenant {tenant}".format(
+                    tenant=auth_meta['tenant_id']
+                ))
+                auth_meta['dirty'] = True
+                if volume not in auth_meta['volumes']:
+                    auth_meta['volumes'].append(volume)
+
+            self._auth_metadata_set(auth_id, auth_meta)
+
+            with self._volume_lock(volume_path):
+                key = self._authorize_volume(volume_path, auth_id, readonly)
+
+            auth_meta['dirty'] = False
+            self._auth_metadata_set(auth_id, auth_meta)
+
+            if tenant_id:
+                if auth_meta['tenant_id'] == tenant_id.__str__():
+                    return {
+                        'auth_key': key
+                    }
+                else:
+                    return {
+                        'auth_key': None
+                    }
+            else:
+                # Caller wasn't multi-tenant aware: be safe and don't give
+                # them a key
+                return {
+                    'auth_key': None
+                }
+
+    def _authorize_volume(self, volume_path, auth_id, readonly):
+        vol_meta = self._volume_metadata_get(volume_path)
+
+        auth = {
+            'id': auth_id,
+            'access_level': 'r' if readonly else 'rw'
+        }
+
+        if vol_meta is None:
+            vol_meta = {
+                'dirty': True,
+                'auths': [auth]
+            }
+        else:
+            vol_meta['dirty'] = True
+
+            if auth not in vol_meta['auths']:
+                vol_meta['auths'].append(auth)
+
+        key = self._authorize_ceph(volume_path, auth_id, readonly)
+
+        vol_meta['dirty'] = False
+        self._volume_metadata_set(volume_path, vol_meta)
+
+        return key
+
+    def _authorize_ceph(self, volume_path, auth_id, readonly):
+        path = self._get_path(volume_path)
+        log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
+            auth_id, path
+        ))
+
         # First I need to work out what the data pool is for this share:
         # read the layout
-        path = self._get_path(volume_path)
         pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
         namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
 
@@ -673,13 +943,62 @@ class CephFSVolumeClient(object):
         # ]
         assert len(caps) == 1
         assert caps[0]['entity'] == client_entity
-        key = caps[0]['key']
+        return caps[0]['key']
+
+    def deauthorize(self, volume_path, auth_id, readonly=False):
+        with self._auth_lock(auth_id):
+            # Existing meta, or None, to be updated
+            auth_meta = self._auth_metadata_get(auth_id)
+
+            if auth_meta is None:
+                # Non-existent auth metadata is a clean state that means
+                # nothing authorized under this name: we must have already
+                # deauthorized.  Be idempotent and return without an error.
+                log.warn("deauthorized called for already-removed auth"
+                         "ID '{auth_id}'".format(
+                    auth_id=auth_id
+                ))
+                return
+
+            volume = {
+                'group_id': volume_path.group_id,
+                'volume_id': volume_path.volume_id,
+                'access_level': 'r' if readonly else 'rw'
+            }
 
-        return {
-            'auth_key': key
-        }
+            auth_meta['dirty'] = True
+
+            self._auth_metadata_set(auth_id, auth_meta)
 
-    def deauthorize(self, volume_path, auth_id):
+            with self._volume_lock(volume_path):
+                vol_meta = self._volume_metadata_get(volume_path)
+                vol_meta['dirty'] = True
+                self._volume_metadata_set(volume_path, vol_meta)
+
+                auth = {
+                    'id': auth_id,
+                    'access_level': 'r' if readonly else 'rw'
+                }
+
+                self._deauthorize(volume_path, auth_id, readonly)
+
+                # Remove the auth_id from the metadata *after* removing it
+                # from ceph, so that if we crashed here, we would actually
+                # recreate the auth ID during recovery (i.e. end up with
+                # a consistent state).
+
+                # Filter out the auth we're removing
+                vol_meta['auths'] =\
+                    [a for a in vol_meta['auths'] if a != auth]
+                vol_meta['dirty'] = False
+                self._volume_metadata_set(volume_path, vol_meta)
+
+            # Filter out the volume we're deauthorizing
+            auth_meta['volumes'] = [v for v in auth_meta['volumes'] if v != volume]
+            auth_meta['dirty'] = False
+            self._auth_metadata_set(auth_id, auth_meta)
+
+    def _deauthorize(self, volume_path, auth_id, readonly):
         """
         The volume must still exist.
         """
@@ -730,6 +1049,11 @@ class CephFSVolumeClient(object):
             # Already gone, great.
             return
 
+    def get_authorized_ids(self, volume_path):
+        with self._volume_lock(volume_path):
+            meta = self._volume_metadata_get(volume_path)
+            return meta['auths']
+
     def _rados_command(self, prefix, args=None, decode=True):
         """
         Safer wrapper for ceph_argparse.json_command, which raises