import errno
import logging
import time
+import yaml
from threading import Event
from functools import wraps
import string
try:
- from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple
+ from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False # just for type checking
import shutil
import subprocess
import uuid
+from mgr_module import PersistentStoreDict
from ceph.deployment import inventory, translate
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.drive_selection import selector
from mgr_module import MgrModule
-import mgr_util
import orchestrator
from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValidationError, HostSpec, \
- CLICommandMeta
+ CLICommandMeta, ServiceSpec
from . import remotes
from ._utils import RemoveUtil
self.osd_removal_report: dict = dict()
self.rm_util = RemoveUtil(self)
+ self.service_spec_store = PersistentStoreDict(self, 'service_spec')
+
# ensure the host lists are in sync
for h in self.inventory.keys():
if h not in self.cache.daemons:
while self.run:
self._check_hosts()
self._remove_osds_bg()
+ completions = self._apply_services()
+ for completion in completions:
+ if completion:
+ while not completion.has_result:
+ self.process([completion])
+ self.log.debug(f'Still processing {completion}')
+ if completion.needs_result:
+ time.sleep(1)
+ else:
+ break
+ if completion.exception is not None:
+ self.log.error(str(completion.exception))
# refresh daemons
self.log.debug('refreshing hosts')
# ugly sync path, FIXME someday perhaps?
for host, hi in self.inventory.items():
self._refresh_host_daemons(host)
+ # <service_map>
sm = {} # type: Dict[str, orchestrator.ServiceDescription]
for h, dm in self.cache.daemons.items():
for name, dd in dm.items():
if service_type and service_type != dd.daemon_type:
continue
- n = dd.service_name()
+ # <name> i.e. rgw.realm.zone
+ n: str = dd.service_name()
if service_name and service_name != n:
continue
+ service_store = PersistentStoreDict(self, f'service_spec/{dd.daemon_type}')
+ try:
+ _ = service_store[dd.service_name(without_service_type=True)]
+ spec_presence = "present"
+ except IndexError:
+ spec_presence = "absent"
+ if dd.daemon_type == 'osd':
+ spec_presence = "not applicable"
if n not in sm:
sm[n] = orchestrator.ServiceDescription(
service_name=n,
last_refresh=dd.last_refresh,
container_image_id=dd.container_image_id,
container_image_name=dd.container_image_name,
+ spec_presence=spec_presence,
)
sm[n].size += 1
if dd.status == 1:
for name in names:
if name in dm:
args.append((name, host, force))
+ # TODO: bail out for OSDs when https://github.com/ceph/ceph/pull/32983 is merged
+ if dm[name].daemon_type == 'osd':
+ continue
+ self._remove_key_from_store(dm[name].daemon_type,
+ dm[name].service_name(without_service_type=True))
if not args:
raise OrchestratorError('Unable to find daemon(s) %s' % (names))
self.log.info('Remove daemons %s' % [a[0] for a in args])
args.append(
(d.name(), d.hostname, force)
)
+ self._remove_key_from_store(d.daemon_type, d.service_name(without_service_type=True))
if not args:
raise OrchestratorError('Unable to find daemons in %s service' % (
service_name))
service_name, [a[0] for a in args]))
return self._remove_daemon(args)
+ def _remove_key_from_store(self, service_type: str, name: str) -> None:
+ self.log.debug(f"Removing {name} from the service_spec store")
+ try:
+ store = PersistentStoreDict(self, f'service_spec/{service_type}')
+ del store[name]
+ except IndexError:
+ self.log.debug(f"{service_type}/{name} not found in store. Can't remove.")
+
def get_inventory(self, host_filter=None, refresh=False):
"""
Return the storage inventory of hosts matching the given filter.
return self._add_daemon('mgr', spec, self._create_mgr)
def apply_mgr(self, spec):
+ self.save_spec(spec)
+ self._kick_serve_loop()
+ return trivial_result("scheduled MGR creation..")
+
+ def _apply_mgr(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply_service('mgr', spec, self._create_mgr)
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
- def apply_mds(self, spec):
+ def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
+ self.save_spec(spec)
+ self._kick_serve_loop()
+ return trivial_result("scheduled MDS creation..")
+
+ def _apply_mds(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply_service('mds', spec, self._create_mds,
self._config_mds)
return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
def apply_rgw(self, spec):
+ self.save_spec(spec)
+ self._kick_serve_loop()
+ return trivial_result("scheduled RGW creation..")
+
+ def _apply_rgw(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply_service('rgw', spec, self._create_rgw,
self._config_rgw)
keyring=keyring)
def apply_rbd_mirror(self, spec):
+ self.save_spec(spec)
+ self._kick_serve_loop()
+ return trivial_result("scheduled rbd-mirror creation..")
+
+ def _apply_rbd_mirror(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply_service('rbd-mirror', spec, self._create_rbd_mirror)
def _create_prometheus(self, daemon_id, host):
return self._create_daemon('prometheus', daemon_id, host)
- def apply_prometheus(self, spec):
+ def _apply_prometheus(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply_service('prometheus', spec, self._create_prometheus)
+ def apply_prometheus(self, spec):
+ self.save_spec(spec)
+ self._kick_serve_loop()
+ return trivial_result("scheduled prometheus creation..")
+
def add_node_exporter(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._add_daemon('node-exporter', spec,
self._create_node_exporter)
def apply_node_exporter(self, spec):
+ self.save_spec(spec)
+ self._kick_serve_loop()
+ return trivial_result("scheduled node-exporter creation..")
+
+ def _apply_node_exporter(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
return self._apply_service('node-exporter', spec,
self._create_node_exporter)
"""
return trivial_result(self.osd_removal_report)
+ def save_spec(self, spec: ServiceSpec) -> None:
+ """
+ Attempts to save a ServiceSpec.
+
+ There are two ways to manipulate the service_specs stored in the mon_store
+ 1) Using a global definition (i.e. `ceph orch apply -i <specs_file.yaml>`)
+ This will usually contain multiple definitions of services and daemons.
+
+ 2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`)
+
+ Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources.
+ """
+ store = PersistentStoreDict(self, f'service_spec/{spec.service_type}')
+ name: str = spec.name or spec.service_type
+ try:
+ _ = store[name]
+ raise orchestrator.OrchestratorError(f"Specification for {name} already exists. "
+ "Please review your existing specs with "
+ "`ceph orch servicespecs ls` and try again.")
+ except IndexError:
+ self.log.info(f"New spec found. Saving <{name}> to the store.")
+ store[name] = spec.to_json()
+
+ def list_specs(self) -> orchestrator.Completion:
+ """
+ Loads all entries from the service_spec mon_store root.
+ """
+ specs = list()
+ for _, spec in self.service_spec_store.items():
+ specs.append('---')
+ specs.append(yaml.dump(spec))
+ return trivial_result(specs)
+
+ def clear_all_specs(self) -> orchestrator.Completion:
+ """
+ Clears the service_spec root in the mon_store (mgr/cephadm/service_specs/)
+ """
+ self.service_spec_store.clear()
+ return trivial_result(f"Mon store for {self.service_spec_store.prefix} cleared")
+
+ def apply_service_config(self, spec_document: str) -> orchestrator.Completion:
+ """
+ Parse a multi document yaml file (represented in a inbuf object)
+ and loads it with it's respective ServiceSpec to validate the
+ initial input.
+ If no errors are raised `save_spec` is called.
+ """
+ content: Iterator[Any] = yaml.load_all(spec_document)
+ # Load all specs from a multi document yaml file.
+ for spec in content:
+ # load ServiceSpec once to validate
+ spec_o = ServiceSpec.from_json(spec)
+ self.save_spec(spec_o)
+ self._kick_serve_loop()
+ return trivial_result("ServiceSpecs saved")
+
+ def trigger_deployment(self, service_name: str,
+ func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
+ """
+ Triggers a corresponding deployment method `func` to `service_name`
+ Services can have multiple entries. (i.e. different RGW configurations)
+ """
+ self.log.debug(f"starting async {service_name} deployment")
+ specs = self.find_json_specs(service_name)
+ completions = list()
+ for spec in specs:
+ completions.append(func(spec))
+ if completions:
+ return completions
+ return [trivial_result("Nothing to do..")]
+
+ def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]:
+ """
+ Inspects the mon_store and gathers entries for the `find_service_type`
+ (i.e. 'mgr', 'rgw') service.
+ Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec)
+ so we need to make the distinction.
+ """
+ specs = list()
+ self.log.debug(f"Checking for type {find_service_type}")
+ store = PersistentStoreDict(self, f'service_spec/{find_service_type}')
+ for service_type, json_specs in store.items():
+
+ if isinstance(json_specs, dict):
+ self.log.debug(f"Found dict in json_specs: No need to decode")
+ elif isinstance(json_specs, str):
+ self.log.debug(f"Found str in json_specs: Decoding from json")
+ json_specs = json.loads(json_specs)
+
+ self.log.debug(f"Found service_type: {service_type} in the k-v store. Adding..")
+ specs.append(ServiceSpec.from_json(json_specs))
+ self.log.debug(f"Found {specs} specs.")
+ return specs
+
+ def _apply_services(self) -> List[orchestrator.Completion]:
+ """
+ This is a method that is supposed to run continuously in the
+ server() thread.
+ It will initiate deployments based on the presence of a ServiceSpec
+ in the persistent mon_store.
+ There is a defined order in which the services should be deployed
+ Defined order:
+ # mon -> mgr -> osd -> monitoring -> mds -> rgw -> nfs -> iscsi -> rbd-mirror
+
+ Special cases:
+ * Mons scaling is currently not implemented.
+ * OSDs are daemons that are handled differently and may not fit in this paradigm
+
+ The serve() thread processes the completions serially, which ensures the adherence to
+ the defined order.
+ """
+
+ super_completions: List[orchestrator.Completion] = list()
+ super_completions.extend(self.trigger_deployment('mgr', self._apply_mgr))
+ super_completions.extend(self.trigger_deployment('prometheus', self._apply_prometheus))
+ super_completions.extend(self.trigger_deployment('node-exporter', self._apply_node_exporter))
+ super_completions.extend(self.trigger_deployment('mds', self._apply_mds))
+ super_completions.extend(self.trigger_deployment('rgw', self._apply_rgw))
+ super_completions.extend(self.trigger_deployment('rbd-mirror', self._apply_rbd_mirror))
+
+ # Not implemented
+
+ # super_completions.extend(trigger_deployment('mon', self._apply_mon))
+ # super_completions.extend(trigger_deployment('nfs', self._apply_nfs))
+ # super_completions.extend(trigger_deployment('grafana', self._apply_grafana))
+ # super_completions.extend(trigger_deployment('iscsi', self._apply_iscsi))
+
+ # Not implemented
+ return super_completions
+
class BaseScheduler(object):
"""
def test_mon_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- c = cephadm_module.add_mon(ServiceSpec(placement=ps))
+ c = cephadm_module.add_mon(ServiceSpec(placement=ps, service_type='mon'))
assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
with pytest.raises(OrchestratorError, match="is missing a network spec"):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_mon(ServiceSpec(placement=ps))
+ c = cephadm_module.add_mon(ServiceSpec(placement=ps, service_type='mon'))
wait(cephadm_module, c)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- c = cephadm_module.apply_mgr(ServiceSpec(placement=ps))
+ c = cephadm_module._apply_mgr(ServiceSpec(placement=ps, service_type='mgr'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed mgr.* on host 'test'")
def test_mds(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_mds(ServiceSpec('name', placement=ps))
+ c = cephadm_module.add_mds(ServiceSpec('name', placement=ps, service_type='mds'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed mds.name.* on host 'test'")
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
+ c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone.* on host 'test'")
with self._with_host(cephadm_module, 'host1'):
with self._with_host(cephadm_module, 'host2'):
ps = PlacementSpec(hosts=['host1'], count=1)
- c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps))
+ c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
- c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps))
+ c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'")
with self._with_host(cephadm_module, 'host1'):
with self._with_host(cephadm_module, 'host2'):
ps = PlacementSpec(hosts=['host1'], count=1)
- c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps))
+ c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host2'], count=1)
- c = cephadm_module.add_rgw(RGWSpec('realm', 'zone2', placement=ps))
+ c = cephadm_module.add_rgw(RGWSpec('realm', 'zone2', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone2.host2.* on host 'host2'")
with pytest.raises(OrchestratorError):
ps = PlacementSpec(hosts=['host1', 'host2'], count=3)
- c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps))
+ c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
[out] = wait(cephadm_module, c)
))
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
- def test_remove_daemon(self, _save_host, _rm_host, cephadm_module):
+ @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
+ def test_remove_daemon(self, _save_host, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
))
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
- def test_remove_service(self, _save_host, _rm_host, cephadm_module):
+ @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
+ def test_remove_service(self, _save_host, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
# type: (mock.Mock, mock.Mock, mock.Mock, mock.Mock, CephadmOrchestrator) -> None
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps))
+ c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps, service_type='rbd-mirror'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rbd-mirror.* on host 'test'")
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_prometheus(ServiceSpec(placement=ps))
+ c = cephadm_module.add_prometheus(ServiceSpec(placement=ps, service_type='prometheus'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed prometheus.* on host 'test'")
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps))
+ c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps, service_type='node-exporter'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed node-exporter.* on host 'test'")
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_grafana(ServiceSpec(placement=ps))
+ c = cephadm_module.add_grafana(ServiceSpec(placement=ps, service_type='grafana'))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed grafana.* on host 'test'")
def __init__(self, mgr, prefix):
# type: (MgrModule, str) -> None
self.mgr = mgr
- self.prefix = prefix + '.'
+ self.prefix = prefix + '/'
def _mk_store_key(self, key):
return self.prefix + key
self.__missing__(key)
return json.loads(val)
except (KeyError, AttributeError, IndexError, ValueError, TypeError):
- logging.getLogger(__name__).exception('failed to deserialize')
+ logging.getLogger(__name__).debug('failed to deserialize item in store')
self.mgr.set_store(key, None)
raise
import functools
import logging
import pickle
+import json
import sys
import time
from collections import namedtuple
"""
raise NotImplementedError()
+ def apply_service_config(self, spec_document: str) -> Completion:
+ """
+ Saves Service Specs from a yaml|json file
+ """
+ raise NotImplementedError()
+
def remove_daemons(self, names, force):
# type: (List[str], bool) -> Completion
"""
"""
raise NotImplementedError()
+ def list_specs(self):
+ # type: () -> Completion
+ """
+ Lists saved service specs
+ """
+ raise NotImplementedError()
+
+ def clear_all_specs(self):
+ # type: () -> Completion
+ """
+ Lists saved service specs
+ """
+ raise NotImplementedError()
+
def remove_service(self, service_name):
# type: (str) -> Completion
"""
else:
self.hosts = [HostPlacementSpec.parse(x, require_network=False) for x in hosts if x]
-
self.count = count # type: Optional[int]
self.all_hosts = all_hosts # type: bool
return self.name().startswith(service_name + '.')
return False
- def service_name(self):
+ def service_name(self, without_service_type=False):
if self.daemon_type == 'rgw':
v = self.daemon_id.split('.')
- return 'rgw.%s' % ('.'.join(v[0:2]))
+ s_name = '.'.join(v[0:2])
+ if without_service_type:
+ return s_name
+ return 'rgw.%s' % s_name
if self.daemon_type in ['mds', 'nfs']:
- return 'mds.%s' % (self.daemon_id.split('.')[0])
+ _s_name = self.daemon_id.split('.')[0]
+ if without_service_type:
+ return _s_name
+ return 'mds.%s' % _s_name
return self.daemon_type
def __repr__(self):
service_url=None,
last_refresh=None,
size=0,
+ spec_presence='absent',
running=0):
# Not everyone runs in containers, but enough people do to
# justify having the container_image_id (image hash) and container_image
# datetime when this info was last refreshed
self.last_refresh = last_refresh # type: Optional[datetime.datetime]
+ # status string to indicate the presence of a persistent servicespec
+ # possible strings are "absent", "present" and "not applicable" while
+ # the "not applicable" is mostly for OSDs.
+ self.spec_presence = spec_presence
+
def service_type(self):
if self.service_name:
return self.service_name.split('.')[0]
"""
- def __init__(self, name=None, placement=None):
- # type: (Optional[str], Optional[PlacementSpec]) -> None
+ def __init__(self, name=None, placement=None, service_type=None, count=None):
+ # type: (Optional[str], Optional[PlacementSpec], Optional[str], Optional[int]) -> None
self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
#: Give this set of stateless services a name: typically it would
#: within one ceph cluster. Note: Not all clusters have a name
self.name = name # type: Optional[str]
+ assert service_type
+ self.service_type = service_type
+
if self.placement is not None and self.placement.count is not None:
#: Count of service instances. Deprecated.
self.count = self.placement.count # type: int
else:
self.count = 1
+ @classmethod
+ def from_json(cls, json_spec: dict) -> "ServiceSpec":
+ """
+ Initialize 'ServiceSpec' object data from a json structure
+ :param json_spec: A valid dict with ServiceSpec
+ """
+ args: Dict[str, Dict[Any, Any]] = {}
+ service_type = json_spec.get('service_type', '')
+ assert service_type
+ if service_type == 'rgw':
+ _cls = RGWSpec # type: ignore
+ elif service_type == 'nfs':
+ _cls = NFSServiceSpec # type: ignore
+ else:
+ _cls = ServiceSpec # type: ignore
+ for k, v in json_spec.items():
+ if k == 'placement':
+ v = PlacementSpec.from_dict(v)
+ if k == 'spec':
+ args.update(v)
+ continue
+ args.update({k: v})
+ return _cls(**args) # type: ignore
+
+ def to_json(self):
+ return json.dumps(self, default=lambda o: o.__dict__,
+ sort_keys=True, indent=4)
+
+ def __repr__(self):
+ return "{}({!r})".format(self.__class__.__name__, self.__dict__)
+
def servicespec_validate_add(self: ServiceSpec):
# This must not be a method of ServiceSpec, otherwise you'll hunt
raise OrchestratorValidationError(m)
-
class NFSServiceSpec(ServiceSpec):
- def __init__(self, name, pool=None, namespace=None, placement=None):
- super(NFSServiceSpec, self).__init__(name, placement)
+ def __init__(self, name, pool=None, namespace=None, placement=None, service_type=None):
+ super(NFSServiceSpec, self).__init__(name=name, placement=placement, service_type=service_type)
#: RADOS pool where NFS client recovery data is stored.
self.pool = pool
rgw_realm, # type: str
rgw_zone, # type: str
placement=None,
+ service_type=None, # type: Optional[str]
+ name=None, # type: Optional[str]
hosts=None, # type: Optional[List[str]]
rgw_multisite=None, # type: Optional[bool]
rgw_zonemaster=None, # type: Optional[bool]
# in Rook itself. Thus we don't set any defaults here in this class.
super(RGWSpec, self).__init__(name=rgw_realm + '.' + rgw_zone,
- placement=placement)
+ placement=placement, service_type=service_type)
#: List of hosts where RGWs should run. Not for Rook.
if hosts:
@property
def rgw_multisite_endpoints_list(self):
return ",".join(["{}://{}:{}".format(self.rgw_multisite_proto,
- host,
- self.rgw_frontend_port) for host in self.placement.hosts])
+ host,
+ self.rgw_frontend_port) for host in self.placement.hosts])
def genkey(self, nchars):
""" Returns a random string of nchars
string.ascii_lowercase +
string.digits) for _ in range(nchars))
- @classmethod
- def from_json(cls, json_rgw_spec):
- # type: (dict) -> RGWSpec
- """
- Initialize 'RGWSpec' object data from a json structure
- :param json_rgw_spec: A valid dict with a the RGW settings
- """
- # TODO: also add PlacementSpec(**json_rgw_spec['placement'])
- args = {k:v for k, v in json_rgw_spec.items()}
- return RGWSpec(**args)
-
class InventoryFilter(object):
"""
else:
now = datetime.datetime.utcnow()
table = PrettyTable(
- ['NAME', 'RUNNING', 'REFRESHED', 'IMAGE NAME', 'IMAGE ID'],
+ ['NAME', 'RUNNING', 'REFRESHED', 'IMAGE NAME', 'IMAGE ID', 'SPEC'],
border=False)
table.align['NAME'] = 'l'
table.align['RUNNING'] = 'r'
'%d/%d' % (s.running, s.size),
age,
ukn(s.container_image_name),
- ukn(s.container_image_id)[0:12]))
+ ukn(s.container_image_id)[0:12],
+ ukn(s.spec_presence)
+ ))
return HandleCommandResult(stdout=table.get_string())
placement = PlacementSpec(label=label, count=num, hosts=hosts)
placement.validate()
- spec = ServiceSpec(placement=placement)
+ spec = ServiceSpec(placement=placement, service_type='mon')
completion = self.add_mon(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
+ @_cli_write_command(
+ 'orch apply',
+ desc='Applies a Service Specification from a file. ceph orch apply -i $file')
+ def _apply_services(self, inbuf):
+ completion = self.apply_service_config(inbuf)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
@_cli_write_command(
'orch daemon add rbd-mirror',
"name=num,type=CephInt,req=false "
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
+ @_cli_write_command(
+ 'orch spec dump',
+ desc='List all Service specs')
+ def _get_service_specs(self):
+ completion = self.list_specs()
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ specs = completion.result_str()
+ return HandleCommandResult(stdout=specs)
+
+ @_cli_write_command(
+ 'orch servicespecs clear',
+ desc='Clear all Service specs')
+ def _clear_service_specs(self):
+ completion = self.clear_all_specs()
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+
@_cli_write_command(
'orch apply mgr',
"name=num,type=CephInt,req=false "
label=label, count=num, hosts=hosts)
placement.validate()
- spec = ServiceSpec(placement=placement)
+ spec = ServiceSpec(placement=placement, service_type='mgr')
completion = self.apply_mgr(spec)
self._orchestrator_wait([completion])
def _apply_mds(self, fs_name, num=None, label=None, hosts=[]):
placement = PlacementSpec(label=label, count=num, hosts=hosts)
placement.validate()
-
spec = ServiceSpec(
- fs_name,
+ service_type='mds',
+ name=fs_name,
placement=placement)
-
completion = self.apply_mds(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
'Update the number of rbd-mirror instances')
def _apply_rbd_mirror(self, num, label=None, hosts=[]):
spec = ServiceSpec(
- placement=PlacementSpec(hosts=hosts, count=num, label=label))
+ placement=PlacementSpec(hosts=hosts, count=num, label=label),
+ service_type='rbd-mirror')
completion = self.apply_rbd_mirror(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
'Update the number of RGW instances for the given zone')
def _apply_rgw(self, zone_name, realm_name, num=None, label=None, hosts=[]):
spec = RGWSpec(
+ service_type='rgw',
rgw_realm=realm_name,
rgw_zone=zone_name,
placement=PlacementSpec(hosts=hosts, label=label, count=num))
# type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult
spec = ServiceSpec(
placement=PlacementSpec(label=label, hosts=hosts, count=num),
+ service_type='prometheus'
)
completion = self.apply_prometheus(spec)
self._orchestrator_wait([completion])
# type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult
spec = ServiceSpec(
placement=PlacementSpec(label=label, hosts=hosts, count=num),
+ service_type='node-exporter'
)
completion = self.apply_node_exporter(spec)
self._orchestrator_wait([completion])
"""
{
"rgw_zone": "zonename",
+ "service_type": "rgw",
"rgw_frontend_port": 8080,
"rgw_zonegroup": "group",
"rgw_zone_user": "user",