From: Adam King Date: Tue, 26 Oct 2021 13:08:14 +0000 (-0400) Subject: cephadm: make agent gatherer classes generic X-Git-Tag: v17.1.0~494^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7e070cee3ecf0dc8d8c2f5e090b8bc927e02ddad;p=ceph.git cephadm: make agent gatherer classes generic Signed-off-by: Adam King --- diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index 1763d1e1b63..8bd9bd50961 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -3523,18 +3523,12 @@ class CephadmAgent(): self.ack = 1 self.event = Event() self.mgr_listener = MgrListener(self) - self.ls_gatherer = LsGatherer(self) - self.ls_event = Event() - self.volume_gatherer = VolumeGatherer(self) - self.volume_event = Event() + self.ls_gatherer = AgentGatherer(self, self._get_ls, 'Ls') + self.volume_gatherer = AgentGatherer(self, self._ceph_volume, 'Volume') self.device_enhanced_scan = False self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] self.recent_iteration_index: int = 0 self.cached_ls_values: Dict[str, Dict[str, str]] = {} - self.ls: Optional[List[Dict[str, str]]] = None - self.ls_ack = 0 - self.volume: str = '' - self.volume_ack = 0 def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None: if not config: @@ -3663,10 +3657,12 @@ WantedBy=ceph-{fsid}.target networks_list[key] = {k: list(v)} data = json.dumps({'host': self.host, - 'ls': self.ls if self.ack == self.ls_ack else [], + 'ls': (self.ls_gatherer.data if self.ack == self.ls_gatherer.ack + and self.ls_gatherer.data is not None else []), 'networks': networks_list, 'facts': HostFacts(self.ctx).dump(), - 'volume': self.volume if self.ack == self.volume_ack else '', + 'volume': (self.volume_gatherer.data if self.ack == self.volume_gatherer.ack + and self.volume_gatherer.data is not None else ''), 'ack': str(ack), 'keyring': self.keyring, 'port': self.listener_port}) @@ -3821,50 +3817,18 @@ WantedBy=ceph-{fsid}.target return ls -class LsGatherer(Thread): - def __init__(self, agent: 'CephadmAgent') -> None: - self.agent = agent - self.stop = False - self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] - self.recent_iteration_index: int = 0 - super(LsGatherer, self).__init__(target=self.run) - - def run(self) -> None: - while not self.stop: - try: - start_time = time.monotonic() - - ack = self.agent.ack - self.agent.ls = self.agent._get_ls() - if ack != self.agent.ls_ack: - self.agent.ls_ack = ack - self.agent.wakeup() - - end_time = time.monotonic() - run_time = datetime.timedelta(seconds=(end_time - start_time)) - self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds() - self.recent_iteration_index = (self.recent_iteration_index + 1) % 3 - run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t]) - - self.agent.ls_event.wait(max(self.agent.loop_interval - int(run_time_average), 0)) - self.agent.ls_event.clear() - except Exception as e: - logger.error(f'Ls Gatherer encountered exception: {e}') - - def shutdown(self) -> None: - self.stop = True - - def wakeup(self) -> None: - self.agent.ls_event.set() - - -class VolumeGatherer(Thread): - def __init__(self, agent: 'CephadmAgent') -> None: +class AgentGatherer(Thread): + def __init__(self, agent: 'CephadmAgent', func: Callable, gatherer_type: str = 'Unnamed', initial_ack: int = 0) -> None: self.agent = agent + self.func = func + self.gatherer_type = gatherer_type + self.ack = initial_ack + self.event = Event() + self.data: Any = None self.stop = False self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0] self.recent_iteration_index: int = 0 - super(VolumeGatherer, self).__init__(target=self.run) + super(AgentGatherer, self).__init__(target=self.run) def run(self) -> None: while not self.stop: @@ -3873,12 +3837,12 @@ class VolumeGatherer(Thread): ack = self.agent.ack try: - self.agent.volume = self.agent._ceph_volume(self.agent.device_enhanced_scan) + self.data = self.func() except Exception as e: - logger.error(f'Failed to get ceph-volume metadata: {e}') - self.agent.volume = '' - if ack != self.agent.volume_ack: - self.agent.volume_ack = ack + logger.error(f'{self.gatherer_type} Gatherer encountered exception gathering data: {e}') + self.data = None + if ack != self.ack: + self.ack = ack self.agent.wakeup() end_time = time.monotonic() @@ -3887,16 +3851,16 @@ class VolumeGatherer(Thread): self.recent_iteration_index = (self.recent_iteration_index + 1) % 3 run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t]) - self.agent.volume_event.wait(max(self.agent.loop_interval - int(run_time_average), 0)) - self.agent.volume_event.clear() + self.event.wait(max(self.agent.loop_interval - int(run_time_average), 0)) + self.event.clear() except Exception as e: - logger.error(f'Ls Gatherer encountered exception: {e}') + logger.error(f'{self.gatherer_type} Gatherer encountered exception: {e}') def shutdown(self) -> None: self.stop = True def wakeup(self) -> None: - self.agent.volume_event.set() + self.event.set() def command_agent(ctx: CephadmContext) -> None: