]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Allow List[HostSpec] in PlacementSpec
authorJoshua Schmid <jschmid@suse.de>
Thu, 5 Dec 2019 11:17:04 +0000 (12:17 +0100)
committerJoshua Schmid <jschmid@suse.de>
Thu, 12 Dec 2019 09:23:14 +0000 (10:23 +0100)
Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/orchestrator_cli/module.py
src/pybind/mgr/test_orchestrator/module.py

index 43dae0f7b6557efcaabdedd0b5eb87b67b8ad38d..40a09d95a4cd424c79e4a43e65529f061c29606c 100644 (file)
@@ -21,7 +21,7 @@ import subprocess
 from ceph.deployment import inventory
 from mgr_module import MgrModule
 import orchestrator
-from orchestrator import OrchestratorError, HostSpec
+from orchestrator import OrchestratorError, HostSpec, OrchestratorValidationError
 
 from . import remotes
 
@@ -788,7 +788,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._refresh_host_services(wait_for_args).then(
             _get_services_result)
 
-
     def describe_service(self, service_type=None, service_id=None,
                          node_name=None, refresh=False):
         if service_type not in ("mds", "osd", "mgr", "mon", 'rgw', "nfs", None):
@@ -932,7 +931,6 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         host = drive_group.hosts(all_hosts)[0]
         self._require_hosts(host)
 
-
         # get bootstrap key
         ret, keyring, err = self.mon_command({
             'prefix': 'auth get',
@@ -1121,11 +1119,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
                                    extra_args=extra_args)
 
     def update_mons(self, spec):
-        # type: (int, List[orchestrator.HostSpec]) -> orchestrator.Completion
+        # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
         """
         Adjust the number of cluster managers.
         """
-        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mon').load()
         return self._update_mons(spec)
 
     def _update_mons(self, spec):
@@ -1188,7 +1186,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         """
         Adjust the number of cluster managers.
         """
-        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
         return self._get_services('mgr').then(lambda daemons: self._update_mgrs(spec, daemons))
 
     def _update_mgrs(self, spec, daemons):
@@ -1280,7 +1278,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._create_mds(args)
 
     def update_mds(self, spec):
-        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mds').load()
         return self._update_service('mds', self.add_mds, spec)
 
     @async_map_completion
@@ -1373,7 +1371,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._get_services('rgw').then(_remove_rgw)
 
     def update_rgw(self, spec):
-        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load()
         return self._update_service('rgw', self.add_rgw, spec)
 
     def add_rbd_mirror(self, spec):
@@ -1429,7 +1427,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self._get_services('rbd-mirror').then(_remove_rbd_mirror)
 
     def update_rbd_mirror(self, spec):
-        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts).load()
+        spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load()
         return self._update_service('rbd-mirror', self.add_rbd_mirror, spec)
 
     def _get_container_image_id(self, image_name):
@@ -1528,10 +1526,13 @@ class NodeAssignment(object):
                  spec=None,  # type: orchestrator.PlacementSpec
                  scheduler=None,  # type: BaseScheduler
                  get_hosts_func=None,  # type: Callable
+                 service_type=None,  # type: str
                  ):
+        assert spec and get_hosts_func and service_type
         self.spec = spec
         self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
         self.get_hosts_func = get_hosts_func
+        self.service_type = service_type
 
     def load(self):
         # type: () -> orchestrator.PlacementSpec
@@ -1564,11 +1565,26 @@ class NodeAssignment(object):
         # If no imperative or declarative host assignments, use the scheduler to pick from the
         # host pool (assuming `count` is set)
         if not self.spec.placement.label and not self.spec.placement.nodes and self.spec.placement.count:
-            logger.info("Found only count spec. Using {} to assign nodes.".format(self.scheduler))
+            logger.info("Found num spec. Looking for labeled nodes.".format(self.scheduler))
+            # TODO: actually query for labels (self.service_type)
+            candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()],
+                                              count=self.spec.placement.count)
+            # Not enough nodes to deploy on
+            if len(candidates) != self.spec.placement.count:
+                logger.warning("Did not find enough labeled nodes to \
+                               scale to <{}> services. Falling back to unlabeled nodes.".
+                               format(self.spec.placement.count))
+            else:
+                logger.info('Assigning nodes to spec: {}'.format(candidates))
+                self.spec.placement.set_nodes(candidates)
+                return None
+
             candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
