import shutil
import subprocess
import uuid
-from mgr_module import PersistentStoreDict
from ceph.deployment import inventory, translate
from ceph.deployment.drive_group import DriveGroupSpec
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
HOST_CACHE_PREFIX = "host."
+SPEC_STORE_PREFIX = "spec."
# for py2 compat
try:
raise OrchestratorError(e)
+class SpecStore():
+ def __init__(self, mgr):
+ # type: (CephadmOrchestrator) -> None
+ self.mgr = mgr
+ self.specs = {} # type: Dict[str, orchestrator.ServiceSpec]
+
+ def load(self):
+ # type: () -> None
+ for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
+ service_name = k[len(SPEC_STORE_PREFIX):]
+ try:
+ spec = ServiceSpec.from_json(json.loads(v))
+ self.specs[service_name] = spec
+ self.mgr.log.debug('SpecStore: loaded spec for %s' % (
+ service_name))
+ except Exception as e:
+ self.mgr.log.warning('unable to load spec for %s: %s' % (
+ service_name, e))
+ pass
+
+ def save(self, spec):
+ # type: (orchestrator.ServiceSpec) -> None
+ self.specs[spec.service_name()] = spec
+ self.mgr.set_store(SPEC_STORE_PREFIX + spec.service_name(),
+ spec.to_json())
+
+ def rm(self, service_name):
+ # type: (str) -> None
+ if service_name in self.specs:
+ del self.specs[service_name]
+ self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
+
+ def find(self, service_name):
+ # type: (str) -> List[orchestrator.ServiceSpec]
+ specs = []
+ for sn, spec in self.specs.items():
+ if sn == service_name or sn.startswith(service_name + '.'):
+ specs.append(spec)
+ return specs
+
class HostCache():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.osd_removal_report: dict = dict()
self.rm_util = RemoveUtil(self)
- self.service_spec_store = PersistentStoreDict(self, 'service_spec')
+ self.spec_store = SpecStore(self)
+ self.spec_store.load()
# ensure the host lists are in sync
for h in self.inventory.keys():
n: str = dd.service_name()
if service_name and service_name != n:
continue
- try:
- _ = self.service_spec_store[dd.service_name()]
+ if dd.service_name() in self.spec_store.specs:
spec_presence = "present"
- except IndexError:
+ else:
spec_presence = "absent"
if dd.daemon_type == 'osd':
spec_presence = "not applicable"
args.append(
(d.name(), d.hostname, force)
)
- self._remove_key_from_store(d.service_name())
+ self.spec_store.rm(d.service_name())
if not args:
raise OrchestratorError('Unable to find daemons in %s service' % (
service_name))
service_name, [a[0] for a in args]))
return self._remove_daemon(args)
- def _remove_key_from_store(self, spec_name: str) -> None:
- self.log.debug(f"Removing {spec_name} from the service_spec store")
- del self.service_spec_store[spec_name]
-
def get_inventory(self, host_filter=None, refresh=False):
"""
Return the storage inventory of hosts matching the given filter.
return self._add_daemon('mgr', spec, self._create_mgr)
def apply_mgr(self, spec):
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled MGR creation..")
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled MDS creation..")
return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
def apply_rgw(self, spec):
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled RGW creation..")
keyring=keyring)
def apply_rbd_mirror(self, spec):
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled rbd-mirror creation..")
return self._apply_service('prometheus', spec, self._create_prometheus)
def apply_prometheus(self, spec):
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled prometheus creation..")
self._create_node_exporter)
def apply_node_exporter(self, spec):
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled node-exporter creation..")
"""
return trivial_result(self.osd_removal_report)
- def save_spec(self, spec: ServiceSpec) -> None:
- """
- Attempts to save a ServiceSpec.
-
- There are two ways to manipulate the service_specs stored in the mon_store
- 1) Using a global definition (i.e. `ceph orch apply -i <specs_file.yaml>`)
- This will usually contain multiple definitions of services and daemons.
-
- 2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`)
-
- Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources.
- """
- full_spec_name = f"{spec.service_type}.{spec.name}"
- try:
- _ = self.service_spec_store[full_spec_name]
- raise orchestrator.OrchestratorError(f"Specification for {full_spec_name} already exists. "
- "Please review your existing specs with "
- "`ceph orch spec dump` and try again.")
- except IndexError:
- self.log.info(f"New spec found. Saving <{full_spec_name}> to the store.")
- _ = self.service_spec_store[full_spec_name] = spec.to_json()
-
def list_specs(self) -> orchestrator.Completion:
"""
Loads all entries from the service_spec mon_store root.
"""
specs = list()
- for _, spec in self.service_spec_store.items():
+ for service_name, spec in self.spec_store.specs.items():
specs.append('---')
specs.append(yaml.dump(spec))
return trivial_result(specs)
Parse a multi document yaml file (represented in a inbuf object)
and loads it with it's respective ServiceSpec to validate the
initial input.
- If no errors are raised `save_spec` is called.
+ If no errors are raised, save them.
"""
content: Iterator[Any] = yaml.load_all(spec_document)
# Load all specs from a multi document yaml file.
spec_o = ServiceSpec.from_json(spec)
loaded_specs.append(spec_o)
for spec in loaded_specs:
- self.save_spec(spec)
+ self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("ServiceSpecs saved")
- def trigger_deployment(self, service_name: str,
+ def trigger_deployment(self,
+ service_name: str,
func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
"""
Triggers a corresponding deployment method `func` to `service_name`
Services can have multiple entries. (i.e. different RGW configurations)
"""
self.log.debug(f"starting async {service_name} deployment")
- specs = self.find_json_specs(service_name)
+ specs = self.spec_store.find(service_name)
completions = list()
for spec in specs:
completions.append(func(spec))
return completions
return [trivial_result("Nothing to do..")]
- def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]:
- """
- Inspects the mon_store and gathers entries for the `find_service_type`
- (i.e. 'mgr', 'rgw') service.
- Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec)
- so we need to make the distinction.
- """
- specs = list()
- self.log.debug(f"Checking for type {find_service_type}")
- for spec_key, json_specs in self.service_spec_store.items():
- if not spec_key.split('.')[0].startswith(find_service_type):
- continue
- if isinstance(json_specs, dict):
- self.log.debug(f"Found dict in json_specs: No need to decode")
- elif isinstance(json_specs, str):
- self.log.debug(f"Found str in json_specs: Decoding from json")
- json_specs = json.loads(json_specs)
-
- self.log.debug(f"Found service_type: {spec_key} in the k-v store. Adding..")
- specs.append(ServiceSpec.from_json(json_specs))
- self.log.debug(f"Found {specs} specs.")
- return specs
-
def _apply_services(self) -> List[orchestrator.Completion]:
"""
This is a method that is supposed to run continuously in the
))
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
- def test_remove_daemon(self, rm_key, _rm_host, _save_spec, cephadm_module):
+ def test_remove_daemon(self, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
))
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
- def test_remove_service(self, _rm_key, _rm_host, _save_spec, cephadm_module):
+ @mock.patch("cephadm.module.SpecStore.rm")
+ def test_remove_service(self, _rm_spec, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_mgr_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_mds_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_rgw_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_rbd_mirror_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_prometheus_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_node_exporter_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
- @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+ @mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.yaml.load_all", return_value=[{'service_type': 'rgw', 'placement': {'count': 1}, 'spec': {'rgw_realm': 'realm1', 'rgw_zone': 'zone1'}}])
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
- def test_trigger_deployment_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ @mock.patch("cephadm.module.SpecStore.find")
+ def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- _find_json_spec.return_value = ['something']
+ _find.return_value = ['something']
c = cephadm_module.trigger_deployment('foo', lambda x: x)
- _find_json_spec.assert_called_with('foo')
+ _find.assert_called_with('foo')
assert c == ['something']
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
- @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
- def test_trigger_deployment_no_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+ @mock.patch("cephadm.module.SpecStore.find")
+ def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
- _find_json_spec.return_value = []
+ _find.return_value = []
c = cephadm_module.trigger_deployment('foo', lambda x: x)
- _find_json_spec.assert_called_with('foo')
+ _find.assert_called_with('foo')
assert wait(cephadm_module, c[0]) == 'Nothing to do..'
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))