From: Joshua Schmid Date: Wed, 4 Dec 2019 12:42:30 +0000 (+0100) Subject: mgr/ssh: Add SimpleScheduler and streamline arg passing X-Git-Tag: v15.1.0~524^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=42beab66a839d1ed017bf10868f6243234a35146;p=ceph.git mgr/ssh: Add SimpleScheduler and streamline arg passing * adding a mechanism to augment specs.placement.nodes even when no nodes are provided via dynamic determination from host_pools. (NodeAssignment) * added SimpleScheduler implements most simple placment algorithm possible. Choose from a shuffled list of hosts. * streamlined args passed to the public update_$service functions. They all take `spec` now. Signed-off-by: Joshua Schmid --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index f33a74169d577..43dae0f7b6557 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -21,7 +21,7 @@ import subprocess from ceph.deployment import inventory from mgr_module import MgrModule import orchestrator -from orchestrator import OrchestratorError +from orchestrator import OrchestratorError, HostSpec from . import remotes @@ -691,39 +691,29 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): """ return [orchestrator.InventoryNode(host_name) for host_name in self.inventory_cache] - """ + @async_completion def add_host_label(self, host, label): if host not in self.inventory: raise OrchestratorError('host %s does not exist' % host) - @log_exceptions - def run(host, label): - if 'labels' not in self.inventory[host]: - self.inventory[host]['labels'] = list() - if label not in self.inventory[host]['labels']: - self.inventory[host]['labels'].append(label) - self._save_inventory() - return 'Added label %s to host %s' % (label, host) - - return SSHWriteCompletion( - self._worker_pool.apply_async(run, (host, label))) + if 'labels' not in self.inventory[host]: + self.inventory[host]['labels'] = list() + if label not in self.inventory[host]['labels']: + self.inventory[host]['labels'].append(label) + self._save_inventory() + return 'Added label %s to host %s' % (label, host) + @async_completion def remove_host_label(self, host, label): if host not in self.inventory: raise OrchestratorError('host %s does not exist' % host) - @log_exceptions - def run(host, label): - if 'labels' not in self.inventory[host]: - self.inventory[host]['labels'] = list() - if label in self.inventory[host]['labels']: - self.inventory[host]['labels'].remove(label) - self._save_inventory() - return 'Removed label %s to host %s' % (label, host) - - return SSHWriteCompletion( - self._worker_pool.apply_async(run, (host, label))) - """ + if 'labels' not in self.inventory[host]: + self.inventory[host]['labels'] = list() + if label in self.inventory[host]['labels']: + self.inventory[host]['labels'].remove(label) + self._save_inventory() + return 'Removed label %s from host %s' % (label, host) @async_map_completion def _refresh_host_services(self, host): @@ -1130,45 +1120,50 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): return self._create_daemon('mon', name or host, host, keyring, extra_args=extra_args) - def update_mons(self, num, host_specs): + def update_mons(self, spec): # type: (int, List[orchestrator.HostSpec]) -> orchestrator.Completion + """ + Adjust the number of cluster managers. + """ + spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load() + return self._update_mons(spec) + + def _update_mons(self, spec): """ Adjust the number of cluster monitors. """ # current support limited to adding monitors. mon_map = self.get("mon_map") num_mons = len(mon_map["mons"]) - if num == num_mons: + if spec.count == num_mons: return orchestrator.Completion(value="The requested number of monitors exist.") - if num < num_mons: + if spec.count < num_mons: raise NotImplementedError("Removing monitors is not supported.") - self.log.debug("Trying to update monitors on: {}".format(host_specs)) + self.log.debug("Trying to update monitors on: {}".format(spec.placement.nodes)) # check that all the hosts are registered - [self._require_hosts(host.hostname) for host in host_specs] + [self._require_hosts(host.hostname) for host in spec.placement.nodes] # current support requires a network to be specified - for host, network, _ in host_specs: + for host, network, _ in spec.placement.nodes: if not network: raise RuntimeError("Host '{}' is missing a network spec".format(host)) def update_mons_with_daemons(daemons): - for _, _, name in host_specs: + for _, _, name in spec.placement.nodes: if name and len([d for d in daemons if d.service_instance == name]): raise RuntimeError('name %s alrady exists', name) # explicit placement: enough hosts provided? - num_new_mons = num - num_mons - if len(host_specs) < num_new_mons: + num_new_mons = spec.count - num_mons + if len(spec.placement.nodes) < num_new_mons: raise RuntimeError("Error: {} hosts provided, expected {}".format( - len(host_specs), num_new_mons)) - + len(spec.placement.nodes), num_new_mons)) self.log.info("creating {} monitors on hosts: '{}'".format( - num_new_mons, ",".join(map(lambda h: ":".join(h), host_specs)))) - + num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.nodes)))) # TODO: we may want to chain the creation of the monitors so they join # the quorum one at a time. - return self._create_mon(host_specs) + return self._create_mon(spec.placement.nodes) return self._get_services('mon').then(update_mons_with_daemons) @async_map_completion @@ -1189,24 +1184,24 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): return self._create_daemon('mgr', name, host, keyring) - def update_mgrs(self, num, host_specs): + def update_mgrs(self, spec): """ Adjust the number of cluster managers. """ - return self._get_services('mgr').then(lambda daemons: self._update_mgrs(num, host_specs, daemons)) + spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load() + return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons)) - def _update_mgrs(self, num, host_specs, daemons): + def _update_mgrs(self, spec, daemons): num_mgrs = len(daemons) - if num == num_mgrs: + if spec.count == num_mgrs: return orchestrator.Completion(value="The requested number of managers exist.") - self.log.debug("Trying to update managers on: {}".format(host_specs)) + self.log.debug("Trying to update managers on: {}".format(spec.placement.nodes)) # check that all the hosts are registered - [self._require_hosts(host.hostname) for host in host_specs] + [self._require_hosts(host.hostname) for host in spec.placement.nodes] - results = [] - if num < num_mgrs: - num_to_remove = num_mgrs - num + if spec.count < num_mgrs: + num_to_remove = num_mgrs - spec.count # first try to remove unconnected mgr daemons that the # cluster doesn't see @@ -1220,7 +1215,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): for d in daemons: if d.service_instance not in connected: to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance), - d.nodename)) + d.nodename)) num_to_remove -= 1 if num_to_remove == 0: break @@ -1237,33 +1232,33 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): else: # we assume explicit placement by which there are the same number of # hosts specified as the size of increase in number of daemons. - num_new_mgrs = num - num_mgrs - if len(host_specs) < num_new_mgrs: + num_new_mgrs = spec.placement.count - num_mgrs + if len(spec.placement.nodes) < num_new_mgrs: raise RuntimeError( "Error: {} hosts provided, expected {}".format( - len(host_specs), num_new_mgrs)) + len(spec.placement.nodes), num_new_mgrs)) - for host_spec in host_specs: + for host_spec in spec.placement.nodes: if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]): raise RuntimeError('name %s alrady exists', host_spec.name) - for host_spec in host_specs: + for host_spec in spec.placement.nodes: if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]): raise RuntimeError('name %s alrady exists', host_spec.name) self.log.info("creating {} managers on hosts: '{}'".format( - num_new_mgrs, ",".join([spec.hostname for spec in host_specs]))) + num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.nodes]))) args = [] - for host_spec in host_specs: + for host_spec in spec.placement.nodes: name = host_spec.name or self.get_unique_name(daemons) host = host_spec.hostname args.append((host, name)) return self._create_mgr(args) def add_mds(self, spec): - if not spec.placement.nodes or len(spec.placement.nodes) < spec.count: - raise RuntimeError("must specify at least %d hosts" % spec.count) + if not spec.placement.nodes or len(spec.placement.nodes) < spec.placement.count: + raise RuntimeError("must specify at least %d hosts" % spec.placement.count) return self._get_services('mds').then(lambda ds: self._add_mds(ds, spec)) def _add_mds(self, daemons, spec): @@ -1285,6 +1280,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): return self._create_mds(args) def update_mds(self, spec): + spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load() return self._update_service('mds', self.add_mds, spec) @async_map_completion @@ -1301,6 +1297,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): def remove_mds(self, name): self.log.debug("Attempting to remove volume: {}".format(name)) + def _remove_mds(daemons): args = [] for d in daemons: @@ -1368,7 +1365,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): for d in daemons: if d.service_instance == name or d.service_instance.startswith(name + '.'): args.append(('%s.%s' % (d.service_type, d.service_instance), - d.nodename)) + d.nodename)) if args: return self._remove_daemon(args) raise RuntimeError('Unable to find rgw.%s[-*] daemon(s)' % name) @@ -1376,6 +1373,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): return self._get_services('rgw').then(_remove_rgw) def update_rgw(self, spec): + spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load() return self._update_service('rgw', self.add_rgw, spec) def add_rbd_mirror(self, spec): @@ -1431,6 +1429,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): return self._get_services('rbd-mirror').then(_remove_rbd_mirror) def update_rbd_mirror(self, spec): + spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load() return self._update_service('rbd-mirror', self.add_rbd_mirror, spec) def _get_container_image_id(self, image_name): @@ -1476,3 +1475,100 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): 'current_id': s.container_image_id, } return trivial_result(json.dumps(r, indent=4)) + + +class BaseScheduler(object): + """ + Base Scheduler Interface + + * requires a placement_spec + + `place(host_pool)` needs to return a List[HostSpec, ..] + """ + + def __init__(self, placement_spec): + # type: (orchestrator.PlacementSpec) -> None + self.placement_spec = placement_spec + + def place(self): + raise NotImplementedError + + +class SimpleScheduler(BaseScheduler): + """ + The most simple way to pick/schedule a set of hosts. + 1) Shuffle the provided host_pool + 2) Select from list up to :count + """ + def __init__(self, placement_spec): + super(SimpleScheduler, self).__init__(placement_spec) + + def place(self, host_pool, count=None): + # type: (List, Optional(int)) -> List + if not host_pool: + raise Exception('List of host candidates is empty') + host_pool = [HostSpec(x, '', '') for x in host_pool] + # shuffle for pseudo random selection + random.shuffle(host_pool) + return host_pool[:count] + + +class NodeAssignment(object): + """ + A class to detect if nodes are being passed imperative or declarative + If the spec is populated via the `nodes/hosts` field it will not load + any nodes into the list. + If the spec isn't populated, i.e. when only num or label is present (declarative) + it will use the provided `get_host_func` to load it from the inventory. + + Schedulers can be assigned to pick hosts from the pool. + """ + + def __init__(self, + spec=None, # type: orchestrator.PlacementSpec + scheduler=None, # type: BaseScheduler + get_hosts_func=None, # type: Callable + ): + self.spec = spec + self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement) + self.get_hosts_func = get_hosts_func + + def load(self): + # type: () -> orchestrator.PlacementSpec + """ + Load nodes into the spec.placement.nodes container. + """ + self.load_labeled_nodes() + self.assign_nodes() + return self.spec + + def load_labeled_nodes(self): + # type: () -> None + """ + Assign nodes based on their label + """ + # Querying for labeled nodes doesn't work currently. + # Leaving this open for the next iteration + # NOTE: This currently queries for all hosts without label restriction + if self.spec.placement.label: + logger.info("Found labels. Assinging nodes that match the label") + candidates = [HostSpec(x[0], '', '') for x in self.get_hosts_func()] # TODO: query for labels + logger.info('Assigning nodes to spec: {}'.format(candidates)) + self.spec.placement.set_nodes(candidates) + + def assign_nodes(self): + # type: () -> None + """ + Use the assigned scheduler to load nodes into the spec.placement.nodes container + """ + # If no imperative or declarative host assignments, use the scheduler to pick from the + # host pool (assuming `count` is set) + if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count: + logger.info("Found only count spec. Using {} to assign nodes.".format(self.scheduler)) + candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count) + if len(candidates) < self.spec.placement.count: + raise Exception("Cannot place {} services on {} hosts.". + format(self.spec.placement.count, len(candidates))) + logger.info('Assigning nodes to spec: {}'.format(candidates)) + self.spec.placement.set_nodes(candidates) +>>>>>>> 1a4ee9e96... mgr/ssh: Add SimpleScheduler and streamline arg passing