import pytest
import json
from tests import mock
+import logging
from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol
from cephadm.tlsobject_store import TLSOBJECT_STORE_PREFIX, TLSObjectStore, TLSObjectScope
@mock.patch("cephadm.module.CephadmOrchestrator.set_store")
def test_tlsobject_store_save_cert(self, _set_store, cephadm_module: CephadmOrchestrator):
+ cephadm_module._init_cert_mgr()
+
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
nvmeof_client_cert = 'fake-nvmeof-client-cert'
nvmeof_server_cert = 'fake-nvmeof-server-cert'
nvmeof_root_ca_cert = 'fake-nvmeof-root-ca-cert'
grafana_cert_host_1 = 'grafana-cert-host-1'
grafana_cert_host_2 = 'grafana-cert-host-2'
- cephadm_module.cert_mgr.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_root_ca_cert', nvmeof_root_ca_cert, service_name='nvmeof.foo', user_made=True)
- cephadm_module.cert_mgr.save_cert('grafana_cert', grafana_cert_host_1, host='host-1', user_made=True)
- cephadm_module.cert_mgr.save_cert('grafana_cert', grafana_cert_host_2, host='host-2', user_made=True)
+ cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', grafana_cert_host_1, host='host-1', user_made=True)
+ cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', grafana_cert_host_2, host='host-2', user_made=True)
expected_calls = [
- mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
+ mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()})),
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()})),
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()})),
- mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json()})),
- mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json(),
- 'host-2': Cert(grafana_cert_host_2, True).to_json()}))
+ mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_ssl_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json()})),
+ mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_ssl_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json(),
+ 'host-2': Cert(grafana_cert_host_2, True).to_json()}))
]
_set_store.assert_has_calls(expected_calls)
compare_certls_dicts(expected_ls)
# Services with sevice_name target/scope
- cephadm_module.cert_mgr.save_cert('rgw_frontend_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='rgw.foo', user_made=True)
- cephadm_module.cert_mgr.save_cert('rgw_frontend_ssl_cert', CEPHADM_SELF_GENERATED_CERT_2, service_name='rgw.bar', user_made=True)
- expected_ls["rgw_frontend_ssl_cert"] = {
+ cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='rgw.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', CEPHADM_SELF_GENERATED_CERT_2, service_name='rgw.bar', user_made=True)
+ expected_ls["rgw_ssl_cert"] = {
"scope": "service",
"certificates": {
"rgw.foo": get_generated_cephadm_cert_info_1(),
compare_certls_dicts(expected_ls)
# Services with host target/scope
- cephadm_module.cert_mgr.save_cert('grafana_cert', CEPHADM_SELF_GENERATED_CERT_1, host='host1', user_made=True)
- cephadm_module.cert_mgr.save_cert('grafana_cert', CEPHADM_SELF_GENERATED_CERT_2, host='host2', user_made=True)
- expected_ls['grafana_cert'] = {
+ cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, host='host1', user_made=True)
+ cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', CEPHADM_SELF_GENERATED_CERT_2, host='host2', user_made=True)
+ cephadm_module.cert_mgr.save_cert('oauth2_proxy_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, host='host1', user_made=True)
+ expected_ls['grafana_ssl_cert'] = {
'scope': 'host',
'certificates': {
'host1': get_generated_cephadm_cert_info_1(),
'host2': get_generated_cephadm_cert_info_2(),
},
}
+ expected_ls['oauth2_proxy_ssl_cert'] = {
+ 'scope': 'host',
+ 'certificates': {
+ 'host1': get_generated_cephadm_cert_info_1(),
+ }
+ }
compare_certls_dicts(expected_ls)
# Services with global target/scope
- cephadm_module.cert_mgr.save_cert('mgmt_gw_cert', CEPHADM_SELF_GENERATED_CERT_1, user_made=True)
- cephadm_module.cert_mgr.save_cert('oauth2_proxy_cert', CEPHADM_SELF_GENERATED_CERT_2, user_made=True)
- expected_ls['mgmt_gw_cert'] = {'scope': 'global', 'certificates': get_generated_cephadm_cert_info_1()}
- expected_ls['oauth2_proxy_cert'] = {'scope': 'global', 'certificates': get_generated_cephadm_cert_info_2()}
+ cephadm_module.cert_mgr.save_cert('mgmt_gateway_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, user_made=True)
+ expected_ls['mgmt_gateway_ssl_cert'] = {'scope': 'global', 'certificates': get_generated_cephadm_cert_info_1()}
compare_certls_dicts(expected_ls)
# nvmeof certificates
nvmeof_client_key = 'nvmeof-client-key'
nvmeof_server_key = 'nvmeof-server-key'
nvmeof_encryption_key = 'nvmeof-encryption-key'
- cephadm_module.cert_mgr.save_key('grafana_key', grafana_host1_key, host='host1')
- cephadm_module.cert_mgr.save_key('grafana_key', grafana_host2_key, host='host2')
+ cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
+ cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host2_key, host='host2')
cephadm_module.cert_mgr.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
cephadm_module.cert_mgr.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
expected_calls = [
- mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
- mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json(),
- 'host2': PrivKey(grafana_host2_key).to_json()})),
+ mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_ssl_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
+ mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_ssl_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json(),
+ 'host2': PrivKey(grafana_host2_key).to_json()})),
mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()})),
mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()})),
mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()})),
# Define certs and keys with their corresponding scopes
certs = {
- 'rgw_frontend_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE),
+ 'rgw_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE),
'nvmeof_server_cert': ('nvmeof.foo', 'nvmeof-server-cert', TLSObjectScope.SERVICE),
'nvmeof_client_cert': ('nvmeof.foo', 'nvmeof-client-cert', TLSObjectScope.SERVICE),
'nvmeof_root_ca_cert': ('nvmeof.foo', 'nvmeof-root-ca-cert', TLSObjectScope.SERVICE),
'ingress_ssl_cert': ('ingress', 'ingress-ssl-cert', TLSObjectScope.SERVICE),
'iscsi_ssl_cert': ('iscsi', 'iscsi-ssl-cert', TLSObjectScope.SERVICE),
- 'grafana_cert': ('host1', 'grafana-cert', TLSObjectScope.HOST),
- 'mgmt_gw_cert': ('mgmt-gateway', 'mgmt-gw-cert', TLSObjectScope.GLOBAL),
- 'oauth2_proxy_cert': ('oauth2-proxy', 'oauth2-proxy-cert', TLSObjectScope.GLOBAL),
+ 'grafana_ssl_cert': ('host1', 'grafana-cert', TLSObjectScope.HOST),
+ 'oauth2_proxy_ssl_cert': ('host1', 'oauth2-proxy', TLSObjectScope.HOST),
+ 'mgmt_gateway_ssl_cert': ('mgmt-gateway', 'mgmt-gw-cert', TLSObjectScope.GLOBAL),
}
unknown_certs = {
'unknown_per_service_cert': ('unknown-svc.foo', 'unknown-cert', TLSObjectScope.SERVICE),
}
keys = {
- 'grafana_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST),
+ 'grafana_ssl_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST),
'nvmeof_server_key': ('nvmeof.foo', 'nvmeof-server-key', TLSObjectScope.SERVICE),
'nvmeof_client_key': ('nvmeof.foo', 'nvmeof-client-key', TLSObjectScope.SERVICE),
'nvmeof_encryption_key': ('nvmeof.foo', 'nvmeof-encryption-key', TLSObjectScope.SERVICE),
- 'mgmt_gw_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL),
- 'oauth2_proxy_key': ('oauth2-proxy', 'oauth2-proxy-key', TLSObjectScope.GLOBAL),
+ 'mgmt_gateway_ssl_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL),
+ 'oauth2_proxy_ssl_key': ('host1', 'oauth2-proxy', TLSObjectScope.HOST),
'ingress_ssl_key': ('ingress', 'ingress-ssl-key', TLSObjectScope.SERVICE),
'iscsi_ssl_key': ('iscsi', 'iscsi-ssl-key', TLSObjectScope.SERVICE),
}
# Validate certificates in cert_store
for cert_name, (target, cert_value, scope) in certs.items():
- assert cert_name in cephadm_module.cert_mgr.cert_store.known_entities
+ assert cert_name in cephadm_module.cert_mgr.cert_store.objects_by_name
if scope == TLSObjectScope.GLOBAL:
- assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name] == Cert(cert_value, True)
+ assert cephadm_module.cert_mgr.cert_store.objects_by_name[cert_name] == Cert(cert_value, True)
else:
- assert cephadm_module.cert_mgr.cert_store.known_entities[cert_name][target] == Cert(cert_value, True)
+ assert cephadm_module.cert_mgr.cert_store.objects_by_name[cert_name][target] == Cert(cert_value, True)
# Validate keys in key_store
for key_name, (target, key_value, scope) in keys.items():
- assert key_name in cephadm_module.cert_mgr.key_store.known_entities
+ assert key_name in cephadm_module.cert_mgr.key_store.objects_by_name
if scope == TLSObjectScope.GLOBAL:
- assert cephadm_module.cert_mgr.key_store.known_entities[key_name] == PrivKey(key_value)
+ assert cephadm_module.cert_mgr.key_store.objects_by_name[key_name] == PrivKey(key_value)
else:
- assert cephadm_module.cert_mgr.key_store.known_entities[key_name][target] == PrivKey(key_value)
+ assert cephadm_module.cert_mgr.key_store.objects_by_name[key_name][target] == PrivKey(key_value)
# Check unknown certificates are not loaded
for unknown_cert in unknown_certs:
- assert unknown_cert not in cephadm_module.cert_mgr.cert_store.known_entities
+ assert unknown_cert not in cephadm_module.cert_mgr.cert_store.objects_by_name
# Check unknown keys are not loaded
for unknown_key in unknown_keys:
- assert unknown_key not in cephadm_module.cert_mgr.key_store.known_entities
+ assert unknown_key not in cephadm_module.cert_mgr.key_store.objects_by_name
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
+ def test_tlsobject_store_load_with_malformed_json_for_known_objects(
+ self, _get_store_prefix, cephadm_module: CephadmOrchestrator, caplog
+ ):
+ """
+ Use known TLS object names (so they pass the unknown-name guard) but provide
+ malformed JSON values to ensure the loader hits the json.JSONDecodeError path,
+ logs a warning, and does not add any targets for those objects.
+ """
+
+ # Known-good entries (SERVICE + GLOBAL)
+ good_certs = {
+ 'rgw_ssl_cert': ('rgw.foo', 'good-cert', TLSObjectScope.SERVICE),
+ 'mgmt_gateway_ssl_cert': ('mgmt-gateway', 'good-global-cert', TLSObjectScope.GLOBAL),
+ }
+ good_keys = {
+ 'rgw_ssl_key': ('rgw.foo', 'good-key', TLSObjectScope.SERVICE),
+ }
+
+ # Helpers to dump valid JSON structures
+ def _dump_cert(target, value, scope):
+ return json.dumps({target: Cert(value, True).to_json()} if scope != TLSObjectScope.GLOBAL
+ else Cert(value, True).to_json())
+
+ def _dump_key(target, value, scope):
+ return json.dumps({target: PrivKey(value).to_json()} if scope != TLSObjectScope.GLOBAL
+ else PrivKey(value).to_json())
+
+ def _fake_prefix_store(prefix):
+ store = {}
+ if prefix == 'cert_store.cert.':
+ # Good certs
+ for name, (target, val, scope) in good_certs.items():
+ store[f'{TLSOBJECT_STORE_CERT_PREFIX}{name}'] = _dump_cert(target, val, scope)
+ # Malformed for known names (must hit JSONDecodeError path)
+ store[f'{TLSOBJECT_STORE_CERT_PREFIX}ingress_ssl_cert'] = "" # empty
+ store[f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_ssl_cert'] = "{not json" # broken
+ return store
+
+ if prefix == 'cert_store.key.':
+ # Good keys
+ for name, (target, val, scope) in good_keys.items():
+ store[f'{TLSOBJECT_STORE_KEY_PREFIX}{name}'] = _dump_key(target, val, scope)
+ # Malformed for known names
+ store[f'{TLSOBJECT_STORE_KEY_PREFIX}iscsi_ssl_key'] = " " # whitespace
+ store[f'{TLSOBJECT_STORE_KEY_PREFIX}mgmt_gateway_ssl_key'] = (
+ "-----BEGIN PRIVATE KEY-----\nSNIP\n-----END PRIVATE KEY-----" # raw PEM
+ )
+ return store
+
+ raise Exception(f'Unexpected key access in store: {prefix}')
+
+ _get_store_prefix.side_effect = _fake_prefix_store
+
+ with caplog.at_level(logging.WARNING):
+ cephadm_module._init_cert_mgr()
+
+ cert_store = cephadm_module.cert_mgr.cert_store.objects_by_name
+ key_store = cephadm_module.cert_mgr.key_store.objects_by_name
+
+ # Good entries loaded correctly
+ assert 'rgw_ssl_cert' in cert_store
+ assert cert_store['rgw_ssl_cert']['rgw.foo'] == Cert('good-cert', True)
+ assert 'mgmt_gateway_ssl_cert' in cert_store
+ assert cert_store['mgmt_gateway_ssl_cert'] == Cert('good-global-cert', True)
+ assert 'rgw_ssl_key' in key_store
+ assert key_store['rgw_ssl_key']['rgw.foo'] == PrivKey('good-key')
+
+ # Bad ones: object names exist (pre-registered), but **no targets** were added
+ # Service / Host scoped => dict should be empty
+ assert 'ingress_ssl_cert' in cert_store
+ assert isinstance(cert_store['ingress_ssl_cert'], dict) and len(cert_store['ingress_ssl_cert']) == 0
+ assert 'grafana_ssl_cert' in cert_store
+ assert isinstance(cert_store['grafana_ssl_cert'], dict) and len(cert_store['grafana_ssl_cert']) == 0
+ assert 'iscsi_ssl_key' in key_store
+ assert isinstance(key_store['iscsi_ssl_key'], dict) and len(key_store['iscsi_ssl_key']) == 0
+
+ # Global-scoped key with bad JSON should also not be instantiated (no PrivKey object)
+ # Depending on how you seed known names, it might be absent OR present but falsy.
+ # Accept either: absent OR not a PrivKey instance.
+ if 'mgmt_gateway_ssl_key' in key_store:
+ assert not isinstance(key_store['mgmt_gateway_ssl_key'], PrivKey)
+ else:
+ assert 'mgmt_gateway_ssl_key' not in key_store
+
+ messages = [r.getMessage() for r in caplog.records
+ if r.levelno >= logging.WARNING and r.name == "cephadm.tlsobject_store"]
+
+ # also ensure we didn't take the unknown-name shortcut
+ for name in ("ingress_ssl_cert", "grafana_ssl_cert", "iscsi_ssl_key", "mgmt_gateway_ssl_key"):
+ assert not any(f"Discarding unknown obj_name '{name}'" in m for m in messages)
+
+ # cert warnings
+ for name in ("ingress_ssl_cert", "grafana_ssl_cert"):
+ assert any(f"Cannot parse JSON for '{name}'" in m for m in messages)
+
+ # key warnings
+ for name in ("iscsi_ssl_key", "mgmt_gateway_ssl_key"):
+ assert any(f"Cannot parse JSON for '{name}'" in m for m in messages)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
+ def test_tlsobject_store_discards_unknown_entries(
+ self, _get_store_prefix, cephadm_module: CephadmOrchestrator, caplog
+ ):
+ """
+ Unknown (not registered) TLS object names must be discarded before parsing,
+ and a warning must be logged for each. Known entries still load normally.
+ """
+
+ # One known-good entry for each store so init proceeds normally
+ known_cert = ('rgw_ssl_cert', 'rgw.foo', 'known-cert', TLSObjectScope.SERVICE)
+ known_key = ('rgw_ssl_key', 'rgw.foo', 'known-key', TLSObjectScope.SERVICE)
+
+ # Unknown names (not in objects_by_name; also not cephadm-signed)
+ unknown_cert_names = [
+ 'totally_unknown_cert_service',
+ 'totally_unknown_cert_host',
+ 'totally_unknown_cert_global',
+ ]
+ unknown_key_names = [
+ 'totally_unknown_key_service',
+ 'totally_unknown_key_host',
+ 'totally_unknown_key_global',
+ ]
+
+ def _dump_cert(target, value, scope):
+ return json.dumps(
+ {target: Cert(value, True).to_json()} if scope != TLSObjectScope.GLOBAL
+ else Cert(value, True).to_json()
+ )
+
+ def _dump_key(target, value, scope):
+ return json.dumps(
+ {target: PrivKey(value).to_json()} if scope != TLSObjectScope.GLOBAL
+ else PrivKey(value).to_json()
+ )
+
+ def _fake_prefix_store(prefix):
+ if prefix == 'cert_store.cert.':
+ store = {
+ f'{TLSOBJECT_STORE_CERT_PREFIX}{known_cert[0]}': _dump_cert(known_cert[1], known_cert[2], known_cert[3]),
+ # Unknowns use valid JSON so the test proves we short-circuited *before* json.loads
+ f'{TLSOBJECT_STORE_CERT_PREFIX}totally_unknown_cert_service': _dump_cert('svc.x', 'ignored-cert-1', TLSObjectScope.SERVICE),
+ f'{TLSOBJECT_STORE_CERT_PREFIX}totally_unknown_cert_host': _dump_cert('host.x', 'ignored-cert-2', TLSObjectScope.HOST),
+ f'{TLSOBJECT_STORE_CERT_PREFIX}totally_unknown_cert_global': _dump_cert(None, 'ignored-cert-3', TLSObjectScope.GLOBAL),
+ }
+ return store
+
+ if prefix == 'cert_store.key.':
+ store = {
+ f'{TLSOBJECT_STORE_KEY_PREFIX}{known_key[0]}': _dump_key(known_key[1], known_key[2], known_key[3]),
+ f'{TLSOBJECT_STORE_KEY_PREFIX}totally_unknown_key_service': _dump_key('svc.x', 'ignored-key-1', TLSObjectScope.SERVICE),
+ f'{TLSOBJECT_STORE_KEY_PREFIX}totally_unknown_key_host': _dump_key('host.x', 'ignored-key-2', TLSObjectScope.HOST),
+ f'{TLSOBJECT_STORE_KEY_PREFIX}totally_unknown_key_global': _dump_key(None, 'ignored-key-3', TLSObjectScope.GLOBAL),
+ }
+ return store
+
+ raise Exception(f'Unexpected key access in store: {prefix}')
+
+ _get_store_prefix.side_effect = _fake_prefix_store
+
+ with caplog.at_level(logging.WARNING, logger="cephadm.tlsobject_store"):
+ cephadm_module._init_cert_mgr()
+
+ cert_store = cephadm_module.cert_mgr.cert_store.objects_by_name
+ key_store = cephadm_module.cert_mgr.key_store.objects_by_name
+
+ # Known entries should be present and correctly parsed
+ assert known_cert[0] in cert_store
+ assert cert_store[known_cert[0]][known_cert[1]] == Cert(known_cert[2], True)
+
+ assert known_key[0] in key_store
+ assert key_store[known_key[0]][known_key[1]] == PrivKey(known_key[2])
+
+ # Unknown entries must be discarded (absent from objects_by_name)
+ for name in unknown_cert_names:
+ assert name not in cert_store
+ for name in unknown_key_names:
+ assert name not in key_store
+
+ # And we must see the early-guard warning for each unknown entry,
+ # proving we short-circuited before json.loads
+ msgs = [r.getMessage() for r in caplog.records if r.name == "cephadm.tlsobject_store"]
+ for name in (*unknown_cert_names, *unknown_key_names):
+ assert any(f"Discarding unknown obj_name '{name}'" in m for m in msgs)
+ # Ensure we did NOT try to parse these (no JSON parse warning for these names)
+ assert not any(f"Cannot parse JSON for '{name}'" in m for m in msgs)
def test_tlsobject_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
nvmeof_client_cert = 'fake-nvmeof-client-cert'
nvmeof_server_cert = 'fake-nvmeof-server-cert'
- cephadm_module.cert_mgr.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
- assert cephadm_module.cert_mgr.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
+ assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == nvmeof_server_cert
assert cephadm_module.cert_mgr.get_cert('nvmeof_client_cert', service_name='nvmeof.foo') == nvmeof_client_cert
- assert cephadm_module.cert_mgr.get_cert('grafana_cert', host='host1') is None
+ assert cephadm_module.cert_mgr.get_cert('grafana_ssl_cert', host='host1') is None
assert cephadm_module.cert_mgr.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') is None
assert cephadm_module.cert_mgr.get_cert('nvmeof_root_ca_cert', service_name='nvmeof.foo') is None
- with pytest.raises(TLSObjectException, match='Attempted to access cert for unknown entity'):
- cephadm_module.cert_mgr.get_cert('unknown_entity')
- with pytest.raises(TLSObjectException, match='Need host to access cert for entity'):
- cephadm_module.cert_mgr.get_cert('grafana_cert')
- with pytest.raises(TLSObjectException, match='Need service name to access cert for entity'):
- cephadm_module.cert_mgr.get_cert('rgw_frontend_ssl_cert', host='foo')
+ with pytest.raises(TLSObjectException, match='Attempted to access cert for unknown TLS object name unknown_consumer'):
+ cephadm_module.cert_mgr.get_cert('unknown_consumer')
+ with pytest.raises(TLSObjectException, match='Need host to access cert for TLS object grafana_ssl_cert'):
+ cephadm_module.cert_mgr.get_cert('grafana_ssl_cert')
+ with pytest.raises(TLSObjectException, match='Need service name to access cert for TLS object rgw_ssl_cert'):
+ cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', host='foo')
grafana_host1_key = 'fake-grafana-host1-cert'
nvmeof_server_key = 'nvmeof-server-key'
nvmeof_encryption_key = 'nvmeof-encryption-key'
- cephadm_module.cert_mgr.save_key('grafana_key', grafana_host1_key, host='host1')
- cephadm_module.cert_mgr.save_key('grafana_key', grafana_host1_key, host='host1')
+ cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
+ cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
cephadm_module.cert_mgr.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
- assert cephadm_module.cert_mgr.get_key('grafana_key', host='host1') == grafana_host1_key
+ assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') == grafana_host1_key
assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') == nvmeof_server_key
assert cephadm_module.cert_mgr.get_key('nvmeof_client_key', service_name='nvmeof.foo') is None
assert cephadm_module.cert_mgr.get_key('nvmeof_encryption_key', service_name='nvmeof.foo') == nvmeof_encryption_key
- with pytest.raises(TLSObjectException, match='Attempted to access privkey for unknown entity'):
- cephadm_module.cert_mgr.get_key('unknown_entity')
- with pytest.raises(TLSObjectException, match='Need host to access privkey for entity'):
- cephadm_module.cert_mgr.get_key('grafana_key')
+ with pytest.raises(TLSObjectException, match='Attempted to access privkey for unknown TLS object name unknown_consumer'):
+ cephadm_module.cert_mgr.get_key('unknown_consumer')
+ with pytest.raises(TLSObjectException, match='Need host to access privkey for TLS object grafana_ssl_key'):
+ cephadm_module.cert_mgr.get_key('grafana_ssl_key')
def test_tlsobject_store_rm_cert(self, cephadm_module: CephadmOrchestrator):
# Save some certificates and ensure certificates are present
- cephadm_module.cert_mgr.save_cert('rgw_frontend_ssl_cert', 'fake-rgw-cert', service_name='rgw.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', 'fake-rgw-cert', service_name='rgw.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', 'fake-nvmeof-server-cert', service_name='nvmeof.foo', user_made=True)
- assert cephadm_module.cert_mgr.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == 'fake-rgw-cert'
+ assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') == 'fake-rgw-cert'
assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == 'fake-nvmeof-server-cert'
# Remove certificates and ensure certificates are removed
- cephadm_module.cert_mgr.rm_cert('rgw_frontend_ssl_cert', service_name='rgw.foo')
+ cephadm_module.cert_mgr.rm_cert('rgw_ssl_cert', service_name='rgw.foo')
cephadm_module.cert_mgr.rm_cert('nvmeof_server_cert', service_name='nvmeof.foo')
- assert cephadm_module.cert_mgr.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') is None
+ assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') is None
assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') is None
def test_tlsobject_store_rm_key(self, cephadm_module: CephadmOrchestrator):
# Save some keys and ensure keys are present
- cephadm_module.cert_mgr.save_key('grafana_key', 'fake-grafana-host1-key', host='host1')
+ cephadm_module.cert_mgr.save_key('grafana_ssl_key', 'fake-grafana-host1-key', host='host1')
cephadm_module.cert_mgr.save_key('nvmeof_server_key', 'fake-nvmeof-server-key', service_name='nvmeof.foo')
- assert cephadm_module.cert_mgr.get_key('grafana_key', host='host1') == 'fake-grafana-host1-key'
+ assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') == 'fake-grafana-host1-key'
assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') == 'fake-nvmeof-server-key'
# Remove keys and ensure keys are removed
- cephadm_module.cert_mgr.rm_key('grafana_key', host='host1')
+ cephadm_module.cert_mgr.rm_key('grafana_ssl_key', host='host1')
cephadm_module.cert_mgr.rm_key('nvmeof_server_key', service_name='nvmeof.foo')
- assert cephadm_module.cert_mgr.get_key('grafana_key', host='host1') is None
+ assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') is None
assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') is None
@mock.patch("cephadm.module.CephadmOrchestrator.set_store")
cert_mgr = cephadm_module.cert_mgr
# for services with host scope
- cert_mgr.save_cert('grafana_cert', EXPIRED_CERT, host="test_host", user_made=True)
- cert_info = CertInfo("grafana_cert", "test_host", is_valid=True, is_close_to_expiration=True, days_to_expiration=5)
+ cert_mgr.save_cert('grafana_ssl_cert', EXPIRED_CERT, host="test_host", user_made=True)
+ cert_info = CertInfo('grafana_ssl_cert', "test_host", is_valid=True, is_close_to_expiration=True, days_to_expiration=5)
cert_obj = Cert(EXPIRED_CERT, user_made=False)
with mock.patch.object(cert_mgr.ssl_certs, "renew_cert", return_value=("mock_new_cert", "mock_new_key")) as renew_mock:
cert_mgr._renew_self_signed_certificate(cert_info, cert_obj)
renew_mock.assert_called_once()
# for services with global scope
- cert_mgr.save_cert('mgmt_gw_cert', EXPIRED_CERT, user_made=True)
- cert_info = CertInfo('mgmt_gw_cert', "test_service", is_valid=True, is_close_to_expiration=True, days_to_expiration=5)
+ cert_mgr.save_cert('mgmt_gateway_ssl_cert', EXPIRED_CERT, user_made=True)
+ cert_info = CertInfo('mgmt_gateway_ssl_cert', "test_service", is_valid=True, is_close_to_expiration=True, days_to_expiration=5)
cert_obj = Cert(EXPIRED_CERT, user_made=False)
with mock.patch.object(cert_mgr.ssl_certs, "renew_cert", return_value=("mock_new_cert", "mock_new_key")) as renew_mock:
cert_mgr._renew_self_signed_certificate(cert_info, cert_obj)
class MockTLSObject(TLSObjectProtocol):
STORAGE_PREFIX = "mocktls"
- def __init__(self, data: str = "", user_made: bool = False):
+ def __init__(self, data: str = "", user_made: bool = False, editable: bool = False):
self.data = data
self.user_made = user_made
+ self.editable = editable
def __bool__(self):
return bool(self.data)
@staticmethod
def to_json(obj):
- return {"data": obj.data, "user_made": obj.user_made}
+ return {"data": obj.data, "user_made": obj.user_made, "editable": obj.editable}
@staticmethod
def from_json(json_data):
- return MockTLSObject(json_data["data"], json_data["user_made"])
+ return MockTLSObject(json_data["data"], json_data["user_made"], json_data["editable"])
class MockCephadmOrchestrator:
class TestTLSObjectStore(unittest.TestCase):
+
+ def is_cephadm_signed_entity(self, entity: str) -> bool:
+ return entity.startswith('cephadm-signed')
+
def setUp(self):
- known_entities = {
+ objects_by_name = {
TLSObjectScope.GLOBAL: ["global_cert_1", "global_cert_2"],
TLSObjectScope.SERVICE: ["per_service1", "per_service2"],
TLSObjectScope.HOST: ["per_host1", "per_host2"],
}
self.mgr = MockCephadmOrchestrator()
- self.store = TLSObjectStore(self.mgr, MockTLSObject, known_entities)
+ self.store = TLSObjectStore(self.mgr, MockTLSObject, objects_by_name, self.is_cephadm_signed_entity)
def test_save_and_get_tlsobject(self):
self.store.save_tlsobject("per_service1", "my_cert_data", service_name="my_service")
with self.assertRaises(TLSObjectException):
self.store.get_tlsobject("unknown_entity")
- def test_validate_tlsobject_entity(self):
+ def test_validate_tlsobject_name(self):
with self.assertRaises(TLSObjectException):
- self.store._validate_tlsobject_entity("unknown_entity")
+ self.store._validate_tlsobject_name("unknown_object")
with self.assertRaises(TLSObjectException):
- self.store._validate_tlsobject_entity("per_host1")
+ self.store._validate_tlsobject_name("per_host1")
with self.assertRaises(TLSObjectException):
- self.store._validate_tlsobject_entity("per_service1")
+ self.store._validate_tlsobject_name("per_service1")
OAuth2ProxySpec
)
from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect, wait
+from cephadm.tlsobject_types import CertKeyPair
from ceph.utils import datetime_now
self.mon_command = MagicMock(side_effect=self._check_mon_command)
self.template = MagicMock()
self.log = MagicMock()
+ self.cert_mgr = MagicMock()
self.inventory = FakeInventory()
def _check_mon_command(self, cmd_dict, inbuf=None):
mgr.spec_store = MagicMock()
mgr.spec_store.all_specs.get.return_value = iscsi_spec
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
def test_iscsi_client_caps(self):
iscsi_daemon_spec = CephadmDaemonDeploySpec(
assert expected_call3 in self.mgr.mon_command.mock_calls
@patch('cephadm.utils.resolve_ip')
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
def test_iscsi_dashboard_config(self, mock_resolve_ip):
self.mgr.check_mon_command = MagicMock()
use_current_daemon_image=False,
)
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("cephadm.module.CephadmOrchestrator.get_unique_name")
+ @patch("cephadm.services.iscsi.get_trusted_ips")
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ def test_iscsi_config_with_security_enabled(self, _get_trusted_ips, _get_name, _run_cephadm, cephadm_module: CephadmOrchestrator):
+
+ iscsi_daemon_id = 'testpool.test.qwert'
+ trusted_ips = '1.1.1.1,2.2.2.2'
+ api_port = 3456
+ api_user = 'test-user'
+ api_password = 'test-password'
+ pool = 'testpool'
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ _get_name.return_value = iscsi_daemon_id
+ _get_trusted_ips.return_value = trusted_ips
+
+ cephadm_module.check_mon_command = MagicMock()
+ cephadm_module.check_mon_command.return_value = (0, '', '')
+
+ iscsi_gateway_conf = f"""# This file is generated by cephadm.
+[config]
+cluster_client_name = client.iscsi.{iscsi_daemon_id}
+pool = {pool}
+trusted_ip_list = {trusted_ips}
+minimum_gateways = 1
+api_port = {api_port}
+api_user = {api_user}
+api_password = {api_password}
+api_secure = True
+log_to_stderr = True
+log_to_stderr_prefix = debug
+log_to_file = False"""
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, IscsiServiceSpec(service_id=pool,
+ api_port=api_port,
+ api_user=api_user,
+ api_password=api_password,
+ pool=pool,
+ api_secure=True,
+ ssl_cert=ceph_generated_cert,
+ ssl_key=ceph_generated_key,
+ trusted_ip_list=trusted_ips)):
+ _run_cephadm.assert_called_with(
+ 'test',
+ f'iscsi.{iscsi_daemon_id}',
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": f'iscsi.{iscsi_daemon_id}',
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'tcp_ports': [api_port],
+ },
+ "meta": {
+ 'service_name': f'iscsi.{pool}',
+ 'ports': [api_port],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': None,
+ 'extra_entrypoint_args': None,
+ },
+ "config_blobs": {
+ "config": "",
+ "keyring": f"[client.iscsi.{iscsi_daemon_id}]\nkey = None\n",
+ "files": {
+ "iscsi-gateway.cfg": iscsi_gateway_conf,
+ },
+ }
+ }),
+ error_ok=True,
+ use_current_daemon_image=False,
+ )
+
+ expected_cert_call = call({
+ 'prefix': 'config-key set',
+ 'key': f'iscsi/client.iscsi.{iscsi_daemon_id}/iscsi-gateway.crt',
+ 'val': ceph_generated_cert,
+ })
+ expected_key_call = call({
+ 'prefix': 'config-key set',
+ 'key': f'iscsi/client.iscsi.{iscsi_daemon_id}/iscsi-gateway.key',
+ 'val': ceph_generated_key,
+ })
+
+ cephadm_module.check_mon_command.assert_has_calls(
+ [expected_cert_call, expected_key_call],
+ any_order=True
+ )
+
class TestNVMEOFService:
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch("cephadm.services.monitoring.password_hash", lambda password: 'alertmanager_password_hash')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
- @patch('cephadm.cert_mgr.CertMgr.generate_cert', lambda instance, fqdn, ip: ('mycert', 'mykey'))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
def test_alertmanager_config_when_mgmt_gw_enabled(self, _get_fqdn, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch("cephadm.services.monitoring.password_hash", lambda password: 'alertmanager_password_hash')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
- @patch('cephadm.cert_mgr.CertMgr.generate_cert', lambda instance, fqdn, ip: ('mycert', 'mykey'))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
def test_alertmanager_config_security_enabled(self, _get_fqdn, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
@patch("socket.getfqdn")
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
- @patch('cephadm.cert_mgr.CertMgr.generate_cert', lambda instance, fqdn, ip: ('mycert', 'mykey'))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
@patch('cephadm.services.cephadmservice.CephExporterService.get_keyring_with_caps', Mock(return_value='[client.ceph-exporter.test]\nkey = fake-secret\n'))
def test_ceph_exporter_config_security_enabled(self, _get_fqdn, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
use_current_daemon_image=False,
)
- @patch('cephadm.cert_mgr.CertMgr.generate_cert', lambda instance, fqdn, ip: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("socket.getfqdn")
@patch("cephadm.module.CephadmOrchestrator._get_mgr_ips", lambda _: ['::1'])
@patch("cephadm.services.monitoring.password_hash", lambda password: 'prometheus_password_hash')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: 'cephadm_root_cert')
- @patch('cephadm.cert_mgr.CertMgr.generate_cert', lambda instance, fqdn, ip: ('mycert', 'mykey'))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair('mycert', 'mykey'))
def test_prometheus_config_security_enabled(self, _run_cephadm, _get_uname, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
_get_uname.return_value = 'test'
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
@patch("cephadm.module.CephadmOrchestrator.get_fqdn", lambda a, b: 'host_fqdn')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
def test_grafana_config_with_mgmt_gw_and_ouath2_proxy(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
ssl_key=ceph_generated_key)
with with_host(cephadm_module, "test"):
- cephadm_module.cert_mgr.save_cert('grafana_cert', ceph_generated_cert, host='test')
- cephadm_module.cert_mgr.save_key('grafana_key', ceph_generated_key, host='test')
+ cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', ceph_generated_cert, host='test')
+ cephadm_module.cert_mgr.save_key('grafana_ssl_key', ceph_generated_key, host='test')
with with_service(cephadm_module, PrometheusSpec("prometheus")) as _, \
with_service(cephadm_module, MgmtGatewaySpec("mgmt-gateway")) as _, \
with_service(cephadm_module, oauth2_spec) as _, \
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
@patch("cephadm.module.CephadmOrchestrator.get_fqdn", lambda a, b: 'host_fqdn')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
def test_grafana_config_with_mgmt_gw(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
editable: false""").lstrip()
with with_host(cephadm_module, "test"):
- cephadm_module.cert_mgr.save_cert('grafana_cert', ceph_generated_cert, host='test')
- cephadm_module.cert_mgr.save_key('grafana_key', ceph_generated_key, host='test')
with with_service(
cephadm_module, PrometheusSpec("prometheus")
) as _, with_service(cephadm_module, MgmtGatewaySpec("mgmt-gateway")) as _, \
with_service(cephadm_module, ServiceSpec("mgr")) as _, with_service(
cephadm_module, GrafanaSpec("grafana")
) as _:
+ cephadm_module.cert_mgr.save_self_signed_cert_key_pair('grafana',
+ CertKeyPair(ceph_generated_cert, ceph_generated_key),
+ host='test')
files = {
'grafana.ini': dedent("""
# This file is generated by cephadm.
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '1::4')
@patch("cephadm.module.CephadmOrchestrator.get_fqdn", lambda a, b: 'host_fqdn')
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
def test_grafana_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(("{}", "", 0))
with with_host(cephadm_module, "test"):
- cephadm_module.cert_mgr.save_cert('grafana_cert', ceph_generated_cert, host='test')
- cephadm_module.cert_mgr.save_key('grafana_key', ceph_generated_key, host='test')
with with_service(
cephadm_module, PrometheusSpec("prometheus")
) as _, with_service(cephadm_module, ServiceSpec("mgr")) as _, with_service(
cephadm_module, GrafanaSpec("grafana")
) as _:
+ cephadm_module.cert_mgr.save_self_signed_cert_key_pair('grafana',
+ CertKeyPair(ceph_generated_cert, ceph_generated_key),
+ host='test')
files = {
'grafana.ini': dedent("""
# This file is generated by cephadm.
# with anonymous_access set to False, expecting the [auth.anonymous] section
# to not be present in the grafana config. Note that we require an initial_admin_password
# to be provided when anonymous_access is False
+ cephadm_module._init_cert_mgr()
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, ServiceSpec('mgr')) as _, \
with_service(cephadm_module, GrafanaSpec(anonymous_access=False, initial_admin_password='secure')):
class TestAgent:
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
- @patch('cephadm.cert_mgr.CertMgr.generate_cert',
- lambda instance, test, ip: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None, fqdns=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
@patch("cephadm.serve.CephadmServe._run_cephadm")
def test_deploy_cephadm_agent(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_discovery_endpoints")
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_external_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_internal_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
+ lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_discovery_endpoints")
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_external_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_internal_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
+ lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_external_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_internal_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
+ lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_external_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
- @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_internal_certificates",
- lambda instance, svc_spec, dspec: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.cephadmservice.CephadmService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.oauth2_proxy.OAuth2ProxyService.get_certificates",
+ lambda instance, dspec, ips=None: CertKeyPair(ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_self_signed_certificates_with_label",
+ lambda instance, svc_spec, dspec, label: (ceph_generated_cert, ceph_generated_key))
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
@patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
@patch("cephadm.services.mgmt_gateway.get_dashboard_endpoints", lambda _: (["ceph-node-2:8443", "ceph-node-2:8443"], "https"))