%endif
Requires: ceph-base = %{epoch}:%{version}-%{release}
%if 0%{?fedora} || 0%{?rhel}
-Requires: python-cherrypy
+Requires: python-cherrypy
+Requires: pyOpenSSL
+Requires: python-werkzeug
%endif
%if 0%{?suse_version}
Requires: python-CherryPy
+Requires: python-pyOpenSSL
+Requires: python-Werkzeug
%endif
-
+Requires: python-pecan
%description mgr
ceph-mgr enables python modules that provide services (such as the REST
module derived from Calamari) and expose CLI hooks. ceph-mgr gathers
Package: ceph-mgr
Architecture: linux-any
Depends: ceph-base (= ${binary:Version}),
+ python-openssl
+ python-pecan
+ python-werkzeug
${misc:Depends},
${python:Depends},
python-cherrypy3,
OPTION(rgw_swift_versioning_enabled, OPT_BOOL, false) // whether swift object versioning feature is enabled
OPTION(mgr_module_path, OPT_STR, CEPH_PKGLIBDIR "/mgr") // where to load python modules from
-OPTION(mgr_modules, OPT_STR, "rest") // Which modules to load
+OPTION(mgr_modules, OPT_STR, "restful") // Which modules to load
OPTION(mgr_data, OPT_STR, "/var/lib/ceph/mgr/$cluster-$id") // where to find keyring etc
OPTION(mgr_beacon_period, OPT_INT, 5) // How frequently to send beacon
OPTION(mgr_stats_period, OPT_INT, 5) // How frequently to send stats
profile_grants.push_back(MonCapGrant("mon", MON_CAP_R));
profile_grants.push_back(MonCapGrant("mds", MON_CAP_R));
profile_grants.push_back(MonCapGrant("osd", MON_CAP_R | MON_CAP_W));
- profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R));
+ profile_grants.push_back(MonCapGrant("auth", MON_CAP_R | MON_CAP_X));
+ profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R | MON_CAP_W));
string prefix = string("daemon-private/mgr/");
profile_grants.push_back(MonCapGrant("config-key get", "key",
StringConstraint("", prefix)));
--- /dev/null
+from module import * # NOQA
--- /dev/null
+from pecan import expose, request, response
+from pecan.rest import RestController
+
+import common
+import traceback
+
+from base64 import b64decode
+from functools import wraps
+from collections import defaultdict
+
+## We need this to access the instance of the module
+#
+# We can't use 'from module import instance' because
+# the instance is not ready, yet (would be None)
+import module
+
+
+# Helper function to catch and log the exceptions
+def catch(f):
+ @wraps(f)
+ def catcher(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except:
+ module.instance.log.error(str(traceback.format_exc()))
+ response.status = 500
+ return {'message': str(traceback.format_exc()).split('\n')}
+ return catcher
+
+
+# Handle authorization
+def auth(f):
+ @wraps(f)
+ def decorated(*args, **kwargs):
+ if not request.authorization:
+ response.status = 401
+ response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"'
+ return {'message': 'auth: No HTTP username/password'}
+
+ username, password = b64decode(request.authorization[1]).split(':')
+
+ # Lookup the password-less tokens first
+ if username not in module.instance.tokens.values():
+ # Check the ceph auth db
+ msg = module.instance.verify_user(username, password)
+ if msg:
+ response.status = 401
+ response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"'
+ return {'message': 'auth: No HTTP username/password'}
+
+ return f(*args, **kwargs)
+ return decorated
+
+
+# Helper function to lock the function
+def lock(f):
+ @wraps(f)
+ def locker(*args, **kwargs):
+ with module.instance.requests_lock:
+ return f(*args, **kwargs)
+ return locker
+
+
+
+class ServerFqdn(RestController):
+ def __init__(self, fqdn):
+ self.fqdn = fqdn
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for the server fqdn
+ """
+ return module.instance.get_server(self.fqdn)
+
+
+
+class Server(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for all the servers
+ """
+ return module.instance.list_servers()
+
+
+ @expose()
+ def _lookup(self, fqdn, *remainder):
+ return ServerFqdn(fqdn), remainder
+
+
+
+class RequestId(RestController):
+ def __init__(self, request_id):
+ self.request_id = request_id
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for the request id
+ """
+ request = filter(
+ lambda x: x.id == self.request_id,
+ module.instance.requests
+ )
+
+ if len(request) != 1:
+ response.status = 500
+ return {'message': 'Unknown request id "%s"' % str(self.request_id)}
+
+ request = request[0]
+ return request.humanify()
+
+
+ @expose('json')
+ @catch
+ @auth
+ @lock
+ def delete(self):
+ """
+ Remove the request id from the database
+ """
+ for index in range(len(module.instance.requests)):
+ if module.instance.requests[index].id == self.request_id:
+ return module.instance.requests.pop(index).humanify()
+
+ # Failed to find the job to cancel
+ response.status = 500
+ return {'message': 'No such request id'}
+
+
+
+class Request(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ List all the available requests and their state
+ """
+ states = {}
+ for _request in module.instance.requests:
+ states[_request.id] = _request.get_state()
+
+ return states
+
+
+ @expose('json')
+ @catch
+ @auth
+ @lock
+ def delete(self):
+ """
+ Remove all the finished requests
+ """
+ num_requests = len(module.instance.requests)
+
+ module.instance.requests = filter(
+ lambda x: not x.is_finished(),
+ module.instance.requests
+ )
+
+ # Return the job statistics
+ return {
+ 'cleaned': num_requests - len(module.instance.requests),
+ 'remaining': len(module.instance.requests),
+ }
+
+
+ @expose()
+ def _lookup(self, request_id, *remainder):
+ return RequestId(request_id), remainder
+
+
+
+class PoolId(RestController):
+ def __init__(self, pool_id):
+ self.pool_id = pool_id
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for the pool id
+ """
+ pool = module.instance.get_pool_by_id(self.pool_id)
+
+ if not pool:
+ response.status = 500
+ return {'message': 'Failed to identify the pool id "%d"' % self.pool_id}
+
+ # pgp_num is called pg_placement_num, deal with that
+ if 'pg_placement_num' in pool:
+ pool['pgp_num'] = pool.pop('pg_placement_num')
+ return pool
+
+
+ @expose('json')
+ @catch
+ @auth
+ def patch(self):
+ """
+ Modify the information for the pool id
+ """
+ args = request.json
+
+ # Get the pool info for its name
+ pool = module.instance.get_pool_by_id(self.pool_id)
+ if not pool:
+ response.status = 500
+ return {'message': 'Failed to identify the pool id "%d"' % self.pool_id}
+
+ # Check for invalid pool args
+ invalid = common.invalid_pool_args(args)
+ if invalid:
+ response.status = 500
+ return {'message': 'Invalid arguments found: "%s"' % str(invalid)}
+
+ # Schedule the update request
+ return module.instance.submit_request(common.pool_update_commands(pool['pool_name'], args))
+
+
+ @expose('json')
+ @catch
+ @auth
+ def delete(self):
+ """
+ Remove the pool data for the pool id
+ """
+ pool = module.instance.get_pool_by_id(self.pool_id)
+
+ if not pool:
+ response.status = 500
+ return {'message': 'Failed to identify the pool id "%d"' % self.pool_id}
+
+ return module.instance.submit_request([[{
+ 'prefix': 'osd pool delete',
+ 'pool': pool['pool_name'],
+ 'pool2': pool['pool_name'],
+ 'sure': '--yes-i-really-really-mean-it'
+ }]])
+
+
+
+class Pool(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for all the pools
+ """
+ pools = module.instance.get('osd_map')['pools']
+
+ # pgp_num is called pg_placement_num, deal with that
+ for pool in pools:
+ if 'pg_placement_num' in pool:
+ pool['pgp_num'] = pool.pop('pg_placement_num')
+
+ return pools
+
+
+ @expose('json')
+ @catch
+ @auth
+ def post(self):
+ """
+ Create a new pool
+ Requires name and pg_num dict arguments
+ """
+ args = request.json
+
+ # Check for the required arguments
+ pool_name = args.pop('name', None)
+ if pool_name is None:
+ response.status = 500
+ return {'message': 'You need to specify the pool "name" argument'}
+
+ pg_num = args.pop('pg_num', None)
+ if pg_num is None:
+ response.status = 500
+ return {'message': 'You need to specify the "pg_num" argument'}
+
+ # Run the pool create command first
+ create_command = {
+ 'prefix': 'osd pool create',
+ 'pool': pool_name,
+ 'pg_num': pg_num
+ }
+
+ # Check for invalid pool args
+ invalid = common.invalid_pool_args(args)
+ if invalid:
+ response.status = 500
+ return {'message': 'Invalid arguments found: "%s"' % str(invalid)}
+
+ # Schedule the creation and update requests
+ return module.instance.submit_request(
+ [[create_command]] +
+ common.pool_update_commands(pool_name, args)
+ )
+
+
+ @expose()
+ def _lookup(self, pool_id, *remainder):
+ return PoolId(int(pool_id)), remainder
+
+
+
+class OsdIdCommand(RestController):
+ def __init__(self, osd_id):
+ self.osd_id = osd_id
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show implemented commands for the OSD id
+ """
+ osd = module.instance.get_osd_by_id(self.osd_id)
+
+ if not osd:
+ response.status = 500
+ return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id}
+
+ if osd['up']:
+ return common.OSD_IMPLEMENTED_COMMANDS
+ else:
+ return []
+
+
+ @expose('json')
+ @catch
+ @auth
+ def post(self):
+ """
+ Run the implemented command for the OSD id
+ """
+ command = request.json.get('command', None)
+
+ osd = module.instance.get_osd_by_id(self.osd_id)
+
+ if not osd:
+ response.status = 500
+ return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id}
+
+ if not osd['up'] or command not in common.OSD_IMPLEMENTED_COMMANDS:
+ response.status = 500
+ return {'message': 'Command "%s" not available' % command}
+
+ return module.instance.submit_request([[{
+ 'prefix': 'osd ' + command,
+ 'who': str(self.osd_id)
+ }]])
+
+
+
+class OsdId(RestController):
+ def __init__(self, osd_id):
+ self.osd_id = osd_id
+ self.command = OsdIdCommand(osd_id)
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for the OSD id
+ """
+ osd = module.instance.get_osds([str(self.osd_id)])
+ if len(osd) != 1:
+ response.status = 500
+ return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id}
+
+ return osd[0]
+
+
+ @expose('json')
+ @catch
+ @auth
+ def patch(self):
+ """
+ Modify the state (up, in) of the OSD id or reweight it
+ """
+ args = request.json
+
+ commands = []
+
+ if 'in' in args:
+ if args['in']:
+ commands.append({
+ 'prefix': 'osd in',
+ 'ids': [str(self.osd_id)]
+ })
+ else:
+ commands.append({
+ 'prefix': 'osd out',
+ 'ids': [str(self.osd_id)]
+ })
+
+ if 'up' in args:
+ if args['up']:
+ response.status = 500
+ return {'message': "It is not valid to set a down OSD to be up"}
+ else:
+ commands.append({
+ 'prefix': 'osd down',
+ 'ids': [str(self.osd_id)]
+ })
+
+ if 'reweight' in args:
+ commands.append({
+ 'prefix': 'osd reweight',
+ 'id': self.osd_id,
+ 'weight': args['reweight']
+ })
+
+ return module.instance.submit_request([commands])
+
+
+
+class Osd(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for all the OSDs
+ """
+ # Parse request args
+ ids = request.GET.getall('id[]')
+ pool_id = request.GET.get('pool', None)
+
+ return module.instance.get_osds(ids, pool_id)
+
+
+ @expose()
+ def _lookup(self, osd_id, *remainder):
+ return OsdId(int(osd_id)), remainder
+
+
+
+class MonName(RestController):
+ def __init__(self, name):
+ self.name = name
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for the monitor name
+ """
+ mon = filter(
+ lambda x: x['name'] == self.name,
+ module.instance.get_mons()
+ )
+
+ if len(mon) != 1:
+ response.status = 500
+ return {'message': 'Failed to identify the monitor node "%s"' % self.name}
+
+ return mon[0]
+
+
+
+class Mon(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show the information for all the monitors
+ """
+ return module.instance.get_mons()
+
+
+ @expose()
+ def _lookup(self, name, *remainder):
+ return MonName(name), remainder
+
+
+
+class Doc(RestController):
+ @expose('json')
+ @catch
+ def get(self):
+ """
+ Show documentation information
+ """
+ return module.instance.get_doc_api(Root)
+
+
+
+class CrushRuleset(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show crush rulesets
+ """
+ rules = module.instance.get('osd_map_crush')['rules']
+ nodes = module.instance.get('osd_map_tree')['nodes']
+
+ ruleset = defaultdict(list)
+ for rule in rules:
+ rule['osd_count'] = len(common.crush_rule_osds(nodes, rule))
+ ruleset[rule['ruleset']].append(rule)
+
+ return ruleset
+
+
+
+class CrushRule(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show crush rules
+ """
+ rules = module.instance.get('osd_map_crush')['rules']
+ nodes = module.instance.get('osd_map_tree')['nodes']
+
+ for rule in rules:
+ rule['osd_count'] = len(common.crush_rule_osds(nodes, rule))
+
+ return rules
+
+
+
+class Crush(RestController):
+ rule = CrushRule()
+ ruleset = CrushRuleset()
+
+
+
+class ConfigOsd(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show OSD configuration options
+ """
+ flags = module.instance.get("osd_map")['flags']
+
+ # pause is a valid osd config command that sets pauserd,pausewr
+ flags = flags.replace('pauserd,pausewr', 'pause')
+
+ return flags.split(',')
+
+
+ @expose('json')
+ @catch
+ @auth
+ def patch(self):
+ """
+ Modify OSD configration options
+ """
+
+ args = request.json
+
+ commands = []
+
+ valid_flags = set(args.keys()) & set(common.OSD_FLAGS)
+ invalid_flags = list(set(args.keys()) - valid_flags)
+ if invalid_flags:
+ module.instance.log.warn("%s not valid to set/unset" % invalid_flags)
+
+ for flag in list(valid_flags):
+ if args[flag]:
+ mode = 'set'
+ else:
+ mode = 'unset'
+
+ commands.append({
+ 'prefix': 'osd ' + mode,
+ 'key': flag,
+ })
+
+ return module.instance.submit_request([commands])
+
+
+
+class ConfigClusterKey(RestController):
+ def __init__(self, key):
+ self.key = key
+
+
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show specific configuration option
+ """
+ return module.instance.get("config").get(self.key, None)
+
+
+
+class ConfigCluster(RestController):
+ @expose('json')
+ @catch
+ @auth
+ def get(self):
+ """
+ Show all cluster configuration options
+ """
+ return module.instance.get("config")
+
+
+ @expose()
+ def _lookup(self, key, *remainder):
+ return ConfigClusterKey(key), remainder
+
+
+
+class Config(RestController):
+ cluster = ConfigCluster()
+ osd = ConfigOsd()
+
+
+
+class Auth(RestController):
+ @expose('json')
+ @catch
+ def get(self):
+ """
+ Generate a brand new password-less login token for the user
+ Uses HTTP Basic Auth for authentication
+ """
+ if not request.authorization:
+ return (
+ {'message': 'auth: No HTTP username/password'},
+ 401,
+ {'WWW-Authenticate': 'Basic realm="Login Required"'}
+ )
+
+ username, password = b64decode(request.authorization[1]).split(':')
+ # Do not create a new token for a username that is already a token
+ if username in module.instance.tokens.values():
+ return {
+ 'token': username
+ }
+
+ # Check the ceph auth db
+ msg = module.instance.verify_user(username, password)
+ if msg:
+ return (
+ {'message': 'auth: ' + msg},
+ 401,
+ {'WWW-Authenticate': 'Basic realm="Login Required"'}
+ )
+
+ # Create a password-less login token for the user
+ # This overwrites any old user tokens
+ return {
+ 'token': module.instance.set_token(username)
+ }
+
+
+ @expose('json')
+ @catch
+ @auth
+ def delete(self):
+ """
+ Delete the password-less login token for the user
+ """
+
+ username, password = b64decode(request.authorization[1]).split(':')
+
+ if module.instance.unset_token(username):
+ return {'success': 'auth: Token removed'}
+
+ response.status = 500
+ return {'message': 'auth: No token for the user'}
+
+
+
+class Root(RestController):
+ auth = Auth()
+ config = Config()
+ crush = Crush()
+ doc = Doc()
+ mon = Mon()
+ osd = Osd()
+ pool = Pool()
+ request = Request()
+ server = Server()
+
+ @expose('json')
+ @catch
+ def get(self):
+ """
+ Show the basic information for the REST API
+ This includes values like api version or auth method
+ """
+ return {
+ 'api_version': 1,
+ 'auth':
+ 'Use ceph auth key pair as HTTP Basic user/password '
+ '(requires caps mon allow * to function properly)',
+ 'doc': 'See /doc endpoint',
+ 'info': "Ceph Manager RESTful API server",
+ }
--- /dev/null
+# List of valid osd flags
+OSD_FLAGS = [
+ 'pause', 'noup', 'nodown', 'noout', 'noin', 'nobackfill',
+ 'norecover', 'noscrub', 'nodeep-scrub',
+]
+
+# Implemented osd commands
+OSD_IMPLEMENTED_COMMANDS = [
+ 'scrub', 'deep_scrub', 'repair'
+]
+
+# Valid values for the 'var' argument to 'ceph osd pool set'
+POOL_PROPERTIES_1 = [
+ 'size', 'min_size', 'crash_replay_interval', 'pg_num',
+ 'crush_ruleset', 'hashpspool',
+]
+
+POOL_PROPERTIES_2 = [
+ 'pgp_num'
+]
+
+POOL_PROPERTIES = POOL_PROPERTIES_1 + POOL_PROPERTIES_2
+
+# Valid values for the 'ceph osd pool set-quota' command
+POOL_QUOTA_PROPERTIES = [
+ ('quota_max_bytes', 'max_bytes'),
+ ('quota_max_objects', 'max_objects'),
+]
+
+POOL_ARGS = POOL_PROPERTIES + map(
+ lambda x: x[0],
+ POOL_QUOTA_PROPERTIES
+)
+
+
+# Transform command to a human readable form
+def humanify_command(command):
+ out = [command['prefix']]
+
+ for arg, val in command.iteritems():
+ if arg != 'prefix':
+ out.append("%s=%s" % (str(arg), str(val)))
+
+ return " ".join(out)
+
+
+def invalid_pool_args(args):
+ invalid = []
+ for arg in args:
+ if arg not in POOL_ARGS:
+ invalid.append(arg)
+
+ return invalid
+
+
+def pool_update_commands(pool_name, args):
+ commands = [[], []]
+
+ # We should increase pgp_num when we are re-setting pg_num
+ if 'pg_num' in args and 'pgp_num' not in args:
+ args['pgp_num'] = args['pg_num']
+
+ # Run the first pool set and quota properties in parallel
+ for var in POOL_PROPERTIES_1:
+ if var in args:
+ commands[0].append({
+ 'prefix': 'osd pool set',
+ 'pool': pool_name,
+ 'var': var,
+ 'val': args[var],
+ })
+
+ for (var, field) in POOL_QUOTA_PROPERTIES:
+ if var in args:
+ commands[0].append({
+ 'prefix': 'osd pool set-quota',
+ 'pool': pool_name,
+ 'field': field,
+ 'val': str(args[var]),
+ })
+
+ # The second pool set properties need to be run after the first wave
+ for var in POOL_PROPERTIES_2:
+ if var in args:
+ commands[1].append({
+ 'prefix': 'osd pool set',
+ 'pool': pool_name,
+ 'var': var,
+ })
+
+ return commands
+
+
+def crush_rule_osds(nodes, rule):
+ nodes_by_id = dict((n['id'], n) for n in nodes)
+
+ def _gather_leaf_ids(node):
+ if node['id'] >= 0:
+ return set([node['id']])
+
+ result = set()
+ for child_id in node['children']:
+ if child_id >= 0:
+ result.add(child_id)
+ else:
+ result |= _gather_leaf_ids(nodes_by_id[child_id])
+
+ return result
+
+ def _gather_descendent_ids(node, typ):
+ result = set()
+ for child_id in node['children']:
+ child_node = nodes_by_id[child_id]
+ if child_node['type'] == typ:
+ result.add(child_node['id'])
+ elif 'children' in child_node:
+ result |= _gather_descendent_ids(child_node, typ)
+
+ return result
+
+ def _gather_osds(root, steps):
+ if root['id'] >= 0:
+ return set([root['id']])
+
+ osds = set()
+ step = steps[0]
+ if step['op'] == 'choose_firstn':
+ # Choose all descendents of the current node of type 'type'
+ d = _gather_descendent_ids(root, step['type'])
+ for desc_node in [nodes_by_id[i] for i in d]:
+ osds |= _gather_osds(desc_node, steps[1:])
+ elif step['op'] == 'chooseleaf_firstn':
+ # Choose all descendents of the current node of type 'type',
+ # and select all leaves beneath those
+ for desc_node in [nodes_by_id[i] for i in _gather_descendent_ids(root, step['type'])]:
+ # Short circuit another iteration to find the emit
+ # and assume anything we've done a chooseleaf on
+ # is going to be part of the selected set of osds
+ osds |= _gather_leaf_ids(desc_node)
+ elif step['op'] == 'emit':
+ if root['id'] >= 0:
+ osds |= root['id']
+
+ return osds
+
+ osds = set()
+ for i, step in enumerate(rule['steps']):
+ if step['op'] == 'take':
+ osds |= _gather_osds(nodes_by_id[step['item']], rule['steps'][i + 1:])
+ return osds
--- /dev/null
+"""
+A RESTful API for Ceph
+"""
+
+import json
+import inspect
+import StringIO
+import threading
+import traceback
+import ConfigParser
+
+import common
+
+from uuid import uuid4
+from pecan import jsonify, make_app
+from OpenSSL import SSL, crypto
+from pecan.rest import RestController
+from werkzeug.serving import make_server
+
+
+from mgr_module import MgrModule, CommandResult
+
+# Global instance to share
+instance = None
+
+
+
+class CommandsRequest(object):
+ """
+ This class handles parallel as well as sequential execution of
+ commands. The class accept a list of iterables that should be
+ executed sequentially. Each iterable can contain several commands
+ that can be executed in parallel.
+
+ Example:
+ [[c1,c2],[c3,c4]]
+ - run c1 and c2 in parallel
+ - wait for them to finish
+ - run c3 and c4 in parallel
+ - wait for them to finish
+ """
+
+
+ def __init__(self, commands_arrays):
+ self.id = str(id(self))
+
+ # Filter out empty sub-requests
+ commands_arrays = filter(
+ lambda x: len(x) != 0,
+ commands_arrays,
+ )
+
+ self.running = []
+ self.waiting = commands_arrays[1:]
+ self.finished = []
+ self.failed = []
+
+ self.lock = threading.RLock()
+ if not len(commands_arrays):
+ # Nothing to run
+ return
+
+ # Process first iteration of commands_arrays in parallel
+ results = self.run(commands_arrays[0])
+
+ self.running.extend(results)
+
+
+ def run(self, commands):
+ """
+ A static method that will execute the given list of commands in
+ parallel and will return the list of command results.
+ """
+
+ # Gather the results (in parallel)
+ results = []
+ for index in range(len(commands)):
+ tag = '%s:%d' % (str(self.id), index)
+
+ # Store the result
+ result = CommandResult(tag)
+ result.command = common.humanify_command(commands[index])
+ results.append(result)
+
+ # Run the command
+ instance.send_command(result, json.dumps(commands[index]), tag)
+
+ return results
+
+
+ def next(self):
+ with self.lock:
+ if not self.waiting:
+ # Nothing to run
+ return
+
+ # Run a next iteration of commands
+ commands = self.waiting[0]
+ self.waiting = self.waiting[1:]
+
+ self.running.extend(self.run(commands))
+
+
+ def finish(self, tag):
+ with self.lock:
+ for index in range(len(self.running)):
+ if self.running[index].tag == tag:
+ if self.running[index].r == 0:
+ self.finished.append(self.running.pop(index))
+ else:
+ self.failed.append(self.running.pop(index))
+ return True
+
+ # No such tag found
+ return False
+
+
+ def is_running(self, tag):
+ for result in self.running:
+ if result.tag == tag:
+ return True
+ return False
+
+
+ def is_ready(self):
+ with self.lock:
+ return not self.running and self.waiting
+
+
+ def is_waiting(self):
+ return bool(self.waiting)
+
+
+ def is_finished(self):
+ with self.lock:
+ return not self.running and not self.waiting
+
+
+ def has_failed(self):
+ return bool(self.failed)
+
+
+ def get_state(self):
+ with self.lock:
+ if not self.is_finished():
+ return "pending"
+
+ if self.has_failed():
+ return "failed"
+
+ return "success"
+
+
+ def humanify(self):
+ return {
+ 'id': self.id,
+ 'running': map(
+ lambda x: (x.command, x.outs, x.outb),
+ self.running
+ ),
+ 'finished': map(
+ lambda x: (x.command, x.outs, x.outb),
+ self.finished
+ ),
+ 'waiting': map(
+ lambda x: (x.command, x.outs, x.outb),
+ self.waiting
+ ),
+ 'failed': map(
+ lambda x: (x.command, x.outs, x.outb),
+ self.failed
+ ),
+ 'is_waiting': self.is_waiting(),
+ 'is_finished': self.is_finished(),
+ 'has_failed': self.has_failed(),
+ 'state': self.get_state(),
+ }
+
+
+
+class Module(MgrModule):
+ COMMANDS = []
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ global instance
+ instance = self
+
+ self.requests = []
+ self.requests_lock = threading.RLock()
+
+ self.tokens = {}
+ self.disable_auth = False
+
+ self.shutdown_key = str(uuid4())
+
+ self.server = None
+
+
+ def serve(self):
+ try:
+ self._serve()
+ except:
+ self.log.error(str(traceback.format_exc()))
+
+
+ def _serve(self):
+ # Load stored authentication tokens
+ self.tokens = self.get_config_json("tokens") or {}
+
+ jsonify._instance = jsonify.GenericJSON(
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '),
+ )
+
+ self.cert = self.get_config_json('cert')
+ self.pkey = self.get_config_json('pkey')
+
+ # Create a new shared cert if it does not exist, yet
+ if not self.cert or not self.pkey:
+ (self.cert, self.pkey) = self.create_cert()
+ self.set_config_json('cert', self.cert)
+ self.set_config_json('pkey', self.pkey)
+
+ # use SSL context for https
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.use_certificate(
+ crypto.load_certificate(crypto.FILETYPE_PEM, self.cert)
+ )
+ context.use_privatekey(
+ crypto.load_privatekey(crypto.FILETYPE_PEM, self.pkey)
+ )
+
+ # Create the HTTPS werkzeug server serving pecan app
+ self.server = make_server(
+ host='0.0.0.0',
+ port=8002,
+ app=make_app('restful.api.Root'),
+ ssl_context=context
+ )
+
+ self.server.serve_forever()
+
+
+ def shutdown(self):
+ try:
+ self.server.shutdown()
+ except:
+ self.log.error(str(traceback.format_exc()))
+
+
+ def notify(self, notify_type, tag):
+ try:
+ self._notify(notify_type, tag)
+ except:
+ self.log.error(str(traceback.format_exc()))
+
+
+ def _notify(self, notify_type, tag):
+ if notify_type == "command":
+ # we can safely skip all the sequential commands
+ if tag == 'seq':
+ return
+
+ request = filter(
+ lambda x: x.is_running(tag),
+ self.requests)
+
+ if len(request) != 1:
+ self.log.warn("Unknown request '%s'" % str(tag))
+ return
+
+ request = request[0]
+ request.finish(tag)
+ if request.is_ready():
+ request.next()
+ else:
+ self.log.debug("Unhandled notification type '%s'" % notify_type)
+
+
+ def create_cert(self):
+ # create a key pair
+ pkey = crypto.PKey()
+ pkey.generate_key(crypto.TYPE_RSA, 2048)
+
+ # create a self-signed cert
+ cert = crypto.X509()
+ cert.get_subject().C = "US"
+ cert.get_subject().ST = "Oregon"
+ cert.get_subject().L = "Portland"
+ cert.get_subject().O = "IT"
+ cert.get_subject().CN = "ceph-restful"
+ cert.set_serial_number(int(uuid4()))
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(10*365*24*60*60)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(pkey)
+ cert.sign(pkey, 'sha512')
+
+ return (
+ crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
+ crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
+ )
+
+
+ def get_doc_api(self, root, prefix=''):
+ doc = {}
+ for _obj in dir(root):
+ obj = getattr(root, _obj)
+
+ if isinstance(obj, RestController):
+ doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
+
+ if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
+ doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
+
+ prefix = prefix or '/'
+
+ doc[prefix] = {}
+ for method in 'get', 'post', 'patch', 'delete':
+ if getattr(root, method, None):
+ doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
+
+ if len(doc[prefix]) == 0:
+ del doc[prefix]
+
+ return doc
+
+
+ def get_mons(self):
+ mon_map_mons = self.get('mon_map')['mons']
+ mon_status = json.loads(self.get('mon_status')['json'])
+
+ # Add more information
+ for mon in mon_map_mons:
+ mon['in_quorum'] = mon['rank'] in mon_status['quorum']
+ mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
+ mon['leader'] = mon['rank'] == mon_status['quorum'][0]
+
+ return mon_map_mons
+
+
+ def get_osd_pools(self):
+ osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
+ pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
+ crush_rules = self.get('osd_map_crush')['rules']
+
+ osds_by_pool = {}
+ for pool_id, pool in pools.items():
+ pool_osds = None
+ for rule in [r for r in crush_rules if r['ruleset'] == pool['crush_ruleset']]:
+ if rule['min_size'] <= pool['size'] <= rule['max_size']:
+ pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
+
+ osds_by_pool[pool_id] = pool_osds
+
+ for pool_id in pools.keys():
+ for in_pool_id in osds_by_pool[pool_id]:
+ osds[in_pool_id].append(pool_id)
+
+ return osds
+
+
+ def get_osds(self, ids=[], pool_id=None):
+ # Get data
+ osd_map = self.get('osd_map')
+ osd_metadata = self.get('osd_metadata')
+
+ # Update the data with the additional info from the osd map
+ osds = osd_map['osds']
+
+ # Filter by osd ids
+ if ids:
+ osds = filter(
+ lambda x: str(x['osd']) in ids,
+ osds
+ )
+
+ # Get list of pools per osd node
+ pools_map = self.get_osd_pools()
+
+ # map osd IDs to reweight
+ reweight_map = dict([
+ (x.get('id'), x.get('reweight', None))
+ for x in self.get('osd_map_tree')['nodes']
+ ])
+
+ # Build OSD data objects
+ for osd in osds:
+ osd['pools'] = pools_map[osd['osd']]
+ osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
+
+ osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
+
+ if osd['up']:
+ osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
+ else:
+ osd['valid_commands'] = []
+
+ # Filter by pool
+ if pool_id:
+ pool_id = int(pool_id)
+ osds = filter(
+ lambda x: pool_id in x['pools'],
+ osds
+ )
+
+ return osds
+
+
+ def get_osd_by_id(self, osd_id):
+ osd = filter(
+ lambda x: x['osd'] == osd_id,
+ self.get('osd_map')['osds']
+ )
+
+ if len(osd) != 1:
+ return None
+
+ return osd[0]
+
+
+ def get_pool_by_id(self, pool_id):
+ pool = filter(
+ lambda x: x['pool'] == pool_id,
+ self.get('osd_map')['pools'],
+ )
+
+ if len(pool) != 1:
+ return None
+
+ return pool[0]
+
+
+ def submit_request(self, _request):
+ request = CommandsRequest(_request)
+ self.requests.append(request)
+ return request.humanify()
+
+
+ def run_command(self, command):
+ # tag with 'seq' so that we can ingore these in notify function
+ result = CommandResult('seq')
+
+ self.send_command(result, json.dumps(command), 'seq')
+ return result.wait()
+
+
+ def verify_user(self, username, password):
+ r, outb, outs = self.run_command({
+ 'prefix': 'auth get',
+ 'entity': username,
+ })
+
+ if r != 0:
+ return 'No such user/key (%s, %s)' % (outb, outs)
+
+ ## check the capabilities, we are looking for mon allow *
+ conf = ConfigParser.ConfigParser()
+
+ # ConfigParser can't handle tabs, remove them
+ conf.readfp(StringIO.StringIO(outb.replace('\t', '')))
+
+ if not conf.has_section(username):
+ return 'Failed to parse the auth details'
+
+ key = conf.get(username, 'key')
+
+ if password != key:
+ return 'Invalid key'
+
+ if not conf.has_option(username, 'caps mon'):
+ return 'No mon caps set'
+
+ caps = conf.get(username, 'caps mon')
+
+ if caps not in ['"allow *"', '"allow profile mgr"']:
+ return 'Insufficient mon caps set'
+
+ # Returning '' means 'no objections'
+ return ''
+
+
+ def set_token(self, user):
+ self.tokens[user] = str(uuid4())
+
+ self.set_config_json('tokens', self.tokens)
+
+ return self.tokens[user]
+
+
+ def unset_token(self, user):
+ if user not in self.tokens:
+ return False
+
+ del self.tokens[user]
+ self.set_config_json('tokens', self.tokens)
+
+ return True