From 5c0afff778d277a03110d4eb3d9f04c258ac1224 Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Thu, 17 Sep 2020 15:56:22 +1200 Subject: [PATCH] cephadm: Misc updates to cephadm exporter mode The http endpoint is now versioned and provide multiple paths/routes. The health data is now extended to provide the daemons start time, and now exposes a list of errors which describe when the collectors failed to help debug. Signed-off-by: Paul Cuzner --- src/cephadm/cephadm | 134 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 107 insertions(+), 27 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index c32de65190e45..a329fee19d659 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -1425,7 +1425,9 @@ def find_program(filename): def get_unit_name(fsid, daemon_type, daemon_id=None): # type: (str, str, Optional[Union[int, str]]) -> str # accept either name or type + id - if daemon_id is not None: + if daemon_type == CephadmDaemon.daemon_type and daemon_id is not None: + return 'ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id) + elif daemon_id is not None: return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id) else: return 'ceph-%s@%s' % (fsid, daemon_type) @@ -1940,11 +1942,7 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, if not reconfig: if daemon_type == CephadmDaemon.daemon_type: - if ports: - port = ports[0] - else: - port = CephadmDaemon.default_port - ports = [port] + port = next(iter(ports), None) # get first tcp port provided or None cephadmd = CephadmDaemon(fsid, daemon_id, port) cephadmd.deploy_daemon_unit() else: @@ -3397,7 +3395,7 @@ def command_ceph_volume(): privileged=True, volume_mounts=mounts, ) - out, err, code = call_throws(c.run_cmd(), verbose=not args.no_log_output) + out, err, code = call_throws(c.run_cmd(), verbose=args.log_output) if not code: print(out) @@ -4080,11 +4078,7 @@ def command_rm_daemon(): l = FileLock(args.fsid) l.acquire() (daemon_type, daemon_id) = args.name.split('.', 1) - if daemon_type == CephadmDaemon.daemon_type: - unit_name = f'ceph-{args.fsid}-cephadm.{daemon_id}.service' - else: - unit_name = get_unit_name_by_daemon_name(args.fsid, args.name) - + unit_name = get_unit_name_by_daemon_name(args.fsid, args.name) if daemon_type in ['mon', 'osd'] and not args.force: raise Error('must pass --force to proceed: ' @@ -5264,6 +5258,57 @@ def command_gather_facts(): class CephadmDaemonHandler(BaseHTTPRequestHandler): + api_version = 'v1' + valid_routes = [ + f'/{api_version}/metadata', + f'/{api_version}/metadata/health', + f'/{api_version}/metadata/disks', + f'/{api_version}/metadata/daemons', + f'/{api_version}/metadata/host', + ] + + def _help_page(self): + return """ + +cephadm metadata exporter + + +

cephadm metadata exporter {api_version}

