)
_, status = s3_notification_conf1.set_config()
assert_equal(status / 100, 2)
+ # verify subscriptions to the account topic
rgw_topic_entry = [
t
for t in list_topics(tenant=account_id)["topics"]
assert_equal(len(subscribed_buckets), 1)
assert_in(user1_bucket_name, subscribed_buckets)
assert_not_in(user2_bucket_name, subscribed_buckets)
+ # verify bucket notifications while 2 test buckets are in the mixed mode
+ notification_list = list_notifications(user1_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ notification_list = list_notifications(user2_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], topic_arn)
# verify both topics are functional at the same time with no duplicate notifications
user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
assert_equal(len(subscribed_buckets), 2)
assert_in(user1_bucket_name, subscribed_buckets)
assert_in(user2_bucket_name, subscribed_buckets)
+ # verify bucket notifications after 2 test buckets are updated to use the account topic
+ notification_list = list_notifications(user1_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
+ notification_list = list_notifications(user2_bucket_name, assert_len=1)
+ assert_equal(notification_list["notifications"][0]["TopicArn"], account_topic_arn)
# finally, make sure that notifications are going thru via the new account topic
user1_bucket.new_key("user1obj1").set_contents_from_string("object content")