+++ /dev/null
-# vim: ts=8 et sw=4 sts=4
-"""
-ceph-mgr DeepSea orchestrator module
-"""
-
-# We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
-# we don't want to aggregate multiple salt invocations here, because that means
-# this module would need to know too much about how DeepSea works internally.
-# Better to expose new runners from DeepSea to match what the orchestrator needs.
-
-import json
-import errno
-try:
- from typing import Dict
-except ImportError:
- pass # type checking
-
-import requests
-
-from threading import Event, Thread, Lock
-
-from ceph.deployment import inventory
-from mgr_module import MgrModule
-import orchestrator
-
-
-class RequestException(Exception):
- def __init__(self, message, status_code=None):
- super(RequestException, self).__init__(message)
- self.status_code = status_code
-
-
-class DeepSeaReadCompletion(orchestrator.Completion):
- def _process_result(self, data):
- self.finalize(data)
-
-
-class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
- MODULE_OPTIONS = [
- {
- 'name': 'salt_api_url',
- 'default': ''
- },
- {
- 'name': 'salt_api_eauth',
- 'default': 'sharedsecret'
- },
- {
- 'name': 'salt_api_username',
- 'default': ''
- },
- {
- 'name': 'salt_api_password',
- 'default': ''
- }
- ]
-
-
- COMMANDS = [
- {
- "cmd": "deepsea config-set name=key,type=CephString "
- "name=value,type=CephString",
- "desc": "Set a configuration value",
- "perm": "rw"
- },
- {
- "cmd": "deepsea config-show",
- "desc": "Show current configuration",
- "perm": "r"
- }
- ]
-
-
- @property
- def config_keys(self):
- return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
-
-
- def get_module_option(self, key, default=None):
- """
- Overrides the default MgrModule get_module_option() method to pull in defaults
- specific to this module
- """
- return super(DeepSeaOrchestrator, self).get_module_option(key, default=self.config_keys[key])
-
-
- def _config_valid(self):
- for key in self.config_keys.keys():
- if not self.get_module_option(key, self.config_keys[key]):
- return False
- return True
-
-
- def __init__(self, *args, **kwargs):
- super(DeepSeaOrchestrator, self).__init__(*args, **kwargs)
- self._event = Event()
- self._token = None
- self._event_reader = None
- self._reading_events = False
- self._last_failure_msg = None
- self._all_completions = dict() # type: Dict[str, DeepSeaReadCompletion]
- self._completion_lock = Lock()
- self.inventory_cache = orchestrator.OutdatableDict()
- self.service_cache = orchestrator.OutdatableDict()
-
- def available(self):
- if not self._config_valid():
- return False, "Configuration invalid; try `ceph deepsea config-set [...]`"
-
- if not self._reading_events and self._last_failure_msg:
- return False, self._last_failure_msg
-
- return True, ""
-
- def get_inventory(self, node_filter=None, refresh=False):
- """
- Note that this will raise an exception (e.g. if the salt-api is down,
- or the username/password is incorret). Same for other methods.
- Callers should expect this and react appropriately. The orchestrator
- cli, for example, just prints the traceback in the console, so the
- user at least sees the error.
- """
- self.inventory_cache.remove_outdated()
- if not self.inventory_cache.any_outdated() and not refresh:
- if node_filter is None:
- return orchestrator.TrivialReadCompletion(
- orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items()))
- elif node_filter.labels is None:
- try:
- return orchestrator.TrivialReadCompletion(
- orchestrator.InventoryNode.from_nested_items(
- self.inventory_cache.items_filtered(node_filter.nodes)))
- except KeyError:
- # items_filtered() will raise KeyError if passed a node name that doesn't exist
- return orchestrator.TrivialReadCompletion([])
-
- def process_result(event_data):
- result = []
- if event_data['success']:
- for node_name, node_devs in event_data["return"].items():
- if node_filter is None:
- # The cache will only be populated when this function is invoked
- # without a node filter, i.e. if you run it once for the whole
- # cluster, you can then call it for individual nodes and return
- # cached data. However, if you only *ever* call it for individual
- # nodes, the cache will never be populated, and you'll always have
- # the full round trip to DeepSea.
- self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs)
- devs = inventory.Devices.from_json(node_devs)
- result.append(orchestrator.InventoryNode(node_name, devs))
- else:
- self.log.error(event_data['return'])
- return result
-
- with self._completion_lock:
- c = DeepSeaReadCompletion(on_complete=process_result)
-
- nodes = []
- roles = []
- if node_filter:
- nodes = node_filter.nodes
- roles = node_filter.labels
-
- resp = self._do_request_with_login("POST", data = {
- "client": "runner_async",
- "fun": "mgr_orch.get_inventory",
- "nodes": nodes,
- "roles": roles
- })
-
- # ['return'][0]['tag'] in the resonse JSON is what we need to match
- # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
- self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
-
- return c
-
- def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
-
- # Note: describe_service() does *not* support OSDs. This is because
- # DeepSea doesn't really record what OSDs are deployed where; Ceph is
- # considered the canonical source of this information, so having this
- # function query OSD information from DeepSea doesn't make a lot of
- # sense (DeepSea would have to call back into Ceph).
-
- assert service_type in ("mon", "mgr", "mds", "rgw", "nfs", "iscsi", None), service_type + " unsupported"
-
- def _deepsea_to_ceph(service):
- if service == "ganesha":
- return "nfs"
- elif service == "igw":
- return "iscsi"
- else:
- return service
-
- # presently unused
- def _ceph_to_deepsea(service):
- if service == "nfs":
- return "ganesha"
- elif service == "iscsi":
- return "igw"
- else:
- return service
-
- self.service_cache.remove_outdated()
- if not self.service_cache.any_outdated() and not refresh:
- # Let's hope the services are complete.
- try:
- node_filter = [node_name] if node_name else None
- services_by_node = [d[1].data for d in self.service_cache.items_filtered(node_filter)]
- services = [orchestrator.ServiceDescription.from_json(s) for services in services_by_node for s in services]
- services = [s for s in services if
- (True if service_type is None else s.service_type == service_type) and
- (True if service_id is None else s.service_instance == service_id)]
- return orchestrator.TrivialReadCompletion(services)
- except KeyError:
- # items_filtered() will raise KeyError if passed a node name that doesn't exist
- return orchestrator.TrivialReadCompletion([])
-
- def process_result(event_data):
- result = []
- if event_data['success']:
- for service_node, service_info in event_data["return"].items():
- node_service_cache = []
- for this_service_type, service_dict in service_info.items():
- if isinstance(service_dict, str):
- # map old form where deepsea only returned service IDs
- # to new form where it retuns a dict
- service_dict = { 'service_instance': service_dict }
- desc = orchestrator.ServiceDescription(nodename=service_node,
- service_instance=service_dict['service_instance'],
- service_type=_deepsea_to_ceph(this_service_type),
- # the following may or may not be present
- container_id=service_dict.get('container_id', None),
- service=service_dict.get('service', None),
- version=service_dict.get('version', None),
- rados_config_location=service_dict.get('rados_config_location', None),
- service_url = service_dict.get('service_url', None),
- status=service_dict.get('status', None),
- status_desc=service_dict.get('status_desc', None)
- )
- # Always add every service to the cache...
- node_service_cache.append(desc.to_json())
- # ...but only return the ones the caller asked for
- if ((service_type is None or desc.service_type == service_type) and
- (service_id is None or desc.service_instance == service_id) and
- (node_name is None or desc.nodename == node_name)):
- result.append(desc)
-
- self.service_cache[service_node] = orchestrator.OutdatableData(node_service_cache)
- else:
- self.log.error(event_data['return'])
- return result
-
- with self._completion_lock:
- c = DeepSeaReadCompletion(on_complete=process_result)
-
- # Always request all services, so we always have all services cached.
- resp = self._do_request_with_login("POST", data = {
- "client": "runner_async",
- "fun": "mgr_orch.describe_service"
- })
- self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
-
- return c
-
- def process(self, completions):
- """
- Does nothing, as completions are processed in another thread.
- """
-
- # If the job is still incomplete:
- # TODO: the job is in the bus, it should reach us eventually
- # unless something has gone wrong (e.g. salt-api died, etc.),
- # in which case it's possible the job finished but we never
- # noticed the salt/run/$id/ret event. Need to add the job ID
- # (or possibly the full event tag) to the completion object.
- # That way, if we want to double check on a job that hasn't
- # been completed yet, we can make a synchronous request to
- # salt-api to invoke jobs.lookup_jid, and if it's complete we
- # should be able to pass its return value to _process_result()
- # Question: do we do this automatically after some timeout?
- # Or do we add a function so the admin can check and "unstick"
- # a stuck completion?
-
-
- def handle_command(self, inbuf, cmd):
- if cmd['prefix'] == 'deepsea config-show':
- return 0, json.dumps(dict([(key, self.get_module_option(key)) for key in self.config_keys.keys()])), ''
-
- elif cmd['prefix'] == 'deepsea config-set':
- if cmd['key'] not in self.config_keys.keys():
- return (-errno.EINVAL, '',
- "Unknown configuration option '{0}'".format(cmd['key']))
-
- self.set_module_option(cmd['key'], cmd['value'])
- self._event.set()
- return 0, "Configuration option '{0}' updated".format(cmd['key']), ''
-
- return (-errno.EINVAL, '',
- "Command not found '{0}'".format(cmd['prefix']))
-
-
- def serve(self):
- self.log.info('DeepSea module starting up')
- self.run = True
- while self.run:
- if not self._config_valid():
- # This will spin until the config is valid, spitting a warning
- # that the config is invalid every 60 seconds. The one oddity
- # is that while setting the various parameters, this log warning
- # will print once for each parameter set until the config is valid.
- self.log.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
- self._event.wait(60)
- self._event.clear()
- continue
-
- if self._event_reader and not self._reading_events:
- self._event_reader = None
-
- if not self._event_reader:
- self._last_failure_msg = None
- try:
- # This spawns a separate thread to read the salt event bus
- # stream. We can't do it in the serve thead, because reading
- # from the response blocks, which would prevent the serve
- # thread from handling anything else.
- #
- # TODO: figure out how to restart the _event_reader thread if
- # config changes, e.g.: a new username or password is set.
- # This will be difficult, because _read_sse() just blocks waiting
- # for response lines. The closest I got was setting a read timeout
- # on the request, but in the general case (where not much is
- # happening most of the time), this will result in continual
- # timeouts and reconnects. We really need an asynchronous read
- # to support this.
- self._event_response = self._do_request_with_login("GET", "events", stream=True)
- self._event_reader = Thread(target=self._read_sse)
- self._reading_events = True
- self._event_reader.start()
- except Exception as ex:
- self._set_last_failure_msg("Failure setting up event reader: " + str(ex))
- # gives an (arbitrary) 60 second retry if we can't attach to
- # the salt-api event bus for some reason (e.g.: invalid username,
- # or password, which will be logged as "Request failed with status
- # code 401"). Note that this 60 second retry will also happen if
- # salt-api dies.
- self._event.wait(60)
- self._event.clear()
- continue
-
- # Wait indefinitely for something interesting to happen (e.g.
- # config-set, or shutdown), or the event reader to fail, which
- # will happen if the salt-api server dies or restarts).
- self._event.wait()
- self._event.clear()
-
-
- def shutdown(self):
- self.log.info('DeepSea module shutting down')
- self.run = False
- self._event.set()
-
-
- def _set_last_failure_msg(self, msg):
- self._last_failure_msg = msg
- self.log.warn(msg)
-
-
- # Reader/parser of SSE events, see:
- # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events)
- # - https://www.w3.org/TR/2009/WD-eventsource-20090421/
- # Note: this is pretty braindead and doesn't implement the full eventsource
- # spec, but it *does* implement enough for us to listen to events from salt
- # and potentially do something with them.
- def _read_sse(self):
- event = {}
- try:
- # Just starting the event reader; if we've made it here, we know we're
- # talking to salt-api (_do_request would have raised an exception if the
- # response wasn't ok), so check if there's any completions inflight that
- # need to be dealt with. This handles the case where some command was
- # invoked, then salt-api died somehow, and we reconneced, but missed the
- # completion at the time it actually happened.
- for tag in list(self._all_completions):
- self.log.info("Found event {} inflight".format(tag))
- try:
- resp = self._do_request_with_login("POST", data = {
- "client": "runner",
- "fun": "jobs.lookup_jid",
- "jid": tag.split('/')[2]
- })
- # jobs.lookup_jid returns a dict keyed by hostname.
- return_dict = resp.json()['return'][0]
- if return_dict:
- # If the job is complete, there'll be one item in the dict.
- self.log.info("Event {} complete".format(tag))
- # The key is the salt master hostname, but we don't care
- # about that, so just grab the data.
- data = next(iter(return_dict.items()))[1]
- self._all_completions[tag]._process_result(data)
- # TODO: decide whether it's bad to delete the completion
- # here -- would we ever need to resurrect it?
- del self._all_completions[tag]
- else:
- # if the job is not complete, there'll be nothing in the dict
- self.log.info("Event {} still pending".format(tag))
- except Exception as ex:
- # Logging a warning if the request failed, so we can continue
- # checking any other completions, then get onto reading events
- self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
- self._all_completions[tag].fail(ex)
-
- for line in self._event_response.iter_lines():
- with self._completion_lock:
- if line:
- line = line.decode('utf-8')
- colon = line.find(':')
- if colon > 0:
- k = line[:colon]
- v = line[colon+2:]
- if k == "retry":
- # TODO: find out if we need to obey this reconnection time
- self.log.warn("Server requested retry {}, ignored".format(v))
- else:
- event[k] = v
- else:
- # Empty line, terminates an event. Note that event['tag']
- # is a salt-api extension to SSE to avoid having to decode
- # json data if you don't care about it. To get to the
- # interesting stuff, you want event['data'], which is json.
- # If you want to have some fun, try
- # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
- # then `salt '*' test.ping` on the master
- self.log.debug("Got event '{}'".format(str(event)))
-
- # If we're actually interested in this event (i.e. it's
- # in our completion dict), fire off that completion's
- # _process_result() callback and remove it from our list.
- if event['tag'] in self._all_completions:
- self.log.info("Event {} complete".format(event['tag']))
- self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
- # TODO: decide whether it's bad to delete the completion
- # here -- would we ever need to resurrect it?
- del self._all_completions[event['tag']]
-
- event = {}
- self._set_last_failure_msg("SSE read terminated")
- except Exception as ex:
- self.log.exception(ex)
- self._set_last_failure_msg("SSE read failed: {}".format(str(ex)))
-
- self._reading_events = False
- self._event.set()
-
-
- # _do_request(), _login() and _do_request_with_login() are an extremely
- # minimalist form of the following, with notably terse error handling:
- # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default
- # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default
- # rationale:
- # - I needed slightly different behaviour than in openATTIC (I want the
- # caller to read the response, to allow streaming the salt-api event bus)
- # - I didn't want to pull in 400+ lines more code into this presently
- # experimental module, to save everyone having to review it ;-)
-
- def _do_request(self, method, path="", data=None, stream=False):
- """
- returns the response, which the caller then has to read
- """
- url = "{0}/{1}".format(self.get_module_option('salt_api_url'), path)
- try:
- if method.lower() == 'get':
- resp = requests.get(url, headers = { "X-Auth-Token": self._token },
- data=data, stream=stream)
- elif method.lower() == 'post':
- resp = requests.post(url, headers = { "X-Auth-Token": self._token },
- data=data)
-
- else:
- raise RequestException("Method '{}' not supported".format(method.upper()))
- if resp.ok:
- return resp
- else:
- msg = "Request failed with status code {}".format(resp.status_code)
- raise RequestException(msg, resp.status_code)
- except requests.exceptions.ConnectionError as ex:
- self.log.exception(str(ex))
- raise RequestException(str(ex))
- except requests.exceptions.InvalidURL as ex:
- self.log.exception(str(ex))
- raise RequestException(str(ex))
-
-
- def _login(self):
- resp = self._do_request('POST', 'login', data = {
- "eauth": self.get_module_option('salt_api_eauth'),
- "password": self.get_module_option('salt_api_password'),
- "username": self.get_module_option('salt_api_username')
- })
- self._token = resp.json()['return'][0]['token']
- self.log.info("Salt API login successful")
-
-
- def _do_request_with_login(self, method, path="", data=None, stream=False):
- retries = 2
- while True:
- try:
- if not self._token:
- self._login()
- return self._do_request(method, path, data, stream)
- except RequestException as ex:
- retries -= 1
- if ex.status_code not in [401, 403] or retries == 0:
- raise ex
- self._token = None