CephService.set_encryption_config(encryption_type, kms_provider, auth_method,
secret_engine, secret_path, namespace, address,
- token, ssl_cert, client_cert, client_key)
+ token, daemon_name, ssl_cert, client_cert, client_key)
def _get_encryption(self, bucket_name, daemon_name, owner):
rgw_client = RgwClient.instance(owner, daemon_name)
@RESTController.Collection(method='GET', path='/getEncryptionConfig')
@allow_empty_body
- def get_encryption_config(self):
- return CephService.get_encryption_config()
+ def get_encryption_config(self, daemon_name=None, owner=None):
+ return CephService.get_encryption_config(daemon_name)
@APIRouter('/rgw/user', Scope.RGW)
return None
@classmethod
- def get_encryption_config(cls):
+ def get_encryption_config(cls, daemon_name):
kms_vault_configured = False
s3_vault_configured = False
kms_backend: str = ''
vault_stats = []
kms_backend = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_s3_kms_backend')
sse_s3_backend = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_sse_s3_backend')
if kms_backend.strip() == 'vault':
kms_vault_auth: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_vault_auth')
kms_vault_engine: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name), # noqa E501 #pylint: disable=line-too-long
key='rgw_crypt_vault_secret_engine')
kms_vault_address: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name), # noqa E501 #pylint: disable=line-too-long
key='rgw_crypt_vault_addr')
kms_vault_token: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_vault_token_file')
if (
kms_vault_auth.strip() != ""
if sse_s3_backend.strip() == 'vault':
s3_vault_auth: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_sse_s3_vault_auth')
s3_vault_engine: str = CephService.send_command('mon',
'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_sse_s3_vault_secret_engine') # noqa E501 #pylint: disable=line-too-long
s3_vault_address: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name), # noqa E501 #pylint: disable=line-too-long
key='rgw_crypt_sse_s3_vault_addr')
s3_vault_token: str = CephService.send_command('mon', 'config get',
- who=name_to_config_section('rgw'),
+ who=name_to_config_section(daemon_name),
key='rgw_crypt_sse_s3_vault_token_file')
if (
s3_vault_auth.strip() != ""
@classmethod
def set_encryption_config(cls, encryption_type, kms_provider, auth_method,
secret_engine, secret_path, namespace, address,
- token, ssl_cert, client_cert, client_key):
+ token, daemon_name, ssl_cert, client_cert, client_key):
if encryption_type == 'aws:kms':
]
for (key, value) in KMS_CONFIG:
- CephService.send_command('mon', 'config set', who=name_to_config_section('rgw'),
+ if value == 'null':
+ continue
+ CephService.send_command('mon', 'config set',
+ who=name_to_config_section(daemon_name),
name=key, value=value)
if encryption_type == 'AES256':
]
for (key, value) in SSE_S3_CONFIG:
- CephService.send_command('mon', 'config set', who=name_to_config_section('rgw'),
+ if value == 'null':
+ continue
+ CephService.send_command('mon', 'config set',
+ who=name_to_config_section(daemon_name),
name=key, value=value)
return {}