From: Boris Ranto Date: Wed, 12 Apr 2017 21:54:36 +0000 (+0200) Subject: ceph-mgr: Implement new pecan-based rest api X-Git-Tag: ses5-milestone6~9^2~47^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5ed2631b094125eacf8f5e4ceae30a3ab2023ecd;p=ceph.git ceph-mgr: Implement new pecan-based rest api The new rest API uses pecan for the restful functionality and simplifies the code significantly. It should be mostly equivalent in functionality to the django-based rest API. The api is self-documenting via /doc endpoint. Signed-off-by: Boris Ranto --- diff --git a/ceph.spec.in b/ceph.spec.in index 30831c11c30..130a4bb579e 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -312,12 +312,16 @@ Group: System/Filesystems %endif Requires: ceph-base = %{epoch}:%{version}-%{release} %if 0%{?fedora} || 0%{?rhel} -Requires: python-cherrypy +Requires: python-cherrypy +Requires: pyOpenSSL +Requires: python-werkzeug %endif %if 0%{?suse_version} Requires: python-CherryPy +Requires: python-pyOpenSSL +Requires: python-Werkzeug %endif - +Requires: python-pecan %description mgr ceph-mgr enables python modules that provide services (such as the REST module derived from Calamari) and expose CLI hooks. ceph-mgr gathers diff --git a/debian/control b/debian/control index ae6c6a73501..b34e778046a 100644 --- a/debian/control +++ b/debian/control @@ -161,6 +161,9 @@ Description: debugging symbols for ceph-mds Package: ceph-mgr Architecture: linux-any Depends: ceph-base (= ${binary:Version}), + python-openssl + python-pecan + python-werkzeug ${misc:Depends}, ${python:Depends}, python-cherrypy3, diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 13e1a558441..e033962ca43 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1673,7 +1673,7 @@ OPTION(rgw_shard_warning_threshold, OPT_DOUBLE, 90) // pct of safe max OPTION(rgw_swift_versioning_enabled, OPT_BOOL, false) // whether swift object versioning feature is enabled OPTION(mgr_module_path, OPT_STR, CEPH_PKGLIBDIR "/mgr") // where to load python modules from -OPTION(mgr_modules, OPT_STR, "rest") // Which modules to load +OPTION(mgr_modules, OPT_STR, "restful") // Which modules to load OPTION(mgr_data, OPT_STR, "/var/lib/ceph/mgr/$cluster-$id") // where to find keyring etc OPTION(mgr_beacon_period, OPT_INT, 5) // How frequently to send beacon OPTION(mgr_stats_period, OPT_INT, 5) // How frequently to send stats diff --git a/src/mon/MonCap.cc b/src/mon/MonCap.cc index 558e67d9cca..6ae8ab3db76 100644 --- a/src/mon/MonCap.cc +++ b/src/mon/MonCap.cc @@ -173,7 +173,8 @@ void MonCapGrant::expand_profile_mon(const EntityName& name) const profile_grants.push_back(MonCapGrant("mon", MON_CAP_R)); profile_grants.push_back(MonCapGrant("mds", MON_CAP_R)); profile_grants.push_back(MonCapGrant("osd", MON_CAP_R | MON_CAP_W)); - profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R)); + profile_grants.push_back(MonCapGrant("auth", MON_CAP_R | MON_CAP_X)); + profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R | MON_CAP_W)); string prefix = string("daemon-private/mgr/"); profile_grants.push_back(MonCapGrant("config-key get", "key", StringConstraint("", prefix))); diff --git a/src/pybind/mgr/restful/__init__.py b/src/pybind/mgr/restful/__init__.py new file mode 100644 index 00000000000..0440e0705fb --- /dev/null +++ b/src/pybind/mgr/restful/__init__.py @@ -0,0 +1 @@ +from module import * # NOQA diff --git a/src/pybind/mgr/restful/api.py b/src/pybind/mgr/restful/api.py new file mode 100644 index 00000000000..9cceccb24c2 --- /dev/null +++ b/src/pybind/mgr/restful/api.py @@ -0,0 +1,721 @@ +from pecan import expose, request, response +from pecan.rest import RestController + +import common +import traceback + +from base64 import b64decode +from functools import wraps +from collections import defaultdict + +## We need this to access the instance of the module +# +# We can't use 'from module import instance' because +# the instance is not ready, yet (would be None) +import module + + +# Helper function to catch and log the exceptions +def catch(f): + @wraps(f) + def catcher(*args, **kwargs): + try: + return f(*args, **kwargs) + except: + module.instance.log.error(str(traceback.format_exc())) + response.status = 500 + return {'message': str(traceback.format_exc()).split('\n')} + return catcher + + +# Handle authorization +def auth(f): + @wraps(f) + def decorated(*args, **kwargs): + if not request.authorization: + response.status = 401 + response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"' + return {'message': 'auth: No HTTP username/password'} + + username, password = b64decode(request.authorization[1]).split(':') + + # Lookup the password-less tokens first + if username not in module.instance.tokens.values(): + # Check the ceph auth db + msg = module.instance.verify_user(username, password) + if msg: + response.status = 401 + response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"' + return {'message': 'auth: No HTTP username/password'} + + return f(*args, **kwargs) + return decorated + + +# Helper function to lock the function +def lock(f): + @wraps(f) + def locker(*args, **kwargs): + with module.instance.requests_lock: + return f(*args, **kwargs) + return locker + + + +class ServerFqdn(RestController): + def __init__(self, fqdn): + self.fqdn = fqdn + + + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for the server fqdn + """ + return module.instance.get_server(self.fqdn) + + + +class Server(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for all the servers + """ + return module.instance.list_servers() + + + @expose() + def _lookup(self, fqdn, *remainder): + return ServerFqdn(fqdn), remainder + + + +class RequestId(RestController): + def __init__(self, request_id): + self.request_id = request_id + + + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for the request id + """ + request = filter( + lambda x: x.id == self.request_id, + module.instance.requests + ) + + if len(request) != 1: + response.status = 500 + return {'message': 'Unknown request id "%s"' % str(self.request_id)} + + request = request[0] + return request.humanify() + + + @expose('json') + @catch + @auth + @lock + def delete(self): + """ + Remove the request id from the database + """ + for index in range(len(module.instance.requests)): + if module.instance.requests[index].id == self.request_id: + return module.instance.requests.pop(index).humanify() + + # Failed to find the job to cancel + response.status = 500 + return {'message': 'No such request id'} + + + +class Request(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + List all the available requests and their state + """ + states = {} + for _request in module.instance.requests: + states[_request.id] = _request.get_state() + + return states + + + @expose('json') + @catch + @auth + @lock + def delete(self): + """ + Remove all the finished requests + """ + num_requests = len(module.instance.requests) + + module.instance.requests = filter( + lambda x: not x.is_finished(), + module.instance.requests + ) + + # Return the job statistics + return { + 'cleaned': num_requests - len(module.instance.requests), + 'remaining': len(module.instance.requests), + } + + + @expose() + def _lookup(self, request_id, *remainder): + return RequestId(request_id), remainder + + + +class PoolId(RestController): + def __init__(self, pool_id): + self.pool_id = pool_id + + + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for the pool id + """ + pool = module.instance.get_pool_by_id(self.pool_id) + + if not pool: + response.status = 500 + return {'message': 'Failed to identify the pool id "%d"' % self.pool_id} + + # pgp_num is called pg_placement_num, deal with that + if 'pg_placement_num' in pool: + pool['pgp_num'] = pool.pop('pg_placement_num') + return pool + + + @expose('json') + @catch + @auth + def patch(self): + """ + Modify the information for the pool id + """ + args = request.json + + # Get the pool info for its name + pool = module.instance.get_pool_by_id(self.pool_id) + if not pool: + response.status = 500 + return {'message': 'Failed to identify the pool id "%d"' % self.pool_id} + + # Check for invalid pool args + invalid = common.invalid_pool_args(args) + if invalid: + response.status = 500 + return {'message': 'Invalid arguments found: "%s"' % str(invalid)} + + # Schedule the update request + return module.instance.submit_request(common.pool_update_commands(pool['pool_name'], args)) + + + @expose('json') + @catch + @auth + def delete(self): + """ + Remove the pool data for the pool id + """ + pool = module.instance.get_pool_by_id(self.pool_id) + + if not pool: + response.status = 500 + return {'message': 'Failed to identify the pool id "%d"' % self.pool_id} + + return module.instance.submit_request([[{ + 'prefix': 'osd pool delete', + 'pool': pool['pool_name'], + 'pool2': pool['pool_name'], + 'sure': '--yes-i-really-really-mean-it' + }]]) + + + +class Pool(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for all the pools + """ + pools = module.instance.get('osd_map')['pools'] + + # pgp_num is called pg_placement_num, deal with that + for pool in pools: + if 'pg_placement_num' in pool: + pool['pgp_num'] = pool.pop('pg_placement_num') + + return pools + + + @expose('json') + @catch + @auth + def post(self): + """ + Create a new pool + Requires name and pg_num dict arguments + """ + args = request.json + + # Check for the required arguments + pool_name = args.pop('name', None) + if pool_name is None: + response.status = 500 + return {'message': 'You need to specify the pool "name" argument'} + + pg_num = args.pop('pg_num', None) + if pg_num is None: + response.status = 500 + return {'message': 'You need to specify the "pg_num" argument'} + + # Run the pool create command first + create_command = { + 'prefix': 'osd pool create', + 'pool': pool_name, + 'pg_num': pg_num + } + + # Check for invalid pool args + invalid = common.invalid_pool_args(args) + if invalid: + response.status = 500 + return {'message': 'Invalid arguments found: "%s"' % str(invalid)} + + # Schedule the creation and update requests + return module.instance.submit_request( + [[create_command]] + + common.pool_update_commands(pool_name, args) + ) + + + @expose() + def _lookup(self, pool_id, *remainder): + return PoolId(int(pool_id)), remainder + + + +class OsdIdCommand(RestController): + def __init__(self, osd_id): + self.osd_id = osd_id + + + @expose('json') + @catch + @auth + def get(self): + """ + Show implemented commands for the OSD id + """ + osd = module.instance.get_osd_by_id(self.osd_id) + + if not osd: + response.status = 500 + return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id} + + if osd['up']: + return common.OSD_IMPLEMENTED_COMMANDS + else: + return [] + + + @expose('json') + @catch + @auth + def post(self): + """ + Run the implemented command for the OSD id + """ + command = request.json.get('command', None) + + osd = module.instance.get_osd_by_id(self.osd_id) + + if not osd: + response.status = 500 + return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id} + + if not osd['up'] or command not in common.OSD_IMPLEMENTED_COMMANDS: + response.status = 500 + return {'message': 'Command "%s" not available' % command} + + return module.instance.submit_request([[{ + 'prefix': 'osd ' + command, + 'who': str(self.osd_id) + }]]) + + + +class OsdId(RestController): + def __init__(self, osd_id): + self.osd_id = osd_id + self.command = OsdIdCommand(osd_id) + + + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for the OSD id + """ + osd = module.instance.get_osds([str(self.osd_id)]) + if len(osd) != 1: + response.status = 500 + return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id} + + return osd[0] + + + @expose('json') + @catch + @auth + def patch(self): + """ + Modify the state (up, in) of the OSD id or reweight it + """ + args = request.json + + commands = [] + + if 'in' in args: + if args['in']: + commands.append({ + 'prefix': 'osd in', + 'ids': [str(self.osd_id)] + }) + else: + commands.append({ + 'prefix': 'osd out', + 'ids': [str(self.osd_id)] + }) + + if 'up' in args: + if args['up']: + response.status = 500 + return {'message': "It is not valid to set a down OSD to be up"} + else: + commands.append({ + 'prefix': 'osd down', + 'ids': [str(self.osd_id)] + }) + + if 'reweight' in args: + commands.append({ + 'prefix': 'osd reweight', + 'id': self.osd_id, + 'weight': args['reweight'] + }) + + return module.instance.submit_request([commands]) + + + +class Osd(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for all the OSDs + """ + # Parse request args + ids = request.GET.getall('id[]') + pool_id = request.GET.get('pool', None) + + return module.instance.get_osds(ids, pool_id) + + + @expose() + def _lookup(self, osd_id, *remainder): + return OsdId(int(osd_id)), remainder + + + +class MonName(RestController): + def __init__(self, name): + self.name = name + + + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for the monitor name + """ + mon = filter( + lambda x: x['name'] == self.name, + module.instance.get_mons() + ) + + if len(mon) != 1: + response.status = 500 + return {'message': 'Failed to identify the monitor node "%s"' % self.name} + + return mon[0] + + + +class Mon(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show the information for all the monitors + """ + return module.instance.get_mons() + + + @expose() + def _lookup(self, name, *remainder): + return MonName(name), remainder + + + +class Doc(RestController): + @expose('json') + @catch + def get(self): + """ + Show documentation information + """ + return module.instance.get_doc_api(Root) + + + +class CrushRuleset(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show crush rulesets + """ + rules = module.instance.get('osd_map_crush')['rules'] + nodes = module.instance.get('osd_map_tree')['nodes'] + + ruleset = defaultdict(list) + for rule in rules: + rule['osd_count'] = len(common.crush_rule_osds(nodes, rule)) + ruleset[rule['ruleset']].append(rule) + + return ruleset + + + +class CrushRule(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show crush rules + """ + rules = module.instance.get('osd_map_crush')['rules'] + nodes = module.instance.get('osd_map_tree')['nodes'] + + for rule in rules: + rule['osd_count'] = len(common.crush_rule_osds(nodes, rule)) + + return rules + + + +class Crush(RestController): + rule = CrushRule() + ruleset = CrushRuleset() + + + +class ConfigOsd(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show OSD configuration options + """ + flags = module.instance.get("osd_map")['flags'] + + # pause is a valid osd config command that sets pauserd,pausewr + flags = flags.replace('pauserd,pausewr', 'pause') + + return flags.split(',') + + + @expose('json') + @catch + @auth + def patch(self): + """ + Modify OSD configration options + """ + + args = request.json + + commands = [] + + valid_flags = set(args.keys()) & set(common.OSD_FLAGS) + invalid_flags = list(set(args.keys()) - valid_flags) + if invalid_flags: + module.instance.log.warn("%s not valid to set/unset" % invalid_flags) + + for flag in list(valid_flags): + if args[flag]: + mode = 'set' + else: + mode = 'unset' + + commands.append({ + 'prefix': 'osd ' + mode, + 'key': flag, + }) + + return module.instance.submit_request([commands]) + + + +class ConfigClusterKey(RestController): + def __init__(self, key): + self.key = key + + + @expose('json') + @catch + @auth + def get(self): + """ + Show specific configuration option + """ + return module.instance.get("config").get(self.key, None) + + + +class ConfigCluster(RestController): + @expose('json') + @catch + @auth + def get(self): + """ + Show all cluster configuration options + """ + return module.instance.get("config") + + + @expose() + def _lookup(self, key, *remainder): + return ConfigClusterKey(key), remainder + + + +class Config(RestController): + cluster = ConfigCluster() + osd = ConfigOsd() + + + +class Auth(RestController): + @expose('json') + @catch + def get(self): + """ + Generate a brand new password-less login token for the user + Uses HTTP Basic Auth for authentication + """ + if not request.authorization: + return ( + {'message': 'auth: No HTTP username/password'}, + 401, + {'WWW-Authenticate': 'Basic realm="Login Required"'} + ) + + username, password = b64decode(request.authorization[1]).split(':') + # Do not create a new token for a username that is already a token + if username in module.instance.tokens.values(): + return { + 'token': username + } + + # Check the ceph auth db + msg = module.instance.verify_user(username, password) + if msg: + return ( + {'message': 'auth: ' + msg}, + 401, + {'WWW-Authenticate': 'Basic realm="Login Required"'} + ) + + # Create a password-less login token for the user + # This overwrites any old user tokens + return { + 'token': module.instance.set_token(username) + } + + + @expose('json') + @catch + @auth + def delete(self): + """ + Delete the password-less login token for the user + """ + + username, password = b64decode(request.authorization[1]).split(':') + + if module.instance.unset_token(username): + return {'success': 'auth: Token removed'} + + response.status = 500 + return {'message': 'auth: No token for the user'} + + + +class Root(RestController): + auth = Auth() + config = Config() + crush = Crush() + doc = Doc() + mon = Mon() + osd = Osd() + pool = Pool() + request = Request() + server = Server() + + @expose('json') + @catch + def get(self): + """ + Show the basic information for the REST API + This includes values like api version or auth method + """ + return { + 'api_version': 1, + 'auth': + 'Use ceph auth key pair as HTTP Basic user/password ' + '(requires caps mon allow * to function properly)', + 'doc': 'See /doc endpoint', + 'info': "Ceph Manager RESTful API server", + } diff --git a/src/pybind/mgr/restful/common.py b/src/pybind/mgr/restful/common.py new file mode 100644 index 00000000000..15a14ed9f47 --- /dev/null +++ b/src/pybind/mgr/restful/common.py @@ -0,0 +1,150 @@ +# List of valid osd flags +OSD_FLAGS = [ + 'pause', 'noup', 'nodown', 'noout', 'noin', 'nobackfill', + 'norecover', 'noscrub', 'nodeep-scrub', +] + +# Implemented osd commands +OSD_IMPLEMENTED_COMMANDS = [ + 'scrub', 'deep_scrub', 'repair' +] + +# Valid values for the 'var' argument to 'ceph osd pool set' +POOL_PROPERTIES_1 = [ + 'size', 'min_size', 'crash_replay_interval', 'pg_num', + 'crush_ruleset', 'hashpspool', +] + +POOL_PROPERTIES_2 = [ + 'pgp_num' +] + +POOL_PROPERTIES = POOL_PROPERTIES_1 + POOL_PROPERTIES_2 + +# Valid values for the 'ceph osd pool set-quota' command +POOL_QUOTA_PROPERTIES = [ + ('quota_max_bytes', 'max_bytes'), + ('quota_max_objects', 'max_objects'), +] + +POOL_ARGS = POOL_PROPERTIES + map( + lambda x: x[0], + POOL_QUOTA_PROPERTIES +) + + +# Transform command to a human readable form +def humanify_command(command): + out = [command['prefix']] + + for arg, val in command.iteritems(): + if arg != 'prefix': + out.append("%s=%s" % (str(arg), str(val))) + + return " ".join(out) + + +def invalid_pool_args(args): + invalid = [] + for arg in args: + if arg not in POOL_ARGS: + invalid.append(arg) + + return invalid + + +def pool_update_commands(pool_name, args): + commands = [[], []] + + # We should increase pgp_num when we are re-setting pg_num + if 'pg_num' in args and 'pgp_num' not in args: + args['pgp_num'] = args['pg_num'] + + # Run the first pool set and quota properties in parallel + for var in POOL_PROPERTIES_1: + if var in args: + commands[0].append({ + 'prefix': 'osd pool set', + 'pool': pool_name, + 'var': var, + 'val': args[var], + }) + + for (var, field) in POOL_QUOTA_PROPERTIES: + if var in args: + commands[0].append({ + 'prefix': 'osd pool set-quota', + 'pool': pool_name, + 'field': field, + 'val': str(args[var]), + }) + + # The second pool set properties need to be run after the first wave + for var in POOL_PROPERTIES_2: + if var in args: + commands[1].append({ + 'prefix': 'osd pool set', + 'pool': pool_name, + 'var': var, + }) + + return commands + + +def crush_rule_osds(nodes, rule): + nodes_by_id = dict((n['id'], n) for n in nodes) + + def _gather_leaf_ids(node): + if node['id'] >= 0: + return set([node['id']]) + + result = set() + for child_id in node['children']: + if child_id >= 0: + result.add(child_id) + else: + result |= _gather_leaf_ids(nodes_by_id[child_id]) + + return result + + def _gather_descendent_ids(node, typ): + result = set() + for child_id in node['children']: + child_node = nodes_by_id[child_id] + if child_node['type'] == typ: + result.add(child_node['id']) + elif 'children' in child_node: + result |= _gather_descendent_ids(child_node, typ) + + return result + + def _gather_osds(root, steps): + if root['id'] >= 0: + return set([root['id']]) + + osds = set() + step = steps[0] + if step['op'] == 'choose_firstn': + # Choose all descendents of the current node of type 'type' + d = _gather_descendent_ids(root, step['type']) + for desc_node in [nodes_by_id[i] for i in d]: + osds |= _gather_osds(desc_node, steps[1:]) + elif step['op'] == 'chooseleaf_firstn': + # Choose all descendents of the current node of type 'type', + # and select all leaves beneath those + for desc_node in [nodes_by_id[i] for i in _gather_descendent_ids(root, step['type'])]: + # Short circuit another iteration to find the emit + # and assume anything we've done a chooseleaf on + # is going to be part of the selected set of osds + osds |= _gather_leaf_ids(desc_node) + elif step['op'] == 'emit': + if root['id'] >= 0: + osds |= root['id'] + + return osds + + osds = set() + for i, step in enumerate(rule['steps']): + if step['op'] == 'take': + osds |= _gather_osds(nodes_by_id[step['item']], rule['steps'][i + 1:]) + return osds diff --git a/src/pybind/mgr/restful/module.py b/src/pybind/mgr/restful/module.py new file mode 100644 index 00000000000..ddfff9715f4 --- /dev/null +++ b/src/pybind/mgr/restful/module.py @@ -0,0 +1,500 @@ +""" +A RESTful API for Ceph +""" + +import json +import inspect +import StringIO +import threading +import traceback +import ConfigParser + +import common + +from uuid import uuid4 +from pecan import jsonify, make_app +from OpenSSL import SSL, crypto +from pecan.rest import RestController +from werkzeug.serving import make_server + + +from mgr_module import MgrModule, CommandResult + +# Global instance to share +instance = None + + + +class CommandsRequest(object): + """ + This class handles parallel as well as sequential execution of + commands. The class accept a list of iterables that should be + executed sequentially. Each iterable can contain several commands + that can be executed in parallel. + + Example: + [[c1,c2],[c3,c4]] + - run c1 and c2 in parallel + - wait for them to finish + - run c3 and c4 in parallel + - wait for them to finish + """ + + + def __init__(self, commands_arrays): + self.id = str(id(self)) + + # Filter out empty sub-requests + commands_arrays = filter( + lambda x: len(x) != 0, + commands_arrays, + ) + + self.running = [] + self.waiting = commands_arrays[1:] + self.finished = [] + self.failed = [] + + self.lock = threading.RLock() + if not len(commands_arrays): + # Nothing to run + return + + # Process first iteration of commands_arrays in parallel + results = self.run(commands_arrays[0]) + + self.running.extend(results) + + + def run(self, commands): + """ + A static method that will execute the given list of commands in + parallel and will return the list of command results. + """ + + # Gather the results (in parallel) + results = [] + for index in range(len(commands)): + tag = '%s:%d' % (str(self.id), index) + + # Store the result + result = CommandResult(tag) + result.command = common.humanify_command(commands[index]) + results.append(result) + + # Run the command + instance.send_command(result, json.dumps(commands[index]), tag) + + return results + + + def next(self): + with self.lock: + if not self.waiting: + # Nothing to run + return + + # Run a next iteration of commands + commands = self.waiting[0] + self.waiting = self.waiting[1:] + + self.running.extend(self.run(commands)) + + + def finish(self, tag): + with self.lock: + for index in range(len(self.running)): + if self.running[index].tag == tag: + if self.running[index].r == 0: + self.finished.append(self.running.pop(index)) + else: + self.failed.append(self.running.pop(index)) + return True + + # No such tag found + return False + + + def is_running(self, tag): + for result in self.running: + if result.tag == tag: + return True + return False + + + def is_ready(self): + with self.lock: + return not self.running and self.waiting + + + def is_waiting(self): + return bool(self.waiting) + + + def is_finished(self): + with self.lock: + return not self.running and not self.waiting + + + def has_failed(self): + return bool(self.failed) + + + def get_state(self): + with self.lock: + if not self.is_finished(): + return "pending" + + if self.has_failed(): + return "failed" + + return "success" + + + def humanify(self): + return { + 'id': self.id, + 'running': map( + lambda x: (x.command, x.outs, x.outb), + self.running + ), + 'finished': map( + lambda x: (x.command, x.outs, x.outb), + self.finished + ), + 'waiting': map( + lambda x: (x.command, x.outs, x.outb), + self.waiting + ), + 'failed': map( + lambda x: (x.command, x.outs, x.outb), + self.failed + ), + 'is_waiting': self.is_waiting(), + 'is_finished': self.is_finished(), + 'has_failed': self.has_failed(), + 'state': self.get_state(), + } + + + +class Module(MgrModule): + COMMANDS = [] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + global instance + instance = self + + self.requests = [] + self.requests_lock = threading.RLock() + + self.tokens = {} + self.disable_auth = False + + self.shutdown_key = str(uuid4()) + + self.server = None + + + def serve(self): + try: + self._serve() + except: + self.log.error(str(traceback.format_exc())) + + + def _serve(self): + # Load stored authentication tokens + self.tokens = self.get_config_json("tokens") or {} + + jsonify._instance = jsonify.GenericJSON( + sort_keys=True, + indent=4, + separators=(',', ': '), + ) + + self.cert = self.get_config_json('cert') + self.pkey = self.get_config_json('pkey') + + # Create a new shared cert if it does not exist, yet + if not self.cert or not self.pkey: + (self.cert, self.pkey) = self.create_cert() + self.set_config_json('cert', self.cert) + self.set_config_json('pkey', self.pkey) + + # use SSL context for https + context = SSL.Context(SSL.TLSv1_METHOD) + context.use_certificate( + crypto.load_certificate(crypto.FILETYPE_PEM, self.cert) + ) + context.use_privatekey( + crypto.load_privatekey(crypto.FILETYPE_PEM, self.pkey) + ) + + # Create the HTTPS werkzeug server serving pecan app + self.server = make_server( + host='0.0.0.0', + port=8002, + app=make_app('restful.api.Root'), + ssl_context=context + ) + + self.server.serve_forever() + + + def shutdown(self): + try: + self.server.shutdown() + except: + self.log.error(str(traceback.format_exc())) + + + def notify(self, notify_type, tag): + try: + self._notify(notify_type, tag) + except: + self.log.error(str(traceback.format_exc())) + + + def _notify(self, notify_type, tag): + if notify_type == "command": + # we can safely skip all the sequential commands + if tag == 'seq': + return + + request = filter( + lambda x: x.is_running(tag), + self.requests) + + if len(request) != 1: + self.log.warn("Unknown request '%s'" % str(tag)) + return + + request = request[0] + request.finish(tag) + if request.is_ready(): + request.next() + else: + self.log.debug("Unhandled notification type '%s'" % notify_type) + + + def create_cert(self): + # create a key pair + pkey = crypto.PKey() + pkey.generate_key(crypto.TYPE_RSA, 2048) + + # create a self-signed cert + cert = crypto.X509() + cert.get_subject().C = "US" + cert.get_subject().ST = "Oregon" + cert.get_subject().L = "Portland" + cert.get_subject().O = "IT" + cert.get_subject().CN = "ceph-restful" + cert.set_serial_number(int(uuid4())) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(10*365*24*60*60) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(pkey) + cert.sign(pkey, 'sha512') + + return ( + crypto.dump_certificate(crypto.FILETYPE_PEM, cert), + crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) + ) + + + def get_doc_api(self, root, prefix=''): + doc = {} + for _obj in dir(root): + obj = getattr(root, _obj) + + if isinstance(obj, RestController): + doc.update(self.get_doc_api(obj, prefix + '/' + _obj)) + + if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController): + doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/')) + + prefix = prefix or '/' + + doc[prefix] = {} + for method in 'get', 'post', 'patch', 'delete': + if getattr(root, method, None): + doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n') + + if len(doc[prefix]) == 0: + del doc[prefix] + + return doc + + + def get_mons(self): + mon_map_mons = self.get('mon_map')['mons'] + mon_status = json.loads(self.get('mon_status')['json']) + + # Add more information + for mon in mon_map_mons: + mon['in_quorum'] = mon['rank'] in mon_status['quorum'] + mon['server'] = self.get_metadata("mon", mon['name'])['hostname'] + mon['leader'] = mon['rank'] == mon_status['quorum'][0] + + return mon_map_mons + + + def get_osd_pools(self): + osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds'])) + pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools'])) + crush_rules = self.get('osd_map_crush')['rules'] + + osds_by_pool = {} + for pool_id, pool in pools.items(): + pool_osds = None + for rule in [r for r in crush_rules if r['ruleset'] == pool['crush_ruleset']]: + if rule['min_size'] <= pool['size'] <= rule['max_size']: + pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule) + + osds_by_pool[pool_id] = pool_osds + + for pool_id in pools.keys(): + for in_pool_id in osds_by_pool[pool_id]: + osds[in_pool_id].append(pool_id) + + return osds + + + def get_osds(self, ids=[], pool_id=None): + # Get data + osd_map = self.get('osd_map') + osd_metadata = self.get('osd_metadata') + + # Update the data with the additional info from the osd map + osds = osd_map['osds'] + + # Filter by osd ids + if ids: + osds = filter( + lambda x: str(x['osd']) in ids, + osds + ) + + # Get list of pools per osd node + pools_map = self.get_osd_pools() + + # map osd IDs to reweight + reweight_map = dict([ + (x.get('id'), x.get('reweight', None)) + for x in self.get('osd_map_tree')['nodes'] + ]) + + # Build OSD data objects + for osd in osds: + osd['pools'] = pools_map[osd['osd']] + osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None) + + osd['reweight'] = reweight_map.get(osd['osd'], 0.0) + + if osd['up']: + osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS + else: + osd['valid_commands'] = [] + + # Filter by pool + if pool_id: + pool_id = int(pool_id) + osds = filter( + lambda x: pool_id in x['pools'], + osds + ) + + return osds + + + def get_osd_by_id(self, osd_id): + osd = filter( + lambda x: x['osd'] == osd_id, + self.get('osd_map')['osds'] + ) + + if len(osd) != 1: + return None + + return osd[0] + + + def get_pool_by_id(self, pool_id): + pool = filter( + lambda x: x['pool'] == pool_id, + self.get('osd_map')['pools'], + ) + + if len(pool) != 1: + return None + + return pool[0] + + + def submit_request(self, _request): + request = CommandsRequest(_request) + self.requests.append(request) + return request.humanify() + + + def run_command(self, command): + # tag with 'seq' so that we can ingore these in notify function + result = CommandResult('seq') + + self.send_command(result, json.dumps(command), 'seq') + return result.wait() + + + def verify_user(self, username, password): + r, outb, outs = self.run_command({ + 'prefix': 'auth get', + 'entity': username, + }) + + if r != 0: + return 'No such user/key (%s, %s)' % (outb, outs) + + ## check the capabilities, we are looking for mon allow * + conf = ConfigParser.ConfigParser() + + # ConfigParser can't handle tabs, remove them + conf.readfp(StringIO.StringIO(outb.replace('\t', ''))) + + if not conf.has_section(username): + return 'Failed to parse the auth details' + + key = conf.get(username, 'key') + + if password != key: + return 'Invalid key' + + if not conf.has_option(username, 'caps mon'): + return 'No mon caps set' + + caps = conf.get(username, 'caps mon') + + if caps not in ['"allow *"', '"allow profile mgr"']: + return 'Insufficient mon caps set' + + # Returning '' means 'no objections' + return '' + + + def set_token(self, user): + self.tokens[user] = str(uuid4()) + + self.set_config_json('tokens', self.tokens) + + return self.tokens[user] + + + def unset_token(self, user): + if user not in self.tokens: + return False + + del self.tokens[user] + self.set_config_json('tokens', self.tokens) + + return True