rgw_client = RgwClient.instance(owner, daemon_name)
return rgw_client.delete_lifecycle(bucket_name)
+ def _get_notification(self, bucket_name: str = '',
+ daemon_name=None, owner=None):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ return rgw_client.get_notification(bucket_name)
+
+ def _set_notification(self, bucket_name: str,
+ notification: Optional[str] = None,
+ daemon_name=None, owner=None):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ return rgw_client.set_notification(bucket_name, notification)
+
+ def _delete_notification(self, bucket_name: str, notification_id: str,
+ daemon_name=None, owner=None):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ return rgw_client.delete_notification(bucket_name, notification_id)
+
def _get_acl(self, bucket_name, daemon_name, owner):
rgw_client = RgwClient.instance(owner, daemon_name)
return str(rgw_client.get_acl(bucket_name))
bucket_name = RgwBucket.get_s3_bucket_name(bucket_name, tenant)
return self._get_lifecycle(bucket_name, daemon_name, owner)
+ @RESTController.Collection(method='GET', path='/notification')
+ @EndpointDoc("Get the bucket notification")
+ def get_notification(self, bucket_name: str,
+ daemon_name=None, owner=None):
+ owner = _get_owner(owner)
+ bucket_name = RgwBucket.get_s3_bucket_name(bucket_name)
+ return self._get_notification(bucket_name, daemon_name, owner)
+
+ @RESTController.Collection(method='PUT', path='/notification')
+ @EndpointDoc("Create or update the bucket notification")
+ def set_notification(self, bucket_name: str, notification: str = '', daemon_name=None,
+ owner=None):
+ owner = _get_owner(owner)
+ bucket_name = RgwBucket.get_s3_bucket_name(bucket_name)
+ if notification == '{}':
+ return self._delete_notification(bucket_name, daemon_name, owner)
+ return self._set_notification(bucket_name, notification, daemon_name, owner)
+
+ @RESTController.Collection(method='DELETE', path='/notification')
+ @EndpointDoc("Delete the bucket notification")
+ def delete_notification(self, bucket_name: str, notification_id: str,
+ daemon_name=None, owner=None):
+ owner = _get_owner(owner)
+ s3_bucket_name = RgwBucket.get_s3_bucket_name(bucket_name)
+ return self._delete_notification(s3_bucket_name, notification_id, daemon_name, owner)
+
@Endpoint(method='GET', path='/ratelimit')
@EndpointDoc("Get the bucket global rate limit")
@ReadPermission
- jwt: []
tags:
- RgwBucket
+ /api/rgw/bucket/notification:
+ delete:
+ parameters:
+ - in: query
+ name: bucket_name
+ required: true
+ schema:
+ type: string
+ - in: query
+ name: notification_id
+ required: true
+ schema:
+ type: string
+ - allowEmptyValue: true
+ in: query
+ name: daemon_name
+ schema:
+ type: string
+ - allowEmptyValue: true
+ in: query
+ name: owner
+ schema:
+ type: string
+ responses:
+ '202':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Operation is still executing. Please check the task queue.
+ '204':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Resource deleted.
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Delete the bucket notification
+ tags:
+ - RgwBucket
+ get:
+ parameters:
+ - in: query
+ name: bucket_name
+ required: true
+ schema:
+ type: string
+ - allowEmptyValue: true
+ in: query
+ name: daemon_name
+ schema:
+ type: string
+ - allowEmptyValue: true
+ in: query
+ name: owner
+ schema:
+ type: string
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: OK
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Get the bucket notification
+ tags:
+ - RgwBucket
+ put:
+ parameters: []
+ requestBody:
+ content:
+ application/json:
+ schema:
+ properties:
+ bucket_name:
+ type: string
+ daemon_name:
+ type: string
+ notification:
+ default: ''
+ type: string
+ owner:
+ type: string
+ required:
+ - bucket_name
+ type: object
+ responses:
+ '200':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Resource updated.
+ '202':
+ content:
+ application/vnd.ceph.api.v1.0+json:
+ type: object
+ description: Operation is still executing. Please check the task queue.
+ '400':
+ description: Operation exception. Please check the response body for details.
+ '401':
+ description: Unauthenticated access. Please login first.
+ '403':
+ description: Unauthorized access. Please check your permissions.
+ '500':
+ description: Unexpected error. Please check the response body for the stack
+ trace.
+ security:
+ - jwt: []
+ summary: Create or update the bucket notification
+ tags:
+ - RgwBucket
/api/rgw/bucket/ratelimit:
get:
parameters: []
raise DashboardException(msg=str(e), component='rgw')
return result
+ @RestClient.api_put('/{bucket_name}?notification')
+ def set_notification(self, bucket_name, notification, request=None):
+ # pylint: disable=unused-argument
+
+ notification = notification.strip()
+
+ if notification.startswith('{'):
+ notification = self.dict_to_xml(notification)
+
+ if not notification.startswith('<NotificationConfiguration'):
+ notification = (
+ f'<NotificationConfiguration>{notification}</NotificationConfiguration>'
+ )
+
+ try:
+ result = request(data=notification) # type: ignore
+ except RequestException as e:
+ raise DashboardException(msg=str(e), component='rgw')
+
+ return result
+
+ @RestClient.api_get('/{bucket_name}?notification')
+ def get_notification(self, bucket_name, request=None):
+ # pylint: disable=unused-argument
+ try:
+ result = request(
+ raw_content=True,
+ headers={'Accept': 'text/xml'}
+ ).decode() # type: ignore
+ except RequestException as e:
+ raise DashboardException(msg=str(e), component='rgw')
+
+ notification_config_dict = xmltodict.parse(result)
+ notification_configuration = notification_config_dict.get(
+ 'NotificationConfiguration'
+ ) or {}
+ topic_configuration = notification_configuration.get('TopicConfiguration')
+ if not topic_configuration:
+ return []
+
+ if isinstance(topic_configuration, dict):
+ topic_configuration = [topic_configuration]
+
+ def normalize_filter_rules(filter_dict):
+ if not isinstance(filter_dict, dict):
+ return
+ for key in ['S3Key', 'S3Metadata', 'S3Tags']:
+ if key in filter_dict:
+ rules = filter_dict[key].get('FilterRule')
+ if rules and isinstance(rules, dict):
+ filter_dict[key]['FilterRule'] = [rules]
+
+ for topic in topic_configuration:
+ topic_filter = topic.get('Filter')
+ if topic_filter:
+ normalize_filter_rules(topic_filter)
+
+ return notification_configuration
+
+ @RestClient.api_delete('/{bucket_name}?notification={notification_id}')
+ def delete_notification(self, bucket_name, notification_id, request=None):
+ # pylint: disable=unused-argument
+ try:
+ result = request()
+ except RequestException as e:
+ raise DashboardException(msg=str(e), component='rgw')
+ return result
+
class SyncStatus(Enum):
enabled = 'enabled'