self.cert_mgr.register_cert_key_pair('iscsi', 'iscsi_ssl_cert', 'iscsi_ssl_key', TLSObjectScope.SERVICE)
self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_server_cert', 'nvmeof_server_key', TLSObjectScope.SERVICE)
self.cert_mgr.register_cert_key_pair('nvmeof', 'nvmeof_client_cert', 'nvmeof_client_key', TLSObjectScope.SERVICE)
+
self.cert_mgr.register_cert('nvmeof', 'nvmeof_root_ca_cert', TLSObjectScope.SERVICE)
self.cert_mgr.register_cert('rgw', 'rgw_frontend_ssl_cert', TLSObjectScope.SERVICE)
self.cert_mgr.register_key('nvmeof', 'nvmeof_encryption_key', TLSObjectScope.SERVICE)
nvmeof_server_key = 'nvmeof-server-key'
nvmeof_client_key = 'nvmeof-client-key'
nvmeof_encryption_key = 'nvmeof-encryption-key'
+ unknown_cert_entity = 'unknown_per_service_cert'
+ unknown_cert_key = 'unknown_per_service_key'
def _fake_prefix_store(key):
if key == 'cert_store.cert.':
f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()}),
f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()}),
f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()}),
+ f'{TLSOBJECT_STORE_CERT_PREFIX}{unknown_cert_entity}': json.dumps({'unkonwn.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}),
}
elif key == 'cert_store.key.':
return {
f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}),
f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}),
f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()}),
+ f'{TLSOBJECT_STORE_KEY_PREFIX}{unknown_cert_key}': json.dumps({'unkonwn.foo': PrivKey(nvmeof_encryption_key).to_json()}),
}
else:
raise Exception(f'Get store with unexpected value {key}')
_get_store_prefix.side_effect = _fake_prefix_store
- cephadm_module.cert_mgr.load()
+ cephadm_module._init_cert_mgr()
+
assert cephadm_module.cert_mgr.cert_store.known_entities['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True)
assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_server_cert']['nvmeof.foo'] == Cert(nvmeof_server_cert, True)
assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_client_cert']['nvmeof.foo'] == Cert(nvmeof_client_cert, True)
assert cephadm_module.cert_mgr.cert_store.known_entities['nvmeof_root_ca_cert']['nvmeof.foo'] == Cert(nvmeof_root_ca_cert, True)
assert cephadm_module.cert_mgr.key_store.known_entities['grafana_key']['host1'] == PrivKey(grafana_host1_key)
+ assert unknown_cert_entity not in cephadm_module.cert_mgr.cert_store.known_entities
+
assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key)
assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key)
assert cephadm_module.cert_mgr.key_store.known_entities['nvmeof_encryption_key']['nvmeof.foo'] == PrivKey(nvmeof_encryption_key)
+ assert unknown_cert_key not in cephadm_module.cert_mgr.key_store.known_entities
def test_tlsobject_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
self.known_entities: Dict[str, Any] = {key: {} for key in all_known_entities}
self.per_service_name_tlsobjects = known_entities[TLSObjectScope.SERVICE]
self.per_host_tlsobjects = known_entities[TLSObjectScope.HOST]
+ self.global_tlsobjects = known_entities[TLSObjectScope.GLOBAL]
self.store_prefix = f'{TLSOBJECT_STORE_PREFIX}{tlsobject_class.STORAGE_PREFIX}.'
def determine_tlsobject_target(self, entity: str, target: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
return TLSObjectScope.SERVICE, service_name
elif entity in self.per_host_tlsobjects:
return TLSObjectScope.HOST, host
- else:
+ elif entity in self.global_tlsobjects:
return TLSObjectScope.GLOBAL, None
+ else:
+ return TLSObjectScope.UNKNOWN, None
def get_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
self._validate_tlsobject_entity(entity, service_name, host)
key: self.tlsobject_class.to_json(self.known_entities[entity][key])
for key in self.known_entities[entity]
}
- else:
+ self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ elif scope == TLSObjectScope.GLOBAL:
self.known_entities[entity] = tlsobject
j = self.tlsobject_class.to_json(tlsobject)
-
- self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ else:
+ logger.error(f'Trying to save entity {entity} with a not-supported/unknown TLSObjectScope scope {scope.value}')
def rm_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
"""Remove a tlsobjectificate for a specific entity, service, or host."""
def load(self) -> None:
for k, v in self.mgr.get_store_prefix(self.store_prefix).items():
entity = k[len(self.store_prefix):]
+ if entity not in self.known_entities:
+ logger.warning(f"TLSObjectStore: Discarding unkown entity '{entity}'")
+ continue
entity_targets = json.loads(v)
- self.known_entities[entity] = {}
if entity in self.per_service_name_tlsobjects or entity in self.per_host_tlsobjects:
+ self.known_entities[entity] = {}
for target in entity_targets:
tlsobject = self.tlsobject_class.from_json(entity_targets[target])
if tlsobject:
self.known_entities[entity][target] = tlsobject
- else:
+ elif entity in self.global_tlsobjects:
tlsobject = self.tlsobject_class.from_json(entity_targets)
if tlsobject:
self.known_entities[entity] = tlsobject
+ else:
+ logger.error(f"TLSObjectStore: Found a known entity {entity} with unknown scope!")