]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: adapt cephadm to osdspec preview caching
authorJoshua Schmid <jschmid@suse.de>
Fri, 15 May 2020 09:33:31 +0000 (11:33 +0200)
committerJoshua Schmid <jschmid@suse.de>
Fri, 15 May 2020 09:33:31 +0000 (11:33 +0200)
Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index ac63bb8ed4ff3d920378e0fd62587887e7a7c1d3..3ab5a0a83e65f9eb3e53370b6cde21304771956f 100644 (file)
@@ -446,6 +446,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     if r:
                         failures.append(r)
 
+                if self.cache.host_needs_osdspec_preview_refresh(host):
+                    self.log.debug(f"refreshing OSDSpec previews for {host}")
+                    r = self._refresh_host_osdspec_previews(host)
+                    if r:
+                        failures.append(r)
+
             health_changed = False
             if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
                 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
@@ -472,8 +478,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             if health_changed:
                 self.set_health_checks(self.health_checks)
 
-
-
             self._check_for_strays()
 
             if self.paused:
@@ -1042,6 +1046,12 @@ you may want to run:
         self.log.info('Removed label %s to host %s' % (label, host))
         return 'Removed label %s from host %s' % (label, host)
 
+    def _refresh_host_osdspec_previews(self, host) -> bool:
+        self.cache.update_osdspec_previews(host)
+        self.cache.save_host(host)
+        self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
+        return True
+
     def _refresh_host_daemons(self, host):
         try:
             out, err, code = self._run_cephadm(
@@ -1123,6 +1133,7 @@ you may want to run:
             host, len(devices), len(networks)))
         devices = inventory.Devices.from_json(devices)
         self.cache.update_host_devices_networks(host, devices.devices, networks)
+        self.cache.update_osdspec_previews(host)
         self.cache.save_host(host)
         return None
 
@@ -1315,6 +1326,7 @@ you may want to run:
     @trivial_completion
     def remove_service(self, service_name):
         self.log.info('Remove service %s' % service_name)
+        self._trigger_preview_refresh(service_name=service_name)
         found = self.spec_store.rm(service_name)
         if found:
             self._kick_serve_loop()
@@ -1402,18 +1414,70 @@ you may want to run:
                 r[str(osd_id)] = o.get('uuid', '')
         return r
 
+    def resolve_hosts_for_osdspecs(self,
+                                   specs: Optional[List[DriveGroupSpec]] = None,
+                                   service_name: Optional[str] = None
+                                   ) -> List[str]:
+        osdspecs = []
+        if service_name:
+            self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
+            osdspecs = self.spec_store.find(service_name=service_name)
+            osdspecs = [cast(DriveGroupSpec, spec) for spec in osdspecs]
+            self.log.debug(f"Found OSDSpecs: {osdspecs}")
+        if specs:
+            osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
+        if not service_name and not specs:
+            # if neither parameters are fulfilled, search for all available osdspecs
+            osdspecs = self.spec_store.find(service_name='osd')
+            self.log.debug(f"Found OSDSpecs: {osdspecs}")
+        if not osdspecs:
+            self.log.debug("No OSDSpecs found")
+            return []
+        return sum([spec.placement.pattern_matches_hosts(self.cache.get_hosts()) for spec in osdspecs], [])
+
+    def resolve_osdspecs_for_host(self, host):
+        matching_specs = []
+        self.log.debug(f"Finding OSDSpecs for host: <{host}>")
+        for spec in self.spec_store.find('osd'):
+            if host in spec.placement.pattern_matches_hosts(self.cache.get_hosts()):
+                self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
+                matching_specs.append(spec)
+        return matching_specs
+
+    def _trigger_preview_refresh(self,
+                                 specs: Optional[List[DriveGroupSpec]] = None,
+                                 service_name: Optional[str] = None):
+        refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+        for host in refresh_hosts:
+            self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
+            self.cache.osdspec_previews_refresh_queue.append(host)
+
     @trivial_completion
     def apply_drivegroups(self, specs: List[DriveGroupSpec]):
+        self._trigger_preview_refresh(specs=specs)
         return [self._apply(spec) for spec in specs]
 
     @trivial_completion
     def create_osds(self, drive_group: DriveGroupSpec):
         return self.osd_service.create_from_spec(drive_group)
 