+ + + + + + + + + +
EndpointMethodsResponseDescription
{api_version}/metadataGETJSONReturn all metadata for the host
{api_version}/metadata/daemonsGETJSONReturn daemon and systemd states for ceph daemons
{api_version}/metadata/disksGETJSONshow disk inventory (ceph-volume)
{api_version}/metadata/healthGETJSONShow current health of the exporter sub-tasks
{api_version}/metadata/hostGETJSONShow host metadata
+ +""".format(api_version=CephadmDaemonHandler.api_version) + + def _fetch_root(self): + self.send_response(200) + self.send_header('Content-type', 'text/html; charset=utf-8') + self.end_headers() + self.wfile.write(self._help_page().encode('utf-8')) + def do_GET(self): """Handle *all* GET requests""" @@ -5271,13 +5316,38 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler): # data is read only and we offer no means of remotely invoking a cephadm # command - so auth is not required (we're basically in the same category # as the prometheus node-exporter) - self.send_response(200) - self.send_header('Content-type','application/json') - self.end_headers() - self.wfile.write(bytes(json.dumps(self.server.cephadm_cache), 'ascii')) + if self.path == '/': + # provide a html response if someone hits the root url, to document the + # available api endpoints + return self._fetch_root() + elif self.path in CephadmDaemonHandler.valid_routes: + u = self.path.split('/')[-1] + data = json.dumps({}) + if u == 'metadata': + data = json.dumps(self.server.cephadm_cache) + elif u == 'daemons': + data = json.dumps(self.server.cephadm_cache.get('daemons', {})) + elif u == 'disks': + data = json.dumps(self.server.cephadm_cache.get('disks', {})) + elif u == 'health': + data = json.dumps(self.server.cephadm_cache.get('health', {})) + elif u == 'host': + data = json.dumps(self.server.cephadm_cache.get('host', {})) + + self.send_response(200) + self.send_header('Content-type','application/json') + self.end_headers() + self.wfile.write(data.encode('utf-8')) + else: + # Invalid GET URL + bad_request_msg = "Valid URLs are: {}".format(', '.join(CephadmDaemonHandler.valid_routes)) + self.send_response(404, message=bad_request_msg) # reason + self.send_header('Content-type','application/json') + self.end_headers() + self.wfile.write(json.dumps({"message": bad_request_msg}).encode('utf-8')) def log_message(self, format, *args): - rqst = " ".join(args) + rqst = " ".join(str(a) for a in args) logger.info(f"client:{self.address_string()} [{self.log_date_time_string()}] {rqst}") @@ -5295,15 +5365,22 @@ class CephadmDaemon(): loop_delay = 1 thread_check_interval = 5 - def __init__(self, fsid, daemon_id, port=5003): + def __init__(self, fsid, daemon_id, port=None): self.fsid = fsid self.daemon_id = daemon_id - self.port = port + if not port: + self.port = CephadmDaemon.default_port + else: + self.port = port self.workers = [] self.http_server = None self.stop = False self.cephadm_cache = { - "health": {}, + "health": { + "started_epoch_secs": None, + "tasks": {}, + "errors": [], + }, "host": {}, "daemons": {}, "disks": {}, @@ -5320,12 +5397,10 @@ class CephadmDaemon(): @staticmethod def _unit_name(fsid, daemon_id): - return f"ceph-{fsid}-cephadm.{daemon_id}.service" + return "{}.service".format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id)) @property def unit_name(self): - # Our name here will cause a problem with the the old list_daemons code, - # since that only expects daemons to be containers and cephadmd is not! return CephadmDaemon._unit_name(self.fsid, self.daemon_id) @property @@ -5380,7 +5455,7 @@ class CephadmDaemon(): def _scrape_ceph_volume(self, refresh_interval=15): args.command = "inventory --format=json".split() args.fsid = self.fsid - args.no_log_output = True + args.log_output = False ctr = 0 while True: if self.stop: @@ -5459,7 +5534,7 @@ class CephadmDaemon(): t.daemon = True t.name = name with self.cephadm_cache_lock: - self.cephadm_cache['health'][name] = "active" + self.cephadm_cache['health']['tasks'][name] = "active" t.start() start_msg = f"Started {name} thread" @@ -5488,6 +5563,8 @@ class CephadmDaemon(): signal.signal(signal.SIGINT, self.shutdown) signal.signal(signal.SIGHUP, self.reload) logger.debug("Signal handlers attached") + with self.cephadm_cache_lock: + self.cephadm_cache['health']['started_epoch_secs'] = time.time() host_facts = self._create_thread(self._scrape_host_facts, 'host_facts', 5) self.workers.append(host_facts) @@ -5513,9 +5590,11 @@ class CephadmDaemon(): for worker in self.workers: if not worker.is_alive(): logger.warning(f"{worker.name} thread not running") + stop_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") with self.cephadm_cache_lock: # update health in the cache - self.cephadm_cache['health'][worker.name] = "inactive" + self.cephadm_cache['health']['tasks'][worker.name] = "inactive" + self.cephadm_cache['health']['errors'].append(f"{worker.name} stopped at {stop_time}") time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay @@ -5850,8 +5929,9 @@ def _get_parser(): '--keyring', '-k', help='ceph.keyring to pass through to the container') parser_ceph_volume.add_argument( - '--no-log-output', + '--log-output', action='store_true', + default=True, help='suppress ceph volume output from the log') parser_ceph_volume.add_argument( 'command', nargs=argparse.REMAINDER, -- 2.39.5