# load inventory
i = self.get_store('inventory')
if i:
- self.inventory = json.loads(i)
+ self.inventory: Dict[str, dict] = json.loads(i)
else:
self.inventory = dict()
self.log.debug('Loaded inventory %s' % self.inventory)
raise
def _get_hosts(self):
+ # type: () -> List[str]
return [a for a in self.inventory.keys()]
@async_completion
raise OrchestratorError('New host %s (%s) failed check: %s' % (
spec.hostname, spec.addr, err))
- self.inventory[spec.hostname] = {
- 'addr': spec.addr,
- 'labels': spec.labels,
- }
+ self.inventory[spec.hostname] = spec.to_json()
self._save_inventory()
self.cache.prime_empty_host(spec.hostname)
self.event.set() # refresh stray health check
@trivial_completion
def get_hosts(self):
+ # type: () -> List[orchestrator.HostSpec]
"""
Return a list of hosts managed by the orchestrator.
Notes:
- skip async: manager reads from cache.
-
- TODO:
- - InventoryNode probably needs to be able to report labels
"""
r = []
for hostname, info in self.inventory.items():
self.log.debug('host %s info %s' % (hostname, info))
- r.append(orchestrator.InventoryNode(
+ r.append(orchestrator.HostSpec(
hostname,
addr=info.get('addr', hostname),
labels=info.get('labels', []),
+ status=info.get('status', ''),
))
return r
return self.get_hosts().then(lambda hosts: self.call_inventory(hosts, drive_groups))
def _prepare_deployment(self,
- all_hosts, # type: List[orchestrator.InventoryNode]
+ all_hosts, # type: List[orchestrator.HostSpec]
drive_groups, # type: List[DriveGroupSpec]
inventory_list # type: List[orchestrator.InventoryNode]
):
for drive_group in drive_groups:
self.log.info("Processing DriveGroup {}".format(drive_group))
# 1) use fn_filter to determine matching_hosts
- matching_hosts = drive_group.hosts([x.name for x in all_hosts])
+ matching_hosts = drive_group.hosts([x.hostname for x in all_hosts])
# 2) Map the inventory to the InventoryNode object
# FIXME: lazy-load the inventory from a InventoryNode object;
# this would save one call to the inventory(at least externally)
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_host(self, _get_connection, _save_host, _rm_host, cephadm_module):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
with self._with_host(cephadm_module, 'test'):
- assert wait(cephadm_module, cephadm_module.get_hosts()) == [InventoryNode('test')]
- c = cephadm_module.get_hosts()
- assert wait(cephadm_module, c) == []
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
+
+ # Be careful with backward compatibility when changing things here:
+ assert json.loads(cephadm_module._store['inventory']) == \
+ {"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
+
+ with self._with_host(cephadm_module, 'second'):
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [
+ HostSpec('test', 'test'),
+ HostSpec('second', 'second')
+ ]
+
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
+ assert wait(cephadm_module, cephadm_module.get_hosts()) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.module.HostCache.save_host")
from __future__ import absolute_import
import copy
+try:
+ from typing import List
+except ImportError:
+ pass
+
from mgr_util import merge_dicts
+from orchestrator import HostSpec
from . import ApiController, RESTController, Task
from .orchestrator import raise_if_no_orchestrator
from .. import mgr
def merge_hosts_by_hostname(ceph_hosts, orch_hosts):
+ # type: (List[dict], List[HostSpec]) -> List[dict]
"""Merge Ceph hosts with orchestrator hosts by hostnames.
- :param mgr_hosts: hosts returned from mgr
- :type mgr_hosts: list of dict
+ :param ceph_hosts: hosts returned from mgr
+ :type ceph_hosts: list of dict
:param orch_hosts: hosts returned from ochestrator
- :type orch_hosts: list of InventoryNode
+ :type orch_hosts: list of HostSpec
:return list of dict
"""
_ceph_hosts = copy.deepcopy(ceph_hosts)
- orch_hostnames = {host.name for host in orch_hosts}
+ orch_hostnames = {host.hostname for host in orch_hosts}
# hosts in both Ceph and Orchestrator
for ceph_host in _ceph_hosts:
except ImportError:
from unittest import mock
-from orchestrator import InventoryNode
+from orchestrator import HostSpec
from . import ControllerTestCase
from ..controllers.host import get_hosts, Host
fake_client = mock.Mock()
fake_client.available.return_value = True
fake_client.hosts.list.return_value = [
- InventoryNode('node1'), InventoryNode('node2')]
+ HostSpec('node1'), HostSpec('node2')]
instance.return_value = fake_client
hosts = get_hosts()
"""
raise NotImplementedError()
- def add_host(self, HostSpec):
+ def add_host(self, host_spec):
# type: (HostSpec) -> Completion
"""
Add a host to the orchestrator inventory.
"""
Report the hosts in the cluster.
- The default implementation is extra slow.
-
- :return: list of InventoryNodes
+ :return: list of HostSpec
"""
- return self.get_inventory()
+ raise NotImplementedError()
def add_host_label(self, host, label):
# type: (str, str) -> Completion
"""
raise NotImplementedError()
+
class HostSpec(object):
- def __init__(self, hostname, addr=None, labels=None):
- # type: (str, Optional[str], Optional[List[str]]) -> None
- self.hostname = hostname # the hostname on the host
- self.addr = addr or hostname # DNS name or IP address to reach it
- self.labels = labels or [] # initial label(s), if any
+ """
+ Information about hosts. Like e.g. ``kubectl get nodes``
+ """
+ def __init__(self,
+ hostname, # type: str
+ addr=None, # type: Optional[str]
+ labels=None, # type: Optional[List[str]]
+ status=None, # type: Optional[str]
+ ):
+ #: the bare hostname on the host. Not the FQDN.
+ self.hostname = hostname # type: str
+
+ #: DNS name or IP address to reach it
+ self.addr = addr or hostname # type: str
+
+ #: label(s), if any
+ self.labels = labels or [] # type: List[str]
+
+ #: human readable status
+ self.status = status or '' # type: str
+
+ def to_json(self):
+ return {
+ 'hostname': self.hostname,
+ 'addr': self.addr,
+ 'labels': self.labels,
+ 'status': self.status,
+ }
+
+ def __repr__(self):
+ args = [self.hostname] # type: List[Any]
+ if self.addr is not None:
+ args.append(self.addr)
+ if self.labels:
+ args.append(self.labels)
+ if self.status:
+ args.append(self.status)
+
+ return "<HostSpec>({})".format(', '.join(map(repr, args)))
+
+ def __eq__(self, other):
+ # Let's omit `status` for the moment, as it is still the very same host.
+ return self.hostname == other.hostname and \
+ self.addr == other.addr and \
+ self.labels == other.labels
+
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
'name=addr,type=CephString,req=false '
'name=labels,type=CephString,n=N,req=false',
'Add a host')
- def _add_host(self, host, addr=None, labels=None):
+ def _add_host(self, host:str, addr: Optional[str]=None, labels: Optional[List[str]]=None):
s = HostSpec(hostname=host, addr=addr, labels=labels)
completion = self.add_host(s)
self._orchestrator_wait([completion])
self._orchestrator_wait([completion])
raise_if_exception(completion)
if format == 'json':
- hosts = [dict(host=node.name, labels=node.labels)
+ hosts = [node.to_json()
for node in completion.result]
output = json.dumps(hosts, sort_keys=True)
else:
table = PrettyTable(
- ['HOST', 'ADDR', 'LABELS'],
+ ['HOST', 'ADDR', 'LABELS', 'STATUS'],
border=False)
table.align = 'l'
table.left_padding_width = 0
table.right_padding_width = 1
for node in sorted(completion.result, key=lambda h: h.name):
- table.add_row((node.name, node.addr, ' '.join(node.labels)))
+ table.add_row((node.hostname, node.addr, ' '.join(node.labels), node.status))
output = table.get_string()
return HandleCommandResult(stdout=output)
@deferred_read
def get_hosts(self):
- # type: () -> List[orchestrator.InventoryNode]
- return [orchestrator.InventoryNode(n, inventory.Devices([])) for n in self.rook_cluster.get_node_names()]
+ # type: () -> List[orchestrator.HostSpec]
+ return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
@deferred_read
def list_daemons(self, daemon_type=None, daemon_id=None, node_name=None, refresh=False):
targets += drive_group.data_directories
def execute(all_hosts_):
- all_hosts = orchestrator.InventoryNode.get_host_names(all_hosts_)
+ # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
+ all_hosts = [h.hostname for h in all_hosts_]
assert len(drive_group.hosts(all_hosts)) == 1
drive_group = drive_groups[0]
def run(all_hosts):
- drive_group.validate(orchestrator.InventoryNode.get_host_names(all_hosts))
+ # type: (List[orchestrator.HostSpec]) -> None
+ drive_group.validate([h.hostname for h in all_hosts])
return self.get_hosts().then(run).then(
on_complete=orchestrator.ProgressReference(
message='create_osds',
@deferred_read
def get_hosts(self):
if self._inventory:
- return self._inventory
- return [orchestrator.InventoryNode('localhost', inventory.Devices([]))]
+ return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
+ return [orchestrator.HostSpec('localhost')]
@deferred_write("add_host")
def add_host(self, spec):