]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/ssh: Add SimpleScheduler and streamline arg passing
authorJoshua Schmid <jschmid@suse.de>
Wed, 4 Dec 2019 12:42:30 +0000 (13:42 +0100)
committerJoshua Schmid <jschmid@suse.de>
Thu, 12 Dec 2019 09:06:03 +0000 (10:06 +0100)
* adding a mechanism to augment specs.placement.nodes
  even when no nodes are provided via dynamic determination
  from host_pools. (NodeAssignment)

* added SimpleScheduler
  implements most simple placment algorithm possible.
  Choose from a shuffled list of hosts.

* streamlined args passed to the public update_$service
  functions. They all take `spec` now.

Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py

index f33a74169d57736659a2b04edb1b33223943b3a5..43dae0f7b6557efcaabdedd0b5eb87b67b8ad38d 100644 (file)
@@ -21,7 +21,7 @@ import subprocess
 from ceph.deployment import inventory
 from mgr_module import MgrModule
 import orchestrator
-from orchestrator import OrchestratorError
+from orchestrator import OrchestratorError, HostSpec
 
 from . import remotes
 
@@ -691,39 +691,29 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         """
         return [orchestrator.InventoryNode(host_name) for host_name in self.inventory_cache]
 
-    """
+    @async_completion
     def add_host_label(self, host, label):
         if host not in self.inventory:
             raise OrchestratorError('host %s does not exist' % host)
 
-        @log_exceptions
-        def run(host, label):
-            if 'labels' not in self.inventory[host]:
-                self.inventory[host]['labels'] = list()
-            if label not in self.inventory[host]['labels']:
-                self.inventory[host]['labels'].append(label)
-            self._save_inventory()
-            return 'Added label %s to host %s' % (label, host)
-
-        return SSHWriteCompletion(
-            self._worker_pool.apply_async(run, (host, label)))
+        if 'labels' not in self.inventory[host]:
+            self.inventory[host]['labels'] = list()
+        if label not in self.inventory[host]['labels']:
+            self.inventory[host]['labels'].append(label)
+        self._save_inventory()
+        return 'Added label %s to host %s' % (label, host)
 
+    @async_completion
     def remove_host_label(self, host, label):
         if host not in self.inventory:
             raise OrchestratorError('host %s does not exist' % host)
 
