LGPL2. See file COPYING.
"""
+from contextlib import contextmanager
+import errno
+import fcntl
import json
import logging
import os
+import re
+import struct
+import sys
import threading
-import errno
import time
+import uuid
-import rados
-import cephfs
from ceph_argparse import json_command
+import cephfs
+import rados
+
class RadosError(Exception):
"""
self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
+ # For flock'ing in cephfs, I want a unique ID to distinguish me
+ # from any other manila-share services that are loading this module.
+ # We could use pid, but that's unnecessary weak: generate a
+ # UUID
+ self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
+
+ # TODO: prevent craftily-named volumes from colliding with
+ # ".meta" filenames
+ # TODO: remove .meta files on volume deletion
+ # TODO: remove .meta files on last rule for an auth ID deletion
+ # TODO: implement fsync in bindings so that we don't have to syncfs
+ # TODO: version the on-disk structures
+ # TODO: check dirty flag after locking something and call recover()
+ # if we are opening something dirty (racing with another instance
+ # of the driver restarting after failure) -- only required if someone
+ # running multiple manila-share instances with Ceph loaded.
+
+ def recover(self):
+ # Scan all auth keys to see if they're dirty: if they are, they have
+ # state that might not have propagated to Ceph or to the related
+ # volumes yet.
+
+ # Important: we *always* acquire locks in the order auth->volume
+ # That means a volume can never be dirty without the auth key
+ # we're updating it with being dirty at the same time.
+
+ # First list the auth IDs that have potentially dirty on-disk metadata
+ dir_handle = self.fs.opendir(self.volume_prefix)
+ d = self.fs.readdir(dir_handle)
+ auth_ids = []
+ while d:
+ d = self.fs.readdir(dir_handle)
+
+ # Identify auth IDs from auth meta filenames. The auth meta files
+ # are named as, "$<auth_id>.meta".
+ match = re.search("^\$(.*)\.meta$", d.d_name)
+ if match:
+ auth_ids.append(match.group(1))
+
+ self.fs.closedir(dir_handle)
+
+ # Key points based on ordering:
+ # * Anything added in VMeta is already added in AMeta
+ # * Anything added in Ceph is already added in VMeta
+ # * Anything removed in VMeta is already removed in Ceph
+ # * Anything removed in AMeta is already removed in VMeta
+
+ # Deauthorization: because I only update metadata AFTER the
+ # update of the next level down, I have the same ordering of
+ # -> things which exist in the AMeta should also exist
+ # in the VMeta, should also exist in Ceph, and the same
+ # recovery procedure that gets me consistent after crashes
+ # during authorization will also work during deauthorization
+
+ # Now for each auth ID, check for dirty flag and apply updates
+ # if dirty flag is found
+ for auth_id in auth_ids:
+ with self._auth_lock(auth_id):
+ auth_meta = self._auth_metadata_get(auth_id)
+ if not auth_meta or not auth_meta['dirty']:
+ continue
+
+ for volume in auth_meta['volumes']:
+ volume_path = VolumePath(volume['volume_id'],
+ volume['group_id'])
+ access_level = volume['access_level']
+
+ with self._volume_lock(volume_path):
+ volume_meta = self._volume_metadata_get(volume_path)
+
+ auth = {
+ 'id': auth_id,
+ 'access_level': access_level
+ }
+
+ if (auth not in volume_meta['auths'] or
+ volume_meta['dirty']):
+ readonly = True if access_level is 'r' else False
+ self._authorize_volume(volume_path, auth_id, readonly)
+
+ auth_meta['dirty'] = False
+ self._auth_metadata_set(auth_id, auth_meta)
+
def evict(self, auth_id, timeout=30, volume_path=None):
"""
Evict all clients based on the authorization ID and optionally based on
self.disconnect()
def _get_pool_id(self, osd_map, pool_name):
- # Maybe borrow the OSDMap wrapper class from calamari if more helpers like this aren't needed.
+ # Maybe borrow the OSDMap wrapper class from calamari if more helpers
+ # like this are needed.
for pool in osd_map['pools']:
if pool['pool_name'] == pool_name:
return pool['pool']
if size is not None:
self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
- # data_isolated means create a seperate pool for this volume
+ # data_isolated means create a separate pool for this volume
if data_isolated:
pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
else:
return self._get_ancestor_xattr(os.path.split(path)[0], attr)
- def authorize(self, volume_path, auth_id, readonly=False):
+ def _metadata_get(self, path):
+ """
+ Return a deserialized JSON object, or None
+ """
+ fd = self.fs.open(path, "r")
+ # TODO iterate instead of assuming file < 4MB
+ read_bytes = self.fs.read(fd, 0, 4096 * 1024)
+ self.fs.close(fd)
+ if read_bytes:
+ return json.loads(read_bytes)
+ else:
+ return None
+
+ def _metadata_set(self, path, data):
+ serialized = json.dumps(data)
+ fd = self.fs.open(path, "w")
+ try:
+ self.fs.write(fd, serialized, 0)
+ self.fs.sync_fs()
+ finally:
+ self.fs.close(fd)
+
+ def _lock(self, path):
+ @contextmanager
+ def fn():
+ fd = self.fs.open(path, os.O_CREAT, 0755)
+ self.fs.flock(fd, fcntl.LOCK_EX, self._id)
+ try:
+ yield
+ finally:
+ self.fs.flock(fd, fcntl.LOCK_UN, self._id)
+ self.fs.close(fd)
+
+ return fn()
+
+ def _auth_metadata_path(self, auth_id):
+ return os.path.join(self.volume_prefix, "${auth_id}.meta".format(
+ auth_id=auth_id))
+
+ def _auth_lock(self, auth_id):
+ return self._lock(self._auth_metadata_path(auth_id))
+
+ def _auth_metadata_get(self, auth_id):
+ return self._metadata_get(self._auth_metadata_path(auth_id))
+
+ def _auth_metadata_set(self, auth_id, data):
+ return self._metadata_set(self._auth_metadata_path(auth_id), data)
+
+ def _volume_metadata_path(self, volume_path):
+ if volume_path.group_id:
+ return os.path.join(self.volume_prefix, "_{0}:{1}.meta".format(
+ volume_path.group_id if volume_path.group_id else "",
+ volume_path.volume_id
+ ))
+
+ def _volume_lock(self, volume_path):
+ """
+ Return a ContextManager which locks the authorization metadata for
+ a particular volume, and persists a flag to the metadata indicating
+ that it is currently locked, so that we can detect dirty situations
+ during recovery.
+
+ This lock isn't just to make access to the metadata safe: it's also
+ designed to be used over the two-step process of checking the
+ metadata and then responding to an authorization request, to
+ ensure that at the point we respond the metadata hasn't changed
+ in the background. It's key to how we avoid security holes
+ resulting from races during that problem ,
+ """
+ return self._lock(self._volume_metadata_path(volume_path))
+
+ def _volume_metadata_get(self, volume_path):
+ """
+ Call me with the metadata locked!
+ """
+ return self._metadata_get(self._volume_metadata_path(volume_path))
+
+ def _volume_metadata_set(self, volume_path, data):
+ """
+ Call me with the metadata locked!
+ """
+ return self._metadata_set(self._volume_metadata_path(volume_path), data)
+
+ def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
"""
Get-or-create a Ceph auth identity for `auth_id` and grant them access
to
:param volume_path:
:param auth_id:
:param readonly:
+ :param tenant_id: Optionally provide a stringizable object to
+ restrict any created cephx IDs to other callers
+ passing the same tenant ID.
:return:
"""
+ with self._auth_lock(auth_id):
+ # Existing meta, or None, to be updated
+ auth_meta = self._auth_metadata_get(auth_id)
+
+ # volume data to be inserted
+ volume = {
+ 'group_id': volume_path.group_id,
+ 'volume_id': volume_path.volume_id,
+ # The access level at which the auth_id is authorized to
+ # access the volume.
+ 'access_level': 'r' if readonly else 'rw'
+ }
+ if auth_meta is None:
+ sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
+ auth_id, tenant_id
+ ))
+ log.debug("Authorize: no existing meta")
+ auth_meta = {
+ 'dirty': True,
+ 'tenant_id': tenant_id.__str__() if tenant_id else None,
+ 'volumes': [volume]
+ }
+
+ # Note: this is *not* guaranteeing that the key doesn't already
+ # exist in Ceph: we are allowing VolumeClient tenants to
+ # 'claim' existing Ceph keys. In order to prevent VolumeClient
+ # tenants from reading e.g. client.admin keys, you need to
+ # have configured your VolumeClient user (e.g. Manila) to
+ # have mon auth caps that prevent it from accessing those keys
+ # (e.g. limit it to only access keys with a manila.* prefix)
+ else:
+ log.debug("Authorize: existing tenant {tenant}".format(
+ tenant=auth_meta['tenant_id']
+ ))
+ auth_meta['dirty'] = True
+ if volume not in auth_meta['volumes']:
+ auth_meta['volumes'].append(volume)
+
+ self._auth_metadata_set(auth_id, auth_meta)
+
+ with self._volume_lock(volume_path):
+ key = self._authorize_volume(volume_path, auth_id, readonly)
+
+ auth_meta['dirty'] = False
+ self._auth_metadata_set(auth_id, auth_meta)
+
+ if tenant_id:
+ if auth_meta['tenant_id'] == tenant_id.__str__():
+ return {
+ 'auth_key': key
+ }
+ else:
+ return {
+ 'auth_key': None
+ }
+ else:
+ # Caller wasn't multi-tenant aware: be safe and don't give
+ # them a key
+ return {
+ 'auth_key': None
+ }
+
+ def _authorize_volume(self, volume_path, auth_id, readonly):
+ vol_meta = self._volume_metadata_get(volume_path)
+
+ auth = {
+ 'id': auth_id,
+ 'access_level': 'r' if readonly else 'rw'
+ }
+
+ if vol_meta is None:
+ vol_meta = {
+ 'dirty': True,
+ 'auths': [auth]
+ }
+ else:
+ vol_meta['dirty'] = True
+
+ if auth not in vol_meta['auths']:
+ vol_meta['auths'].append(auth)
+
+ key = self._authorize_ceph(volume_path, auth_id, readonly)
+
+ vol_meta['dirty'] = False
+ self._volume_metadata_set(volume_path, vol_meta)
+
+ return key
+
+ def _authorize_ceph(self, volume_path, auth_id, readonly):
+ path = self._get_path(volume_path)
+ log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
+ auth_id, path
+ ))
+
# First I need to work out what the data pool is for this share:
# read the layout
- path = self._get_path(volume_path)
pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
# ]
assert len(caps) == 1
assert caps[0]['entity'] == client_entity
- key = caps[0]['key']
+ return caps[0]['key']
+
+ def deauthorize(self, volume_path, auth_id, readonly=False):
+ with self._auth_lock(auth_id):
+ # Existing meta, or None, to be updated
+ auth_meta = self._auth_metadata_get(auth_id)
+
+ if auth_meta is None:
+ # Non-existent auth metadata is a clean state that means
+ # nothing authorized under this name: we must have already
+ # deauthorized. Be idempotent and return without an error.
+ log.warn("deauthorized called for already-removed auth"
+ "ID '{auth_id}'".format(
+ auth_id=auth_id
+ ))
+ return
+
+ volume = {
+ 'group_id': volume_path.group_id,
+ 'volume_id': volume_path.volume_id,
+ 'access_level': 'r' if readonly else 'rw'
+ }
- return {
- 'auth_key': key
- }
+ auth_meta['dirty'] = True
+
+ self._auth_metadata_set(auth_id, auth_meta)
- def deauthorize(self, volume_path, auth_id):
+ with self._volume_lock(volume_path):
+ vol_meta = self._volume_metadata_get(volume_path)
+ vol_meta['dirty'] = True
+ self._volume_metadata_set(volume_path, vol_meta)
+
+ auth = {
+ 'id': auth_id,
+ 'access_level': 'r' if readonly else 'rw'
+ }
+
+ self._deauthorize(volume_path, auth_id, readonly)
+
+ # Remove the auth_id from the metadata *after* removing it
+ # from ceph, so that if we crashed here, we would actually
+ # recreate the auth ID during recovery (i.e. end up with
+ # a consistent state).
+
+ # Filter out the auth we're removing
+ vol_meta['auths'] =\
+ [a for a in vol_meta['auths'] if a != auth]
+ vol_meta['dirty'] = False
+ self._volume_metadata_set(volume_path, vol_meta)
+
+ # Filter out the volume we're deauthorizing
+ auth_meta['volumes'] = [v for v in auth_meta['volumes'] if v != volume]
+ auth_meta['dirty'] = False
+ self._auth_metadata_set(auth_id, auth_meta)
+
+ def _deauthorize(self, volume_path, auth_id, readonly):
"""
The volume must still exist.
"""
# Already gone, great.
return
+ def get_authorized_ids(self, volume_path):
+ with self._volume_lock(volume_path):
+ meta = self._volume_metadata_get(volume_path)
+ return meta['auths']
+
def _rados_command(self, prefix, args=None, decode=True):
"""
Safer wrapper for ceph_argparse.json_command, which raises