from ceph.deployment import inventory
from mgr_module import MgrModule
import orchestrator
-from orchestrator import OrchestratorError, HostSpec
+from orchestrator import OrchestratorError, HostSpec, OrchestratorValidationError
from . import remotes
return self._refresh_host_services(wait_for_args).then(
_get_services_result)
-
def describe_service(self, service_type=None, service_id=None,
node_name=None, refresh=False):
if service_type not in ("mds", "osd", "mgr", "mon", 'rgw', "nfs", None):
host = drive_group.hosts(all_hosts)[0]
self._require_hosts(host)
-
# get bootstrap key
ret, keyring, err = self.mon_command({
'prefix': 'auth get',
extra_args=extra_args)
def update_mons(self, spec):
- # type: (int, List[orchestrator.HostSpec]) -> orchestrator.Completion
+ # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
"""
Adjust the number of cluster managers.
"""
- spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mon').load()
return self._update_mons(spec)
def _update_mons(self, spec):
"""
Adjust the number of cluster managers.
"""
- spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
def _update_mgrs(self, spec, daemons):
return self._create_mds(args)
def update_mds(self, spec):
- spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mds').load()
return self._update_service('mds', self.add_mds, spec)
@async_map_completion
return self._get_services('rgw').then(_remove_rgw)
def update_rgw(self, spec):
- spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load()
return self._update_service('rgw', self.add_rgw, spec)
def add_rbd_mirror(self, spec):
return self._get_services('rbd-mirror').then(_remove_rbd_mirror)
def update_rbd_mirror(self, spec):
- spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load()
return self._update_service('rbd-mirror', self.add_rbd_mirror, spec)
def _get_container_image_id(self, image_name):
spec=None, # type: orchestrator.PlacementSpec
scheduler=None, # type: BaseScheduler
get_hosts_func=None, # type: Callable
+ service_type=None, # type: str
):
+ assert spec and get_hosts_func and service_type
self.spec = spec
self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
self.get_hosts_func = get_hosts_func
+ self.service_type = service_type
def load(self):
# type: () -> orchestrator.PlacementSpec
# If no imperative or declarative host assignments, use the scheduler to pick from the
# host pool (assuming `count` is set)
if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count:
- logger.info("Found only count spec. Using {} to assign nodes.".format(self.scheduler))
+ logger.info("Found num spec. Looking for labeled nodes.".format(self.scheduler))
+ # TODO: actually query for labels (self.service_type)
+ candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()],
+ count=self.spec.placement.count)
+ # Not enough nodes to deploy on
+ if len(candidates) != self.spec.placement.count:
+ logger.warning("Did not find enough labeled nodes to \
+ scale to <{}> services. Falling back to unlabeled nodes.".
+ format(self.spec.placement.count))
+ else:
+ logger.info('Assigning nodes to spec: {}'.format(candidates))
+ self.spec.placement.set_nodes(candidates)
+ return None
+
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
- if len(candidates) < self.spec.placement.count:
- raise Exception("Cannot place {} services on {} hosts.".
- format(self.spec.placement.count, len(candidates)))
+ # Not enough nodes to deploy on
+ if len(candidates) != self.spec.placement.count:
+ raise OrchestratorValidationError("Cannot place {} services on {} hosts.".
+ format(self.spec.placement.count, len(candidates)))
+
logger.info('Assigning nodes to spec: {}'.format(candidates))
self.spec.placement.set_nodes(candidates)
->>>>>>> 1a4ee9e96... mgr/ssh: Add SimpleScheduler and streamline arg passing
+ return None
HostSpec = namedtuple('HostSpec', ['hostname', 'network', 'name'])
+
def parse_host_specs(host, require_network=True):
"""
Split host into host, network, and (optional) daemon name parts. The network
"""
raise NotImplementedError()
- def update_mgrs(self, num, hosts):
- # type: (int, List[str]) -> Completion
+ def update_mgrs(self, spec):
+ # type: (StatefulServiceSpec) -> Completion
"""
Update the number of cluster managers.
"""
raise NotImplementedError()
- def update_mons(self, num, hosts):
- # type: (int, List[HostSpec]) -> Completion
+ def update_mons(self, spec):
+ # type: (StatefulServiceSpec) -> Completion
"""
Update the number of cluster monitors.
For APIs that need to specify a node subset
"""
def __init__(self, label=None, nodes=None, count=None):
+ # type: (Optional[str], Optional[List], Optional[int]) -> None
self.label = label
if nodes:
- self.nodes = [parse_host_specs(x, require_network=False) for x in nodes if x]
+ if all([isinstance(node, HostSpec) for node in nodes]):
+ self.nodes = nodes
+ else:
+ self.nodes = [parse_host_specs(x, require_network=False) for x in nodes if x]
else:
self.nodes = []
- self.count = count if count else 1
+ self.count = count # type: Optional[int]
def set_nodes(self, nodes):
# To backpopulate the .nodes attribute when using labels or count
self.nodes = nodes
@classmethod
- def from_yaml(cls, data):
- return cls(**data)
+ def from_dict(cls, data):
+ _cls = cls(**data)
+ _cls.validate()
+ return _cls
def validate(self):
if self.nodes and self.label:
# TODO: a less generic Exception
raise Exception('Node and label are mutually exclusive')
- if self.count <= 0:
+ if self.count is not None and self.count <= 0:
raise Exception("num/count must be > 1")
def __repr__(self):
return "<ServiceDescription>({n_name}:{s_type})".format(n_name=self.nodename,
- s_type=self.name())
+ s_type=self.name())
def to_json(self):
out = {
""" Such as mgrs/mons
"""
# TODO: create base class for Stateless/Stateful service specs and propertly inherit
- def __init__(self,
- name=None,
- placement=None,
- count=None,
- scheduler=None):
+ def __init__(self, name=None, placement=None):
self.placement = PlacementSpec() if placement is None else placement
self.name = name
- self.count = self.placement.count if self.placement else 1 # for backwards-compatibility
+ self.count = self.placement.count if self.placement is not None else 1 # for backwards-compatibility
class StatelessServiceSpec(object):
# This structure is supposed to be enough information to
# start the services.
- def __init__(self, name,
- placement=None):
+ def __init__(self, name, placement=None):
self.placement = PlacementSpec() if placement is None else placement
#: Give this set of statelss services a name: typically it would
self.name = name
#: Count of service instances
- self.count = self.placement.count if self.placement else 1 # for backwards-compatibility
+ self.count = self.placement.count if self.placement is not None else 1 # for backwards-compatibility
def validate_add(self):
if not self.name:
"""
def __init__(self,
- rgw_realm, # type: str
+ rgw_realm, # type: str
rgw_zone, # type: str
placement=None,
hosts=None, # type: Optional[List[str]]
mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
return mgr.remote(o, meth, *args, **kwargs)
-
def _orchestrator_wait(self, completions):
# type: (List[Completion]) -> None
"""
timestr = timestr.rstrip('Z')
return datetime.datetime.strptime(timestr, cls.DATEFMT)
-
@classmethod
def from_json(cls, data):
return cls(data['data'], cls.time_from_string(data['last_refresh']))
'name=host,type=CephString '
'name=label,type=CephString',
'Add a host label')
- def _host_label_add(self, host, label):
+ def _host_label_rm(self, host, label):
completion = self.remove_host_label(host, label)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
'orchestrator rgw update',
'name=zone_name,type=CephString '
'name=realm_name,type=CephString '
- "name=num,type=CephInt,req=false "
- "name=label,type=CephString,req=false "
- "name=hosts,type=CephString,n=N,req=false",
+ 'name=num,type=CephInt,req=false '
+ 'name=label,type=CephString,req=false '
+ 'name=hosts,type=CephString,n=N,req=false',
'Update the number of RGW instances for the given zone')
def _rgw_update(self, zone_name, realm_name, num=None, label=None, hosts=[]):
spec = orchestrator.RGWSpec(
placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
if not hosts and not label:
# Improve Error message. Point to parse_host_spec examples
- raise Exception("Mons need a host spec. (host, network, name(opt))")
+ raise orchestrator.OrchestratorValidationError("Mons need a host spec. (host, network, name(opt))")
# TODO: Scaling without a HostSpec doesn't work right now.
# we need network autodetection for that.
# placement = orchestrator.PlacementSpec(count=num)
assert isinstance(host, six.string_types)
@deferred_write("update_mgrs")
- def update_mgrs(self, num, hosts):
- assert not hosts or len(hosts) == num
- assert all([isinstance(h, str) for h in hosts])
+ def update_mgrs(self, spec):
+ assert not spec.nodes or len(spec.placement.nodes) == spec.placement.count
+ assert all([isinstance(h, str) for h in spec.nodes])
@deferred_write("update_mons")
- def update_mons(self, num, hosts):
- assert not hosts or len(hosts) == num
- assert all([isinstance(h[0], str) for h in hosts])
- assert all([isinstance(h[1], str) or h[1] is None for h in hosts])
+ def update_mons(self, spec):
+ assert not spec.nodes or len(spec.placement.nodes) == spec.placement.count
+ assert all([isinstance(h[0], str) for h in spec.nodes])
+ assert all([isinstance(h[1], str) or h[1] is None for h in spec.nodes])