spec.service_name(), spec, e))
return r
+ def _check_pool_exists(self, pool, service_name):
+ logger.info(f'Checking pool "{pool}" exists for service {service_name}')
+ if not self.rados.pool_exists(pool):
+ raise OrchestratorError(f'Cannot find pool "{pool}" for '
+ f'service {service_name}')
+
def _check_daemons(self):
# get monmap mtime so we can refresh configs when mons change
monmap = self.get('mon_map')
return self._add_daemon('iscsi', spec, self._create_iscsi, self._config_iscsi)
def _config_iscsi(self, spec):
+ self._check_pool_exists(spec.pool, spec.service_name())
+
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.spec_store.save(spec)
return self._add_daemon('nfs', spec, self._create_nfs, self._config_nfs)
def _config_nfs(self, spec):
+ self._check_pool_exists(spec.pool, spec.service_name())
+
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.spec_store.save(spec)
match_glob(out, "Deployed nfs.name.* on host 'test'")
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_iscsi(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)