]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: agent: gather ls and volumes asynchronously
authorAdam King <adking@redhat.com>
Fri, 22 Oct 2021 19:48:30 +0000 (15:48 -0400)
committerAdam King <adking@redhat.com>
Fri, 22 Oct 2021 19:48:30 +0000 (15:48 -0400)
This allows for the agent to always report in a consistent
amount of time while delivering new ls or volume output when
it's ready. It removes the risk of the agent getting marked
as down because the ls + volume time takes too long.

Signed-off-by: Adam King <adking@redhat.com>
src/cephadm/cephadm
src/pybind/mgr/cephadm/agent.py

index 3276047c9ae841e6127fe4409f9758e5c4234cf4..1763d1e1b6383e6245c91ad8e283e5404150c85a 100755 (executable)
@@ -3484,7 +3484,8 @@ class MgrListener(Thread):
                         logger.error(err_str)
                     else:
                         conn.send(b'ACK')
-                        self.agent.wakeup()
+                        self.agent.ls_gatherer.wakeup()
+                        self.agent.volume_gatherer.wakeup()
                         logger.debug(f'Got mgr message {data}')
             except Exception as e:
                 logger.error(f'Mgr Listener encountered exception: {e}')
@@ -3522,10 +3523,18 @@ class CephadmAgent():
         self.ack = 1
         self.event = Event()
         self.mgr_listener = MgrListener(self)
+        self.ls_gatherer = LsGatherer(self)
+        self.ls_event = Event()
+        self.volume_gatherer = VolumeGatherer(self)
+        self.volume_event = Event()
         self.device_enhanced_scan = False
         self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
         self.recent_iteration_index: int = 0
         self.cached_ls_values: Dict[str, Dict[str, str]] = {}
+        self.ls: Optional[List[Dict[str, str]]] = None
+        self.ls_ack = 0
+        self.volume: str = ''
+        self.volume_ack = 0
 
     def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
         if not config:
@@ -3630,6 +3639,12 @@ WantedBy=ceph-{fsid}.target
         if not self.mgr_listener.is_alive():
             self.mgr_listener.start()
 
+        if not self.ls_gatherer.is_alive():
+            self.ls_gatherer.start()
+
+        if not self.volume_gatherer.is_alive():
+            self.volume_gatherer.start()
+
         ssl_ctx = ssl.create_default_context()
         ssl_ctx.check_hostname = True
         ssl_ctx.verify_mode = ssl.CERT_REQUIRED
@@ -3638,11 +3653,6 @@ WantedBy=ceph-{fsid}.target
         while not self.stop:
             start_time = time.monotonic()
             ack = self.ack
-            try:
-                volume = self._ceph_volume(self.device_enhanced_scan)
-            except Exception as e:
-                logger.error(f'Failed to get ceph-volume metadata: {e}')
-                volume = ''
 
             # part of the networks info is returned as a set which is not JSON
             # serializable. The set must be converted to a list
@@ -3653,10 +3663,10 @@ WantedBy=ceph-{fsid}.target
                     networks_list[key] = {k: list(v)}
 
             data = json.dumps({'host': self.host,
-                              'ls': self._get_ls(),
+                               'ls': self.ls if self.ack == self.ls_ack else [],
                                'networks': networks_list,
                                'facts': HostFacts(self.ctx).dump(),
-                               'volume': volume,
+                               'volume': self.volume if self.ack == self.volume_ack else '',
                                'ack': str(ack),
                                'keyring': self.keyring,
                                'port': self.listener_port})
@@ -3811,6 +3821,84 @@ WantedBy=ceph-{fsid}.target
         return ls
 
 
+class LsGatherer(Thread):
+    def __init__(self, agent: 'CephadmAgent') -> None:
+        self.agent = agent
+        self.stop = False
+        self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
+        self.recent_iteration_index: int = 0
+        super(LsGatherer, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        while not self.stop:
+            try:
+                start_time = time.monotonic()
+
+                ack = self.agent.ack
+                self.agent.ls = self.agent._get_ls()
+                if ack != self.agent.ls_ack:
+                    self.agent.ls_ack = ack
+                    self.agent.wakeup()
+
+                end_time = time.monotonic()
+                run_time = datetime.timedelta(seconds=(end_time - start_time))
+                self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds()
+                self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
+                run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
+
+                self.agent.ls_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
+                self.agent.ls_event.clear()
+            except Exception as e:
+                logger.error(f'Ls Gatherer encountered exception: {e}')
+
+    def shutdown(self) -> None:
+        self.stop = True
+
+    def wakeup(self) -> None:
+        self.agent.ls_event.set()
+
+
+class VolumeGatherer(Thread):
+    def __init__(self, agent: 'CephadmAgent') -> None:
+        self.agent = agent
+        self.stop = False
+        self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
+        self.recent_iteration_index: int = 0
+        super(VolumeGatherer, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        while not self.stop:
+            try:
+                start_time = time.monotonic()
+
+                ack = self.agent.ack
+                try:
+                    self.agent.volume = self.agent._ceph_volume(self.agent.device_enhanced_scan)
+                except Exception as e:
+                    logger.error(f'Failed to get ceph-volume metadata: {e}')
+                    self.agent.volume = ''
+                if ack != self.agent.volume_ack:
+                    self.agent.volume_ack = ack
+                    self.agent.wakeup()
+
+                end_time = time.monotonic()
+                run_time = datetime.timedelta(seconds=(end_time - start_time))
+                self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds()
+                self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
+                run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
+
+                self.agent.volume_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
+                self.agent.volume_event.clear()
+            except Exception as e:
+                logger.error(f'Ls Gatherer encountered exception: {e}')
+
+    def shutdown(self) -> None:
+        self.stop = True
+
+    def wakeup(self) -> None:
+        self.agent.volume_event.set()
+
+
 def command_agent(ctx: CephadmContext) -> None:
     agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)
 
index 0c40846a9bb3434bc4763567fc738f36bbcd7a4a..12bd0bf6f4aa380fb55eb7378a9f69beeccc6fce 100644 (file)
@@ -204,7 +204,7 @@ class HostData:
                     f'Change detected in daemons in error state from {host} agent metadata. Kicking serve loop')
                 self.mgr._kick_serve_loop()
 
-            if up_to_date:
+            if up_to_date and ('ls' in data and data['ls']):
                 was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date()
                 self.mgr.cache.metadata_up_to_date[host] = True
                 if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date():