for key_attr in [
'server_key',
'client_key',
+ 'encryption_key',
]:
key = getattr(nvmeof_spec, key_attr, None)
if key:
self.mgr.cert_key_store.rm_cert('nvmeof_root_ca_cert', service_name=spec.service_name())
self.mgr.cert_key_store.rm_key('nvmeof_server_key', service_name=spec.service_name())
self.mgr.cert_key_store.rm_key('nvmeof_client_key', service_name=spec.service_name())
+ self.mgr.cert_key_store.rm_key('nvmeof_encryption_key', service_name=spec.service_name())
def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
return self.spec_created.get(spec.service_name())
'ingress_ssl_key',
'nvmeof_server_key',
'nvmeof_client_key',
+ 'nvmeof_encryption_key',
]
known_certs: Dict[str, Any] = {}
'ingress_ssl_key': {}, # service-name -> key
'nvmeof_server_key': {}, # service-name -> key
'nvmeof_client_key': {}, # service-name -> key
+ 'nvmeof_encryption_key': {}, # service-name -> key
}
def get_cert(self, entity: str, service_name: str = '', host: str = '') -> str:
grafana_host1_key = 'fake-grafana-host1-key'
nvmeof_client_key = 'nvmeof-client-key'
nvmeof_server_key = 'nvmeof-server-key'
+ nvmeof_encryption_key = 'nvmeof-encryption-key'
grafana_host1_key = 'fake-grafana-host1-cert'
cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
cephadm_module.cert_key_store.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
cephadm_module.cert_key_store.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
+ cephadm_module.cert_key_store.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
expected_calls = [
mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_client_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()})),
mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_server_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()})),
+ mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_encryption_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()})),
]
_set_store.assert_has_calls(expected_calls)
'ingress_ssl_key': False,
'nvmeof_client_key': False,
'nvmeof_server_key': False,
+ 'nvmeof_encryption_key': False,
}
assert cephadm_module.cert_key_store.key_ls() == expected_ls
cephadm_module.cert_key_store.save_key('nvmeof_client_key', 'xxx', service_name='nvmeof.foo')
cephadm_module.cert_key_store.save_key('nvmeof_server_key', 'xxx', service_name='nvmeof.foo')
+ cephadm_module.cert_key_store.save_key('nvmeof_encryption_key', 'xxx', service_name='nvmeof.foo')
expected_ls['nvmeof_server_key'] = {}
expected_ls['nvmeof_server_key']['nvmeof.foo'] = True
expected_ls['nvmeof_client_key'] = {}
expected_ls['nvmeof_client_key']['nvmeof.foo'] = True
+ expected_ls['nvmeof_encryption_key'] = {}
+ expected_ls['nvmeof_encryption_key']['nvmeof.foo'] = True
assert cephadm_module.cert_key_store.key_ls() == expected_ls
@mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
nvmeof_root_ca_cert = 'nvmeof-root-ca-cert'
nvmeof_server_key = 'nvmeof-server-key'
nvmeof_client_key = 'nvmeof-client-key'
+ nvmeof_encryption_key = 'nvmeof-encryption-key'
def _fake_prefix_store(key):
if key == 'cert_store.cert.':
f'{CERT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}),
f'{CERT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}),
f'{CERT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}),
+ f'{CERT_STORE_KEY_PREFIX}nvmeof_encryption_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()}),
}
else:
raise Exception(f'Get store with unexpected value {key}')
assert cephadm_module.cert_key_store.known_keys['grafana_key']['host1'] == PrivKey(grafana_host1_key)
assert cephadm_module.cert_key_store.known_keys['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key)
assert cephadm_module.cert_key_store.known_keys['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key)
+ assert cephadm_module.cert_key_store.known_keys['nvmeof_encryption_key']['nvmeof.foo'] == PrivKey(nvmeof_encryption_key)
def test_cert_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
cephadm_module.cert_key_store._init_known_cert_key_dicts()
grafana_host1_key = 'fake-grafana-host1-cert'
nvmeof_server_key = 'nvmeof-server-key'
+ nvmeof_encryption_key = 'nvmeof-encryption-key'
cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
cephadm_module.cert_key_store.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
+ cephadm_module.cert_key_store.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1') == grafana_host1_key
assert cephadm_module.cert_key_store.get_key('nvmeof_server_key', service_name='nvmeof.foo') == nvmeof_server_key
assert cephadm_module.cert_key_store.get_key('nvmeof_client_key', service_name='nvmeof.foo') == ''
+ assert cephadm_module.cert_key_store.get_key('nvmeof_encryption_key', service_name='nvmeof.foo') == nvmeof_encryption_key
with pytest.raises(OrchestratorError, match='Attempted to access priv key for unknown entity'):
cephadm_module.cert_key_store.get_key('unknown_entity')
state_update_interval_sec: Optional[int] = 5,
enable_spdk_discovery_controller: Optional[bool] = False,
enable_key_encryption: Optional[bool] = True,
+ encryption_key: Optional[str] = None,
omap_file_lock_duration: Optional[int] = 20,
omap_file_lock_retries: Optional[int] = 30,
omap_file_lock_retry_sleep_interval: Optional[float] = 1.0,
self.enable_spdk_discovery_controller = enable_spdk_discovery_controller
#: ``enable_key_encryption`` encrypt DHCHAP and PSK keys before saving in OMAP
self.enable_key_encryption = enable_key_encryption
+ #: ``encryption_key`` gateway encryption key
+ self.encryption_key = encryption_key
#: ``enable_prometheus_exporter`` enables Prometheus exporter
self.enable_prometheus_exporter = enable_prometheus_exporter
#: ``verify_nqns`` enables verification of subsystem and host NQNs for validity