from .. import mgr
from ..exceptions import DashboardException
+from ..model.rgw import KmipConfig, KmsProviders, VaultConfig
from ..rest_client import RequestException
from ..security import Permission, Scope
from ..services.auth import AuthManager, JwtManager
mfa_token_serial, mfa_token_pin)
def _set_encryption(self, bid, encryption_type, key_id, daemon_name, owner):
-
rgw_client = RgwClient.instance(owner, daemon_name)
rgw_client.set_bucket_encryption(bid, key_id, encryption_type)
- # pylint: disable=W0613
- def _set_encryption_config(self, encryption_type, kms_provider, auth_method, secret_engine,
- secret_path, namespace, address, token, daemon_name, owner,
- ssl_cert, client_cert, client_key):
-
- CephService.set_encryption_config(encryption_type, kms_provider, auth_method,
- secret_engine, secret_path, namespace, address,
- token, daemon_name, ssl_cert, client_cert, client_key)
-
def _get_encryption(self, bucket_name, daemon_name, owner):
rgw_client = RgwClient.instance(owner, daemon_name)
return rgw_client.get_bucket_encryption(bucket_name)
}, json_response=False)
@RESTController.Collection(method='PUT', path='/setEncryptionConfig')
- @allow_empty_body
- def set_encryption_config(self, encryption_type=None, kms_provider=None, auth_method=None,
- secret_engine=None, secret_path='', namespace='', address=None,
- token=None, daemon_name=None, owner=None, ssl_cert=None,
- client_cert=None, client_key=None):
- return self._set_encryption_config(encryption_type, kms_provider, auth_method,
- secret_engine, secret_path, namespace,
- address, token, daemon_name, owner, ssl_cert,
- client_cert, client_key)
+ def set_encryption_config(self, encryption_type: Optional[str] = None,
+ kms_provider: Optional[str] = None,
+ config: Optional[Union[VaultConfig, KmipConfig]] = None,
+ daemon_name: Optional[str] = None):
+ if encryption_type is None or daemon_name is None:
+ raise ValueError("Both 'encryption_type' and 'daemon_name' must be provided.")
+
+ if kms_provider == KmsProviders.VAULT.value:
+ config = config if config else VaultConfig(
+ addr="", auth="", prefix="", secret_engine=""
+ )
+ elif kms_provider == KmsProviders.KMIP.value:
+ config = config if config else KmipConfig(
+ addr=""
+ )
+ else:
+ raise ValueError("Invalid KMS provider specified.")
+
+ return CephService.set_encryption_config(
+ encryption_type, kms_provider, config, daemon_name
+ )
@RESTController.Collection(method='GET', path='/getEncryption')
@allow_empty_body
@RESTController.Collection(method='GET', path='/getEncryptionConfig')
@allow_empty_body
+ # pylint: disable=W0613
def get_encryption_config(self, daemon_name=None, owner=None):
return CephService.get_encryption_config(daemon_name)
-enum KmsProviders {
- Vault = 'vault'
+export enum KMS_PROVIDER {
+ VAULT = 'vault',
+ KMIP = 'kmip'
}
enum AuthMethods {
Transit = 'transit'
}
-enum sseS3 {
- SSE_S3 = 'AES256'
-}
-
-enum sseKms {
- SSE_KMS = 'aws:kms'
+export enum ENCRYPTION_TYPE {
+ SSE_S3 = 's3',
+ SSE_KMS = 'kms'
}
interface RgwBucketEncryptionModel {
- kmsProviders: KmsProviders[];
+ kmsProviders: KMS_PROVIDER[];
authMethods: AuthMethods[];
secretEngines: SecretEngines[];
- SSE_S3: sseS3;
- SSE_KMS: sseKms;
}
export const rgwBucketEncryptionModel: RgwBucketEncryptionModel = {
- kmsProviders: [KmsProviders.Vault],
+ kmsProviders: [KMS_PROVIDER.VAULT, KMS_PROVIDER.KMIP],
authMethods: [AuthMethods.Token, AuthMethods.Agent],
- secretEngines: [SecretEngines.KV, SecretEngines.Transit],
- SSE_S3: sseS3.SSE_S3,
- SSE_KMS: sseKms.SSE_KMS
+ secretEngines: [SecretEngines.KV, SecretEngines.Transit]
};
formControlName="encryptionType"
id="s3Enabled"
type="radio"
- name="encryptionType"
(change)="checkKmsProviders()"
- [attr.disabled]="editing && configForm.getValue('encryptionType') !== 'AES256' ? true : null"
- value="AES256">
+ [attr.disabled]="editing && configForm.getValue('encryptionType') !== ENCRYPTION_TYPE.SSE_S3 ? true : null"
+ [value]="ENCRYPTION_TYPE.SSE_S3">
<label class="custom-check-label"
- [ngClass]="{'text-muted': editing && configForm.getValue('encryptionType') !== 'AES256'}"
+ [ngClass]="{'text-muted': editing && configForm.getValue('encryptionType') !== ENCRYPTION_TYPE.SSE_S3}"
for="s3Enabled"
i18n>SSE-S3</label>
</div>
<input class="form-check-input"
formControlName="encryptionType"
id="kmsEnabled"
- name="encryptionType"
(change)="checkKmsProviders()"
- value="aws:kms"
- [attr.disabled]="editing && configForm.getValue('encryptionType') !== 'aws:kms' ? true : null"
+ [value]="ENCRYPTION_TYPE.SSE_KMS"
+ [attr.disabled]="editing && configForm.getValue('encryptionType') !== ENCRYPTION_TYPE.SSE_KMS ? true : null"
type="radio">
<label class="custom-check-label"
- [ngClass]="{'text-muted': editing && configForm.getValue('encryptionType') !== 'aws:kms'}"
+ [ngClass]="{'text-muted': editing && configForm.getValue('encryptionType') !== ENCRYPTION_TYPE.SSE_KMS}"
for="kmsEnabled"
i18n>SSE-KMS</label>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label required"
for="kms_provider"
i18n>Key management service provider</label>
<div class="cd-col-form-input">
<select id="kms_provider"
- name="kms_provider"
class="form-select"
+ (change)="setKmsProvider()"
formControlName="kms_provider">
<option *ngIf="kmsProviders !== null && kmsProviders.length === 0"
ngValue="null"
<option *ngFor="let provider of kmsProviders"
[value]="provider">{{ provider }}</option>
</select>
+ <cd-help-text>
+ <span i18n>Where the encryption keys are stored.
+ </span>
+ </cd-help-text>
<span class="invalid-feedback"
*ngIf="configForm.showError('kms_provider', frm, 'required')"
i18n>This field is required.</span>
</div>
<div *ngIf="kmsProviders.length !== 0 && configForm.getValue('kms_provider') !== ''">
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="(configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS && configForm.getValue('kms_provider') === KMS_PROVIDER.VAULT) || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label required"
- for="auth_method"
+ for="auth"
i18n>Authentication Method</label>
<div class="cd-col-form-input">
- <select id="auth_method"
- name="auth_method"
- class="form-select"
- formControlName="auth_method">
- <option *ngFor="let auth_method of authMethods"
- [value]="auth_method">{{ auth_method }}</option>
+ <select class="form-select"
+ id="auth"
+ formControlName="auth">
+ <option *ngFor="let auth of authMethods"
+ [value]="auth">{{ auth }}</option>
</select>
+ <cd-help-text>
+ <span i18n>Type of authentication method to be used with Vault
+ </span>
+ </cd-help-text>
<span class="invalid-feedback"
- *ngIf="configForm.showError('auth_method', frm, 'required')"
+ *ngIf="configForm.showError('auth', frm, 'required')"
i18n>This field is required.</span>
</div>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="(configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS && configForm.getValue('kms_provider') === KMS_PROVIDER.VAULT) || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label required"
for="secret_engine"
i18n>Secret Engine</label>
<div class="cd-col-form-input">
- <select id="secret_engine"
- name="secret_engine"
- class="form-select"
+ <select class="form-select"
+ id="secret_engine"
formControlName="secret_engine">
<option *ngFor="let secret_engine of secretEngines"
[value]="secret_engine">{{ secret_engine }}</option>
</select>
+ <cd-help-text>
+ <span i18n>Vault Secret Engine to be used to retrieve encryption keys.
+ </span>
+ </cd-help-text>
<span class="invalid-feedback"
*ngIf="configForm.showError('secret_engine', frm, 'required')"
i18n>This field is required.</span>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="(configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS && configForm.getValue('kms_provider') === KMS_PROVIDER.VAULT) || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label"
for="secret_path"
i18n>Secret Path
</label>
<div class="cd-col-form-input">
- <input id="secret_path"
- name="secret_path"
- class="form-control"
- type="text"
- formControlName="secret_path">
+ <input class="form-control"
+ type="text"
+ id="secret_path"
+ formControlName="secret_path"
+ placeholder="/v1/secret/data">
+ <cd-help-text>
+ <span i18n>Vault secret URL prefix, which can be used to restrict access to a particular subset of the Vault secret space.
+ </span>
+ </cd-help-text>
<span class="invalid-feedback"
*ngIf="configForm.showError('secret_path', frm, 'required')"
i18n>This field is required.</span>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="(configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS && configForm.getValue('kms_provider') == KMS_PROVIDER.VAULT) || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label"
for="namespace"
i18n>Namespace
</label>
<div class="cd-col-form-input">
- <input id="namespace"
- name="namespace"
- class="form-control"
- type="text"
- formControlName="namespace">
+ <input class="form-control"
+ id="namespace"
+ type="text"
+ placeholder="tenant1"
+ formControlName="namespace">
+ <cd-help-text>
+ <span i18n>Vault Namespace to be used to select your tenant.
+ </span>
+ </cd-help-text>
</div>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
- <label class="cd-col-form-label required"
- for="address"
- i18n>Vault Address
+ <label class="cd-col-form-label required"
+ for="addr">
+ <span i18n
+ *ngIf="configForm.getValue('kms_provider') === KMS_PROVIDER.VAULT">Vault Address
+ </span>
+ <span i18n
+ *ngIf="configForm.getValue('kms_provider') === KMS_PROVIDER.KMIP">KMIP Address
+ </span>
</label>
<div class="cd-col-form-input">
- <input id="address"
- name="address"
- class="form-control"
- formControlName="address"
- placeholder="http://127.0.0.1:8000">
+ <input class="form-control"
+ id="addr"
+ formControlName="addr"
+ placeholder="http://127.0.0.1:8000"
+ i18n>
+ <cd-help-text >
+ <span *ngIf="configForm.getValue('kms_provider') === KMS_PROVIDER.VAULT"
+ i18n>Vault server base address.
+ </span>
+ <span *ngIf="configForm.getValue('kms_provider') === KMS_PROVIDER.KMIP"
+ i18n>Kmip server base address.
+ </span>
+ </cd-help-text>
<span class="invalid-feedback"
- *ngIf="configForm.showError('address', frm, 'required')"
+ *ngIf="configForm.showError('addr', frm, 'required')"
i18n>This field is required.</span>
</div>
</div>
</div>
- <div *ngIf="configForm.getValue('auth_method') === 'token'"
- class="form-group row">
- <label class="cd-col-form-label required"
- for="token">
+ <div *ngIf="(configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS && configForm.getValue('kms_provider') === KMS_PROVIDER.VAULT)|| configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
+ <div *ngIf="configForm.getValue('auth') === 'token'"
+ class="form-group row">
+ <label class="cd-col-form-label required"
+ for="token">
<span i18n>Token</span>
<cd-helper i18n>
The token authentication method expects a Vault token to be present in a plaintext file.
</cd-helper>
</label>
<div class="cd-col-form-input">
- <input type="file"
- formControlName="token"
- (change)="fileUpload($event.target.files, 'token')">
+ <input type="file"
+ id="token"
+ formControlName="token"
+ (change)="fileUpload($event.target.files, 'token')">
+ <cd-help-text>
+ <span i18n>If authentication method is 'token', provide a path to the token file.
+ </span>
+ </cd-help-text>
<span class="invalid-feedback"
*ngIf="configForm.showError('token', frm, 'required')"
i18n>This field is required.</span>
</div>
+ </div>
+ </div>
+
+ <div *ngIf="configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS && configForm.getValue('kms_provider') === KMS_PROVIDER.KMIP">
+ <div class="form-group row">
+ <label class="cd-col-form-label"
+ for="kms_key_template"
+ i18n>KMS Key Template
+ </label>
+ <div class="cd-col-form-input">
+ <input class="form-control"
+ id="kms_key_template"
+ placeholder="$keyid"
+ formControlName="kms_key_template">
+ <cd-help-text>
+ <span i18n>sse-kms; kmip key names
+ </span>
+ </cd-help-text>
+ </div>
+ </div>
+ <div class="form-group row">
+ <label class="cd-col-form-label"
+ for="s3_key_template"
+ i18n>S3 Key Template
+ </label>
+ <div class="cd-col-form-input">
+ <input class="form-control"
+ id="s3_key_template"
+ placeholder="$keyid"
+ formControlName="s3_key_template">
+ <cd-help-text>
+ <span i18n>sse-s3; kmip key template
+ </span>
+ </cd-help-text>
+ </div>
+ </div>
+
+ <div class="form-group row">
+ <label class="cd-col-form-label"
+ for="username"
+ i18n>Username
+ </label>
+ <div class="cd-col-form-input">
+ <input id="username"
+ class="form-control"
+ [attr.disabled]="configForm.getValue('kms_provider') !== KMS_PROVIDER.KMIP ? true : null"
+ formControlName="username">
+ <cd-help-text>
+ <span i18n>When authenticating via username
+ </span>
+ </cd-help-text>
+ </div>
+ </div>
+ <div class="form-group row">
+ <label class="cd-col-form-label"
+ for="password"
+ i18n>Password
+ </label>
+ <div class="cd-col-form-input">
+ <input id="password"
+ class="form-control"
+ type="password"
+ [attr.disabled]="configForm.getValue('kms_provider') != KMS_PROVIDER.KMIP"
+ formControlName="password">
+ <cd-help-text>
+ <span i18n>optional w/ username
+ </span>
+ </cd-help-text>
+ </div>
</div>
+ </div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label"
for="ssl_cert">
- <span i18n>CA Certificate</span>
- <cd-helper i18n>The SSL certificate in PEM format.</cd-helper>
+ <span i18n>CA Certificate Path</span>
+ <cd-helper i18n>The SSL certificate in PEM format. Please provide file path to the RGW host.</cd-helper>
</label>
<div class="cd-col-form-input">
- <input type="file"
- formControlName="ssl_cert"
- (change)="fileUpload($event.target.files, 'ssl_cert')">
- <span class="invalid-feedback"
- *ngIf="configForm.showError('ssl_cert', frm, 'required')"
- i18n>This field is required.</span>
+ <input type="text"
+ id="ssl_cert"
+ class="form-control"
+ formControlName="ssl_cert"
+ placeholder="/path/to/ca_cert.pem">
+ <cd-help-text>
+ <span i18n>
+ Path for custom ca certificate for accessing server
+ </span>
+ </cd-help-text>
</div>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label"
for="client_cert">
- <span i18n>Client Certificate</span>
- <cd-helper i18n>The Client certificate in PEM format.</cd-helper>
+ <span i18n>Client Certificate Path</span>
+ <cd-helper i18n>The Client certificate in PEM format. Please provide file path to the RGW host.</cd-helper>
</label>
<div class="cd-col-form-input">
- <input type="file"
- formControlName="client_cert"
- (change)="fileUpload($event.target.files, 'client_cert')">
- <span class="invalid-feedback"
- *ngIf="configForm.showError('client_cert', frm, 'required')"
- i18n>This field is required.</span>
+ <input type="text"
+ id="client_cert"
+ class="form-control"
+ formControlName="client_cert"
+ placeholder="/path/to/client_cert.pem">
+ <cd-help-text>
+ <span i18n>
+ Path for custom client certificate for accessing server
+ </span>
+ </cd-help-text>
</div>
</div>
</div>
- <div *ngIf="configForm.getValue('encryptionType') === 'aws:kms' || configForm.getValue('encryptionType') === 'AES256'">
+ <div *ngIf="configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_KMS || configForm.getValue('encryptionType') === ENCRYPTION_TYPE.SSE_S3">
<div class="form-group row">
<label class="cd-col-form-label"
for="client_key">
- <span i18n>Client Private Key</span>
- <cd-helper i18n>The Client Private Key in PEM format.</cd-helper>
+ <span i18n>Client Private Key Path</span>
+ <cd-helper i18n>The Client Private Key in PEM format. Please provide file path to the RGW host.</cd-helper>
</label>
<div class="cd-col-form-input">
- <input type="file"
- (change)="fileUpload($event.target.files, 'client_key')">
- <span class="invalid-feedback"
- *ngIf="configForm.showError('client_key', frm, 'required')"
- i18n>This field is required.</span>
+ <input type="text"
+ id="client_key"
+ formControlName="client_key"
+ placeholder="/path/to/client_key.pem"
+ class="form-control">
+ <cd-help-text>
+ <span i18n>
+ Path for private key required for client cert
+ </span>
+ </cd-help-text>
</div>
</div>
</div>
import { CdFormGroup } from '~/app/shared/forms/cd-form-group';
import { CdValidators } from '~/app/shared/forms/cd-validators';
import { NotificationService } from '~/app/shared/services/notification.service';
-import { rgwBucketEncryptionModel } from '../models/rgw-bucket-encryption';
+import {
+ rgwBucketEncryptionModel,
+ KMS_PROVIDER,
+ ENCRYPTION_TYPE
+} from '../models/rgw-bucket-encryption';
import { TableComponent } from '~/app/shared/datatable/table/table.component';
+import { KmipConfig, VaultConfig } from '~/app/shared/models/rgw-encryption-config-keys';
@Component({
selector: 'cd-rgw-config-modal',
editing = false;
action: string;
table: TableComponent;
-
+ ENCRYPTION_TYPE = ENCRYPTION_TYPE;
+ KMS_PROVIDER = KMS_PROVIDER;
constructor(
private formBuilder: CdFormBuilder,
public activeModal: NgbActiveModal,
this.secretEngines = rgwBucketEncryptionModel.secretEngines;
if (this.editing && this.selectedEncryptionConfigValues) {
const patchValues = {
- address: this.selectedEncryptionConfigValues['addr'],
- encryptionType:
- rgwBucketEncryptionModel[this.selectedEncryptionConfigValues['encryption_type']],
+ addr: this.selectedEncryptionConfigValues['addr'],
+ encryptionType: this.selectedEncryptionConfigValues['encryption_type'],
kms_provider: this.selectedEncryptionConfigValues['backend'],
- auth_method: this.selectedEncryptionConfigValues['auth'],
+ auth: this.selectedEncryptionConfigValues['auth'],
secret_engine: this.selectedEncryptionConfigValues['secret_engine'],
secret_path: this.selectedEncryptionConfigValues['prefix'],
- namespace: this.selectedEncryptionConfigValues['namespace']
+ namespace: this.selectedEncryptionConfigValues['namespace'],
+ kms_key_template: this.selectedEncryptionConfigValues['kms_key_template'],
+ s3_key_template: this.selectedEncryptionConfigValues['s3_key_template'],
+ username: this.selectedEncryptionConfigValues['username'],
+ password: this.selectedEncryptionConfigValues['password'],
+ ssl_cert:
+ this.selectedEncryptionConfigValues['backend'] === KMS_PROVIDER.VAULT
+ ? this.selectedEncryptionConfigValues['ssl_cacert']
+ : this.selectedEncryptionConfigValues['ca_path'],
+ client_cert:
+ this.selectedEncryptionConfigValues['backend'] === KMS_PROVIDER.VAULT
+ ? this.selectedEncryptionConfigValues['ssl_clientcert']
+ : this.selectedEncryptionConfigValues['client_cert'],
+ client_key:
+ this.selectedEncryptionConfigValues['backend'] === KMS_PROVIDER.VAULT
+ ? this.selectedEncryptionConfigValues['ssl_clientkey']
+ : this.selectedEncryptionConfigValues['client_key']
};
this.configForm.patchValue(patchValues);
this.configForm.get('kms_provider').disable();
this.checkKmsProviders();
}
+ setKmsProvider() {
+ const selectedEncryptionType = this.configForm.get('encryptionType').value;
+ this.kmsProviders =
+ selectedEncryptionType === ENCRYPTION_TYPE.SSE_KMS
+ ? [KMS_PROVIDER.VAULT, KMS_PROVIDER.KMIP]
+ : [KMS_PROVIDER.VAULT];
+ }
checkKmsProviders() {
- this.kmsProviders = rgwBucketEncryptionModel.kmsProviders;
+ if (!this.editing) {
+ this.setKmsProvider();
+ }
+
if (
this.allEncryptionConfigValues &&
- this.allEncryptionConfigValues.hasOwnProperty('SSE_KMS') &&
+ this.allEncryptionConfigValues.hasOwnProperty(ENCRYPTION_TYPE.SSE_KMS) &&
!this.editing
) {
- const sseKmsBackends = this.allEncryptionConfigValues['SSE_KMS'].map(
- (config: any) => config.backend
- );
- if (this.configForm.get('encryptionType').value === rgwBucketEncryptionModel.SSE_KMS) {
- this.kmsProviders = this.kmsProviders.filter(
- (provider) => !sseKmsBackends.includes(provider)
- );
+ const kmsBackends = Object.values(
+ this.allEncryptionConfigValues[ENCRYPTION_TYPE.SSE_KMS]
+ ).map((config: any) => config.backend);
+ if (this.configForm.get('encryptionType').value === ENCRYPTION_TYPE.SSE_KMS) {
+ this.kmsProviders = this.kmsProviders.filter((provider) => !kmsBackends.includes(provider));
}
}
if (
this.allEncryptionConfigValues &&
- this.allEncryptionConfigValues.hasOwnProperty('SSE_S3') &&
+ this.allEncryptionConfigValues.hasOwnProperty('s3') &&
!this.editing
) {
- const sseS3Backends = this.allEncryptionConfigValues['SSE_S3'].map(
+ const s3Backends = Object.values(this.allEncryptionConfigValues[ENCRYPTION_TYPE.SSE_S3]).map(
(config: any) => config.backend
);
- if (this.configForm.get('encryptionType').value === rgwBucketEncryptionModel.SSE_S3) {
- this.kmsProviders = this.kmsProviders.filter(
- (provider) => !sseS3Backends.includes(provider)
- );
+ if (this.configForm.get('encryptionType').value === ENCRYPTION_TYPE.SSE_S3) {
+ this.kmsProviders = this.kmsProviders.filter((provider) => !s3Backends.includes(provider));
}
}
- if (this.kmsProviders.length > 0 && !this.kmsProviders.includes('vault')) {
- this.configForm.get('kms_provider').setValue('');
+ if (!this.editing) {
+ if (this.kmsProviders.length > 0) {
+ this.configForm.get('kms_provider').setValue(this.kmsProviders[0]);
+ }
}
}
createForm() {
this.configForm = this.formBuilder.group({
- address: [
+ addr: [
null,
[
Validators.required,
]
],
kms_provider: ['vault', Validators.required],
- encryptionType: ['aws:kms', Validators.required],
- auth_method: ['token', Validators.required],
- secret_engine: ['kv', Validators.required],
+ encryptionType: ['kms', Validators.required],
+ auth: [
+ 'token',
+ CdValidators.requiredIf({
+ kms_provider: 'vault'
+ })
+ ],
+ secret_engine: [
+ 'kv',
+ CdValidators.requiredIf({
+ kms_provider: 'vault'
+ })
+ ],
secret_path: ['/'],
namespace: [null],
token: [
null,
[
CdValidators.requiredIf({
- auth_method: 'token'
+ auth: 'token',
+ kms_provider: 'vault'
})
]
],
- ssl_cert: [null, CdValidators.sslCert()],
- client_cert: [null, CdValidators.pemCert()],
- client_key: [null, CdValidators.sslPrivKey()],
+ ssl_cert: [''],
+ client_cert: [''],
+ client_key: [''],
kmsEnabled: [{ value: false }],
- s3Enabled: [{ value: false }]
+ s3Enabled: [{ value: false }],
+ kms_key_template: [null],
+ s3_key_template: [null],
+ username: [''],
+ password: ['']
});
}
onSubmit() {
const values = this.configForm.getRawValue();
- this.rgwBucketService
- .setEncryptionConfig(
- values['encryptionType'],
- values['kms_provider'],
- values['auth_method'],
- values['secret_engine'],
- values['secret_path'],
- values['namespace'],
- values['address'],
- values['token'],
- values['owner'],
- values['ssl_cert'],
- values['client_cert'],
- values['client_key']
- )
- .subscribe({
- next: () => {
- this.notificationService.show(
- NotificationType.success,
- $localize`Updated RGW Encryption Configuration values`
- );
- },
- error: (error: any) => {
- this.notificationService.show(NotificationType.error, error);
- this.configForm.setErrors({ cdSubmitButton: true });
- },
- complete: () => {
- this.activeModal.close();
- this.table?.refreshBtn();
+ let encryptionData: VaultConfig | KmipConfig;
+ if (values['kms_provider'] === KMS_PROVIDER.VAULT) {
+ encryptionData = {
+ kms_provider: values['kms_provider'],
+ encryption_type: values['encryptionType'],
+ config: {
+ addr: values['addr'],
+ auth: values['auth'],
+ prefix: values['secret_path'],
+ secret_engine: values['secret_engine'],
+ namespace: values['namespace'],
+ token_file: values['token'],
+ ssl_cacert: values['ssl_cert'],
+ ssl_clientcert: values['client_cert'],
+ ssl_clientkey: values['client_key']
}
- });
+ };
+ } else if (values['kms_provider'] === KMS_PROVIDER.KMIP) {
+ encryptionData = {
+ kms_provider: values['kms_provider'],
+ encryption_type: values['encryptionType'],
+ config: {
+ addr: values['addr'],
+ username: values['username'],
+ password: values['password'],
+ kms_key_template: values['kms_key_template'],
+ s3_key_template: values['s3_key_template'],
+ client_key: values['client_key'],
+ ca_path: values['ssl_cert'],
+ client_cert: values['client_cert']
+ }
+ };
+ }
+ this.rgwBucketService.setEncryptionConfig(encryptionData).subscribe({
+ next: () => {
+ this.notificationService.show(
+ NotificationType.success,
+ $localize`Updated RGW Encryption Configuration values`
+ );
+ },
+ error: (error: Error) => {
+ this.notificationService.show(NotificationType.error, error.message);
+ this.configForm.setErrors({ cdSubmitButton: true });
+ },
+ complete: () => {
+ this.activeModal.close();
+ this.table?.refreshBtn();
+ }
+ });
}
}
import { Icons } from '~/app/shared/enum/icons.enum';
import { ModalService } from '~/app/shared/services/modal.service';
import { RgwConfigModalComponent } from '../rgw-config-modal/rgw-config-modal.component';
-import { rgwBucketEncryptionModel } from '../models/rgw-bucket-encryption';
import { TableComponent } from '~/app/shared/datatable/table/table.component';
+import { ENCRYPTION_TYPE } from '../models/rgw-bucket-encryption';
+import {
+ KmipConfig,
+ VaultConfig,
+ encryptionDatafromAPI
+} from '~/app/shared/models/rgw-encryption-config-keys';
@Component({
selector: 'cd-rgw-configuration-page',
bsModalRef: NgbModalRef;
filteredEncryptionConfigValues: {};
excludeProps: any[] = [];
- disableCreate = false;
+ disableCreate = true;
allEncryptionValues: any;
constructor(
this.excludeProps.push('unique_id');
}
- getBackend(encryptionData: { [x: string]: any[] }, encryptionType: string) {
- return new Set(encryptionData[encryptionType].map((item) => item.backend));
- }
-
- areAllAllowedBackendsPresent(allowedBackends: any[], backendsSet: Set<any>) {
- return allowedBackends.every((backend) => backendsSet.has(backend));
+ getBackend(encryptionData: encryptionDatafromAPI, encryptionType: ENCRYPTION_TYPE) {
+ const backendSet = new Set(Object.keys(encryptionData[encryptionType]));
+ const result =
+ encryptionType === ENCRYPTION_TYPE.SSE_KMS
+ ? backendSet.has('kmip') && backendSet.has('vault')
+ : backendSet.has('vault');
+ return result;
}
openRgwConfigModal(edit: boolean) {
this.selection = selection;
}
- setExpandedRow(expandedRow: any) {
+ setExpandedRow(expandedRow: VaultConfig | KmipConfig) {
super.setExpandedRow(expandedRow);
}
+ flattenData(data: encryptionDatafromAPI) {
+ const combinedArray = [];
+ for (const kmsData of Object.values(data[ENCRYPTION_TYPE.SSE_KMS])) {
+ combinedArray.push(kmsData);
+ }
+ for (const s3Data of Object.values(data[ENCRYPTION_TYPE.SSE_S3])) {
+ combinedArray.push(s3Data);
+ }
+ return combinedArray;
+ }
+
fetchData() {
- this.rgwBucketService.getEncryptionConfig().subscribe((data: any) => {
+ this.rgwBucketService.getEncryptionConfig().subscribe((data: encryptionDatafromAPI) => {
this.allEncryptionValues = data;
- const allowedBackends = rgwBucketEncryptionModel.kmsProviders;
-
- const kmsBackends = this.getBackend(data, 'SSE_KMS');
- const s3Backends = this.getBackend(data, 'SSE_S3');
-
- const allKmsBackendsPresent = this.areAllAllowedBackendsPresent(allowedBackends, kmsBackends);
- const allS3BackendsPresent = this.areAllAllowedBackendsPresent(allowedBackends, s3Backends);
-
- this.disableCreate = allKmsBackendsPresent && allS3BackendsPresent;
- this.encryptionConfigValues = Object.values(data).flat();
+ const kmsBackends = this.getBackend(data, ENCRYPTION_TYPE.SSE_KMS);
+ const s3Backends = this.getBackend(data, ENCRYPTION_TYPE.SSE_S3);
+ this.disableCreate = kmsBackends && s3Backends;
+ this.encryptionConfigValues = this.flattenData(data);
});
}
}
import { ApiClient } from '~/app/shared/api/api-client';
import { RgwDaemonService } from '~/app/shared/api/rgw-daemon.service';
import { cdEncode } from '~/app/shared/decorators/cd-encode';
+import { KmipConfig, VaultConfig } from '../models/rgw-encryption-config-keys';
@cdEncode
@Injectable({
return bucketData['lock_retention_period_days'] || 0;
}
- setEncryptionConfig(
- encryption_type: string,
- kms_provider: string,
- auth_method: string,
- secret_engine: string,
- secret_path: string,
- namespace: string,
- address: string,
- token: string,
- owner: string,
- ssl_cert: string,
- client_cert: string,
- client_key: string
- ) {
+ setEncryptionConfig(config: VaultConfig | KmipConfig) {
+ const reqBody = {
+ ...config
+ };
return this.rgwDaemonService.request((params: HttpParams) => {
- params = params.appendAll({
- encryption_type: encryption_type,
- kms_provider: kms_provider,
- auth_method: auth_method,
- secret_engine: secret_engine,
- secret_path: secret_path,
- namespace: namespace,
- address: address,
- token: token,
- owner: owner,
- ssl_cert: ssl_cert,
- client_cert: client_cert,
- client_key: client_key
- });
- return this.http.put(`${this.url}/setEncryptionConfig`, null, { params: params });
+ return this.http.put(`${this.url}/setEncryptionConfig`, reqBody, { params: params });
});
}
+import { ENCRYPTION_TYPE } from '~/app/ceph/rgw/models/rgw-bucket-encryption';
+
export enum rgwEncryptionConfigKeys {
auth = 'Authentication Method',
encryption_type = 'Encryption Type',
s3_key_template = 'S3 Key Template',
username = 'Username'
}
+
+export interface VaultConfig {
+ config: {
+ addr: string;
+ auth: string;
+ prefix: string;
+ secret_engine: string;
+ namespace: string;
+ token_file: string;
+ ssl_cacert: string;
+ ssl_clientcert: string;
+ ssl_clientkey: string;
+ };
+ kms_provider: string;
+ encryption_type: ENCRYPTION_TYPE;
+}
+
+export interface KmipConfig {
+ config: {
+ addr: string;
+ username: string;
+ password: string;
+ s3_key_template: string;
+ client_cert: string;
+ client_key: string;
+ ca_path: string;
+ kms_key_template: string;
+ };
+ encryption_type: ENCRYPTION_TYPE;
+ kms_provider: string;
+}
+
+export interface encryptionDatafromAPI {
+ kms: {
+ vault: VaultConfig;
+ kmip: KmipConfig;
+ };
+ s3: {
+ vault: VaultConfig;
+ };
+}
--- /dev/null
+from enum import Enum
+from typing import List, NamedTuple, Optional
+
+
+class VaultConfig(NamedTuple):
+ addr: str
+ auth: str
+ prefix: str
+ secret_engine: str
+ namespace: Optional[str] = None
+ token_file: Optional[str] = None
+ ssl_cacert: Optional[str] = None
+ ssl_clientcert: Optional[str] = None
+ ssl_clientkey: Optional[str] = None
+ verify_ssl: Optional[bool] = False
+ backend: Optional[str] = None
+ encryption_type: Optional[str] = None
+ unique_id: Optional[str] = None
+
+ @classmethod
+ def required_fields(cls):
+ return [field for field in cls._fields if field not in cls._field_defaults]
+
+ @classmethod
+ def ceph_config_fields(cls):
+ return [field for field in cls._fields if
+ field not in ['backend', 'encryption_type', 'unique_id']]
+
+
+class KmipConfig(NamedTuple):
+ addr: str
+ username: Optional[str] = None
+ password: Optional[str] = None
+ client_cert: Optional[str] = None
+ client_key: Optional[str] = None
+ ca_path: Optional[str] = None
+ kms_key_template: Optional[str] = None
+ s3_key_template: Optional[str] = None
+ backend: Optional[str] = None
+ encryption_type: Optional[str] = None
+ unique_id: Optional[str] = None
+
+ @classmethod
+ def required_fields(cls):
+ return [field for field in cls._fields if field not in cls._field_defaults]
+
+ @classmethod
+ def ceph_config_fields(cls):
+ return [field for field in cls._fields if
+ field not in ['backend', 'encryption_type', 'unique_id']]
+
+
+class KmsProviders(Enum):
+ VAULT = 'vault'
+ KMIP = 'kmip'
+
+
+class EncryptionTypes(Enum):
+ KMS = 'kms'
+ S3 = 's3'
+
+
+class KmsConfig(NamedTuple):
+ vault: Optional[VaultConfig] = None
+ kmip: Optional[KmipConfig] = None
+
+
+class S3Config(NamedTuple):
+ vault: VaultConfig
+
+
+class EncryptionConfig(NamedTuple):
+ kms: Optional[List[KmsConfig]] = None
+ s3: Optional[List[S3Config]] = None
+
+ def to_dict(self):
+ """
+ Converts the EncryptionConfig class to a dictionary, ensuring that
+ 'kms' and 's3' entries are stored as dictionaries rather than lists.
+ """
+
+ def convert_namedtuple(obj):
+ if isinstance(obj, tuple) and hasattr(obj, '_fields'):
+ return {field: convert_namedtuple(getattr(obj, field))
+ for field in obj._fields if getattr(obj, field) is not None}
+ elif isinstance(obj, list):
+ if all(isinstance(item, tuple) and hasattr(item, '_fields') for item in obj):
+ return {key: convert_namedtuple(value) for entry in obj
+ for key, value in convert_namedtuple(entry).items()}
+ return [convert_namedtuple(item) for item in obj]
+ elif isinstance(obj, dict):
+ return {key: convert_namedtuple(value)
+ for key, value in obj.items() if value is not None}
+ else:
+ return obj
+
+ return {
+ "kms": convert_namedtuple(self.kms) if self.kms else [],
+ "s3": convert_namedtuple(self.s3) if self.s3 else []
+ }
application/json:
schema:
properties:
- address:
- type: string
- auth_method:
- type: string
- client_cert:
- type: string
- client_key:
+ config:
type: string
daemon_name:
type: string
type: string
kms_provider:
type: string
- namespace:
- default: ''
- type: string
- owner:
- type: string
- secret_engine:
- type: string
- secret_path:
- default: ''
- type: string
- ssl_cert:
- type: string
- token:
- type: string
type: object
responses:
'200':
import json
import logging
-from abc import ABC, abstractmethod
import rados
from mgr_module import CommandResult
from mgr_util import get_most_recent_rate, get_time_series_rates, name_to_config_section
from .. import mgr
+from ..model.rgw import EncryptionConfig, EncryptionTypes, KmipConfig, \
+ KmsConfig, KmsProviders, S3Config, VaultConfig
try:
from typing import Any, Dict, List, Optional, Union
super(SendCommandError, self).__init__(err, errno)
-class BackendConfig(ABC):
- @abstractmethod
- def get_config_keys(self) -> List[str]:
- pass
-
- @abstractmethod
- def get_required_keys(self) -> List[str]:
- pass
-
- @abstractmethod
- def get_key_pattern(self, enc_type: str) -> str:
- pass
-
-
-class VaultConfig(BackendConfig):
- def get_config_keys(self) -> List[str]:
- return ['addr', 'auth', 'namespace', 'prefix', 'secret_engine',
- 'token_file', 'ssl_cacert', 'ssl_clientcert', 'ssl_clientkey',
- 'verify_ssl']
-
- def get_required_keys(self) -> List[str]:
- return ['auth', 'prefix', 'secret_engine', 'addr']
-
- def get_key_pattern(self, enc_type: str) -> str:
- return 'rgw_crypt_{backend}_{key}' if enc_type == 'SSE_KMS' else 'rgw_crypt_sse_s3_{backend}_{key}' # noqa E501 #pylint: disable=line-too-long
-
-
-class KmipConfig(BackendConfig):
- def get_config_keys(self) -> List[str]:
- return ['addr', 'ca_path', 'client_cert', 'client_key', 'kms_key_template',
- 'password', 's3_key_template', 'username']
-
- def get_required_keys(self) -> List[str]:
- return ['addr', 'username', 'password']
-
- def get_key_pattern(self, enc_type: str) -> str:
- return 'rgw_crypt_{backend}_{key}' if enc_type == 'SSE_KMS' else 'rgw_crypt_sse_s3_{backend}_{key}' # noqa E501 #pylint: disable=line-too-long
-
-
-# pylint: disable=too-many-public-methods
+# pylint: disable=R0904
class CephService(object):
OSD_FLAG_NO_SCRUB = 'noscrub'
return None
@classmethod
- def get_encryption_config(cls, daemon_name: str) -> Dict[str, List[Dict[str, Any]]]:
- # Define backends with their respective configuration classes
- backends: Dict[str, Dict[str, BackendConfig]] = {
- 'SSE_KMS': {
- 'vault': VaultConfig(),
- 'kmip': KmipConfig()
- },
- 'SSE_S3': {
- 'vault': VaultConfig()
- }
- }
-
- # Final configuration values
- config_values: Dict[str, List[Dict[str, Any]]] = {
- 'SSE_KMS': [],
- 'SSE_S3': []
- }
+ def get_encryption_config(cls, daemon_name: str) -> Dict[str, Any]:
+ full_daemon_name = f'rgw.{daemon_name}'
+
+ encryption_config = EncryptionConfig(kms=None, s3=None)
+
+ # Fetch configuration for KMS
+ if EncryptionTypes.KMS.value:
+ vault_config_data: Optional[Dict[str, Any]] = _get_conf_keys(
+ list(VaultConfig._fields), VaultConfig.required_fields(),
+ EncryptionTypes.KMS.value, KmsProviders.VAULT.value, full_daemon_name
+ )
+
+ kmip_config_data: Optional[Dict[str, Any]] = _get_conf_keys(
+ list(KmipConfig._fields), KmipConfig.required_fields(),
+ EncryptionTypes.KMS.value, KmsProviders.KMIP.value, full_daemon_name
+ )
+
+ kms_config: List[KmsConfig] = []
+
+ if vault_config_data:
+ vault_config_data = _set_defaults_in_encryption_configs(
+ vault_config_data, EncryptionTypes.KMS.value, KmsProviders.VAULT.value
+ )
+ kms_config.append(KmsConfig(vault=VaultConfig(**vault_config_data)))
+
+ if kmip_config_data:
+ kmip_config_data = _set_defaults_in_encryption_configs(
+ kmip_config_data, EncryptionTypes.KMS.value, KmsProviders.KMIP.value
+ )
+ kms_config.append(KmsConfig(kmip=KmipConfig(**kmip_config_data)))
+
+ if kms_config:
+ encryption_config = encryption_config._replace(kms=kms_config)
+
+ # Fetch configuration for S3
+ if EncryptionTypes.S3.value:
+ s3_config_data: Optional[Dict[str, Any]] = _get_conf_keys(
+ list(VaultConfig._fields), VaultConfig.required_fields(),
+ EncryptionTypes.S3.value, KmsProviders.VAULT.value, full_daemon_name
+ )
+
+ s3_config: List[S3Config] = []
+ if s3_config_data:
+ s3_config_data = _set_defaults_in_encryption_configs(
+ s3_config_data, EncryptionTypes.S3.value, KmsProviders.VAULT.value
+ )
+ s3_config.append(S3Config(vault=VaultConfig(**s3_config_data)))
+
+ encryption_config = encryption_config._replace(s3=s3_config)
+
+ return encryption_config.to_dict()
+ @classmethod
+ def set_encryption_config(cls, encryption_type: str, kms_provider: str,
+ config: Union[VaultConfig, KmipConfig],
+ daemon_name: str) -> None:
full_daemon_name = 'rgw.' + daemon_name
- for enc_type, backend_list in backends.items():
- for backend_name, backend in backend_list.items():
- config_keys = backend.get_config_keys()
- required_keys = backend.get_required_keys()
- key_pattern = backend.get_key_pattern(enc_type)
-
- # Check if all required configurations are present and not empty
- all_required_configs_present = True
- for key in required_keys:
- config_key = key_pattern.format(backend=backend_name, key=key)
- value = CephService.send_command('mon', 'config get',
- who=name_to_config_section(full_daemon_name),
- key=config_key)
- if not (isinstance(value, str) and value.strip()):
- all_required_configs_present = False
- break
-
- # If all required configurations are present, gather all config values
- if all_required_configs_present:
- config_dict = {}
- for key in config_keys:
- config_key = key_pattern.format(backend=backend_name, key=key)
- value = CephService.send_command('mon', 'config get',
- who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
- key=config_key)
- if value:
- config_dict[key] = value.strip() if isinstance(value, str) else value
- config_dict['backend'] = backend_name
- config_dict['encryption_type'] = enc_type
- config_dict['unique_id'] = enc_type + '-' + backend_name
- config_values[enc_type].append(config_dict)
-
- return config_values
+ config_dict = config._asdict() if isinstance(config, (VaultConfig, KmipConfig)) else config
- @classmethod
- def set_encryption_config(cls, encryption_type, kms_provider, auth_method,
- secret_engine, secret_path, namespace, address,
- token, daemon_name, ssl_cert, client_cert, client_key):
- full_daemon_name = 'rgw.' + daemon_name
- if encryption_type == 'aws:kms':
-
- KMS_CONFIG = [
- ['rgw_crypt_s3_kms_backend', kms_provider],
- ['rgw_crypt_vault_auth', auth_method],
- ['rgw_crypt_vault_prefix', secret_path],
- ['rgw_crypt_vault_namespace', namespace],
- ['rgw_crypt_vault_secret_engine', secret_engine],
- ['rgw_crypt_vault_addr', address],
- ['rgw_crypt_vault_token_file', token],
- ['rgw_crypt_vault_ssl_cacert', ssl_cert],
- ['rgw_crypt_vault_ssl_clientcert', client_cert],
- ['rgw_crypt_vault_ssl_clientkey', client_key]
- ]
+ if isinstance(config_dict, dict):
+ if kms_provider == KmsProviders.VAULT.value:
+ config = VaultConfig(**config_dict)
+ elif kms_provider == KmsProviders.KMIP.value:
+ config = KmipConfig(**config_dict)
- for (key, value) in KMS_CONFIG:
- if value == 'null':
- continue
- CephService.send_command('mon', 'config set',
- who=name_to_config_section(full_daemon_name),
- name=key, value=value)
-
- if encryption_type == 'AES256':
-
- SSE_S3_CONFIG = [
- ['rgw_crypt_sse_s3_backend', kms_provider],
- ['rgw_crypt_sse_s3_vault_auth', auth_method],
- ['rgw_crypt_sse_s3_vault_prefix', secret_path],
- ['rgw_crypt_sse_s3_vault_namespace', namespace],
- ['rgw_crypt_sse_s3_vault_secret_engine', secret_engine],
- ['rgw_crypt_sse_s3_vault_addr', address],
- ['rgw_crypt_sse_s3_vault_token_file', token],
- ['rgw_crypt_sse_s3_vault_ssl_cacert', ssl_cert],
- ['rgw_crypt_sse_s3_vault_ssl_clientcert', client_cert],
- ['rgw_crypt_sse_s3_vault_ssl_clientkey', client_key]
- ]
+ for field in config._fields:
+ value = getattr(config, field)
+ if value is None:
+ continue
- for (key, value) in SSE_S3_CONFIG:
- if value == 'null':
- continue
- CephService.send_command('mon', 'config set',
- who=name_to_config_section(full_daemon_name),
- name=key, value=value)
+ if isinstance(value, bool):
+ value = str(value).lower()
- return {}
+ key = _generate_key(encryption_type, kms_provider, field)
+
+ CephService.send_command('mon', 'config set',
+ who=name_to_config_section(full_daemon_name),
+ name=key, value=value)
@classmethod
def set_multisite_config(cls, realm_name, zonegroup_name, zone_name, daemon_name):
'statuses': pg_summary['all'],
'pgs_per_osd': pgs_per_osd,
}
+
+
+def _generate_key(encryption_type: str, backend: str, config_name: str):
+ if encryption_type == EncryptionTypes.KMS.value:
+ return f'rgw_crypt_{backend}_{config_name}'
+
+ if encryption_type == EncryptionTypes.S3.value:
+ return f'rgw_crypt_sse_s3_{backend}_{config_name}'
+ return None
+
+
+def _get_conf_keys(fields: List[str], req_fields: List[str], encryption_type: str,
+ backend: str, daemon_name: str) -> Dict[str, Any]:
+ config: Dict[str, Any] = {}
+
+ for field in fields:
+ key = _generate_key(encryption_type, backend, field)
+ try:
+ value = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(daemon_name), key=key)
+ if field in req_fields and not (isinstance(value, str) and value.strip()):
+ config = {}
+ break
+
+ config[field] = value.strip() if isinstance(value, str) else value
+ except Exception as e: # pylint: disable=broad-except
+ logger.exception('Error %s while fetching configuration for %s', e, key)
+ return config
+
+
+def _set_defaults_in_encryption_configs(config: Dict[str, Any],
+ encryption_type: str,
+ kms_provider: str) -> Dict[str, Any]:
+ config['backend'] = kms_provider
+ config['encryption_type'] = encryption_type
+ config['unique_id'] = f'{encryption_type}-{kms_provider}'
+ return config