admin
from nose import SkipTest
-from nose.tools import assert_not_equal, assert_equal, assert_in, assert_true
+from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
import boto.s3.tagging
# configure logging for the tests module
conn.delete_bucket(bucket_name)
receiver_1.close(task_1)
receiver_2.close(task_2)
+
+
+@attr("http_test")
+def test_topic_migration_to_an_account():
+ """test the topic migration procedure described at
+ https://docs.ceph.com/en/latest/radosgw/account/#migrate-an-existing-user-into-an-account
+ """
+ try:
+ # create an http server for notification delivery
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ http_server = HTTPServerWithEvents((host, port))
+
+ # start with two non-account users
+ # create a new user for "user1" which is going to be migrated to an account
+ user1_conn, user1_arn = another_user()
+ user1_id = user1_arn.split("/")[1]
+ user2_conn = connection()
+ log.info(
+ f"two non-account users with acckeys user1 {user1_conn.access_key} and user2 {user2_conn.access_key}"
+ )
+
+ # create a bucket per user
+ user1_bucket_name = gen_bucket_name()
+ user2_bucket_name = gen_bucket_name()
+ user1_bucket = user1_conn.create_bucket(user1_bucket_name)
+ user2_bucket = user2_conn.create_bucket(user2_bucket_name)
+ log.info(
+ f"one bucket per user {user1_conn.access_key}: {user1_bucket_name} and {user2_conn.access_key}: {user2_bucket_name}"
+ )
+
+ # create an S3 topic owned by the first user
+ topic_name = user1_bucket_name + TOPIC_SUFFIX
+ zonegroup = get_config_zonegroup()
+ endpoint_address = "http://" + host + ":" + str(port)
+ endpoint_args = "push-endpoint=" + endpoint_address + "&persistent=true"
+ expected_topic_arn = "arn:aws:sns:" + zonegroup + "::" + topic_name
+ topic_conf = PSTopicS3(
+ user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
+ )
+ topic_arn = topic_conf.set_config()
+ assert_equal(topic_arn, expected_topic_arn)
+ log.info(
+ f"{user1_conn.access_key} created the topic {topic_arn} with args {endpoint_args}"
+ )
+
+ # both buckets subscribe to the same and only topic using the same notification id
+ notification_name = user1_bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [
+ {
+ "Id": notification_name,
+ "TopicArn": topic_arn,
+ "Events": ["s3:ObjectCreated:*"],
+ }
+ ]
+ s3_notification_conf1 = PSNotificationS3(
+ user1_conn, user1_bucket_name, topic_conf_list
+ )
+ s3_notification_conf2 = PSNotificationS3(
+ user2_conn, user2_bucket_name, topic_conf_list
+ )
+ _, status = s3_notification_conf1.set_config()
+ assert_equal(status / 100, 2)
+ _, status = s3_notification_conf2.set_config()
+ assert_equal(status / 100, 2)
+ # verify both buckets are subscribed to the topic
+ rgw_topic_entry = [
+ t for t in list_topics()["topics"] if t["name"] == topic_name
+ ]
+ assert_equal(len(rgw_topic_entry), 1)
+ subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
+ assert_equal(len(subscribed_buckets), 2)
+ assert_in(user1_bucket_name, subscribed_buckets)
+ assert_in(user2_bucket_name, subscribed_buckets)
+ log.info(
+ "buckets {user1_bucket_name} and {user2_bucket_name} are subscribed to {topic_arn}"
+ )
+
+ # move user1 to an account
+ account_id = "RGW98765432101234567"
+ cmd = ["account", "create", "--account-id", account_id]
+ _, rc = admin(cmd, get_config_cluster())
+ assert rc == 0, f"failed to create {account_id}: {rc}"
+ cmd = [
+ "user",
+ "modify",
+ "--uid",
+ user1_id,
+ "--account-id",
+ account_id,
+ "--account-root",
+ ]
+ _, rc = admin(cmd, get_config_cluster())
+ assert rc == 0, f"failed to modify {user1_id}: {rc}"
+ log.info(
+ f"{user1_conn.access_key}/{user1_id} is migrated to account {account_id} as root user"
+ )
+
+ # verify the topic is functional
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+
+ # create a new account topic with the same name as the existing topic
+ # note that the expected arn now contains the account ID
+ expected_topic_arn = (
+ "arn:aws:sns:" + zonegroup + ":" + account_id + ":" + topic_name
+ )
+ topic_conf = PSTopicS3(
+ user1_conn, topic_name, zonegroup, endpoint_args=endpoint_args
+ )
+ account_topic_arn = topic_conf.set_config()
+ assert_equal(account_topic_arn, expected_topic_arn)
+ log.info(
+ f"{user1_conn.access_key} created the account topic {account_topic_arn} with args {endpoint_args}"
+ )
+
+ # verify that the old topic is still functional
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ # wait_for_queue_to_drain(topic_name, tenant=account_id, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+
+ # change user1 bucket's subscription to the account topic - using the same notification ID but with the new account_topic_arn
+ topic_conf_list = [
+ {
+ "Id": notification_name,
+ "TopicArn": account_topic_arn,
+ "Events": ["s3:ObjectCreated:*"],
+ }
+ ]
+ s3_notification_conf1 = PSNotificationS3(
+ user1_conn, user1_bucket_name, topic_conf_list
+ )
+ _, status = s3_notification_conf1.set_config()
+ assert_equal(status / 100, 2)
+ rgw_topic_entry = [
+ t
+ for t in list_topics(tenant=account_id)["topics"]
+ if t["name"] == topic_name
+ ]
+ assert_equal(len(rgw_topic_entry), 1)
+ subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
+ assert_equal(len(subscribed_buckets), 1)
+ assert_in(user1_bucket_name, subscribed_buckets)
+ assert_not_in(user2_bucket_name, subscribed_buckets)
+
+ # verify both topics are functional at the same time with no duplicate notifications
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, http_port=port)
+ wait_for_queue_to_drain(topic_name, tenant=account_id, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+
+ # also change user2 bucket's subscription to the account topic
+ s3_notification_conf2 = PSNotificationS3(
+ user2_conn, user2_bucket_name, topic_conf_list
+ )
+ _, status = s3_notification_conf2.set_config()
+ assert_equal(status / 100, 2)
+ # remove old topic
+ # note that, although account topic has the same name, it has to be scoped by account/tenant id to be removed
+ # so below command will only remove the old topic
+ _, rc = admin(["topic", "rm", "--topic", topic_name], get_config_cluster())
+ assert_equal(rc, 0)
+
+ # now verify account topic serves both buckets
+ rgw_topic_entry = [
+ t
+ for t in list_topics(tenant=account_id)["topics"]
+ if t["name"] == topic_name
+ ]
+ assert_equal(len(rgw_topic_entry), 1)
+ subscribed_buckets = rgw_topic_entry[0]["subscribed_buckets"]
+ assert_equal(len(subscribed_buckets), 2)
+ assert_in(user1_bucket_name, subscribed_buckets)
+ assert_in(user2_bucket_name, subscribed_buckets)
+
+ # finally, make sure that notifications are going thru via the new account topic
+ user1_bucket.new_key("user1obj1").set_contents_from_string("object content")
+ user2_bucket.new_key("user2obj1").set_contents_from_string("object content")
+ wait_for_queue_to_drain(topic_name, tenant=account_id, http_port=port)
+ http_server.verify_s3_events(
+ list(user1_bucket.list()) + list(user2_bucket.list()), exact_match=True
+ )
+ log.info("topic migration test has completed successfully")
+ finally:
+ admin(["user", "rm", "--uid", user1_id, "--purge-data"], get_config_cluster())
+ admin(
+ ["bucket", "rm", "--bucket", user1_bucket_name, "--purge-data"],
+ get_config_cluster(),
+ )
+ admin(
+ ["bucket", "rm", "--bucket", user2_bucket_name, "--purge-data"],
+ get_config_cluster(),
+ )
+ admin(["account", "rm", "--account-id", account_id], get_config_cluster())