-            if len(candidates) < self.spec.placement.count:
-                raise Exception("Cannot place {} services on {} hosts.".
-                                format(self.spec.placement.count, len(candidates)))
+            # Not enough nodes to deploy on
+            if len(candidates) != self.spec.placement.count:
+                raise OrchestratorValidationError("Cannot place {} services on {} hosts.".
+                                                  format(self.spec.placement.count, len(candidates)))
+
             logger.info('Assigning nodes to spec: {}'.format(candidates))
             self.spec.placement.set_nodes(candidates)
->>>>>>> 1a4ee9e96... mgr/ssh: Add SimpleScheduler and streamline arg passing
+            return None
index 653d0ec216cfcc8e375a17d1cac41c8311cd2174..30ca32f8f8d83896ef33d962593ab0ee7d009334 100644 (file)
@@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
 
 HostSpec = namedtuple('HostSpec', ['hostname', 'network', 'name'])
 
+
 def parse_host_specs(host, require_network=True):
     """
     Split host into host, network, and (optional) daemon name parts.  The network
@@ -899,8 +900,8 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def update_mgrs(self, num, hosts):
-        # type: (int, List[str]) -> Completion
+    def update_mgrs(self, spec):
+        # type: (StatefulServiceSpec) -> Completion
         """
         Update the number of cluster managers.
 
@@ -909,8 +910,8 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def update_mons(self, num, hosts):
-        # type: (int, List[HostSpec]) -> Completion
+    def update_mons(self, spec):
+        # type: (StatefulServiceSpec) -> Completion
         """
         Update the number of cluster monitors.
 
@@ -1042,12 +1043,16 @@ class PlacementSpec(object):
     For APIs that need to specify a node subset
     """
     def __init__(self, label=None, nodes=None, count=None):
+        # type: (Optional[str], Optional[List], Optional[int]) -> None
         self.label = label
         if nodes:
-            self.nodes = [parse_host_specs(x, require_network=False) for x in nodes if x]
+            if all([isinstance(node, HostSpec) for node in nodes]):
+                self.nodes = nodes
+            else:
+                self.nodes = [parse_host_specs(x, require_network=False) for x in nodes if x]
         else:
             self.nodes = []
-        self.count = count if count else 1
+        self.count = count  # type: Optional[int]
 
     def set_nodes(self, nodes):
         # To backpopulate the .nodes attribute when using labels or count
@@ -1055,14 +1060,16 @@ class PlacementSpec(object):
         self.nodes = nodes
 
     @classmethod
-    def from_yaml(cls, data):
-        return cls(**data)
+    def from_dict(cls, data):
+        _cls = cls(**data)
+        _cls.validate()
+        return _cls
 
     def validate(self):
         if self.nodes and self.label:
             # TODO: a less generic Exception
             raise Exception('Node and label are mutually exclusive')
-        if self.count <= 0:
+        if self.count is not None and self.count <= 0:
             raise Exception("num/count must be > 1")
 
 
