]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: make agent gatherer classes generic
authorAdam King <adking@redhat.com>
Tue, 26 Oct 2021 13:08:14 +0000 (09:08 -0400)
committerAdam King <adking@redhat.com>
Tue, 26 Oct 2021 14:14:39 +0000 (10:14 -0400)
Signed-off-by: Adam King <adking@redhat.com>
src/cephadm/cephadm

index 1763d1e1b6383e6245c91ad8e283e5404150c85a..8bd9bd50961b319286246aaca6de4bb4df986792 100755 (executable)
@@ -3523,18 +3523,12 @@ class CephadmAgent():
         self.ack = 1
         self.event = Event()
         self.mgr_listener = MgrListener(self)
-        self.ls_gatherer = LsGatherer(self)
-        self.ls_event = Event()
-        self.volume_gatherer = VolumeGatherer(self)
-        self.volume_event = Event()
+        self.ls_gatherer = AgentGatherer(self, self._get_ls, 'Ls')
+        self.volume_gatherer = AgentGatherer(self, self._ceph_volume, 'Volume')
         self.device_enhanced_scan = False
         self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
         self.recent_iteration_index: int = 0
         self.cached_ls_values: Dict[str, Dict[str, str]] = {}
-        self.ls: Optional[List[Dict[str, str]]] = None
-        self.ls_ack = 0
-        self.volume: str = ''
-        self.volume_ack = 0
 
     def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
         if not config:
@@ -3663,10 +3657,12 @@ WantedBy=ceph-{fsid}.target
                     networks_list[key] = {k: list(v)}
 
             data = json.dumps({'host': self.host,
-                               'ls': self.ls if self.ack == self.ls_ack else [],
+                               'ls': (self.ls_gatherer.data if self.ack == self.ls_gatherer.ack
+                                      and self.ls_gatherer.data is not None else []),
                                'networks': networks_list,
                                'facts': HostFacts(self.ctx).dump(),
-                               'volume': self.volume if self.ack == self.volume_ack else '',
+                               'volume': (self.volume_gatherer.data if self.ack == self.volume_gatherer.ack
+                                          and self.volume_gatherer.data is not None else ''),
                                'ack': str(ack),
                                'keyring': self.keyring,
                                'port': self.listener_port})
@@ -3821,50 +3817,18 @@ WantedBy=ceph-{fsid}.target
         return ls
 
 
-class LsGatherer(Thread):
-    def __init__(self, agent: 'CephadmAgent') -> None:
-        self.agent = agent
-        self.stop = False
-        self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
-        self.recent_iteration_index: int = 0
-        super(LsGatherer, self).__init__(target=self.run)
-
-    def run(self) -> None:
-        while not self.stop:
-            try:
-                start_time = time.monotonic()
-
-                ack = self.agent.ack
-                self.agent.ls = self.agent._get_ls()
-                if ack != self.agent.ls_ack:
-                    self.agent.ls_ack = ack
-                    self.agent.wakeup()
-
-                end_time = time.monotonic()
-                run_time = datetime.timedelta(seconds=(end_time - start_time))
-                self.recent_iteration_run_times[self.recent_iteration_index] = run_time.total_seconds()
-                self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
-                run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
-
-                self.agent.ls_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
-                self.agent.ls_event.clear()
-            except Exception as e:
-                logger.error(f'Ls Gatherer encountered exception: {e}')
-
-    def shutdown(self) -> None:
-        self.stop = True
-
-    def wakeup(self) -> None:
-        self.agent.ls_event.set()
-
-
-class VolumeGatherer(Thread):
-    def __init__(self, agent: 'CephadmAgent') -> None:
+class AgentGatherer(Thread):
+    def __init__(self, agent: 'CephadmAgent', func: Callable, gatherer_type: str = 'Unnamed', initial_ack: int = 0) -> None:
         self.agent = agent
+        self.func = func
+        self.gatherer_type = gatherer_type
+        self.ack = initial_ack
+        self.event = Event()
+        self.data: Any = None
         self.stop = False
         self.recent_iteration_run_times: List[float] = [0.0, 0.0, 0.0]
         self.recent_iteration_index: int = 0
-        super(VolumeGatherer, self).__init__(target=self.run)
+        super(AgentGatherer, self).__init__(target=self.run)
 
     def run(self) -> None:
         while not self.stop:
@@ -3873,12 +3837,12 @@ class VolumeGatherer(Thread):
 
                 ack = self.agent.ack
                 try:
-                    self.agent.volume = self.agent._ceph_volume(self.agent.device_enhanced_scan)
+                    self.data = self.func()
                 except Exception as e:
-                    logger.error(f'Failed to get ceph-volume metadata: {e}')
-                    self.agent.volume = ''
-                if ack != self.agent.volume_ack:
-                    self.agent.volume_ack = ack
+                    logger.error(f'{self.gatherer_type} Gatherer encountered exception gathering data: {e}')
+                    self.data = None
+                if ack != self.ack:
+                    self.ack = ack
                     self.agent.wakeup()
 
                 end_time = time.monotonic()
@@ -3887,16 +3851,16 @@ class VolumeGatherer(Thread):
                 self.recent_iteration_index = (self.recent_iteration_index + 1) % 3
                 run_time_average = sum(self.recent_iteration_run_times, 0.0) / len([t for t in self.recent_iteration_run_times if t])
 
-                self.agent.volume_event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
-                self.agent.volume_event.clear()
+                self.event.wait(max(self.agent.loop_interval - int(run_time_average), 0))
+                self.event.clear()
             except Exception as e:
-                logger.error(f'Ls Gatherer encountered exception: {e}')
+                logger.error(f'{self.gatherer_type} Gatherer encountered exception: {e}')
 
     def shutdown(self) -> None:
         self.stop = True
 
     def wakeup(self) -> None:
-        self.agent.volume_event.set()
+        self.event.set()
 
 
 def command_agent(ctx: CephadmContext) -> None: