]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-mgr: Implement new pecan-based rest api
authorBoris Ranto <branto@redhat.com>
Wed, 12 Apr 2017 21:54:36 +0000 (23:54 +0200)
committerBoris Ranto <branto@redhat.com>
Mon, 22 May 2017 17:18:58 +0000 (19:18 +0200)
The new rest API uses pecan for the restful functionality and simplifies
the code significantly. It should be mostly equivalent in functionality
to the django-based rest API. The api is self-documenting via /doc
endpoint.

Signed-off-by: Boris Ranto <branto@redhat.com>
ceph.spec.in
debian/control
src/common/config_opts.h
src/mon/MonCap.cc
src/pybind/mgr/restful/__init__.py [new file with mode: 0644]
src/pybind/mgr/restful/api.py [new file with mode: 0644]
src/pybind/mgr/restful/common.py [new file with mode: 0644]
src/pybind/mgr/restful/module.py [new file with mode: 0644]

index 30831c11c30f1106cb10e8a49a99de0da6e655a1..130a4bb579e7af13d9076f2f9fce69646fdef1cf 100644 (file)
@@ -312,12 +312,16 @@ Group:          System/Filesystems
 %endif
 Requires:       ceph-base = %{epoch}:%{version}-%{release}
 %if 0%{?fedora} || 0%{?rhel}
-Requires:      python-cherrypy
+Requires:       python-cherrypy
+Requires:       pyOpenSSL
+Requires:       python-werkzeug
 %endif
 %if 0%{?suse_version}
 Requires:      python-CherryPy
+Requires:       python-pyOpenSSL
+Requires:       python-Werkzeug
 %endif
-
+Requires:       python-pecan
 %description mgr
 ceph-mgr enables python modules that provide services (such as the REST
 module derived from Calamari) and expose CLI hooks.  ceph-mgr gathers
index ae6c6a73501d1c6812b5aacb95607c10289e1cbe..b34e778046aa0b287adb4c87d1e50bbe199a8d0b 100644 (file)
@@ -161,6 +161,9 @@ Description: debugging symbols for ceph-mds
 Package: ceph-mgr
 Architecture: linux-any
 Depends: ceph-base (= ${binary:Version}),
+         python-openssl
+         python-pecan
+         python-werkzeug
          ${misc:Depends},
          ${python:Depends},
         python-cherrypy3,
index 13e1a558441cee6456ea289ef4c6754bb581d5c4..e033962ca4314bb0234ce929c7b7cea5d894408a 100644 (file)
@@ -1673,7 +1673,7 @@ OPTION(rgw_shard_warning_threshold, OPT_DOUBLE, 90) // pct of safe max
 OPTION(rgw_swift_versioning_enabled, OPT_BOOL, false) // whether swift object versioning feature is enabled
 
 OPTION(mgr_module_path, OPT_STR, CEPH_PKGLIBDIR "/mgr") // where to load python modules from
-OPTION(mgr_modules, OPT_STR, "rest")  // Which modules to load
+OPTION(mgr_modules, OPT_STR, "restful")  // Which modules to load
 OPTION(mgr_data, OPT_STR, "/var/lib/ceph/mgr/$cluster-$id") // where to find keyring etc
 OPTION(mgr_beacon_period, OPT_INT, 5)  // How frequently to send beacon
 OPTION(mgr_stats_period, OPT_INT, 5) // How frequently to send stats
index 558e67d9cca2fa2330c0dfbb65ea671dd657ebce..6ae8ab3db76b0e9a09c0887787d71537f3293ab0 100644 (file)
@@ -173,7 +173,8 @@ void MonCapGrant::expand_profile_mon(const EntityName& name) const
     profile_grants.push_back(MonCapGrant("mon", MON_CAP_R));
     profile_grants.push_back(MonCapGrant("mds", MON_CAP_R));
     profile_grants.push_back(MonCapGrant("osd", MON_CAP_R | MON_CAP_W));
-    profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R));
+    profile_grants.push_back(MonCapGrant("auth", MON_CAP_R | MON_CAP_X));
+    profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R | MON_CAP_W));
     string prefix = string("daemon-private/mgr/");
     profile_grants.push_back(MonCapGrant("config-key get", "key",
                                         StringConstraint("", prefix)));