-        @log_exceptions
-        def run(host, label):
-            if 'labels' not in self.inventory[host]:
-                self.inventory[host]['labels'] = list()
-            if label in self.inventory[host]['labels']:
-                self.inventory[host]['labels'].remove(label)
-            self._save_inventory()
-            return 'Removed label %s to host %s' % (label, host)
-
-        return SSHWriteCompletion(
-            self._worker_pool.apply_async(run, (host, label)))
-    """
+        if 'labels' not in self.inventory[host]:
+            self.inventory[host]['labels'] = list()
+        if label in self.inventory[host]['labels']:
+            self.inventory[host]['labels'].remove(label)
+        self._save_inventory()
+        return 'Removed label %s from host %s' % (label, host)
 
     @async_map_completion
     def _refresh_host_services(self, host):
@@ -1130,45 +1120,50 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._create_daemon('mon', name or host, host, keyring,
                                    extra_args=extra_args)
 
-    def update_mons(self, num, host_specs):
+    def update_mons(self, spec):
         # type: (int, List[orchestrator.HostSpec]) -> orchestrator.Completion
+        """
+        Adjust the number of cluster managers.
+        """
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        return self._update_mons(spec)
+
+    def _update_mons(self, spec):
         """
         Adjust the number of cluster monitors.
         """
         # current support limited to adding monitors.
         mon_map = self.get("mon_map")
         num_mons = len(mon_map["mons"])
-        if num == num_mons:
+        if spec.count == num_mons:
             return orchestrator.Completion(value="The requested number of monitors exist.")
-        if num < num_mons:
+        if spec.count < num_mons:
             raise NotImplementedError("Removing monitors is not supported.")
 
-        self.log.debug("Trying to update monitors on: {}".format(host_specs))
+        self.log.debug("Trying to update monitors on: {}".format(spec.placement.nodes))
         # check that all the hosts are registered
-        [self._require_hosts(host.hostname) for host in host_specs]
+        [self._require_hosts(host.hostname) for host in spec.placement.nodes]
 
         # current support requires a network to be specified
-        for host, network, _ in host_specs:
+        for host, network, _ in spec.placement.nodes:
             if not network:
                 raise RuntimeError("Host '{}' is missing a network spec".format(host))
 
         def update_mons_with_daemons(daemons):
-            for _, _, name in host_specs:
+            for _, _, name in spec.placement.nodes:
                 if name and len([d for d in daemons if d.service_instance == name]):
                     raise RuntimeError('name %s alrady exists', name)
 
             # explicit placement: enough hosts provided?
-            num_new_mons = num - num_mons
-            if len(host_specs) < num_new_mons:
+            num_new_mons = spec.count - num_mons
+            if len(spec.placement.nodes) < num_new_mons:
                 raise RuntimeError("Error: {} hosts provided, expected {}".format(
-                    len(host_specs), num_new_mons))
-
+                    len(spec.placement.nodes), num_new_mons))
             self.log.info("creating {} monitors on hosts: '{}'".format(
-                num_new_mons, ",".join(map(lambda h: ":".join(h), host_specs))))
-
+                num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.nodes))))
             # TODO: we may want to chain the creation of the monitors so they join
             # the quorum one at a time.
-            return self._create_mon(host_specs)
+            return self._create_mon(spec.placement.nodes)
         return self._get_services('mon').then(update_mons_with_daemons)
 
     @async_map_completion
@@ -1189,24 +1184,24 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return self._create_daemon('mgr', name, host, keyring)
 
-    def update_mgrs(self, num, host_specs):
+    def update_mgrs(self, spec):
         """
         Adjust the number of cluster managers.
         """
-        return self._get_services('mgr').then(lambda daemons: self._update_mgrs(num, host_specs, daemons))
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
 
-    def _update_mgrs(self, num, host_specs, daemons):
+    def _update_mgrs(self, spec, daemons):
         num_mgrs = len(daemons)
-        if num == num_mgrs:
+        if spec.count == num_mgrs:
             return orchestrator.Completion(value="The requested number of managers exist.")
 
-        self.log.debug("Trying to update managers on: {}".format(host_specs))
+        self.log.debug("Trying to update managers on: {}".format(spec.placement.nodes))
         # check that all the hosts are registered
-        [self._require_hosts(host.hostname) for host in host_specs]
+        [self._require_hosts(host.hostname) for host in spec.placement.nodes]
 
-        results = []
-        if num < num_mgrs:
-            num_to_remove = num_mgrs - num
+        if spec.count < num_mgrs:
+            num_to_remove = num_mgrs - spec.count
 
             # first try to remove unconnected mgr daemons that the
             # cluster doesn't see
@@ -1220,7 +1215,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
             for d in daemons:
                 if d.service_instance not in connected:
                     to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance),
-                         d.nodename))
+                                             d.nodename))
                     num_to_remove -= 1
                     if num_to_remove == 0:
                         break
@@ -1237,33 +1232,33 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         else:
             # we assume explicit placement by which there are the same number of
             # hosts specified as the size of increase in number of daemons.
-            num_new_mgrs = num - num_mgrs
-            if len(host_specs) < num_new_mgrs:
+            num_new_mgrs = spec.placement.count - num_mgrs
+            if len(spec.placement.nodes) < num_new_mgrs:
                 raise RuntimeError(
                     "Error: {} hosts provided, expected {}".format(
-                        len(host_specs), num_new_mgrs))
+                        len(spec.placement.nodes), num_new_mgrs))
 
-            for host_spec in host_specs:
+            for host_spec in spec.placement.nodes:
                 if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
                     raise RuntimeError('name %s alrady exists', host_spec.name)
 
-            for host_spec in host_specs:
+            for host_spec in spec.placement.nodes:
                 if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
                     raise RuntimeError('name %s alrady exists', host_spec.name)
 
             self.log.info("creating {} managers on hosts: '{}'".format(
-                num_new_mgrs, ",".join([spec.hostname for spec in host_specs])))
+                num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.nodes])))
 
             args = []
-            for host_spec in host_specs:
+            for host_spec in spec.placement.nodes:
                 name = host_spec.name or self.get_unique_name(daemons)
                 host = host_spec.hostname
                 args.append((host, name))
         return self._create_mgr(args)
 
     def add_mds(self, spec):
-        if not spec.placement.nodes or len(spec.placement.nodes) < spec.count:
-            raise RuntimeError("must specify at least %d hosts" % spec.count)
+        if not spec.placement.nodes or len(spec.placement.nodes) < spec.placement.count:
+            raise RuntimeError("must specify at least %d hosts" % spec.placement.count)
         return self._get_services('mds').then(lambda ds: self._add_mds(ds, spec))
 
     def _add_mds(self, daemons, spec):
@@ -1285,6 +1280,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._create_mds(args)
 
     def update_mds(self, spec):
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
         return self._update_service('mds', self.add_mds, spec)
 
     @async_map_completion
@@ -1301,6 +1297,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
 
     def remove_mds(self, name):
         self.log.debug("Attempting to remove volume: {}".format(name))
+
         def _remove_mds(daemons):
             args = []
             for d in daemons:
@@ -1368,7 +1365,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
             for d in daemons:
                 if d.service_instance == name or d.service_instance.startswith(name + '.'):
                     args.append(('%s.%s' % (d.service_type, d.service_instance),
-                         d.nodename))
+                                d.nodename))
             if args:
                 return self._remove_daemon(args)
             raise RuntimeError('Unable to find rgw.%s[-*] daemon(s)' % name)
@@ -1376,6 +1373,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._get_services('rgw').then(_remove_rgw)
 
     def update_rgw(self, spec):
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
         return self._update_service('rgw', self.add_rgw, spec)
 
     def add_rbd_mirror(self, spec):
@@ -1431,6 +1429,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._get_services('rbd-mirror').then(_remove_rbd_mirror)
 
     def update_rbd_mirror(self, spec):
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
         return self._update_service('rbd-mirror', self.add_rbd_mirror, spec)
 
     def _get_container_image_id(self, image_name):
@@ -1476,3 +1475,100 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
                     'current_id': s.container_image_id,
                 }
         return trivial_result(json.dumps(r, indent=4))
+
+
+class BaseScheduler(object):
+    """
+    Base Scheduler Interface
+
+    * requires a placement_spec
+
+    `place(host_pool)` needs to return a List[HostSpec, ..]
+    """
+
+    def __init__(self, placement_spec):
+        # type: (orchestrator.PlacementSpec) -> None
+        self.placement_spec = placement_spec
+
+    def place(self):
+        raise NotImplementedError
+
+
+class SimpleScheduler(BaseScheduler):
+    """
+    The most simple way to pick/schedule a set of hosts.
+    1) Shuffle the provided host_pool
+    2) Select from list up to :count
+    """
+    def __init__(self, placement_spec):
+        super(SimpleScheduler, self).__init__(placement_spec)
+
+    def place(self, host_pool, count=None):
+        # type: (List, Optional(int)) -> List
+        if not host_pool:
+            raise Exception('List of host candidates is empty')
+        host_pool = [HostSpec(x, '', '') for x in host_pool]
+        # shuffle for pseudo random selection
+        random.shuffle(host_pool)
+        return host_pool[:count]
+
+
+class NodeAssignment(object):
+    """
+    A class to detect if nodes are being passed imperative or declarative
+    If the spec is populated via the `nodes/hosts` field it will not load
+    any nodes into the list.
+    If the spec isn't populated, i.e. when only num or label is present (declarative)
+    it will use the provided `get_host_func` to load it from the inventory.
+
+    Schedulers can be assigned to pick hosts from the pool.
+    """
+
+    def __init__(self,
+                 spec=None,  # type: orchestrator.PlacementSpec
+                 scheduler=None,  # type: BaseScheduler
+                 get_hosts_func=None,  # type: Callable
+                 ):
+        self.spec = spec
+        self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
+        self.get_hosts_func = get_hosts_func
+
+    def load(self):
+        # type: () -> orchestrator.PlacementSpec
+        """
+        Load nodes into the spec.placement.nodes container.
+        """
+        self.load_labeled_nodes()
+        self.assign_nodes()
+        return self.spec
+
+    def load_labeled_nodes(self):
+        # type: () -> None
+        """
+        Assign nodes based on their label
+        """
+        # Querying for labeled nodes doesn't work currently.
+        # Leaving this open for the next iteration
+        # NOTE: This currently queries for all hosts without label restriction
+        if self.spec.placement.label:
+            logger.info("Found labels. Assinging nodes that match the label")
+            candidates = [HostSpec(x[0], '', '') for x in self.get_hosts_func()]  # TODO: query for labels
+            logger.info('Assigning nodes to spec: {}'.format(candidates))
+            self.spec.placement.set_nodes(candidates)
+
+    def assign_nodes(self):
+        # type: () -> None
+        """
+        Use the assigned scheduler to load nodes into the spec.placement.nodes container
+        """
+        # If no imperative or declarative host assignments, use the scheduler to pick from the
+        # host pool (assuming `count` is set)
+        if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count:
+            logger.info("Found only count spec. Using {} to assign nodes.".format(self.scheduler))
+            candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
+            if len(candidates) < self.spec.placement.count:
+                raise Exception("Cannot place {} services on {} hosts.".
+                                format(self.spec.placement.count, len(candidates)))
+            logger.info('Assigning nodes to spec: {}'.format(candidates))
+            self.spec.placement.set_nodes(candidates)
+>>>>>>> 1a4ee9e96... mgr/ssh: Add SimpleScheduler and streamline arg passing