]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/admin/notifications: support admin operations on topics with tenants 59322/head
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 23 Oct 2022 18:56:18 +0000 (18:56 +0000)
committerKonstantin Shalygin <k0ste@k0ste.ru>
Mon, 19 Aug 2024 16:38:03 +0000 (23:38 +0700)
also add integration tests for topics with tenants
issue was a regression introduced in commit: 200f71a90c9e77c91452cec128c2c8be0d3d6f1f

Fixes: https://tracker.ceph.com/issues/57899
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit c73f361e9c84dce8b48eb8cca7675e659f3a286c)

src/rgw/rgw_admin.cc
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/test_bn.py

index a8dfce96e4e612d4c674bc65370dad6751f79566..564576fd5d10ebb82d8b89cbd6de44435094a0aa 100644 (file)
@@ -4311,7 +4311,10 @@ int main(int argc, const char **argv)
                           && opt_cmd != OPT::ROLE_POLICY_DELETE
                           && opt_cmd != OPT::RESHARD_ADD
                           && opt_cmd != OPT::RESHARD_CANCEL
-                          && opt_cmd != OPT::RESHARD_STATUS) {
+                          && opt_cmd != OPT::RESHARD_STATUS
+                          && opt_cmd != OPT::PUBSUB_TOPICS_LIST
+                          && opt_cmd != OPT::PUBSUB_TOPIC_GET
+                          && opt_cmd != OPT::PUBSUB_TOPIC_RM) {
         cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
         return EINVAL;
       }
