from tests import mock
import logging
-from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol, CertKeyPair
from cephadm.tlsobject_store import TLSOBJECT_STORE_PREFIX, TLSObjectStore, TLSObjectScope
from cephadm.module import CephadmOrchestrator
from cephadm.cert_mgr import CertInfo, CertMgr
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
nvmeof_client_cert = 'fake-nvmeof-client-cert'
- nvmeof_server_cert = 'fake-nvmeof-server-cert'
+ nvmeof_ssl_cert = 'fake-nvmeof-ssl-cert'
nvmeof_root_ca_cert = 'fake-nvmeof-root-ca-cert'
grafana_cert_host_1 = 'grafana-cert-host-1'
grafana_cert_host_2 = 'grafana-cert-host-2'
cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
- cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', nvmeof_ssl_cert, service_name='nvmeof.self-signed.foo', user_made=False)
cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_root_ca_cert', nvmeof_root_ca_cert, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', grafana_cert_host_1, host='host-1', user_made=True)
expected_calls = [
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
- mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()})),
+ mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_ssl_cert', json.dumps({'nvmeof.self-signed.foo': Cert(nvmeof_ssl_cert, False).to_json()})),
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()})),
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()})),
mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_ssl_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json()})),
}
def compare_certls_dicts(expected_ls):
- actual_ls = cephadm_module.cert_mgr.cert_ls(include_datails=True)
+ actual_ls = cephadm_module.cert_mgr.cert_ls(include_details=True, include_cephadm_signed=True)
assert actual_ls.keys() == expected_ls.keys()
for svc_cert_name, value in expected_ls.items():
expected_certs_entry = value['certificates']
# nvmeof certificates
cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='nvmeof.foo', user_made=True)
- cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_mgr.save_cert('nvmeof_root_ca_cert', CEPHADM_SELF_GENERATED_CERT_2, service_name='nvmeof.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='nvmeof.self-signed.foo', user_made=False)
expected_ls.update(
{
"nvmeof_client_cert": {
"nvmeof.foo": get_generated_cephadm_cert_info_1(),
},
},
- "nvmeof_server_cert": {
+ "nvmeof_root_ca_cert": {
"scope": "service",
"certificates": {
- "nvmeof.foo": get_generated_cephadm_cert_info_1(),
+ "nvmeof.foo": get_generated_cephadm_cert_info_2(),
},
},
- "nvmeof_root_ca_cert": {
+ "nvmeof_ssl_cert": {
"scope": "service",
"certificates": {
- "nvmeof.foo": get_generated_cephadm_cert_info_2(),
+ "nvmeof.self-signed.foo": get_generated_cephadm_cert_info_1(),
},
},
}
)
compare_certls_dicts(expected_ls)
+ def test_cephadm_signed_with_label_host_scope(self, cephadm_module):
+ """
+ Ensure cephadm-signed <service>__<label> names work end-to-end
+ (stored under HOST scope automatically, retrievable, and visible in cert_ls).
+ """
+ cm: CertMgr = cephadm_module.cert_mgr
+
+ svc = "mgmt-gateway"
+ cert_label = "internal"
+ host = "host-1"
+
+ # Register the self-signed pair for this (service,label)
+ cm.register_self_signed_cert_key_pair(svc, label=cert_label)
+
+ # Save (simulate cephadm-generated) cert/key at host target
+ cm.save_self_signed_cert_key_pair(
+ svc,
+ CertKeyPair(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+ host=host,
+ label=cert_label,
+ )
+
+ cert_name = cm.self_signed_cert(svc, cert_label)
+ key_name = cm.self_signed_key(svc, cert_label)
+
+ # Stored under HOST scope and retrievable by (host)
+ assert cm.cert_exists(cert_name, host=host) is True
+ assert cm.get_cert(cert_name, host=host) == CEPHADM_SELF_GENERATED_CERT_1
+ assert cm.get_key(key_name, host=host) == CEPHADM_SELF_GENERATED_KEY_2048
+
+ # Scope detection for cephadm-signed objects should be HOST
+ assert cm.get_cert_scope(cert_name) == TLSObjectScope.HOST
+ assert cm.get_key_scope(key_name) == TLSObjectScope.HOST
+
+ # Parsing back the service name from the cert name should ignore the label
+ assert cm.service_name_from_cert(cert_name) == svc
+
+ # Verify listing includes the cephadm-signed entry when requested
+ ls = cm.cert_ls(include_details=True, include_cephadm_signed=True)
+ assert cert_name in ls
+ assert ls[cert_name]["scope"] == "host"
+ assert host in ls[cert_name]["certificates"]
+ # sanity on validity fields (already exercised elsewhere; just presence here)
+ assert "validity" in ls[cert_name]["certificates"][host]
+
@mock.patch("cephadm.module.CephadmOrchestrator.set_store")
def test_tlsobject_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator):
grafana_host1_key = 'fake-grafana-host1-key'
grafana_host2_key = 'fake-grafana-host2-key'
nvmeof_client_key = 'nvmeof-client-key'
- nvmeof_server_key = 'nvmeof-server-key'
- nvmeof_encryption_key = 'nvmeof-encryption-key'
+ nvmeof_ssl_key = 'nvmeof-ssl-key'
cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host2_key, host='host2')
cephadm_module.cert_mgr.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
- cephadm_module.cert_mgr.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
- cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
+ cephadm_module.cert_mgr.save_key('nvmeof_ssl_key', nvmeof_ssl_key, service_name='nvmeof.self-signed.foo')
expected_calls = [
mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_ssl_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_ssl_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json(),
'host2': PrivKey(grafana_host2_key).to_json()})),
mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()})),
- mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()})),
- mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()})),
+ mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_ssl_key', json.dumps({'nvmeof.self-signed.foo': PrivKey(nvmeof_ssl_key).to_json()})),
]
_set_store.assert_has_calls(expected_calls)
@mock.patch("cephadm.module.CephadmOrchestrator.set_store")
def test_tlsobject_store_key_ls(self, _set_store, cephadm_module: CephadmOrchestrator):
expected_ls = {
- 'nvmeof_server_key': {
+ 'nvmeof_ssl_key': {
'scope': 'service',
'keys': {
- 'nvmeof.foo': {
+ 'nvmeof.self-signed.foo': {
'key_type': 'RSA',
'key_size': 4096
}
'key_size': 4096
}
}
- },
- 'nvmeof_encryption_key': {
- 'scope': 'service',
- 'keys': {
- 'nvmeof.foo': {
- 'key_type': 'RSA',
- 'key_size': 2048
- }
- }
}
}
cephadm_module.cert_mgr.save_key('nvmeof_client_key', CEPHADM_SELF_GENERATED_KEY_4096, service_name='nvmeof.foo')
- cephadm_module.cert_mgr.save_key('nvmeof_server_key', CEPHADM_SELF_GENERATED_KEY_4096, service_name='nvmeof.foo')
- cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', CEPHADM_SELF_GENERATED_KEY_2048, service_name='nvmeof.foo')
+ cephadm_module.cert_mgr.save_key('nvmeof_ssl_key', CEPHADM_SELF_GENERATED_KEY_4096, service_name='nvmeof.self-signed.foo')
assert cephadm_module.cert_mgr.key_ls() == expected_ls
cephadm_module.cert_mgr.save_key('ingress_ssl_key', 'invalid_key', service_name='ingress.foo')
# Define certs and keys with their corresponding scopes
certs = {
'rgw_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE),
- 'nvmeof_server_cert': ('nvmeof.foo', 'nvmeof-server-cert', TLSObjectScope.SERVICE),
+ 'nvmeof_ssl_cert': ('nvmeof.self-signed.foo', 'nvmeof-ssl-cert', TLSObjectScope.SERVICE),
'nvmeof_client_cert': ('nvmeof.foo', 'nvmeof-client-cert', TLSObjectScope.SERVICE),
'nvmeof_root_ca_cert': ('nvmeof.foo', 'nvmeof-root-ca-cert', TLSObjectScope.SERVICE),
'ingress_ssl_cert': ('ingress', 'ingress-ssl-cert', TLSObjectScope.SERVICE),
keys = {
'grafana_ssl_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST),
- 'nvmeof_server_key': ('nvmeof.foo', 'nvmeof-server-key', TLSObjectScope.SERVICE),
+ 'nvmeof_ssl_key': ('nvmeof.self-signed.foo', 'nvmeof-ssl-key', TLSObjectScope.SERVICE),
'nvmeof_client_key': ('nvmeof.foo', 'nvmeof-client-key', TLSObjectScope.SERVICE),
- 'nvmeof_encryption_key': ('nvmeof.foo', 'nvmeof-encryption-key', TLSObjectScope.SERVICE),
'mgmt_gateway_ssl_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL),
'oauth2_proxy_ssl_key': ('host1', 'oauth2-proxy', TLSObjectScope.HOST),
'ingress_ssl_key': ('ingress', 'ingress-ssl-key', TLSObjectScope.SERVICE),
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
nvmeof_client_cert = 'fake-nvmeof-client-cert'
- nvmeof_server_cert = 'fake-nvmeof-server-cert'
+ nvmeof_ssl_cert = 'fake-nvmeof-ssl-cert'
cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
- cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', nvmeof_ssl_cert, service_name='nvmeof.self-signed.foo', user_made=False)
cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
- assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == nvmeof_server_cert
+ assert cephadm_module.cert_mgr.get_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') == nvmeof_ssl_cert
assert cephadm_module.cert_mgr.get_cert('nvmeof_client_cert', service_name='nvmeof.foo') == nvmeof_client_cert
assert cephadm_module.cert_mgr.get_cert('grafana_ssl_cert', host='host1') is None
assert cephadm_module.cert_mgr.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') is None
cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', host='foo')
grafana_host1_key = 'fake-grafana-host1-cert'
- nvmeof_server_key = 'nvmeof-server-key'
- nvmeof_encryption_key = 'nvmeof-encryption-key'
- cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
+ nvmeof_client_key = 'nvmeof-client-key'
+ nvmeof_ssl_key = 'nvmeof-ssl-key'
cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
- cephadm_module.cert_mgr.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
- cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
+ cephadm_module.cert_mgr.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
+ cephadm_module.cert_mgr.save_key('nvmeof_ssl_key', nvmeof_ssl_key, service_name='nvmeof.self-signed.foo')
assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') == grafana_host1_key
- assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') == nvmeof_server_key
- assert cephadm_module.cert_mgr.get_key('nvmeof_client_key', service_name='nvmeof.foo') is None
- assert cephadm_module.cert_mgr.get_key('nvmeof_encryption_key', service_name='nvmeof.foo') == nvmeof_encryption_key
+ assert cephadm_module.cert_mgr.get_key('nvmeof_client_key', service_name='nvmeof.foo') == nvmeof_client_key
+ assert cephadm_module.cert_mgr.get_key('nvmeof_ssl_key', service_name='nvmeof.self-signed.foo') == nvmeof_ssl_key
with pytest.raises(TLSObjectException, match='Attempted to access privkey for unknown TLS object name unknown_consumer'):
cephadm_module.cert_mgr.get_key('unknown_consumer')
# Save some certificates and ensure certificates are present
cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', 'fake-rgw-cert', service_name='rgw.foo', user_made=True)
- cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', 'fake-nvmeof-server-cert', service_name='nvmeof.foo', user_made=True)
+ cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', 'fake-nvmeof-ssl-cert', service_name='nvmeof.self-signed.foo', user_made=False)
assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') == 'fake-rgw-cert'
- assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == 'fake-nvmeof-server-cert'
+ assert cephadm_module.cert_mgr.get_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') == 'fake-nvmeof-ssl-cert'
# Remove certificates and ensure certificates are removed
- cephadm_module.cert_mgr.rm_cert('rgw_ssl_cert', service_name='rgw.foo')
- cephadm_module.cert_mgr.rm_cert('nvmeof_server_cert', service_name='nvmeof.foo')
+ assert cephadm_module.cert_mgr.rm_cert('rgw_ssl_cert', service_name='rgw.foo') is True
+ assert cephadm_module.cert_mgr.rm_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') is True
assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') is None
- assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') is None
+ assert cephadm_module.cert_mgr.get_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') is None
def test_tlsobject_store_rm_key(self, cephadm_module: CephadmOrchestrator):
# Save some keys and ensure keys are present
cephadm_module.cert_mgr.save_key('grafana_ssl_key', 'fake-grafana-host1-key', host='host1')
- cephadm_module.cert_mgr.save_key('nvmeof_server_key', 'fake-nvmeof-server-key', service_name='nvmeof.foo')
assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') == 'fake-grafana-host1-key'
- assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') == 'fake-nvmeof-server-key'
# Remove keys and ensure keys are removed
cephadm_module.cert_mgr.rm_key('grafana_ssl_key', host='host1')
- cephadm_module.cert_mgr.rm_key('nvmeof_server_key', service_name='nvmeof.foo')
assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') is None
- assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') is None
@mock.patch("cephadm.module.CephadmOrchestrator.set_store")
def test_expired_certificate_detection(self, _set_store, cephadm_module: CephadmOrchestrator):