@@ -1150,7 +1157,7 @@ class ServiceDescription(object):
 
     def __repr__(self):
         return "<ServiceDescription>({n_name}:{s_type})".format(n_name=self.nodename,
-                                                                  s_type=self.name())
+                                                                s_type=self.name())
 
     def to_json(self):
         out = {
@@ -1177,14 +1184,10 @@ class StatefulServiceSpec(object):
     """ Such as mgrs/mons
     """
     # TODO: create base class for Stateless/Stateful service specs and propertly inherit
-    def __init__(self,
-                 name=None,
-                 placement=None,
-                 count=None,
-                 scheduler=None):
+    def __init__(self, name=None, placement=None):
         self.placement = PlacementSpec() if placement is None else placement
         self.name = name
-        self.count = self.placement.count if self.placement else 1  # for backwards-compatibility
+        self.count = self.placement.count if self.placement is not None else 1  # for backwards-compatibility
 
 
 class StatelessServiceSpec(object):
@@ -1199,8 +1202,7 @@ class StatelessServiceSpec(object):
     # This structure is supposed to be enough information to
     # start the services.
 
-    def __init__(self, name,
-                 placement=None):
+    def __init__(self, name, placement=None):
         self.placement = PlacementSpec() if placement is None else placement
 
         #: Give this set of statelss services a name: typically it would
@@ -1209,7 +1211,7 @@ class StatelessServiceSpec(object):
         self.name = name
 
         #: Count of service instances
-        self.count = self.placement.count if self.placement else 1  # for backwards-compatibility
+        self.count = self.placement.count if self.placement is not None else 1  # for backwards-compatibility
 
     def validate_add(self):
         if not self.name:
@@ -1239,7 +1241,7 @@ class RGWSpec(StatelessServiceSpec):
 
     """
     def __init__(self,
-                 rgw_realm, # type: str
+                 rgw_realm,  # type: str
                  rgw_zone,  # type: str
                  placement=None,
                  hosts=None,  # type: Optional[List[str]]
@@ -1476,7 +1478,6 @@ class OrchestratorClientMixin(Orchestrator):
         mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
         return mgr.remote(o, meth, *args, **kwargs)
 
-
     def _orchestrator_wait(self, completions):
         # type: (List[Completion]) -> None
         """
@@ -1537,7 +1538,6 @@ class OutdatableData(object):
         timestr = timestr.rstrip('Z')
         return datetime.datetime.strptime(timestr, cls.DATEFMT)
 
-
     @classmethod
     def from_json(cls, data):
         return cls(data['data'], cls.time_from_string(data['last_refresh']))
index 0293c5181dac671808c2800d98a31d0fd39b3b5e..9dbc2797de003d678d50ffee0b0c3a704163a763 100644 (file)
@@ -213,7 +213,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
         'name=host,type=CephString '
         'name=label,type=CephString',
         'Add a host label')
-    def _host_label_add(self, host, label):
+    def _host_label_rm(self, host, label):
         completion = self.remove_host_label(host, label)
         self._orchestrator_wait([completion])
         orchestrator.raise_if_exception(completion)
@@ -494,9 +494,9 @@ Usage:
         'orchestrator rgw update',
         'name=zone_name,type=CephString '
         'name=realm_name,type=CephString '
-        "name=num,type=CephInt,req=false "
-        "name=label,type=CephString,req=false "
-        "name=hosts,type=CephString,n=N,req=false",
+        'name=num,type=CephInt,req=false '
+        'name=label,type=CephString,req=false '
+        'name=hosts,type=CephString,n=N,req=false',
         'Update the number of RGW instances for the given zone')
     def _rgw_update(self, zone_name, realm_name, num=None, label=None, hosts=[]):
         spec = orchestrator.RGWSpec(
@@ -608,7 +608,7 @@ Usage:
         placement = orchestrator.PlacementSpec(label=label, count=num, nodes=hosts)
         if not hosts and not label:
             # Improve Error message. Point to parse_host_spec examples
-            raise Exception("Mons need a host spec. (host, network, name(opt))")
+            raise orchestrator.OrchestratorValidationError("Mons need a host spec. (host, network, name(opt))")
             # TODO: Scaling without a HostSpec doesn't work right now.
             # we need network autodetection for that.
             # placement = orchestrator.PlacementSpec(count=num)
index 474bb31aad6357169d2f48fd2ea18ba4098468a8..7f392f812794489b0924a32b06d447952052b78b 100644 (file)
@@ -260,12 +260,12 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         assert isinstance(host, six.string_types)
 
     @deferred_write("update_mgrs")
-    def update_mgrs(self, num, hosts):
-        assert not hosts or len(hosts) == num
-        assert all([isinstance(h, str) for h in hosts])
+    def update_mgrs(self, spec):
+        assert not spec.nodes or len(spec.placement.nodes) == spec.placement.count
+        assert all([isinstance(h, str) for h in spec.nodes])
 
     @deferred_write("update_mons")
-    def update_mons(self, num, hosts):
-        assert not hosts or len(hosts) == num
-        assert all([isinstance(h[0], str) for h in hosts])
-        assert all([isinstance(h[1], str) or h[1] is None for h in hosts])
+    def update_mons(self, spec):
+        assert not spec.nodes or len(spec.placement.nodes) == spec.placement.count
+        assert all([isinstance(h[0], str) for h in spec.nodes])
+        assert all([isinstance(h[1], str) or h[1] is None for h in spec.nodes])