From 4b8be8d4278dda4e5555a1ccefd09f3b0aea37af Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Mon, 21 Sep 2020 12:21:42 +1200 Subject: [PATCH] cephadm: Updates for tox / mypy compliance Now using an object for the cephadm cache, with all locking and updates handled within the cache instance. In addition, tasks names now match the URL endpoints for consistency Signed-off-by: Paul Cuzner --- src/cephadm/cephadm | 134 ++++++++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 47 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index b9b08b0d8124b..4b8c447f55b86 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -1889,7 +1889,7 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, osd_fsid=None, reconfig=False, ports=None): - # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None + # type: (str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None ports = ports or [] if any([port_in_use(port) for port in ports]): @@ -1942,12 +1942,15 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, if not reconfig: if daemon_type == CephadmDaemon.daemon_type: - port = next(iter(ports), None) # get first tcp port provided or None + port = next(iter(ports), None) # get first tcp port provided or None cephadmd = CephadmDaemon(fsid, daemon_id, port) cephadmd.deploy_daemon_unit() else: - deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, - osd_fsid=osd_fsid) + if c: + deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, + osd_fsid=osd_fsid) + else: + raise RuntimeError("attempting to deploy a daemon without a container image") if not os.path.exists(data_dir + '/unit.created'): with open(data_dir + '/unit.created', 'w') as f: @@ -5257,7 +5260,59 @@ def command_gather_facts(): ################################## +class CephadmCache: + task_types = ['disks', 'daemons', 'host', 'http_server'] + + def __init__(self): + self.started_epoch_secs = time.time() + self.tasks = { + "daemons": "inactive", + "disks": "inactive", + "host": "inactive", + "http_server": "inactive", + } + self.errors = [] + self.disks = {} + self.daemons = {} + self.host = {} + self.lock = RLock() + + @property + def health(self): + return { + "started_epoch_secs": self.started_epoch_secs, + "tasks": self.tasks, + "errors": self.errors, + } + + def to_json(self): + return { + "health": self.health, + "host": self.host, + "daemons": self.daemons, + "disks": self.disks, + } + + def update_health(self, task_type, task_status, error_msg=None): + assert task_type in CephadmCache.task_types + with self.lock: + self.tasks[task_type] = task_status + if error_msg: + self.errors.append(error_msg) + + def update_task(self, task_type, content): + assert task_type in CephadmCache.task_types + with self.lock: + setattr(self, task_type, content) + + +class CephadmHTTPServer(ThreadingMixIn, HTTPServer): + allow_reuse_address = True + daemon_threads = True + cephadm_cache: CephadmCache + class CephadmDaemonHandler(BaseHTTPRequestHandler): + server: CephadmHTTPServer api_version = 'v1' valid_routes = [ f'/{api_version}/metadata', @@ -5295,10 +5350,10 @@ td,th {{ EndpointMethodsResponseDescription {api_version}/metadataGETJSONReturn all metadata for the host - {api_version}/metadata/daemonsGETJSONReturn daemon and systemd states for ceph daemons + {api_version}/metadata/daemonsGETJSONReturn daemon and systemd states for ceph daemons (ls) {api_version}/metadata/disksGETJSONshow disk inventory (ceph-volume) {api_version}/metadata/healthGETJSONShow current health of the exporter sub-tasks - {api_version}/metadata/hostGETJSONShow host metadata + {api_version}/metadata/hostGETJSONShow host metadata (gather-facts) """.format(api_version=CephadmDaemonHandler.api_version) @@ -5324,15 +5379,15 @@ td,th {{ u = self.path.split('/')[-1] data = json.dumps({}) if u == 'metadata': - data = json.dumps(self.server.cephadm_cache) + data = json.dumps(self.server.cephadm_cache.to_json()) elif u == 'daemons': - data = json.dumps(self.server.cephadm_cache.get('daemons', {})) + data = json.dumps(self.server.cephadm_cache.daemons) elif u == 'disks': - data = json.dumps(self.server.cephadm_cache.get('disks', {})) + data = json.dumps(self.server.cephadm_cache.disks) elif u == 'health': - data = json.dumps(self.server.cephadm_cache.get('health', {})) + data = json.dumps(self.server.cephadm_cache.health) elif u == 'host': - data = json.dumps(self.server.cephadm_cache.get('host', {})) + data = json.dumps(self.server.cephadm_cache.host) self.send_response(200) self.send_header('Content-type','application/json') @@ -5351,12 +5406,6 @@ td,th {{ logger.info(f"client:{self.address_string()} [{self.log_date_time_string()}] {rqst}") -class CephadmHTTPServer(ThreadingMixIn, HTTPServer): - allow_reuse_address = True - daemon_threads = True - cephadm_cache = None - - class CephadmDaemon(): daemon_type = "cephadm" @@ -5373,19 +5422,9 @@ class CephadmDaemon(): else: self.port = port self.workers = [] - self.http_server = None + self.http_server: CephadmHTTPServer self.stop = False - self.cephadm_cache = { - "health": { - "started_epoch_secs": None, - "tasks": {}, - "errors": [], - }, - "host": {}, - "daemons": {}, - "disks": {}, - } - self.cephadm_cache_lock = RLock() + self.cephadm_cache = CephadmCache() @property def port_active(self): @@ -5439,13 +5478,15 @@ class CephadmDaemon(): errors.append("host-facts provided invalid JSON") logger.warning(errors[-1]) data = {} - with self.cephadm_cache_lock: - self.cephadm_cache['host'] = { + self.cephadm_cache.update_task( + 'host', + { "scrape_timestamp": s_time, "scrape_duration_secs": elapsed, "scrape_errors": errors, "data": data, } + ) logger.debug(f"completed host-facts scrape - {elapsed}s") time.sleep(CephadmDaemon.loop_delay) @@ -5486,13 +5527,15 @@ class CephadmDaemon(): errors.append("ceph-volume didn't return any data") logger.warning(errors[-1]) - with self.cephadm_cache_lock: - self.cephadm_cache['disks'] = { + self.cephadm_cache.update_task( + 'disks', + { "scrape_timestamp": s_time, "scrape_duration_secs": elapsed, "scrape_errors": errors, "data": data, } + ) logger.debug(f"completed ceph-volume scrape - {elapsed}s") time.sleep(CephadmDaemon.loop_delay) @@ -5519,13 +5562,15 @@ class CephadmDaemon(): logger.warning(errors[-1]) data = [] elapsed = time.time() - s_time - with self.cephadm_cache_lock: - self.cephadm_cache['daemons'] = { + self.cephadm_cache.update_task( + 'daemons', + { "scrape_timestamp": s_time, "scrape_duration_secs": elapsed, "scrape_errors": errors, "data": data, } + ) logger.debug(f"completed list-daemons scrape - {elapsed}s") time.sleep(CephadmDaemon.loop_delay) @@ -5539,8 +5584,7 @@ class CephadmDaemon(): t = Thread(target=target) t.daemon = True t.name = name - with self.cephadm_cache_lock: - self.cephadm_cache['health']['tasks'][name] = "active" + self.cephadm_cache.update_health(name, "active") t.start() start_msg = f"Started {name} thread" @@ -5569,16 +5613,14 @@ class CephadmDaemon(): signal.signal(signal.SIGINT, self.shutdown) signal.signal(signal.SIGHUP, self.reload) logger.debug("Signal handlers attached") - with self.cephadm_cache_lock: - self.cephadm_cache['health']['started_epoch_secs'] = time.time() - host_facts = self._create_thread(self._scrape_host_facts, 'host_facts', 5) + host_facts = self._create_thread(self._scrape_host_facts, 'host', 5) self.workers.append(host_facts) - daemons = self._create_thread(self._scrape_list_daemons, 'list_daemons', 20) + daemons = self._create_thread(self._scrape_list_daemons, 'daemons', 20) self.workers.append(daemons) - disks = self._create_thread(self._scrape_ceph_volume, 'ceph_volume', 20) + disks = self._create_thread(self._scrape_ceph_volume, 'disks', 20) self.workers.append(disks) self.http_server = CephadmHTTPServer(('0.0.0.0', self.port), CephadmDaemonHandler) # IPv4 only @@ -5594,15 +5636,13 @@ class CephadmDaemon(): if ctr >= CephadmDaemon.thread_check_interval: ctr = 0 for worker in self.workers: - if self.cephadm_cache['health']['tasks'][worker.name] == 'inactive': + if self.cephadm_cache.tasks[worker.name] == 'inactive': continue if not worker.is_alive(): logger.warning(f"{worker.name} thread not running") stop_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") - with self.cephadm_cache_lock: - # update health status in the cache - self.cephadm_cache['health']['tasks'][worker.name] = "inactive" - self.cephadm_cache['health']['errors'].append(f"{worker.name} stopped at {stop_time}") + error_msg = f"{worker.name} stopped at {stop_time}" + self.cephadm_cache.update_health(worker.name, "inactive", f"{worker.name} stopped at {stop_time}") time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay -- 2.39.5