.. note::
``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while:
- ``pubsub_push_ok``, ``pubsub_push_fail``, are incremented per push action on each notification.
+ ``pubsub_push_ok``, ``pubsub_push_fail``, are incremented per push action on each notification
Bucket Notification REST API
----------------------------
Action=DeleteTopic
&TopicArn=<topic-arn>
-Delete the specified topic. Note that deleting a deleted topic should result with no-op and not a failure.
+Delete the specified topic.
+
+.. note::
+
+ - Deleting an unknown notification (e.g. double delete) is not considered an error
+ - Deleting a topic does not automatically delete all notifications associated with it
The response will have the following format:
- "Abort Multipart Upload" request does not emit a notification
- Both "Initiate Multipart Upload" and "POST Object" requests will emit an ``s3:ObjectCreated:Post`` notification
-
Events
~~~~~~
return 0;
}
+int RGWPubSub::Bucket::remove_notifications(optional_yield y)
+{
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ auto ret = get_topics(&bucket_topics);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl;
+ return ret ;
+ }
+
+ // remove all auto-genrated topics
+ for (const auto& topic : bucket_topics.topics) {
+ const auto& topic_name = topic.first;
+ ret = ps->remove_topic(topic_name, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(ps->store->ctx(), 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl;
+ }
+ }
+
+ // delete all notification of on a bucket
+ ret = ps->remove(bucket_meta_obj, nullptr, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(ps->store->ctx(), 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
int RGWPubSub::create_topic(const string& name, optional_yield y) {
return create_topic(name, rgw_pubsub_sub_dest(), "", "", y);
}
#include "services/svc_tier_rados.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "rgw_pubsub.h"
+
#define dout_subsys ceph_subsys_rgw
namespace rgw::sal {
return ret;
}
+ // if bucket has notification definitions associated with it
+ // they should be removed (note that any pending notifications on the bucket are still going to be sent)
+ RGWPubSub ps(store, info.owner.tenant);
+ RGWPubSub::Bucket ps_bucket(&ps, info.bucket);
+ const auto ps_ret = ps_bucket.remove_notifications(y);
+ if (ps_ret < 0 && ps_ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: unable to remove notifications from bucket. ret=" << ps_ret << dendl;
+ }
+
ret = store->ctl()->bucket->unlink_bucket(info.owner, info.bucket, y, false);
if (ret < 0) {
lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
# create s3 topic
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
- topic_arn = topic_conf.set_config()
+ topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name+'_1',
- 'TopicArn': topic_arn,
+ 'TopicArn': topic_arn1,
'Events': ['s3:ObjectCreated:*']
},
{'Id': notification_name+'_2',
- 'TopicArn': topic_arn,
+ 'TopicArn': topic_arn1,
'Events': ['s3:ObjectRemoved:*']
},
{'Id': notification_name+'_3',
- 'TopicArn': topic_arn,
+ 'TopicArn': topic_arn1,
'Events': []
}]
s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
# get notifications on a bucket
response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
assert_equal(status/100, 2)
- assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+ assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn1)
# delete specific notifications
_, status = s3_notification_conf.del_config(notification=notification_name+'_1')
response, status = s3_notification_conf.get_config()
assert_equal(status/100, 2)
assert_equal(len(response['TopicConfigurations']), 2)
- assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
- assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
+ assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn1)
+ assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn1)
# delete remaining notifications
_, status = s3_notification_conf.del_config()
assert_equal(status/100, 2)
# make sure that the notifications are now deleted
- _, status = s3_notification_conf.get_config()
+ response, status = s3_notification_conf.get_config()
+ try:
+ dummy = response['TopicConfigurations']
+ except:
+ print('"TopicConfigurations" is not in response')
+ else:
+ assert False, '"TopicConfigurations" should not be in response'
- # cleanup
- topic_conf.del_config()
+ # create another s3 notification
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # make sure the notification and auto-genrated topic are deleted
+ response, status = topic_conf1.get_list()
+ topics = response['ListTopicsResponse']['ListTopicsResult']['Topics']['member']
+ before_delete = len(topics)
# delete the bucket
master_zone.delete_bucket(bucket_name)
+ response, status = topic_conf2.get_list()
+ topics = response['ListTopicsResponse']['ListTopicsResult']['Topics']['member']
+ after_delete = len(topics)
+ assert_equal(before_delete - after_delete, 3)
+
+ # cleanup
+ topic_conf1.del_config()
+ topic_conf2.del_config()
def ps_s3_notification_filter(on_master):