import orchestrator
from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
+from ceph.deployment.service_spec import (
+ ServiceSpec,
+ PlacementSpec,
+ TunedProfileSpec,
+ IngressSpec,
+ RGWSpec,
+)
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
if update_create:
self.spec_created[name] = datetime_now()
self._save(name)
+ self._save_certs_and_keys(spec)
def save_rank_map(self,
name: str,
OrchestratorEvent.INFO,
'service was created')
+ def _save_certs_and_keys(self, spec: ServiceSpec) -> None:
+ if spec.service_type == 'rgw':
+ rgw_spec = cast(RGWSpec, spec)
+ if rgw_spec.rgw_frontend_ssl_certificate:
+ rgw_cert: Union[str, List[str]] = rgw_spec.rgw_frontend_ssl_certificate
+ if isinstance(rgw_cert, list):
+ cert_str = '\n'.join(rgw_cert)
+ else:
+ cert_str = rgw_cert
+ assert isinstance(cert_str, str)
+ self.mgr.cert_key_store.save_cert(
+ 'rgw_frontend_ssl_cert',
+ cert_str,
+ service_name=rgw_spec.service_name(),
+ user_made=True)
+
def rm(self, service_name: str) -> bool:
if service_name not in self._specs:
return False
# type: (str) -> bool
found = service_name in self._specs
if found:
+ self._rm_certs_and_keys(self._specs[service_name])
del self._specs[service_name]
if service_name in self._rank_maps:
del self._rank_maps[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
return found
+ def _rm_certs_and_keys(self, spec: ServiceSpec) -> None:
+ if spec.service_type == 'rgw':
+ self.mgr.cert_key_store.rm_cert('rgw_frontend_ssl_cert', service_name=spec.service_name())
+
def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
return self.spec_created.get(spec.service_name())