-    # @trivial_completion
-    def preview_drivegroups(self, drive_group_name: Optional[str] = None,
-                            dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
-        return self.osd_service.preview_drivegroups(drive_group_name, dg_specs)
+    def preview_osdspecs(self,
+                         osdspec_name: Optional[str] = None,
+                         osdspecs: Optional[List[DriveGroupSpec]] = None
+                         ) -> Dict[str, List[Dict[str, Any]]]:
+        matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+        if not matching_hosts:
+            return {'n/a': [{'error': True,
+                             'message': 'No OSDSpec or matching hosts found.'}]}
+        # Is any host still loading previews
+        pending_flags = {f for (h, f) in self.cache.loading_osdspec_preview.items() if h in matching_hosts}
+        if any(pending_flags):
+            # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
+            return {'n/a': [{'error': True,
+                             'message': 'Preview data is being generated.. '
+                                        'Please try again in a bit.'}]}
+        # drop all keys that are not in search_hosts and return preview struct
+        return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
 
     def _calc_daemon_deps(self, daemon_type, daemon_id):
         need = {
index 50283e99a970506081d418cde1a6ec7b2520202a..7f90e6edb16b6cac3003f2b10bf44b20b6360d29 100644 (file)
@@ -93,8 +93,7 @@ class OSDService(CephadmService):
         else:
             return "Created no osd(s) on host %s; already created?" % host
 
-    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[
-        Tuple[str, DriveSelection]]:
+    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
         # 1) use fn_filter to determine matching_hosts
         matching_hosts = drive_group.placement.pattern_matches_hosts(
             [x for x in self.mgr.cache.get_hosts()])
@@ -130,40 +129,64 @@ class OSDService(CephadmService):
         logger.debug(f"Resulting ceph-volume cmd: {cmd}")
         return cmd
 
-    def preview_drivegroups(self, drive_group_name: Optional[str] = None,
-                            dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[
-        Dict[str, Dict[Any, Any]]]:
-        # find drivegroups
-        if drive_group_name:
-            drive_groups = cast(List[DriveGroupSpec],
-                                self.mgr.spec_store.find(service_name=drive_group_name))
-        elif dg_specs:
-            drive_groups = dg_specs
-        else:
-            drive_groups = []
-        ret_all = []
-        for drive_group in drive_groups:
-            drive_group.osd_id_claims = self.find_destroyed_osds()
-            logger.info(
-                f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+    def get_previews(self, host) -> List[Dict[str, Any]]:
+        # Find OSDSpecs that match host.
+        osdspecs = self.mgr.resolve_osdspecs_for_host(host)
+        return self.generate_previews(osdspecs)
+
+    def generate_previews(self, osdspecs: List[DriveGroupSpec]) -> List[Dict[str, Any]]:
+        """
+
+        The return should look like this:
+
+        [
+          {'data': {<metadata>},
+           'osdspec': <name of osdspec>,
+           'host': <name of host>
+           },
+
+           {'data': ...,
+            'osdspec': ..,
+            'host': ..
+           }
+        ]
+
+        Note: One host can have multiple previews based on its assigned OSDSpecs.
+        """
+        self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
+        ret_all: List[Dict[str, Any]] = []
+        if not osdspecs:
+            return ret_all
+        for osdspec in osdspecs:
+
+            # populate osd_id_claims
+            osdspec.osd_id_claims = self.find_destroyed_osds()
+
             # prepare driveselection
-            for host, ds in self.prepare_drivegroup(drive_group):
-                cmd = self.driveselection_to_ceph_volume(drive_group, ds,
-                                                         drive_group.osd_id_claims.get(host,
-                                                                                       []),
+            for host, ds in self.prepare_drivegroup(osdspec):
+
+                # driveselection for host
+                cmd = self.driveselection_to_ceph_volume(osdspec,
+                                                         ds,
+                                                         osdspec.osd_id_claims.get(host, []),
                                                          preview=True)
                 if not cmd:
                     logger.debug("No data_devices, skipping DriveGroup: {}".format(
-                        drive_group.service_name()))
+                        osdspec.service_name()))
                     continue
+
+                # get preview data from ceph-volume
                 out, err, code = self._run_ceph_volume_command(host, cmd)
                 if out:
-                    concat_out = json.loads(" ".join(out))
-                    ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id,
+                    concat_out: Dict[str, Any] = json.loads(" ".join(out))
+                    ret_all.append({'data': concat_out,
+                                    'osdspec': osdspec.service_id,
                                     'host': host})
         return ret_all
 
-    def _run_ceph_volume_command(self, host: str, cmd: str, env_vars: Optional[List[str]] = None) -> Tuple[List[str], List[str], int]:
+    def _run_ceph_volume_command(self, host: str,
+                                 cmd: str, env_vars: Optional[List[str]] = None
+                                 ) -> Tuple[List[str], List[str], int]:
         self.mgr.inventory.assert_host(host)
 
         # get bootstrap key
@@ -219,6 +242,8 @@ class OSDService(CephadmService):
                 osd_host_map.update(
                     {node.get('name'): [str(_id) for _id in node.get('children', list())]}
                 )
+        self.mgr.log.info(
+            f"Found osd claims -> {osd_host_map}")
         return osd_host_map
 
 
index ad79a8f28263d880b9a98ccb85331cad595544ea..8287fbe6b793ebb7373bbfae459ad7191e5ff1db 100644 (file)
@@ -313,22 +313,6 @@ class TestCephadm(object):
             out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
             assert out in exp_command
 
-    @mock.patch("cephadm.module.SpecStore.find")
-    @mock.patch("cephadm.services.osd.OSDService.prepare_drivegroup")
-    @mock.patch("cephadm.services.osd.OSDService.driveselection_to_ceph_volume")
-    @mock.patch("cephadm.services.osd.OSDService._run_ceph_volume_command")
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
-            _find_store.return_value = [dg]
-            _prepare_dg.return_value = [('host1', 'ds_dummy')]
-            _run_c_v_command.return_value = ("{}", '', 0)
-            cephadm_module.osd_service.preview_drivegroups(drive_group_name='foo')
-            _find_store.assert_called_once_with(service_name='foo')
-            _prepare_dg.assert_called_once_with(dg)
-            _run_c_v_command.assert_called_once()
-
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
         json.dumps([
             dict(