From: Adam King Date: Fri, 22 Oct 2021 19:48:30 +0000 (-0400) Subject: cephadm: agent: gather ls and volumes asynchronously X-Git-Tag: v17.1.0~494^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8e9c025b531b540aab5938757aab0215ca3c2407;p=ceph.git cephadm: agent: gather ls and volumes asynchronously This allows for the agent to always report in a consistent amount of time while delivering new ls or volume output when it's ready. It removes the risk of the agent getting marked as down because the ls + volume time takes too long. Signed-off-by: Adam King --- diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index 3276047c9ae..1763d1e1b63 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -3484,7 +3484,8 @@ class MgrListener(Thread): logger.error(err_str) else: conn.send(b'ACK') - self.agent.wakeup() + self.agent.ls_gatherer.wakeup() + self.agent.volume_gatherer.wakeup() logger.debug(f'Got mgr message {data}') except Exception as e: logger.error(f'Mgr Listener encountered exception: {e}') @@ -3522,10 +3523,18 @@ class CephadmAgent(): self.ack = 1 self.event = Event() self.mgr_listener = MgrListener(self) + self.ls_gatherer = LsGatherer(self) + self.ls_event = Event() + self.volume_gatherer = VolumeGatherer(self) + self.volume_event = Event() self.device_enhanced_scan = False self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] self.recent_iteration_index: int = 0 self.cached_ls_values: Dict[str, Dict[str, str]] = {} + self.ls: Optional[List[Dict[str, str]]] = None + self.ls_ack = 0 + self.volume: str = '' + self.volume_ack = 0 def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None: if not config: @@ -3630,6 +3639,12 @@ WantedBy=ceph-{fsid}.target if not self.mgr_listener.is_alive(): self.mgr_listener.start() + if not self.ls_gatherer.is_alive(): + self.ls_gatherer.start() + + if not self.volume_gatherer.is_alive(): + self.volume_gatherer.start() + ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = True ssl_ctx.verify_mode = ssl.CERT_REQUIRED @@ -3638,11 +3653,6 @@ WantedBy=ceph-{fsid}.target while not self.stop: start_time = time.monotonic() ack = self.ack - try: - volume = self._ceph_volume(self.device_enhanced_scan) - except Exception as e: - logger.error(f'Failed to get ceph-volume metadata: {e}') - volume = '' # part of the networks info is returned as a set which is not JSON # serializable. The set must be converted to a list @@ -3653,10 +3663,10 @@ WantedBy=ceph-{fsid}.target networks_list[key] = {k: list(v)} data = json.dumps({'host': self.host, - 'ls': self._get_ls(), + 'ls': self.ls if self.ack == self.ls_ack else [], 'networks': networks_list, 'facts': HostFacts(self.ctx).dump(), - 'volume': volume, + 'volume': self.volume if self.ack == self.volume_ack else '', 'ack': str(ack), 'keyring': self.keyring, 'port': self.listener_port}) @@ -3811,6 +3821,84 @@ WantedBy=ceph-{fsid}.target return ls +class LsGatherer(Thread): + def __init__(self, agent: 'CephadmAgent') -> None: + self.agent = agent + self.stop = False + self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] + self.recent_iteration_index: int = 0 + super(LsGatherer, self).__init__(target=self.run) + + def run(self) -> None: + while not self.stop: + try: + start_time = time.monotonic() + + ack = self.agent.ack + self.agent.ls = self.agent._get_ls() + if ack != self.agent.ls_ack: + self.agent.ls_ack = ack + self.agent.wakeup() + + end_time = time.monotonic() + run_time = datetime.timedelta(seconds=(end_time - start_time)) + self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds() + self.recent_iteration_index = (self.recent_iteration_index + 1) % 3 + run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t]) + + self.agent.ls_event.wait(max(self.agent.loop_interval - int(run_time_average), 0)) + self.agent.ls_event.clear() + except Exception as e: + logger.error(f'Ls Gatherer encountered exception: {e}') + + def shutdown(self) -> None: + self.stop = True + + def wakeup(self) -> None: + self.agent.ls_event.set() + + +class VolumeGatherer(Thread): + def __init__(self, agent: 'CephadmAgent') -> None: + self.agent = agent + self.stop = False + self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] + self.recent_iteration_index: int = 0 + super(VolumeGatherer, self).__init__(target=self.run) + + def run(self) -> None: + while not self.stop: + try: + start_time = time.monotonic() + + ack = self.agent.ack + try: + self.agent.volume = self.agent._ceph_volume(self.agent.device_enhanced_scan) + except Exception as e: + logger.error(f'Failed to get ceph-volume metadata: {e}') + self.agent.volume = '' + if ack != self.agent.volume_ack: + self.agent.volume_ack = ack + self.agent.wakeup() + + end_time = time.monotonic() + run_time = datetime.timedelta(seconds=(end_time - start_time)) + self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds() + self.recent_iteration_index = (self.recent_iteration_index + 1) % 3 + run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t]) + + self.agent.volume_event.wait(max(self.agent.loop_interval - int(run_time_average), 0)) + self.agent.volume_event.clear() + except Exception as e: + logger.error(f'Ls Gatherer encountered exception: {e}') + + def shutdown(self) -> None: + self.stop = True + + def wakeup(self) -> None: + self.agent.volume_event.set() + + def command_agent(ctx: CephadmContext) -> None: agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id) diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index 0c40846a9bb..12bd0bf6f4a 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -204,7 +204,7 @@ class HostData: f'Change detected in daemons in error state from {host} agent metadata. Kicking serve loop') self.mgr._kick_serve_loop() - if up_to_date: + if up_to_date and ('ls' in data and data['ls']): was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date() self.mgr.cache.metadata_up_to_date[host] = True if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date():