self._delete_lifecycle(bucket_name, daemon_name, uid)
return self._append_bid(result) if result else None
- def delete(self, bucket, purge_objects='true', daemon_name=None):
- return self.proxy(daemon_name, 'DELETE', 'bucket', {
- 'bucket': bucket,
- 'purge-objects': purge_objects
- }, json_response=False)
+ def delete(self, bucket, daemon_name=None):
+ try:
+ bucket_info = self.proxy(daemon_name, 'GET', 'bucket', {'bucket': bucket})
+ num_objects = bucket_info.get('usage', {}).get('rgw.main', {}).get('num_objects', 0)
+ if num_objects > 0:
+ raise DashboardException(msg='Unable to delete bucket"{}" - Bucket is not empty. '
+ 'Remove all objects before deletion.'.format(bucket))
+ return self.proxy(daemon_name, 'DELETE', 'bucket', {
+ 'bucket': bucket
+ }, json_response=False)
+ except (DashboardException, RequestException) as e: # pragma: no cover
+ raise DashboardException(e, component='rgw')
@RESTController.Collection(method='PUT', path='/setEncryptionConfig')
- @allow_empty_body
- def set_encryption_config(self, encryption_type=None, kms_provider=None, auth_method=None,
- secret_engine=None, secret_path='', namespace='', address=None,
- token=None, daemon_name=None, owner=None, ssl_cert=None,
- client_cert=None, client_key=None):
- return self._set_encryption_config(encryption_type, kms_provider, auth_method,
- secret_engine, secret_path, namespace,
- address, token, daemon_name, owner, ssl_cert,
- client_cert, client_key)
+ def set_encryption_config(self, encryption_type: Optional[str] = None,
+ kms_provider: Optional[str] = None,
+ config: Optional[Union[VaultConfig, KmipConfig]] = None,
+ daemon_name: Optional[str] = None):
+ if encryption_type is None or daemon_name is None:
+ raise ValueError("Both 'encryption_type' and 'daemon_name' must be provided.")
+
+ if kms_provider == KmsProviders.VAULT.value:
+ config = config if config else VaultConfig(
+ addr="", auth="", prefix="", secret_engine=""
+ )
+ elif kms_provider == KmsProviders.KMIP.value:
+ config = config if config else KmipConfig(
+ addr=""
+ )
+ else:
+ raise ValueError("Invalid KMS provider specified.")
+
+ return CephService.set_encryption_config(
+ encryption_type, kms_provider, config, daemon_name
+ )
@RESTController.Collection(method='GET', path='/getEncryption')
@allow_empty_body