diff --git a/src/pybind/mgr/restful/__init__.py b/src/pybind/mgr/restful/__init__.py
new file mode 100644 (file)
index 0000000..0440e07
--- /dev/null
@@ -0,0 +1 @@
+from module import *  # NOQA
diff --git a/src/pybind/mgr/restful/api.py b/src/pybind/mgr/restful/api.py
new file mode 100644 (file)
index 0000000..9cceccb
--- /dev/null
@@ -0,0 +1,721 @@
+from pecan import expose, request, response
+from pecan.rest import RestController
+
+import common
+import traceback
+
+from base64 import b64decode
+from functools import wraps
+from collections import defaultdict
+
+## We need this to access the instance of the module
+#
+# We can't use 'from module import instance' because
+# the instance is not ready, yet (would be None)
+import module
+
+
+# Helper function to catch and log the exceptions
+def catch(f):
+    @wraps(f)
+    def catcher(*args, **kwargs):
+        try:
+            return f(*args, **kwargs)
+        except:
+            module.instance.log.error(str(traceback.format_exc()))
+            response.status = 500
+            return {'message': str(traceback.format_exc()).split('\n')}
+    return catcher
+
+
+# Handle authorization
+def auth(f):
+    @wraps(f)
+    def decorated(*args, **kwargs):
+        if not request.authorization:
+            response.status = 401
+            response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"'
+            return {'message': 'auth: No HTTP username/password'}
+
+        username, password = b64decode(request.authorization[1]).split(':')
+
+        # Lookup the password-less tokens first
+        if username not in module.instance.tokens.values():
+            # Check the ceph auth db
+            msg = module.instance.verify_user(username, password)
+            if msg:
+                response.status = 401
+                response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"'
+                return {'message': 'auth: No HTTP username/password'}
+
+        return f(*args, **kwargs)
+    return decorated
+
+
+# Helper function to lock the function
+def lock(f):
+    @wraps(f)
+    def locker(*args, **kwargs):
+        with module.instance.requests_lock:
+            return f(*args, **kwargs)
+    return locker
+
+
+
+class ServerFqdn(RestController):
+    def __init__(self, fqdn):
+        self.fqdn = fqdn
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for the server fqdn
+        """
+        return module.instance.get_server(self.fqdn)
+
+
+
+class Server(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for all the servers
+        """
+        return module.instance.list_servers()
+
+
+    @expose()
+    def _lookup(self, fqdn, *remainder):
+        return ServerFqdn(fqdn), remainder
+
+
+
+class RequestId(RestController):
+    def __init__(self, request_id):
+        self.request_id = request_id
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for the request id
+        """
+        request = filter(
+            lambda x: x.id == self.request_id,
+            module.instance.requests
+        )
+
+        if len(request) != 1:
+            response.status = 500
+            return {'message': 'Unknown request id "%s"' % str(self.request_id)}
+
+        request = request[0]
+        return request.humanify()
+
+
+    @expose('json')
+    @catch
+    @auth
+    @lock
+    def delete(self):
+        """
+        Remove the request id from the database
+        """
+        for index in range(len(module.instance.requests)):
+            if module.instance.requests[index].id == self.request_id:
+                return module.instance.requests.pop(index).humanify()
+
+        # Failed to find the job to cancel
+        response.status = 500
+        return {'message': 'No such request id'}
+
+
+
+class Request(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        List all the available requests and their state
+        """
+        states = {}
+        for _request in module.instance.requests:
+            states[_request.id] = _request.get_state()
+
+        return states
+
+
+    @expose('json')
+    @catch
+    @auth
+    @lock
+    def delete(self):
+        """
+        Remove all the finished requests
+        """
+        num_requests = len(module.instance.requests)
+
+        module.instance.requests = filter(
+            lambda x: not x.is_finished(),
+            module.instance.requests
+        )
+
+        # Return the job statistics
+        return {
+            'cleaned': num_requests - len(module.instance.requests),
+            'remaining': len(module.instance.requests),
+        }
+
+
+    @expose()
+    def _lookup(self, request_id, *remainder):
+        return RequestId(request_id), remainder
+
+
+
+class PoolId(RestController):
+    def __init__(self, pool_id):
+        self.pool_id = pool_id
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for the pool id
+        """
+        pool = module.instance.get_pool_by_id(self.pool_id)
+
+        if not pool:
+            response.status = 500
+            return {'message': 'Failed to identify the pool id "%d"' % self.pool_id}
+
+        # pgp_num is called pg_placement_num, deal with that
+        if 'pg_placement_num' in pool:
+            pool['pgp_num'] = pool.pop('pg_placement_num')
+        return pool
+
+
+    @expose('json')
+    @catch
+    @auth
+    def patch(self):
+        """
+        Modify the information for the pool id
+        """
+        args = request.json
+
+        # Get the pool info for its name
+        pool = module.instance.get_pool_by_id(self.pool_id)
+        if not pool:
+            response.status = 500
+            return {'message': 'Failed to identify the pool id "%d"' % self.pool_id}
+
+        # Check for invalid pool args
+        invalid = common.invalid_pool_args(args)
+        if invalid:
+            response.status = 500
+            return {'message': 'Invalid arguments found: "%s"' % str(invalid)}
+
+        # Schedule the update request
+        return module.instance.submit_request(common.pool_update_commands(pool['pool_name'], args))
+
+
+    @expose('json')
+    @catch
+    @auth
+    def delete(self):
+        """
+        Remove the pool data for the pool id
+        """
+        pool = module.instance.get_pool_by_id(self.pool_id)
+
+        if not pool:
+            response.status = 500
+            return {'message': 'Failed to identify the pool id "%d"' % self.pool_id}
+
+        return module.instance.submit_request([[{
+            'prefix': 'osd pool delete',
+            'pool': pool['pool_name'],
+            'pool2': pool['pool_name'],
+            'sure': '--yes-i-really-really-mean-it'
+        }]])
+
+
+
+class Pool(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for all the pools
+        """
+        pools = module.instance.get('osd_map')['pools']
+
+        # pgp_num is called pg_placement_num, deal with that
+        for pool in pools:
+            if 'pg_placement_num' in pool:
+                pool['pgp_num'] = pool.pop('pg_placement_num')
+
+        return pools
+
+
+    @expose('json')
+    @catch
+    @auth
+    def post(self):
+        """
+        Create a new pool
+        Requires name and pg_num dict arguments
+        """
+        args = request.json
+
+        # Check for the required arguments
+        pool_name = args.pop('name', None)
+        if pool_name is None:
+            response.status = 500
+            return {'message': 'You need to specify the pool "name" argument'}
+
+        pg_num = args.pop('pg_num', None)
+        if pg_num is None:
+            response.status = 500
+            return {'message': 'You need to specify the "pg_num" argument'}
+
+        # Run the pool create command first
+        create_command = {
+            'prefix': 'osd pool create',
+            'pool': pool_name,
+            'pg_num': pg_num
+        }
+
+        # Check for invalid pool args
+        invalid = common.invalid_pool_args(args)
+        if invalid:
+            response.status = 500
+            return {'message': 'Invalid arguments found: "%s"' % str(invalid)}
+
+        # Schedule the creation and update requests
+        return module.instance.submit_request(
+            [[create_command]] +
+            common.pool_update_commands(pool_name, args)
+        )
+
+
+    @expose()
+    def _lookup(self, pool_id, *remainder):
+        return PoolId(int(pool_id)), remainder
+
+
+
+class OsdIdCommand(RestController):
+    def __init__(self, osd_id):
+        self.osd_id = osd_id
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show implemented commands for the OSD id
+        """
+        osd = module.instance.get_osd_by_id(self.osd_id)
+
+        if not osd:
+            response.status = 500
+            return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id}
+
+        if osd['up']:
+            return common.OSD_IMPLEMENTED_COMMANDS
+        else:
+            return []
+
+
+    @expose('json')
+    @catch
+    @auth
+    def post(self):
+        """
+        Run the implemented command for the OSD id
+        """
+        command = request.json.get('command', None)
+
+        osd = module.instance.get_osd_by_id(self.osd_id)
+
+        if not osd:
+            response.status = 500
+            return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id}
+
+        if not osd['up'] or command not in common.OSD_IMPLEMENTED_COMMANDS:
+            response.status = 500
+            return {'message': 'Command "%s" not available' % command}
+
+        return module.instance.submit_request([[{
+            'prefix': 'osd ' + command,
+            'who': str(self.osd_id)
+        }]])
+
+
+
+class OsdId(RestController):
+    def __init__(self, osd_id):
+        self.osd_id = osd_id
+        self.command = OsdIdCommand(osd_id)
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for the OSD id
+        """
+        osd = module.instance.get_osds([str(self.osd_id)])
+        if len(osd) != 1:
+            response.status = 500
+            return {'message': 'Failed to identify the OSD id "%d"' % self.osd_id}
+
+        return osd[0]
+
+
+    @expose('json')
+    @catch
+    @auth
+    def patch(self):
+        """
+        Modify the state (up, in) of the OSD id or reweight it
+        """
+        args = request.json
+
+        commands = []
+
+        if 'in' in args:
+            if args['in']:
+                commands.append({
+                    'prefix': 'osd in',
+                    'ids': [str(self.osd_id)]
+                })
+            else:
+                commands.append({
+                    'prefix': 'osd out',
+                    'ids': [str(self.osd_id)]
+                })
+
+        if 'up' in args:
+            if args['up']:
+                response.status = 500
+                return {'message': "It is not valid to set a down OSD to be up"}
+            else:
+                commands.append({
+                    'prefix': 'osd down',
+                    'ids': [str(self.osd_id)]
+                })
+
+        if 'reweight' in args:
+            commands.append({
+                'prefix': 'osd reweight',
+                'id': self.osd_id,
+                'weight': args['reweight']
+            })
+
+        return module.instance.submit_request([commands])
+
+
+
+class Osd(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for all the OSDs
+        """
+        # Parse request args
+        ids = request.GET.getall('id[]')
+        pool_id = request.GET.get('pool', None)
+
+        return module.instance.get_osds(ids, pool_id)
+
+
+    @expose()
+    def _lookup(self, osd_id, *remainder):
+        return OsdId(int(osd_id)), remainder
+
+
+
+class MonName(RestController):
+    def __init__(self, name):
+        self.name = name
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for the monitor name
+        """
+        mon = filter(
+            lambda x: x['name'] == self.name,
+            module.instance.get_mons()
+        )
+
+        if len(mon) != 1:
+            response.status = 500
+            return {'message': 'Failed to identify the monitor node "%s"' % self.name}
+
+        return mon[0]
+
+
+
+class Mon(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show the information for all the monitors
+        """
+        return module.instance.get_mons()
+
+
+    @expose()
+    def _lookup(self, name, *remainder):
+        return MonName(name), remainder
+
+
+
+class Doc(RestController):
+    @expose('json')
+    @catch
+    def get(self):
+        """
+        Show documentation information
+        """
+        return module.instance.get_doc_api(Root)
+
+
+
+class CrushRuleset(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show crush rulesets
+        """
+        rules = module.instance.get('osd_map_crush')['rules']
+        nodes = module.instance.get('osd_map_tree')['nodes']
+
+        ruleset = defaultdict(list)
+        for rule in rules:
+            rule['osd_count'] = len(common.crush_rule_osds(nodes, rule))
+            ruleset[rule['ruleset']].append(rule)
+
+        return ruleset
+
+
+
+class CrushRule(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show crush rules
+        """
+        rules = module.instance.get('osd_map_crush')['rules']
+        nodes = module.instance.get('osd_map_tree')['nodes']
+
+        for rule in rules:
+            rule['osd_count'] = len(common.crush_rule_osds(nodes, rule))
+
+        return rules
+
+
+
+class Crush(RestController):
+    rule = CrushRule()
+    ruleset = CrushRuleset()
+
+
+
+class ConfigOsd(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show OSD configuration options
+        """
+        flags = module.instance.get("osd_map")['flags']
+
+        # pause is a valid osd config command that sets pauserd,pausewr
+        flags = flags.replace('pauserd,pausewr', 'pause')
+
+        return flags.split(',')
+
+
+    @expose('json')
+    @catch
+    @auth
+    def patch(self):
+        """
+        Modify OSD configration options
+        """
+
+        args = request.json
+
+        commands = []
+
+        valid_flags = set(args.keys()) & set(common.OSD_FLAGS)
+        invalid_flags = list(set(args.keys()) - valid_flags)
+        if invalid_flags:
+            module.instance.log.warn("%s not valid to set/unset" % invalid_flags)
+
+        for flag in list(valid_flags):
+            if args[flag]:
+                mode = 'set'
+            else:
+                mode = 'unset'
+
+            commands.append({
+                'prefix': 'osd ' + mode,
+                'key': flag,
+            })
+
+        return module.instance.submit_request([commands])
+
+
+
+class ConfigClusterKey(RestController):
+    def __init__(self, key):
+        self.key = key
+
+
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show specific configuration option
+        """
+        return module.instance.get("config").get(self.key, None)
+
+
+
+class ConfigCluster(RestController):
+    @expose('json')
+    @catch
+    @auth
+    def get(self):
+        """
+        Show all cluster configuration options
+        """
+        return module.instance.get("config")
+
+
+    @expose()
+    def _lookup(self, key, *remainder):
+        return ConfigClusterKey(key), remainder
+
+
+
+class Config(RestController):
+    cluster = ConfigCluster()
+    osd = ConfigOsd()
+
+
+
+class Auth(RestController):
+    @expose('json')
+    @catch
+    def get(self):
+        """
+        Generate a brand new password-less login token for the user
+        Uses HTTP Basic Auth for authentication
+        """
+        if not request.authorization:
+            return (
+                {'message': 'auth: No HTTP username/password'},
+                401,
+                {'WWW-Authenticate': 'Basic realm="Login Required"'}
+            )
+
+        username, password = b64decode(request.authorization[1]).split(':')
+        # Do not create a new token for a username that is already a token
+        if username in module.instance.tokens.values():
+            return {
+                'token': username
+            }
+
+        # Check the ceph auth db
+        msg = module.instance.verify_user(username, password)
+        if msg:
+            return (
+                {'message': 'auth: ' + msg},
+                401,
+                {'WWW-Authenticate': 'Basic realm="Login Required"'}
+            )
+
+        # Create a password-less login token for the user
+        # This overwrites any old user tokens
+        return {
+            'token': module.instance.set_token(username)
+        }
+
+
+    @expose('json')
+    @catch
+    @auth
+    def delete(self):
+        """
+        Delete the password-less login token for the user
+        """
+
+        username, password = b64decode(request.authorization[1]).split(':')
+
+        if module.instance.unset_token(username):
+            return {'success': 'auth: Token removed'}
+
+        response.status = 500
+        return {'message': 'auth: No token for the user'}
+
+
+
+class Root(RestController):
+    auth = Auth()
+    config = Config()
+    crush = Crush()
+    doc = Doc()
+    mon = Mon()
+    osd = Osd()
+    pool = Pool()
+    request = Request()
+    server = Server()
+
+    @expose('json')
+    @catch
+    def get(self):
+        """
+        Show the basic information for the REST API
+        This includes values like api version or auth method
+        """
+        return {
+            'api_version': 1,
+            'auth':
+                'Use ceph auth key pair as HTTP Basic user/password '
+                '(requires caps mon allow * to function properly)',
+            'doc': 'See /doc endpoint',
+            'info': "Ceph Manager RESTful API server",
+        }
diff --git a/src/pybind/mgr/restful/common.py b/src/pybind/mgr/restful/common.py
new file mode 100644 (file)
index 0000000..15a14ed
--- /dev/null
@@ -0,0 +1,150 @@
+# List of valid osd flags
+OSD_FLAGS = [
+    'pause', 'noup', 'nodown', 'noout', 'noin', 'nobackfill',
+    'norecover', 'noscrub', 'nodeep-scrub',
+]
+
+# Implemented osd commands
+OSD_IMPLEMENTED_COMMANDS = [
+    'scrub', 'deep_scrub', 'repair'
+]
+
+# Valid values for the 'var' argument to 'ceph osd pool set'
+POOL_PROPERTIES_1 = [
+    'size', 'min_size', 'crash_replay_interval', 'pg_num',
+    'crush_ruleset', 'hashpspool',
+]
+
+POOL_PROPERTIES_2 = [
+    'pgp_num'
+]
+
+POOL_PROPERTIES = POOL_PROPERTIES_1 + POOL_PROPERTIES_2
+
+# Valid values for the 'ceph osd pool set-quota' command
+POOL_QUOTA_PROPERTIES = [
+    ('quota_max_bytes', 'max_bytes'),
+    ('quota_max_objects', 'max_objects'),
+]
+
+POOL_ARGS = POOL_PROPERTIES + map(
+    lambda x: x[0],
+    POOL_QUOTA_PROPERTIES
+)
+
+
+# Transform command to a human readable form
+def humanify_command(command):
+    out = [command['prefix']]
+
+    for arg, val in command.iteritems():
+        if arg != 'prefix':
+            out.append("%s=%s" % (str(arg), str(val)))
+
+    return " ".join(out)
+
+
+def invalid_pool_args(args):
+    invalid = []
+    for arg in args:
+        if arg not in POOL_ARGS:
+            invalid.append(arg)
+
+    return invalid
+
+
+def pool_update_commands(pool_name, args):
+    commands = [[], []]
+
+    # We should increase pgp_num when we are re-setting pg_num
+    if 'pg_num' in args and 'pgp_num' not in args:
+        args['pgp_num'] = args['pg_num']
+
+    # Run the first pool set and quota properties in parallel
+    for var in POOL_PROPERTIES_1:
+        if var in args:
+            commands[0].append({
+                'prefix': 'osd pool set',
+                'pool': pool_name,
+                'var': var,
+                'val': args[var],
+            })
+
+    for (var, field) in POOL_QUOTA_PROPERTIES:
+        if var in args:
+            commands[0].append({
+                'prefix': 'osd pool set-quota',
+                'pool': pool_name,
+                'field': field,
+                'val': str(args[var]),
+            })
+
+    # The second pool set properties need to be run after the first wave
+    for var in POOL_PROPERTIES_2:
+        if var in args:
+            commands[1].append({
+                'prefix': 'osd pool set',
+                'pool': pool_name,
+                'var': var,
+            })
+
+    return commands
+
+
+def crush_rule_osds(nodes, rule):
+    nodes_by_id = dict((n['id'], n) for n in nodes)
+
+    def _gather_leaf_ids(node):
+        if node['id'] >= 0:
+            return set([node['id']])
+
+        result = set()
+        for child_id in node['children']:
+            if child_id >= 0:
+                result.add(child_id)
+            else:
+                result |= _gather_leaf_ids(nodes_by_id[child_id])
+
+        return result
+
+    def _gather_descendent_ids(node, typ):
+        result = set()
+        for child_id in node['children']:
+            child_node = nodes_by_id[child_id]
+            if child_node['type'] == typ:
+                result.add(child_node['id'])
+            elif 'children' in child_node:
+                result |= _gather_descendent_ids(child_node, typ)
+
+        return result
+
+    def _gather_osds(root, steps):
+        if root['id'] >= 0:
+            return set([root['id']])
+
+        osds = set()
+        step = steps[0]
+        if step['op'] == 'choose_firstn':
+            # Choose all descendents of the current node of type 'type'
+            d = _gather_descendent_ids(root, step['type'])
+            for desc_node in [nodes_by_id[i] for i in d]:
+                osds |= _gather_osds(desc_node, steps[1:])
+        elif step['op'] == 'chooseleaf_firstn':
+            # Choose all descendents of the current node of type 'type',
+            # and select all leaves beneath those
+            for desc_node in [nodes_by_id[i] for i in _gather_descendent_ids(root, step['type'])]:
+                # Short circuit another iteration to find the emit
+                # and assume anything we've done a chooseleaf on
+                # is going to be part of the selected set of osds
+                osds |= _gather_leaf_ids(desc_node)
+        elif step['op'] == 'emit':
+            if root['id'] >= 0:
+                osds |= root['id']
+
+        return osds
+
+    osds = set()
+    for i, step in enumerate(rule['steps']):
+        if step['op'] == 'take':
+            osds |= _gather_osds(nodes_by_id[step['item']], rule['steps'][i + 1:])
+    return osds
diff --git a/src/pybind/mgr/restful/module.py b/src/pybind/mgr/restful/module.py
new file mode 100644 (file)
index 0000000..ddfff97
--- /dev/null
@@ -0,0 +1,500 @@
+"""
+A RESTful API for Ceph
+"""
+
+import json
+import inspect
+import StringIO
+import threading
+import traceback
+import ConfigParser
+
+import common
+
+from uuid import uuid4
+from pecan import jsonify, make_app
+from OpenSSL import SSL, crypto
+from pecan.rest import RestController
+from werkzeug.serving import make_server
+
+
+from mgr_module import MgrModule, CommandResult
+
+# Global instance to share
+instance = None
+
+
+
+class CommandsRequest(object):
+    """
+    This class handles parallel as well as sequential execution of
+    commands. The class accept a list of iterables that should be
+    executed sequentially. Each iterable can contain several commands
+    that can be executed in parallel.
+
+    Example:
+    [[c1,c2],[c3,c4]]
+     - run c1 and c2 in parallel
+     - wait for them to finish
+     - run c3 and c4 in parallel
+     - wait for them to finish
+    """
+
+
+    def __init__(self, commands_arrays):
+        self.id = str(id(self))
+
+        # Filter out empty sub-requests
+        commands_arrays = filter(
+            lambda x: len(x) != 0,
+            commands_arrays,
+        )
+
+        self.running = []
+        self.waiting = commands_arrays[1:]
+        self.finished = []
+        self.failed = []
+
+        self.lock = threading.RLock()
+        if not len(commands_arrays):
+            # Nothing to run
+            return
+
+        # Process first iteration of commands_arrays in parallel
+        results = self.run(commands_arrays[0])
+
+        self.running.extend(results)
+
+
+    def run(self, commands):
+        """
+        A static method that will execute the given list of commands in
+        parallel and will return the list of command results.
+        """
+
+        # Gather the results (in parallel)
+        results = []
+        for index in range(len(commands)):
+            tag = '%s:%d' % (str(self.id), index)
+
+            # Store the result
+            result = CommandResult(tag)
+            result.command = common.humanify_command(commands[index])
+            results.append(result)
+
+            # Run the command
+            instance.send_command(result, json.dumps(commands[index]), tag)
+
+        return results
+
+
+    def next(self):
+        with self.lock:
+            if not self.waiting:
+                # Nothing to run
+                return
+
+            # Run a next iteration of commands
+            commands = self.waiting[0]
+            self.waiting = self.waiting[1:]
+
+            self.running.extend(self.run(commands))
+
+
+    def finish(self, tag):
+        with self.lock:
+            for index in range(len(self.running)):
+                if self.running[index].tag == tag:
+                    if self.running[index].r == 0:
+                        self.finished.append(self.running.pop(index))
+                    else:
+                        self.failed.append(self.running.pop(index))
+                    return True
+
+            # No such tag found
+            return False
+
+
+    def is_running(self, tag):
+        for result in self.running:
+            if result.tag == tag:
+                return True
+        return False
+
+
+    def is_ready(self):
+        with self.lock:
+            return not self.running and self.waiting
+
+
+    def is_waiting(self):
+        return bool(self.waiting)
+
+
+    def is_finished(self):
+        with self.lock:
+            return not self.running and not self.waiting
+
+
+    def has_failed(self):
+        return bool(self.failed)
+
+
+    def get_state(self):
+        with self.lock:
+            if not self.is_finished():
+                return "pending"
+
+            if self.has_failed():
+                return "failed"
+
+            return "success"
+
+
+    def humanify(self):
+        return {
+            'id': self.id,
+            'running': map(
+                lambda x: (x.command, x.outs, x.outb),
+                self.running
+            ),
+            'finished': map(
+                lambda x: (x.command, x.outs, x.outb),
+                self.finished
+            ),
+            'waiting': map(
+                lambda x: (x.command, x.outs, x.outb),
+                self.waiting
+            ),
+            'failed': map(
+                lambda x: (x.command, x.outs, x.outb),
+                self.failed
+            ),
+            'is_waiting': self.is_waiting(),
+            'is_finished': self.is_finished(),
+            'has_failed': self.has_failed(),
+            'state': self.get_state(),
+        }
+
+
+
+class Module(MgrModule):
+    COMMANDS = []
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        global instance
+        instance = self
+
+        self.requests = []
+        self.requests_lock = threading.RLock()
+
+        self.tokens = {}
+        self.disable_auth = False
+
+        self.shutdown_key = str(uuid4())
+
+        self.server = None
+
+
+    def serve(self):
+        try:
+            self._serve()
+        except:
+            self.log.error(str(traceback.format_exc()))
+
+
+    def _serve(self):
+        # Load stored authentication tokens
+        self.tokens = self.get_config_json("tokens") or {}
+
+        jsonify._instance = jsonify.GenericJSON(
+            sort_keys=True,
+            indent=4,
+            separators=(',', ': '),
+        )
+
+        self.cert = self.get_config_json('cert')
+        self.pkey = self.get_config_json('pkey')
+
+        # Create a new shared cert if it does not exist, yet
+        if not self.cert or not self.pkey:
+            (self.cert, self.pkey) = self.create_cert()
+            self.set_config_json('cert', self.cert)
+            self.set_config_json('pkey', self.pkey)
+
+        # use SSL context for https
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.use_certificate(
+            crypto.load_certificate(crypto.FILETYPE_PEM, self.cert)
+        )
+        context.use_privatekey(
+            crypto.load_privatekey(crypto.FILETYPE_PEM, self.pkey)
+        )
+
+        # Create the HTTPS werkzeug server serving pecan app
+        self.server = make_server(
+            host='0.0.0.0',
+            port=8002,
+            app=make_app('restful.api.Root'),
+            ssl_context=context
+        )
+
+        self.server.serve_forever()
+
+
+    def shutdown(self):
+        try:
+            self.server.shutdown()
+        except:
+            self.log.error(str(traceback.format_exc()))
+
+
+    def notify(self, notify_type, tag):
+        try:
+            self._notify(notify_type, tag)
+        except:
+            self.log.error(str(traceback.format_exc()))
+
+
+    def _notify(self, notify_type, tag):
+        if notify_type == "command":
+            # we can safely skip all the sequential commands
+            if tag == 'seq':
+                return
+
+            request = filter(
+                lambda x: x.is_running(tag),
+                self.requests)
+
+            if len(request) != 1:
+                self.log.warn("Unknown request '%s'" % str(tag))
+                return
+
+            request = request[0]
+            request.finish(tag)
+            if request.is_ready():
+                request.next()
+        else:
+            self.log.debug("Unhandled notification type '%s'" % notify_type)
+
+
+    def create_cert(self):
+        # create a key pair
+        pkey = crypto.PKey()
+        pkey.generate_key(crypto.TYPE_RSA, 2048)
+
+        # create a self-signed cert
+        cert = crypto.X509()
+        cert.get_subject().C = "US"
+        cert.get_subject().ST = "Oregon"
+        cert.get_subject().L = "Portland"
+        cert.get_subject().O = "IT"
+        cert.get_subject().CN = "ceph-restful"
+        cert.set_serial_number(int(uuid4()))
+        cert.gmtime_adj_notBefore(0)
+        cert.gmtime_adj_notAfter(10*365*24*60*60)
+        cert.set_issuer(cert.get_subject())
+        cert.set_pubkey(pkey)
+        cert.sign(pkey, 'sha512')
+
+        return (
+            crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
+            crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
+        )
+
+
+    def get_doc_api(self, root, prefix=''):
+        doc = {}
+        for _obj in dir(root):
+            obj = getattr(root, _obj)
+
+            if isinstance(obj, RestController):
+                doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
+
+        if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
+            doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
+
+        prefix = prefix or '/'
+
+        doc[prefix] = {}
+        for method in 'get', 'post', 'patch', 'delete':
+            if getattr(root, method, None):
+                doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
+
+        if len(doc[prefix]) == 0:
+            del doc[prefix]
+
+        return doc
+
+
+    def get_mons(self):
+        mon_map_mons = self.get('mon_map')['mons']
+        mon_status = json.loads(self.get('mon_status')['json'])
+
+        # Add more information
+        for mon in mon_map_mons:
+            mon['in_quorum'] = mon['rank'] in mon_status['quorum']
+            mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
+            mon['leader'] = mon['rank'] == mon_status['quorum'][0]
+
+        return mon_map_mons
+
+
+    def get_osd_pools(self):
+        osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
+        pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
+        crush_rules = self.get('osd_map_crush')['rules']
+
+        osds_by_pool = {}
+        for pool_id, pool in pools.items():
+            pool_osds = None
+            for rule in [r for r in crush_rules if r['ruleset'] == pool['crush_ruleset']]:
+                if rule['min_size'] <= pool['size'] <= rule['max_size']:
+                    pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
+
+            osds_by_pool[pool_id] = pool_osds
+
+        for pool_id in pools.keys():
+            for in_pool_id in osds_by_pool[pool_id]:
+                osds[in_pool_id].append(pool_id)
+
+        return osds
+
+
+    def get_osds(self, ids=[], pool_id=None):
+        # Get data
+        osd_map = self.get('osd_map')
+        osd_metadata = self.get('osd_metadata')
+
+        # Update the data with the additional info from the osd map
+        osds = osd_map['osds']
+
+        # Filter by osd ids
+        if ids:
+            osds = filter(
+                lambda x: str(x['osd']) in ids,
+                osds
+            )
+
+        # Get list of pools per osd node
+        pools_map = self.get_osd_pools()
+
+        # map osd IDs to reweight
+        reweight_map = dict([
+            (x.get('id'), x.get('reweight', None))
+            for x in self.get('osd_map_tree')['nodes']
+        ])
+
+        # Build OSD data objects
+        for osd in osds:
+            osd['pools'] = pools_map[osd['osd']]
+            osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
+
+            osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
+
+            if osd['up']:
+                osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
+            else:
+                osd['valid_commands'] = []
+
+        # Filter by pool
+        if pool_id:
+            pool_id = int(pool_id)
+            osds = filter(
+                lambda x: pool_id in x['pools'],
+                osds
+            )
+
+        return osds
+
+
+    def get_osd_by_id(self, osd_id):
+        osd = filter(
+            lambda x: x['osd'] == osd_id,
+            self.get('osd_map')['osds']
+        )
+
+        if len(osd) != 1:
+            return None
+
+        return osd[0]
+
+
+    def get_pool_by_id(self, pool_id):
+        pool = filter(
+            lambda x: x['pool'] == pool_id,
+            self.get('osd_map')['pools'],
+        )
+
+        if len(pool) != 1:
+            return None
+
+        return pool[0]
+
+
+    def submit_request(self, _request):
+        request = CommandsRequest(_request)
+        self.requests.append(request)
+        return request.humanify()
+
+
+    def run_command(self, command):
+        # tag with 'seq' so that we can ingore these in notify function
+        result = CommandResult('seq')
+
+        self.send_command(result, json.dumps(command), 'seq')
+        return result.wait()
+
+
+    def verify_user(self, username, password):
+        r, outb, outs = self.run_command({
+            'prefix': 'auth get',
+            'entity': username,
+        })
+
+        if r != 0:
+            return 'No such user/key (%s, %s)' % (outb, outs)
+
+        ## check the capabilities, we are looking for mon allow *
+        conf = ConfigParser.ConfigParser()
+
+        # ConfigParser can't handle tabs, remove them
+        conf.readfp(StringIO.StringIO(outb.replace('\t', '')))
+
+        if not conf.has_section(username):
+            return 'Failed to parse the auth details'
+
+        key = conf.get(username, 'key')
+
+        if password != key:
+            return 'Invalid key'
+
+        if not conf.has_option(username, 'caps mon'):
+            return 'No mon caps set'
+
+        caps = conf.get(username, 'caps mon')
+
+        if caps not in ['"allow *"', '"allow profile mgr"']:
+            return 'Insufficient mon caps set'
+
+        # Returning '' means 'no objections'
+        return ''
+
+
+    def set_token(self, user):
+        self.tokens[user] = str(uuid4())
+
+        self.set_config_json('tokens', self.tokens)
+
+        return self.tokens[user]
+
+
+    def unset_token(self, user):
+        if user not in self.tokens:
+            return False
+
+        del self.tokens[user]
+        self.set_config_json('tokens', self.tokens)
+
+        return True