]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: fix topic arn. tenant support to multisite tests 27671/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Thu, 18 Apr 2019 08:21:26 +0000 (11:21 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Thu, 18 Apr 2019 08:21:26 +0000 (11:21 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
doc/radosgw/pubsub-module.rst
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_pubsub_push.h
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/test_multi.py

index 26a0fb98f8d6927a2b02e6b097136e205739483e..5185ef06edf63a1d1e3768f3295f4be3417e63dc 100644 (file)
@@ -157,13 +157,11 @@ Request parameters:
  - "none" - message is considered "delivered" if sent to broker
  - "broker" message is considered "delivered" if acked by broker
 
-Response:
-The ARN will have one of the following format (depending with whether a push-endpoint was defined):
+The topic ARN in the response will have the following format:
 
 ::
 
    arn:aws:sns:<zone-group>:<tenant>:<topic>
-   arn:aws:sns:<zone-group>:<tenant>:<webhook|amqp>:<push-endpoint-url>:<topic>
 
 Get Topic Information
 `````````````````````
@@ -522,7 +520,7 @@ the events will have an S3-compatible record format (JSON):
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
 
-In case that the subscription was not created via an S3-compatible notification, 
+In case that the subscription was not created via a non S3-compatible notification, 
 the events will have the following event format (JSON):
 
 ::
@@ -549,7 +547,7 @@ the events will have the following event format (JSON):
        }
    ]}
 
-- id: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- id: unique ID of the event, that could be used for acking
 - event: either ``OBJECT_CREATE``, or ``OBJECT_DELETE``
 - timestamp: timestamp indicating when the event was sent
 - info.attrs.mtime: timestamp indicating when the event was triggered
index 0d276df616490ce4b5992342933925e3b9f8fa49..d797efa9e8277304416307db61dd1fdd20d76c70 100644 (file)
@@ -160,40 +160,6 @@ void rgw_pubsub_sub_config::dump(Formatter *f) const
   encode_json("s3_id", s3_id, f);
 }
 
-std::string dest_to_topic_arn(const rgw_pubsub_sub_dest& dest, 
-    const std::string& topic_name, 
-    const std::string& zonegroup_name,
-    const std::string& user_name) {
-  rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, zonegroup_name, user_name, "");
-  rgw::ARNResource arn_resource;
-  const auto endpoint_type = RGWPubSubEndpoint::get_schema(dest.push_endpoint);
-  if (endpoint_type.empty()) {
-    arn_resource.resource = topic_name;
-  } else {
-    // add endpoint info as resource
-    arn_resource.resource_type = endpoint_type;
-    arn_resource.resource = dest.push_endpoint;
-    arn_resource.qualifier = topic_name;
-  }
-  arn.resource = arn_resource.to_string();
-  return arn.to_string();
-}
-
-std::string topic_name_from_arn(const std::string& topic_arn) {
-  const auto arn = rgw::ARN::parse(topic_arn);
-  if (!arn || arn->resource.empty()) {
-    return "";
-  }
-  const auto arn_resource = rgw::ARNResource::parse(arn->resource);
-  if (!arn_resource) {
-    return "";
-  }
-  if (arn_resource->resource_type.empty()) {
-    return arn_resource->resource;
-  }
-  return arn_resource->qualifier;
-}
-
 int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
 {
   int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker);
index 56c737ce63de0fbdd6d6a628d0bf15926b7f6665..4dbaf5023a5dc8f428c1d01c1221d22698609bfe 100644 (file)
@@ -23,7 +23,7 @@ class XMLObj;
       </S3Key>
     </Filter>
     <Id>notification1</Id>
-    <Topic>arn:aws:sns:<region>:<account>:[<endpoint-type>:<endpoint-name>]:<topic></Topic>
+    <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
     <Event>s3:ObjectCreated:*</Event>
     <Event>s3:ObjectRemoved:*</Event>
   </TopicConfiguration>
@@ -411,17 +411,6 @@ WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
 
 static std::string pubsub_user_oid_prefix = "pubsub.user.";
 
-// generate a topic ARN string
-// no endpoint (pull mode only): arn:s3:sns:<region>:<account>:<topic>
-// with endpoint               : arn:s3:sns:<region>:<account>:<endpoint-type>:<endpoint>:<topic>
-std::string dest_to_topic_arn(const rgw_pubsub_sub_dest& dest, 
-    const std::string& topic_name, 
-    const std::string& zonegroup_name,
-    const std::string& user_name);
-
-// extract the name of the topic from the string representation of the topic ARN
-std::string topic_name_from_arn(const std::string& topic_arn);
-
 class RGWUserPubSub
 {
   friend class Bucket;
index 81c9aefa1a680de999e03fb377be4aa113876ebb..2435f9f0e7501fc820bf30f2751df9cc6cdba590 100644 (file)
@@ -312,7 +312,7 @@ static const std::string WEBHOOK_SCHEMA("webhook");
 static const std::string UNKNOWN_SCHEMA("unknown");
 static const std::string NO_SCHEMA("");
 
-const std::string& RGWPubSubEndpoint::get_schema(const std::string& endpoint) {
+const std::string& get_schema(const std::string& endpoint) {
   if (endpoint.empty()) {
     return NO_SCHEMA; 
   }
index 96e3499aa60b2b5c0e74e3d79822260bd574dc89..3a0dc9f54b962793492711891a93119be7e8a7a1 100644 (file)
@@ -25,8 +25,6 @@ public:
 
   typedef std::unique_ptr<RGWPubSubEndpoint> Ptr;
 
-  static const std::string& get_schema(const std::string& endpoint);
-
   // factory method for the actual notification endpoint
   // derived class specific arguments are passed in http args format
   // may throw a configuration_error if creation fails
index 1c52d159136fbf70fbec5cf2224878408c8973b1..f80c05e0b1c423eade23c70d838c09942c8d6eaa 100644 (file)
@@ -66,8 +66,11 @@ public:
     // bucket to store events/records will be set only when subscription is created
     dest.bucket_name = "";
     dest.oid_prefix = "";
-    // the ARN will be sent in the reply
-    topic_arn = dest_to_topic_arn(dest, topic_name, s->zonegroup_name, s->account_name);
+    // the topic ARN will be sent in the reply
+    const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
+        store->svc.zone->get_zonegroup().get_name(),
+        s->user->user_id.tenant, topic_name);
+    topic_arn = arn.to_string();
     return 0;
   }
 
@@ -252,12 +255,15 @@ protected:
   int init_permissions(RGWOp* op) override {
     return 0;
   }
+
   int read_permissions(RGWOp* op) override {
     return 0;
   }
+
   bool supports_quota() override {
     return false;
   }
+
   RGWOp *op_get() override {
     if (s->init_state.url_bucket.empty()) {
       return nullptr;
@@ -687,11 +693,12 @@ public:
     ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
                                  bucket_info, nullptr, nullptr);
     if (ret < 0) {
+      ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
       return ret;
     }
 
     if (bucket_info.owner != id) {
-      ldout(s->cct, 1) << "user doesn't own bucket, cannot create notification" << dendl;
+      ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
       return -EPERM;
     }
     return 0;
@@ -869,13 +876,15 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
       return;
     }
 
-    const auto topic_name = topic_name_from_arn(c.topic_arn);
-    if (topic_name.empty()) {
+    const auto arn = rgw::ARN::parse(c.topic_arn);
+    if (!arn || arn->resource.empty()) {
       ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
       op_ret = -EINVAL;
       return;
     }
 
+    const auto topic_name = arn->resource;
+
     // get topic information. destination information is stored in the topic
     rgw_pubsub_topic_subs topic_info;  
     op_ret = ups->get_topic(topic_name, &topic_info);
index cb77985bc4b932ec297a9ec00279b7e4463c73a5..ab52be266945c4eaf18099512a06fd9f632fc08c 100644 (file)
@@ -35,6 +35,7 @@ class Config:
         self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
         # allow some time for realm reconfiguration after changing master zone
         self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
+        self.tenant = kwargs.get('tenant', '')
 
 # rgw multisite tests, written against the interfaces provided in rgw_multi.
 # these tests must be initialized and run by another module that provides
@@ -51,6 +52,12 @@ def init_multi(_realm, _user, _config=None):
     config = _config or Config()
     realm_meta_checkpoint(realm)
 
+def get_user():
+    return user.id if user is not None else ''
+
+def get_tenant():
+    return config.tenant if config is not None and config.tenant is not None else ''
+
 def get_realm():
     return realm
 
@@ -108,6 +115,7 @@ def datalog_autotrim(zone):
 
 def bilog_list(zone, bucket, args = None):
     cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
+    cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
     bilog, _ = zone.cluster.admin(cmd, read_only=True)
     bilog = bilog.decode('utf-8')
     return json.loads(bilog)
@@ -265,6 +273,7 @@ def bucket_sync_status(target_zone, source_zone, bucket_name):
     cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
     cmd += ['--source-zone', source_zone.name]
     cmd += ['--bucket', bucket_name]
+    cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
     while True:
         bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
         if retcode == 0:
@@ -300,6 +309,7 @@ def data_source_log_status(source_zone):
 def bucket_source_log_status(source_zone, bucket_name):
     cmd = ['bilog', 'status'] + source_zone.zone_args()
     cmd += ['--bucket', bucket_name]
+    cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
     source_cluster = source_zone.cluster
     bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
     bilog_status = json.loads(bilog_status_json.decode('utf-8'))
index 8f95523d3a44e510ce69a3d59d451be607a44f0a..8846f5a8008807a07382c86b5e3a207589fd5dc9 100644 (file)
@@ -1,15 +1,18 @@
 import logging
 import json
 import tempfile
-from rgw_multi.tests import get_realm, \
+from .tests import get_realm, \
     ZonegroupConns, \
     zonegroup_meta_checkpoint, \
     zone_meta_checkpoint, \
     zone_bucket_checkpoint, \
     zone_data_checkpoint, \
     check_bucket_eq, \
-    gen_bucket_name
-from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+    gen_bucket_name, \
+    get_user, \
+    get_tenant
+from .zone_ps import PSTopic, PSNotification, PSSubscription, PSNotificationS3, print_connection_info
+from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
 
@@ -126,6 +129,7 @@ NOTIFICATION_SUFFIX = "_notif"
 # pubsub tests
 ##############
 
+
 def test_ps_info():
     """ log information for manual testing """
     return SkipTest("only used in manual testing")
@@ -141,29 +145,33 @@ def test_ps_info():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     print('Zonegroup: ' + zonegroup.name)
+    print('user: ' + get_user())
+    print('tenant: ' + get_tenant())
     print('Master Zone')
     print_connection_info(zones[0].conn)
     print('PubSub Zone')
     print_connection_info(ps_zones[0].conn)
     print('Bucket: ' + bucket_name)
 
+
 def test_ps_s3_notification_low_level():
     """ test low level implementation of s3 notifications """
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    zones[0].create_bucket(bucket_name)
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
-    _, status = topic_conf.set_config()
+    result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     generated_topic_name = notification_name+'_'+topic_name
-    topic_arn = 'arn:aws:sns:::' + topic_name
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
                                             notification_name, topic_arn, ['s3:ObjectCreated:*'])
     response, status = s3_notification_conf.set_config()
@@ -227,11 +235,12 @@ def test_ps_s3_notification_records():
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
-    _, status = topic_conf.set_config()
+    result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_arn = 'arn:aws:sns:::' + topic_name
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, 
                                             notification_name, topic_arn, ['s3:ObjectCreated:*'])
     response, status = s3_notification_conf.set_config()
@@ -273,18 +282,17 @@ def test_ps_s3_notification():
     zones, ps_zones = init_env()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    zones[0].create_bucket(bucket_name)
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     topic_name = bucket_name + TOPIC_SUFFIX
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
-    topic_arn = 'arn:aws:sns:::' + topic_name
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     response, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(response)
-    assert_equal(parsed_result['arn'], topic_arn)
+    topic_arn = parsed_result['arn']
     # create one s3 notification
     notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
     s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name,
@@ -332,6 +340,8 @@ def test_ps_s3_notification():
 def test_ps_topic():
     """ test set/get/delete of topic """
     _, ps_zones = init_env()
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
@@ -345,6 +355,8 @@ def test_ps_topic():
     parsed_result = json.loads(result)
     assert_equal(parsed_result['topic']['name'], topic_name)
     assert_equal(len(parsed_result['subs']), 0)
+    assert_equal(parsed_result['topic']['arn'],
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
     # delete topic
     _, status = topic_conf.del_config()
     assert_equal(status/100, 2)
@@ -354,6 +366,30 @@ def test_ps_topic():
     assert_equal(parsed_result['Code'], 'NoSuchKey')
 
 
+def test_ps_topic_with_endpoint():
+    """ test set topic with endpoint"""
+    _, ps_zones = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
+
+    # create topic
+    dest_endpoint = 'amqp://localhost:7001'
+    dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name, 
+                         endpoint=dest_endpoint,
+                         endpoint_args=dest_args)
+    _, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
+    # get topic
+    result, _ = topic_conf.get_config()
+    # verify topic content
+    parsed_result = json.loads(result)
+    assert_equal(parsed_result['topic']['name'], topic_name)
+    assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint)
+    # cleanup
+    topic_conf.del_config()
+
+
 def test_ps_notification():
     """ test set/get/delete of notification """
     zones, ps_zones = init_env()
@@ -930,15 +966,16 @@ def test_ps_s3_push_http():
     # create topic
     topic_conf = PSTopic(ps_zones[0].conn, topic_name,
                          endpoint='http://localhost:9001')
-    _, status = topic_conf.set_config()
+    result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
     # create bucket on the first of the rados zones
     bucket = zones[0].create_bucket(bucket_name)
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_arn = 'arn:aws:sns:::' + topic_name
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
                                             notification_name, topic_arn, ['s3:ObjectCreated:*'])
     _, status = s3_notification_conf.set_config()
@@ -1027,15 +1064,15 @@ def test_ps_s3_push_amqp():
     topic_conf = PSTopic(ps_zones[0].conn, topic_name,
                          endpoint='amqp://localhost',
                          endpoint_args='amqp-exchange=ex1&amqp-ack-level=none')
-    _, status = topic_conf.set_config()
+    result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
+    topic_arn = 'arn:aws:sns:::' + topic_name
     # create bucket on the first of the rados zones
     bucket = zones[0].create_bucket(bucket_name)
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_arn = 'arn:aws:sns:::' + topic_name
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
                                             notification_name, topic_arn, ['s3:ObjectCreated:*'])
     _, status = s3_notification_conf.set_config()
@@ -1074,12 +1111,11 @@ def test_ps_delete_bucket():
     topic_name = bucket_name + TOPIC_SUFFIX
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
-    topic_arn = 'arn:aws:sns:::' + topic_name
     topic_conf = PSTopic(ps_zones[0].conn, topic_name)
     response, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(response)
-    assert_equal(parsed_result['arn'], topic_arn)
+    topic_arn = parsed_result['arn']
     # create one s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name,
index 6b12a6c0e0e3be120b1961f956f0cc019256bf38..ebfc5b505a1e5f926c24f270e3ed12b382baab26 100644 (file)
@@ -347,13 +347,16 @@ def init(parse_args):
                     arg = ['--display-name', '"Test User"']
                     arg += user_creds.credential_args()
                     if args.tenant:
-                        cmd += ['--tenant', args.tenant]
+                        arg += ['--tenant', args.tenant]
                     user.create(zone, arg)
                 else:
                     # read users and update keys
                     admin_user.info(zone)
                     admin_creds = admin_user.credentials[0]
-                    user.info(zone)
+                    arg = []
+                    if args.tenant:
+                        arg += ['--tenant', args.tenant]
+                    user.info(zone, arg)
                     user_creds = user.credentials[0]
 
     if not bootstrap:
@@ -361,7 +364,8 @@ def init(parse_args):
 
     config = Config(checkpoint_retries=args.checkpoint_retries,
                     checkpoint_delay=args.checkpoint_delay,
-                    reconfigure_delay=args.reconfigure_delay)
+                    reconfigure_delay=args.reconfigure_delay,
+                    tenant=args.tenant)
     init_multi(realm, user, config)
 
 def setup_module():