from ceph.deployment import inventory
from mgr_module import MgrModule
import orchestrator
-from orchestrator import OrchestratorError
+from orchestrator import OrchestratorError, HostSpec
from . import remotes
"""
return [orchestrator.InventoryNode(host_name) for host_name in self.inventory_cache]
- """
+ @async_completion
def add_host_label(self, host, label):
if host not in self.inventory:
raise OrchestratorError('host %s does not exist' % host)
- @log_exceptions
- def run(host, label):
- if 'labels' not in self.inventory[host]:
- self.inventory[host]['labels'] = list()
- if label not in self.inventory[host]['labels']:
- self.inventory[host]['labels'].append(label)
- self._save_inventory()
- return 'Added label %s to host %s' % (label, host)
-
- return SSHWriteCompletion(
- self._worker_pool.apply_async(run, (host, label)))
+ if 'labels' not in self.inventory[host]:
+ self.inventory[host]['labels'] = list()
+ if label not in self.inventory[host]['labels']:
+ self.inventory[host]['labels'].append(label)
+ self._save_inventory()
+ return 'Added label %s to host %s' % (label, host)
+ @async_completion
def remove_host_label(self, host, label):
if host not in self.inventory:
raise OrchestratorError('host %s does not exist' % host)
- @log_exceptions
- def run(host, label):
- if 'labels' not in self.inventory[host]:
- self.inventory[host]['labels'] = list()
- if label in self.inventory[host]['labels']:
- self.inventory[host]['labels'].remove(label)
- self._save_inventory()
- return 'Removed label %s to host %s' % (label, host)
-
- return SSHWriteCompletion(
- self._worker_pool.apply_async(run, (host, label)))
- """
+ if 'labels' not in self.inventory[host]:
+ self.inventory[host]['labels'] = list()
+ if label in self.inventory[host]['labels']:
+ self.inventory[host]['labels'].remove(label)
+ self._save_inventory()
+ return 'Removed label %s from host %s' % (label, host)
@async_map_completion
def _refresh_host_services(self, host):
return self._create_daemon('mon', name or host, host, keyring,
extra_args=extra_args)
- def update_mons(self, num, host_specs):
+ def update_mons(self, spec):
# type: (int, List[orchestrator.HostSpec]) -> orchestrator.Completion
+ """
+ Adjust the number of cluster managers.
+ """
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ return self._update_mons(spec)
+
+ def _update_mons(self, spec):
"""
Adjust the number of cluster monitors.
"""
# current support limited to adding monitors.
mon_map = self.get("mon_map")
num_mons = len(mon_map["mons"])
- if num == num_mons:
+ if spec.count == num_mons:
return orchestrator.Completion(value="The requested number of monitors exist.")
- if num < num_mons:
+ if spec.count < num_mons:
raise NotImplementedError("Removing monitors is not supported.")
- self.log.debug("Trying to update monitors on: {}".format(host_specs))
+ self.log.debug("Trying to update monitors on: {}".format(spec.placement.nodes))
# check that all the hosts are registered
- [self._require_hosts(host.hostname) for host in host_specs]
+ [self._require_hosts(host.hostname) for host in spec.placement.nodes]
# current support requires a network to be specified
- for host, network, _ in host_specs:
+ for host, network, _ in spec.placement.nodes:
if not network:
raise RuntimeError("Host '{}' is missing a network spec".format(host))
def update_mons_with_daemons(daemons):
- for _, _, name in host_specs:
+ for _, _, name in spec.placement.nodes:
if name and len([d for d in daemons if d.service_instance == name]):
raise RuntimeError('name %s alrady exists', name)
# explicit placement: enough hosts provided?
- num_new_mons = num - num_mons
- if len(host_specs) < num_new_mons:
+ num_new_mons = spec.count - num_mons
+ if len(spec.placement.nodes) < num_new_mons:
raise RuntimeError("Error: {} hosts provided, expected {}".format(
- len(host_specs), num_new_mons))
-
+ len(spec.placement.nodes), num_new_mons))
self.log.info("creating {} monitors on hosts: '{}'".format(
- num_new_mons, ",".join(map(lambda h: ":".join(h), host_specs))))
-
+ num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.nodes))))
# TODO: we may want to chain the creation of the monitors so they join
# the quorum one at a time.
- return self._create_mon(host_specs)
+ return self._create_mon(spec.placement.nodes)
return self._get_services('mon').then(update_mons_with_daemons)
@async_map_completion
return self._create_daemon('mgr', name, host, keyring)
- def update_mgrs(self, num, host_specs):
+ def update_mgrs(self, spec):
"""
Adjust the number of cluster managers.
"""
- return self._get_services('mgr').then(lambda daemons: self._update_mgrs(num, host_specs, daemons))
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
- def _update_mgrs(self, num, host_specs, daemons):
+ def _update_mgrs(self, spec, daemons):
num_mgrs = len(daemons)
- if num == num_mgrs:
+ if spec.count == num_mgrs:
return orchestrator.Completion(value="The requested number of managers exist.")
- self.log.debug("Trying to update managers on: {}".format(host_specs))
+ self.log.debug("Trying to update managers on: {}".format(spec.placement.nodes))
# check that all the hosts are registered
- [self._require_hosts(host.hostname) for host in host_specs]
+ [self._require_hosts(host.hostname) for host in spec.placement.nodes]
- results = []
- if num < num_mgrs:
- num_to_remove = num_mgrs - num
+ if spec.count < num_mgrs:
+ num_to_remove = num_mgrs - spec.count
# first try to remove unconnected mgr daemons that the
# cluster doesn't see
for d in daemons:
if d.service_instance not in connected:
to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance),
- d.nodename))
+ d.nodename))
num_to_remove -= 1
if num_to_remove == 0:
break
else:
# we assume explicit placement by which there are the same number of
# hosts specified as the size of increase in number of daemons.
- num_new_mgrs = num - num_mgrs
- if len(host_specs) < num_new_mgrs:
+ num_new_mgrs = spec.placement.count - num_mgrs
+ if len(spec.placement.nodes) < num_new_mgrs:
raise RuntimeError(
"Error: {} hosts provided, expected {}".format(
- len(host_specs), num_new_mgrs))
+ len(spec.placement.nodes), num_new_mgrs))
- for host_spec in host_specs:
+ for host_spec in spec.placement.nodes:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
- for host_spec in host_specs:
+ for host_spec in spec.placement.nodes:
if host_spec.name and len([d for d in daemons if d.service_instance == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
self.log.info("creating {} managers on hosts: '{}'".format(
- num_new_mgrs, ",".join([spec.hostname for spec in host_specs])))
+ num_new_mgrs, ",".join([_spec.hostname for _spec in spec.placement.nodes])))
args = []
- for host_spec in host_specs:
+ for host_spec in spec.placement.nodes:
name = host_spec.name or self.get_unique_name(daemons)
host = host_spec.hostname
args.append((host, name))
return self._create_mgr(args)
def add_mds(self, spec):
- if not spec.placement.nodes or len(spec.placement.nodes) < spec.count:
- raise RuntimeError("must specify at least %d hosts" % spec.count)
+ if not spec.placement.nodes or len(spec.placement.nodes) < spec.placement.count:
+ raise RuntimeError("must specify at least %d hosts" % spec.placement.count)
return self._get_services('mds').then(lambda ds: self._add_mds(ds, spec))
def _add_mds(self, daemons, spec):
return self._create_mds(args)
def update_mds(self, spec):
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
return self._update_service('mds', self.add_mds, spec)
@async_map_completion
def remove_mds(self, name):
self.log.debug("Attempting to remove volume: {}".format(name))
+
def _remove_mds(daemons):
args = []
for d in daemons:
for d in daemons:
if d.service_instance == name or d.service_instance.startswith(name + '.'):
args.append(('%s.%s' % (d.service_type, d.service_instance),
- d.nodename))
+ d.nodename))
if args:
return self._remove_daemon(args)
raise RuntimeError('Unable to find rgw.%s[-*] daemon(s)' % name)
return self._get_services('rgw').then(_remove_rgw)
def update_rgw(self, spec):
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
return self._update_service('rgw', self.add_rgw, spec)
def add_rbd_mirror(self, spec):
return self._get_services('rbd-mirror').then(_remove_rbd_mirror)
def update_rbd_mirror(self, spec):
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
return self._update_service('rbd-mirror', self.add_rbd_mirror, spec)
def _get_container_image_id(self, image_name):
'current_id': s.container_image_id,
}
return trivial_result(json.dumps(r, indent=4))
+
+
+class BaseScheduler(object):
+ """
+ Base Scheduler Interface
+
+ * requires a placement_spec
+
+ `place(host_pool)` needs to return a List[HostSpec, ..]
+ """
+
+ def __init__(self, placement_spec):
+ # type: (orchestrator.PlacementSpec) -> None
+ self.placement_spec = placement_spec
+
+ def place(self):
+ raise NotImplementedError
+
+
+class SimpleScheduler(BaseScheduler):
+ """
+ The most simple way to pick/schedule a set of hosts.
+ 1) Shuffle the provided host_pool
+ 2) Select from list up to :count
+ """
+ def __init__(self, placement_spec):
+ super(SimpleScheduler, self).__init__(placement_spec)
+
+ def place(self, host_pool, count=None):
+ # type: (List, Optional(int)) -> List
+ if not host_pool:
+ raise Exception('List of host candidates is empty')
+ host_pool = [HostSpec(x, '', '') for x in host_pool]
+ # shuffle for pseudo random selection
+ random.shuffle(host_pool)
+ return host_pool[:count]
+
+
+class NodeAssignment(object):
+ """
+ A class to detect if nodes are being passed imperative or declarative
+ If the spec is populated via the `nodes/hosts` field it will not load
+ any nodes into the list.
+ If the spec isn't populated, i.e. when only num or label is present (declarative)
+ it will use the provided `get_host_func` to load it from the inventory.
+
+ Schedulers can be assigned to pick hosts from the pool.
+ """
+
+ def __init__(self,
+ spec=None, # type: orchestrator.PlacementSpec
+ scheduler=None, # type: BaseScheduler
+ get_hosts_func=None, # type: Callable
+ ):
+ self.spec = spec
+ self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
+ self.get_hosts_func = get_hosts_func
+
+ def load(self):
+ # type: () -> orchestrator.PlacementSpec
+ """
+ Load nodes into the spec.placement.nodes container.
+ """
+ self.load_labeled_nodes()
+ self.assign_nodes()
+ return self.spec
+
+ def load_labeled_nodes(self):
+ # type: () -> None
+ """
+ Assign nodes based on their label
+ """
+ # Querying for labeled nodes doesn't work currently.
+ # Leaving this open for the next iteration
+ # NOTE: This currently queries for all hosts without label restriction
+ if self.spec.placement.label:
+ logger.info("Found labels. Assinging nodes that match the label")
+ candidates = [HostSpec(x[0], '', '') for x in self.get_hosts_func()] # TODO: query for labels
+ logger.info('Assigning nodes to spec: {}'.format(candidates))
+ self.spec.placement.set_nodes(candidates)
+
+ def assign_nodes(self):
+ # type: () -> None
+ """
+ Use the assigned scheduler to load nodes into the spec.placement.nodes container
+ """
+ # If no imperative or declarative host assignments, use the scheduler to pick from the
+ # host pool (assuming `count` is set)
+ if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count:
+ logger.info("Found only count spec. Using {} to assign nodes.".format(self.scheduler))
+ candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
+ if len(candidates) < self.spec.placement.count:
+ raise Exception("Cannot place {} services on {} hosts.".
+ format(self.spec.placement.count, len(candidates)))
+ logger.info('Assigning nodes to spec: {}'.format(candidates))
+ self.spec.placement.set_nodes(candidates)
+>>>>>>> 1a4ee9e96... mgr/ssh: Add SimpleScheduler and streamline arg passing