index 2e0fc9ca1df68dda1013d07db99e6004a2407757..fe38576fb3512e7ef1ead5ff4669debcf844a368 100644 (file)
@@ -56,23 +56,6 @@ def make_request(conn, method, resource, parameters=None, sign_parameters=False,
     http_conn.close()
     return data.decode('utf-8'), status
 
-def delete_all_s3_topics(zone, region):
-    try:
-        conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
-        protocol = 'https' if conn.is_secure else 'http'
-        client = boto3.client('sns',
-                endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
-                aws_access_key_id=conn.aws_access_key_id,
-                aws_secret_access_key=conn.aws_secret_access_key,
-                region_name=region,
-                verify='./cert.pem')
-
-        topics = client.list_topics()['Topics']
-        for topic in topics:
-            print('topic cleanup, deleting: ' + topic['TopicArn'])
-            assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
-    except Exception as err:
-        print('failed to do topic cleanup: ' + str(err))
 
 def delete_all_objects(conn, bucket_name):
     client = boto3.client('s3',
@@ -149,9 +132,9 @@ class PSTopicS3:
         self.topic_arn = result['TopicArn']
         return self.topic_arn
 
-    def del_config(self):
+    def del_config(self, topic_arn=None):
         """delete topic"""
-        result = self.client.delete_topic(TopicArn=self.topic_arn)
+        result = self.client.delete_topic(TopicArn=(topic_arn if topic_arn is not None else self.topic_arn))
         return result['ResponseMetadata']['HTTPStatusCode']
 
     def get_list(self):
index 80bcc21a697a17cee0caaac98963eba0f6108af8..8b6bb9a71d04bf367605884fb3b1a342fa8d8e90 100644 (file)
@@ -29,7 +29,6 @@ from . import(
 
 from .api import PSTopicS3, \
     PSNotificationS3, \
-    delete_all_s3_topics, \
     delete_all_objects, \
     put_object_tagging, \
     admin
@@ -481,6 +480,7 @@ def stop_kafka_receiver(receiver, task):
 def get_ip():
     return 'localhost'
 
+
 def get_ip_http():
     s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     try:
@@ -491,6 +491,7 @@ def get_ip_http():
         s.close()
     return ip
 
+
 def connection():
     hostname = get_config_host()
     port_no = get_config_port()
@@ -504,14 +505,16 @@ def connection():
 
     return conn
 
+
 def connection2():
-    vstart_access_key = '0555b35654ad1656d804'
-    vstart_secret_key = 'h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=='
-    hostname = get_ip()
+    hostname = get_config_host()
+    port_no = 8001
+    vstart_access_key = get_access_key()
+    vstart_secret_key = get_secret_key()
 
     conn = S3Connection(aws_access_key_id=vstart_access_key,
-                      aws_secret_access_key=vstart_secret_key,
-                      is_secure=False, port=8001, host=hostname,
+                  aws_secret_access_key=vstart_secret_key,
+                      is_secure=False, port=port_no, host=hostname, 
                       calling_format='boto.s3.connection.OrdinaryCallingFormat')
 
     return conn
@@ -522,39 +525,55 @@ def connection2():
 ##############
 
 
-@attr('modification_required')
+@attr('basic_test')
 def test_ps_s3_topic_on_master():
     """ test s3 topics set/get/delete on master """
-    return SkipTest('Get tenant function required.')
-
+    
+    access_key = str(time.time())
+    secret_key = str(time.time())
+    uid = 'superman' + str(time.time())
+    tenant = 'kaboom'
+    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+    assert_equal(result, 0)
+    conn = S3Connection(aws_access_key_id=access_key,
+                  aws_secret_access_key=secret_key,
+                      is_secure=False, port=get_config_port(), host=get_config_host(), 
+                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
     zonegroup = 'default' 
     bucket_name = gen_bucket_name()
-    conn = connection()
     topic_name = bucket_name + TOPIC_SUFFIX
 
-    # clean all topics
-    delete_all_s3_topics(conn, zonegroup)
-
     # create s3 topics
     endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
     topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+    # clean all topics
+    try:
+        result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
+        topics = []
+        if result is not None:
+            topics = result['member']
+        for topic in topics:
+            topic_conf1.del_config(topic_arn=topic['TopicArn'])
+    except Exception as err:
+        print('failed to do topic cleanup: ' + str(err))
+
     topic_arn = topic_conf1.set_config()
     assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name + '_1')
+                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
 
     endpoint_address = 'http://127.0.0.1:9001'
     endpoint_args = 'push-endpoint='+endpoint_address
     topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf2.set_config()
     assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name + '_2')
+                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
     endpoint_address = 'http://127.0.0.1:9002'
     endpoint_args = 'push-endpoint='+endpoint_address
     topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
     topic_arn = topic_conf3.set_config()
     assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name + '_3')
+                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
 
     # get topic 3
     result, status = topic_conf3.get_config()
@@ -578,17 +597,95 @@ def test_ps_s3_topic_on_master():
 
     # delete topics
     result = topic_conf2.del_config()
-    # TODO: should be 200OK
-    # assert_equal(status, 200)
+    assert_equal(status, 200)
     result = topic_conf3.del_config()
-    # TODO: should be 200OK
-    # assert_equal(status, 200)
+    assert_equal(status, 200)
 
     # get topic list, make sure it is empty
     result, status = topic_conf1.get_list()
     assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
 
 
+@attr('basic_test')
+def test_ps_s3_topic_admin_on_master():
+    """ test s3 topics set/get/delete on master """
+    
+    access_key = str(time.time())
+    secret_key = str(time.time())
+    uid = 'superman' + str(time.time())
+    tenant = 'kaboom'
+    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+    assert_equal(result, 0)
+    conn = S3Connection(aws_access_key_id=access_key,
+                  aws_secret_access_key=secret_key,
+                      is_secure=False, port=get_config_port(), host=get_config_host(), 
+                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
+    zonegroup = 'default' 
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topics
+    endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+    # clean all topics
+    try:
+        result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
+        topics = []
+        if result is not None:
+            topics = result['member']
+        for topic in topics:
+            topic_conf1.del_config(topic_arn=topic['TopicArn'])
+    except Exception as err:
+        print('failed to do topic cleanup: ' + str(err))
+
+    topic_arn1 = topic_conf1.set_config()
+    assert_equal(topic_arn1,
+                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+
+    endpoint_address = 'http://127.0.0.1:9001'
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+    topic_arn2 = topic_conf2.set_config()
+    assert_equal(topic_arn2,
+                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+    endpoint_address = 'http://127.0.0.1:9002'
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
+    topic_arn3 = topic_conf3.set_config()
+    assert_equal(topic_arn3,
+                 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+
+    # get topic 3 via commandline
+    result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])  
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['topic']['arn'], topic_arn3)
+
+    # delete topic 3
+    _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])  
+    assert_equal(result, 0)
+
+    # try to get a deleted topic
+    _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])  
+    assert_equal(result, 2)
+
+    # get the remaining 2 topics
+    result = admin(['topic', 'list', '--tenant', tenant])  
+    parsed_result = json.loads(result[0])
+    assert_equal(len(parsed_result['topics']), 2)
+
+    # delete topics
+    _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant])  
+    assert_equal(result, 0)
+    _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant])  
+    assert_equal(result, 0)
+
+    # get topic list, make sure it is empty
+    result = admin(['topic', 'list', '--tenant', tenant])  
+    parsed_result = json.loads(result[0])
+    assert_equal(len(parsed_result['topics']), 0)
+
+
 @attr('modification_required')
 def test_ps_s3_topic_with_secret_on_master():
     """ test s3 topics with secret set/get/delete on master """