logger.error(err_str)
else:
conn.send(b'ACK')
- self.agent.wakeup()
+ self.agent.ls_gatherer.wakeup()
+ self.agent.volume_gatherer.wakeup()
logger.debug(f'Got mgr message {data}')
except Exception as e:
logger.error(f'Mgr Listener encountered exception: {e}')
self.ack = 1
self.event = Event()
self.mgr_listener = MgrListener(self)
+ self.ls_gatherer = LsGatherer(self)
+ self.ls_event = Event()
+ self.volume_gatherer = VolumeGatherer(self)
+ self.volume_event = Event()
self.device_enhanced_scan = False
self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
self.recent_iteration_index: int = 0
self.cached_ls_values: Dict[str, Dict[str, str]] = {}
+ self.ls: Optional[List[Dict[str, str]]] = None
+ self.ls_ack = 0
+ self.volume: str = ''
+ self.volume_ack = 0
def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
if not config:
if not self.mgr_listener.is_alive():
self.mgr_listener.start()
+ if not self.ls_gatherer.is_alive():
+ self.ls_gatherer.start()
+
+ if not self.volume_gatherer.is_alive():
+ self.volume_gatherer.start()
+
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = True
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
while not self.stop:
start_time = time.monotonic()
ack = self.ack
- try:
- volume = self._ceph_volume(self.device_enhanced_scan)
- except Exception as e:
- logger.error(f'Failed to get ceph-volume metadata: {e}')
- volume = ''
# part of the networks info is returned as a set which is not JSON
# serializable. The set must be converted to a list
networks_list[key] = {k: list(v)}
data = json.dumps({'host': self.host,
- 'ls': self._get_ls(),
+ 'ls': self.ls if self.ack == self.ls_ack else [],
'networks': networks_list,
'facts': HostFacts(self.ctx).dump(),
- 'volume': volume,
+ 'volume': self.volume if self.ack == self.volume_ack else '',
'ack': str(ack),
'keyring': self.keyring,
'port': self.listener_port})
return ls
+class LsGatherer(Thread):
+ def __init__(self, agent: 'CephadmAgent') -> None:
+ self.agent = agent
+ self.stop = False
+ self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
+ self.recent_iteration_index: int = 0
+ super(LsGatherer, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ while not self.stop:
+ try:
+ start_time = time.monotonic()
+
+ ack = self.agent.ack
+ self.agent.ls = self.agent._get_ls()
+ if ack != self.agent.ls_ack:
+ self.agent.ls_ack = ack
+ self.agent.wakeup()
+
+ end_time = time.monotonic()
+ run_time = datetime.timedelta(seconds=(end_time - start_time))
+ self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds()
+ self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
+ run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
+
+ self.agent.ls_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
+ self.agent.ls_event.clear()
+ except Exception as e:
+ logger.error(f'Ls Gatherer encountered exception: {e}')
+
+ def shutdown(self) -> None:
+ self.stop = True
+
+ def wakeup(self) -> None:
+ self.agent.ls_event.set()
+
+
+class VolumeGatherer(Thread):
+ def __init__(self, agent: 'CephadmAgent') -> None:
+ self.agent = agent
+ self.stop = False
+ self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
+ self.recent_iteration_index: int = 0
+ super(VolumeGatherer, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ while not self.stop:
+ try:
+ start_time = time.monotonic()
+
+ ack = self.agent.ack
+ try:
+ self.agent.volume = self.agent._ceph_volume(self.agent.device_enhanced_scan)
+ except Exception as e:
+ logger.error(f'Failed to get ceph-volume metadata: {e}')
+ self.agent.volume = ''
+ if ack != self.agent.volume_ack:
+ self.agent.volume_ack = ack
+ self.agent.wakeup()
+
+ end_time = time.monotonic()
+ run_time = datetime.timedelta(seconds=(end_time - start_time))
+ self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds()
+ self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
+ run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
+
+ self.agent.volume_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
+ self.agent.volume_event.clear()
+ except Exception as e:
+ logger.error(f'Ls Gatherer encountered exception: {e}')
+
+ def shutdown(self) -> None:
+ self.stop = True
+
+ def wakeup(self) -> None:
+ self.agent.volume_event.set()
+
+
def command_agent(ctx: CephadmContext) -> None:
agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)