self.ack = 1
self.event = Event()
self.mgr_listener = MgrListener(self)
- self.ls_gatherer = LsGatherer(self)
- self.ls_event = Event()
- self.volume_gatherer = VolumeGatherer(self)
- self.volume_event = Event()
+ self.ls_gatherer = AgentGatherer(self, self._get_ls, 'Ls')
+ self.volume_gatherer = AgentGatherer(self, self._ceph_volume, 'Volume')
self.device_enhanced_scan = False
self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
self.recent_iteration_index: int = 0
self.cached_ls_values: Dict[str, Dict[str, str]] = {}
- self.ls: Optional[List[Dict[str, str]]] = None
- self.ls_ack = 0
- self.volume: str = ''
- self.volume_ack = 0
def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
if not config:
networks_list[key] = {k: list(v)}
data = json.dumps({'host': self.host,
- 'ls': self.ls if self.ack == self.ls_ack else [],
+ 'ls': (self.ls_gatherer.data if self.ack == self.ls_gatherer.ack
+ and self.ls_gatherer.data is not None else []),
'networks': networks_list,
'facts': HostFacts(self.ctx).dump(),
- 'volume': self.volume if self.ack == self.volume_ack else '',
+ 'volume': (self.volume_gatherer.data if self.ack == self.volume_gatherer.ack
+ and self.volume_gatherer.data is not None else ''),
'ack': str(ack),
'keyring': self.keyring,
'port': self.listener_port})
return ls
-class LsGatherer(Thread):
- def __init__(self, agent: 'CephadmAgent') -> None:
- self.agent = agent
- self.stop = False
- self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
- self.recent_iteration_index: int = 0
- super(LsGatherer, self).__init__(target=self.run)
-
- def run(self) -> None:
- while not self.stop:
- try:
- start_time = time.monotonic()
-
- ack = self.agent.ack
- self.agent.ls = self.agent._get_ls()
- if ack != self.agent.ls_ack:
- self.agent.ls_ack = ack
- self.agent.wakeup()
-
- end_time = time.monotonic()
- run_time = datetime.timedelta(seconds=(end_time - start_time))
- self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds()
- self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
- run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
-
- self.agent.ls_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
- self.agent.ls_event.clear()
- except Exception as e:
- logger.error(f'Ls Gatherer encountered exception: {e}')
-
- def shutdown(self) -> None:
- self.stop = True
-
- def wakeup(self) -> None:
- self.agent.ls_event.set()
-
-
-class VolumeGatherer(Thread):
- def __init__(self, agent: 'CephadmAgent') -> None:
+class AgentGatherer(Thread):
+ def __init__(self, agent: 'CephadmAgent', func: Callable, gatherer_type: str = 'Unnamed', initial_ack: int = 0) -> None:
self.agent = agent
+ self.func = func
+ self.gatherer_type = gatherer_type
+ self.ack = initial_ack
+ self.event = Event()
+ self.data: Any = None
self.stop = False
self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
self.recent_iteration_index: int = 0
- super(VolumeGatherer, self).__init__(target=self.run)
+ super(AgentGatherer, self).__init__(target=self.run)
def run(self) -> None:
while not self.stop:
ack = self.agent.ack
try:
- self.agent.volume = self.agent._ceph_volume(self.agent.device_enhanced_scan)
+ self.data = self.func()
except Exception as e:
- logger.error(f'Failed to get ceph-volume metadata: {e}')
- self.agent.volume = ''
- if ack != self.agent.volume_ack:
- self.agent.volume_ack = ack
+ logger.error(f'{self.gatherer_type} Gatherer encountered exception gathering data: {e}')
+ self.data = None
+ if ack != self.ack:
+ self.ack = ack
self.agent.wakeup()
end_time = time.monotonic()
self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
- self.agent.volume_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
- self.agent.volume_event.clear()
+ self.event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
+ self.event.clear()
except Exception as e:
- logger.error(f'Ls Gatherer encountered exception: {e}')
+ logger.error(f'{self.gatherer_type} Gatherer encountered exception: {e}')
def shutdown(self) -> None:
self.stop = True
def wakeup(self) -> None:
- self.agent.volume_event.set()
+ self.event.set()
def command_agent(ctx: CephadmContext) -> None: