]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: add opaque data
authorYuval Lifshitz <yuvalif@yahoo.com>
Mon, 13 Jan 2020 10:48:24 +0000 (12:48 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 26 Apr 2020 11:22:29 +0000 (14:22 +0300)
opaque data may be set in topic configuration and later on sent inside
nottifications triggered by that topic.

Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
(cherry picked from commit 07630a8759b0836f56d4ee6938e2b711ffb4e169)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Conflicts:
src/test/rgw/rgw_multi/tests_ps.py

13 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
doc/radosgw/s3-notification-compatibility.rst
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_pubsub_common.cc
src/rgw/rgw_rest_pubsub_common.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index 9ec16ea83d10e7fd84d1f3cf2bb479a7795643df..3fd7f2d518bea5739b50ccffc8498b6367e841fe 100644 (file)
@@ -72,10 +72,13 @@ To update a topic, use the same command used for topic creation, with the topic
    [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
    [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
    [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
+   [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
 
 Request parameters:
 
 - push-endpoint: URI of an endpoint to send push notification to
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
+
 - HTTP endpoint 
 
  - URI: ``http[s]://<fqdn>[:<port]``
@@ -158,6 +161,7 @@ Response will have the following format:
                     <EndpointTopic></EndpointTopic>
                 </EndPoint>
                 <TopicArn></TopicArn>
+                <OpaqueData></OpaqueData>
             </Topic>
         </GetTopicResult>
         <ResponseMetadata>
@@ -219,6 +223,7 @@ Response will have the following format:
                         <EndpointTopic></EndpointTopic>
                     </EndPoint>
                     <TopicArn></TopicArn>
+                    <OpaqueData></OpaqueData>
                 </member>
             </Topics>
         </ListTopicsResult>
@@ -288,6 +293,7 @@ pushed or pulled using the pubsub sync module.
                }
            },
            "eventId":"",
+           "opaqueData":"",
        }
    ]}
 
@@ -311,6 +317,7 @@ pushed or pulled using the pubsub sync module.
 - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) 
 - s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
 
 .. _PubSub Module : ../pubsub-module
 .. _S3 Notification Compatibility: ../s3-notification-compatibility
index cf3951fec9450f3972835f3fe0f190ff32b538b7..cc299cb5538152c2fb39b14554fb6c6b5fbd91ce 100644 (file)
@@ -149,11 +149,12 @@ To update a topic, use the same command used for topic creation, with the topic
 
 ::
 
-   PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
+   PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
 
 Request parameters:
 
 - push-endpoint: URI of an endpoint to send push notification to
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
 
 The endpoint URI may include parameters depending with the type of endpoint:
 
@@ -220,6 +221,7 @@ Response will have the following format (JSON):
                "push_endpoint_topic":""
            },
            "arn":""
+           "opaqueData":""
        },
        "subs":[]
    }             
@@ -501,6 +503,7 @@ the events will have an S3-compatible record format (JSON):
                }
            },
            "eventId":"",
+           "opaqueData":"",
        }
    ]}
 
@@ -523,6 +526,7 @@ the events will have an S3-compatible record format (JSON):
 - s3.object.metadata: not supported (an extension to the S3 notification API)
 - s3.object.tags: not supported (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
 
 In case that the subscription was not created via a non S3-compatible notification, 
 the events will have the following event format (JSON):
index ffbceca64d4e95e3c27ef50edd37ffb6cc663477..b587fcf495f9fb95f25f586604729785a52f23b8 100644 (file)
@@ -112,10 +112,14 @@ Note that most of the API is not applicable to Ceph, and only the following acti
  - ``DeleteTopic``
  - ``ListTopics``
 
-We also extend it by: 
+We also have the following extensions to topic configuration: 
+
+ - In ``GetTopic`` we allow fetching a specific topic, instead of all user topics
+ - In ``CreateTopic``
+
+  - we allow setting endpoint attributes
+  - we allow setting opaque data thta will be sent to the endpoint in the notification
 
- - ``GetTopic`` - allowing for fetching a specific topic, instead of all user topics
- - In ``CreateTopic`` we allow setting endpoint attributes
 
 .. _AWS Simple Notification Service API: https://docs.aws.amazon.com/sns/latest/api/API_Operations.html
 .. _AWS S3 Bucket Notifications API: https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
index 75a5eeb0ca948db73a45b381b5a94f38507bba09..55da57799ae92ac506ec36f0212d7d7c7195db29 100644 (file)
@@ -24,7 +24,7 @@ void populate_record_from_request(const req_state *s,
   record.userIdentity = s->user->user_id.id;    // user that triggered the change
   record.x_amz_request_id = s->req_id;          // request ID of the original change
   record.x_amz_id_2 = s->host_id;               // RGW on which the change was made
-  // configurationId is filled from subscription configuration
+  // configurationId is filled from notification configuration
   record.bucket_name = s->bucket_name;
   record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
   record.bucket_arn = to_string(rgw::ARN(s->bucket));
@@ -42,6 +42,7 @@ void populate_record_from_request(const req_state *s,
   record.x_meta_map = s->info.x_meta_map;
   // pass tags
   record.tags = s->tagset.get_tags();
+  // opaque data will be filled from topic configuration
 }
 
 bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
@@ -86,6 +87,7 @@ int publish(const req_state* s,
         }
         event_should_be_handled = true;
         record.configurationId = topic_filter.s3_id;
+        record.opaque_data = topic_cfg.opaque_data;
         ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << 
             "' on topic: '" << topic_cfg.dest.arn_topic << 
             "' and bucket: '" << s->bucket.name << 
index 8f945e82296c24c57b2090d00b3476126833c5cd..f3ff342ff4c551490aa5d1c54e37f41bf4256fbb 100644 (file)
@@ -282,6 +282,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
     }
   }
   encode_json("eventId", id, f);
+  encode_json("opaqueData", opaque_data, f);
 }
 
 void rgw_pubsub_event::dump(Formatter *f) const
@@ -299,6 +300,7 @@ void rgw_pubsub_topic::dump(Formatter *f) const
   encode_json("name", name, f);
   encode_json("dest", dest, f);
   encode_json("arn", arn, f);
+  encode_json("opaqueData", opaque_data, f);
 }
 
 void rgw_pubsub_topic::dump_xml(Formatter *f) const
@@ -307,6 +309,7 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const
   encode_xml("Name", name, f);
   encode_xml("EndPoint", dest, f);
   encode_xml("TopicArn", arn, f);
+  encode_xml("OpaqueData", opaque_data, f);
 }
 
 void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
@@ -556,10 +559,10 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
 }
 
 int RGWUserPubSub::create_topic(const string& name) {
-  return create_topic(name, rgw_pubsub_sub_dest(), "");
+  return create_topic(name, rgw_pubsub_sub_dest(), "", "");
 }
 
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) {
+int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_user_topics topics;
 
@@ -575,6 +578,7 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
   new_topic.topic.name = name;
   new_topic.topic.dest = dest;
   new_topic.topic.arn = arn;
+  new_topic.topic.opaque_data = opaque_data;
 
   ret = write_user_topics(topics, &objv_tracker);
   if (ret < 0) {
index fbe6e15dbc80984fd0ba6b42d61b228ff0adb6dd..d7b1758ad446043b629c4cee1cbb42ca36b2bdb0 100644 (file)
@@ -97,7 +97,7 @@ WRITE_CLASS_ENCODER(rgw_s3_filter)
 
 using OptionalFilter = std::optional<rgw_s3_filter>;
 
-class rgw_pubsub_topic_filter;
+struct rgw_pubsub_topic_filter;
 /* S3 notification configuration
  * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
@@ -254,9 +254,12 @@ struct rgw_pubsub_s3_record {
   KeyValueList x_meta_map;
   // tags
   KeyValueList tags;
+  // opaque data received from the topic
+  // could be used to identify the gateway
+  std::string opaque_data;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(4, 1, bl);
     encode(eventVersion, bl);
     encode(eventSource, bl);
     encode(awsRegion, bl);
@@ -280,11 +283,12 @@ struct rgw_pubsub_s3_record {
     encode(bucket_id, bl);
     encode(x_meta_map, bl);
     encode(tags, bl);
+    encode(opaque_data, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(3, bl);
+    DECODE_START(4, bl);
     decode(eventVersion, bl);
     decode(eventSource, bl);
     decode(awsRegion, bl);
@@ -306,11 +310,14 @@ struct rgw_pubsub_s3_record {
     decode(object_sequencer, bl);
     decode(id, bl);
     if (struct_v >= 2) {
-        decode(bucket_id, bl);
-        decode(x_meta_map, bl);
+      decode(bucket_id, bl);
+      decode(x_meta_map, bl);
     }
     if (struct_v >= 3) {
-        decode(tags, bl);
+      decode(tags, bl);
+    }
+    if (struct_v >= 4) {
+      decode(opaque_data, bl);
     }
     DECODE_FINISH(bl);
   }
@@ -433,24 +440,29 @@ struct rgw_pubsub_topic {
   std::string name;
   rgw_pubsub_sub_dest dest;
   std::string arn;
+  std::string opaque_data;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(user, bl);
     encode(name, bl);
     encode(dest, bl);
     encode(arn, bl);
+    encode(opaque_data, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(user, bl);
     decode(name, bl);
     if (struct_v >= 2) {
       decode(dest, bl);
       decode(arn, bl);
     }
+    if (struct_v >= 3) {
+      decode(opaque_data, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -750,7 +762,7 @@ public:
   // create a topic with push destination information and ARN
   // if the topic already exists the destination and ARN values may be updated (considered succsess)
   // return 0 on success, error code otherwise
-  int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
+  int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data);
   // remove a topic according to its name
   // if the topic does not exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
index 675830db9e3dd24be454fb47276fffaf3fdc7dc4..de4babd4d2c2ea5355677025b810299b171ff34e 100644 (file)
@@ -32,6 +32,8 @@ public:
       return -EINVAL;
     }
 
+    opaque_data = s->info.args.get("OpaqueData");
+
     dest.push_endpoint = s->info.args.get("push-endpoint");
 
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
@@ -489,7 +491,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     // generate the internal topic. destination is stored here for the "push-only" case
     // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn);
+    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
index 1b969ceb947f4d366c031dfcd17d789882fd160c..3b5de53fa4d3a7c159544d2d4586c6f15eafee54 100644 (file)
@@ -52,7 +52,7 @@ void RGWPSCreateTopicOp::execute() {
   }
 
   ups.emplace(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name, dest, topic_arn);
+  op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
index f11c75658f52395d93443c70b5f14837fd341a1f..d472fa4076d9818c886a3ab43c71b35d21bbece8 100644 (file)
@@ -18,6 +18,7 @@ protected:
   std::string topic_name;
   rgw_pubsub_sub_dest dest;
   std::string topic_arn;
+  std::string opaque_data;
   
   virtual int get_params() = 0;
 
index 1cb76c7bb9c86d56b8594f8d8e4d29fdb2b138a8..fd514b81addad466193ce8ed050990cad6800db9 100644 (file)
@@ -144,10 +144,12 @@ using  PSSubConfigRef = std::shared_ptr<PSSubConfig>;
 struct PSTopicConfig {
   std::string name;
   std::set<std::string> subs;
+  std::string opaque_data;
 
   void dump(Formatter *f) const {
     encode_json("name", name, f);
     encode_json("subs", subs, f);
+    encode_json("opaque", opaque_data, f);
   }
 };
 
@@ -191,10 +193,10 @@ using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
 
 struct PSConfig {
-  string id{"pubsub"};
+  const std::string id{"pubsub"};
   rgw_user user;
-  string data_bucket_prefix;
-  string data_oid_prefix;
+  std::string data_bucket_prefix;
+  std::string data_oid_prefix;
 
   int events_retention_days{0};
 
@@ -230,7 +232,7 @@ struct PSConfig {
     }
     {
       Formatter::ObjectSection section(*f, "notifications");
-      string last;
+      std::string last;
       for (auto& notif : notifications) {
         const string& n = notif.first;
         if (n != last) {
@@ -284,7 +286,7 @@ struct PSConfig {
   }
 
   void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
-    string path = bucket.name + "/" + key.name;
+    const std::string path = bucket.name + "/" + key.name;
 
     auto iter = notifications.upper_bound(path);
     if (iter == notifications.begin()) {
@@ -1142,6 +1144,7 @@ public:
         std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
         tc->name = info.name;
         tc->subs = user_topics.topics[info.name].subs;
+        tc->opaque_data = info.opaque_data;
         (*topics)->push_back(tc);
       }
 
@@ -1166,7 +1169,7 @@ class RGWPSHandleObjEventCR : public RGWCoroutine {
   PSSubscriptionRef sub;
   std::array<rgw_user, 2>::const_iterator oiter;
   std::vector<PSTopicConfigRef>::const_iterator titer;
-  std::set<string>::const_iterator siter;
+  std::set<std::string>::const_iterator siter;
   int last_sub_conf_error;
 
 public:
@@ -1247,6 +1250,7 @@ public:
               // subscription was made by S3 compatible API
               ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
               record->configurationId = sub->sub_conf->s3_id;
+              record->opaque_data = (*titer)->opaque_data;
               yield call(PSSubscription::store_event_cr(sync_env, sub, record));
               if (retcode < 0) {
                 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
@@ -1321,7 +1325,7 @@ public:
       {
         std::vector<std::pair<std::string, std::string> > attrs;
         for (auto& attr : attrs) {
-          string k = attr.first;
+          std::string k = attr.first;
           if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
             k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
           }
@@ -1521,7 +1525,7 @@ public:
 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
 {
   data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
-  string jconf = json_str("conf", *data_handler->get_conf());
+  const std::string jconf = json_str("conf", *data_handler->get_conf());
   JSONParser p;
   if (!p.parse(jconf.c_str(), jconf.size())) {
     ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
index 069072ab35922166cbff03175e754d284e953655..aec5a346a0e31103a92ade276d1ec395ced43e34 100644 (file)
@@ -24,6 +24,7 @@ public:
     
     topic_name = s->object.name;
 
+    opaque_data = s->info.args.get("OpaqueData");
     dest.push_endpoint = s->info.args.get("push-endpoint");
     
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
index 376f815313e03984a0c15332202b80321783c077..b5df8817cd6b40382e90868f994751404bfcb432 100644 (file)
@@ -140,6 +140,13 @@ class StreamingHTTPServer:
             worker.reset_events()
         verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
 
+    def get_and_reset_events(self):
+        events = []
+        for worker in self.workers:
+            events += worker.get_events()
+            worker.reset_events()
+        return events
+
     def close(self):
         """close all workers in the http server and wait for it to finish"""
         # make sure that the shared socket is closed
@@ -1796,6 +1803,150 @@ def test_ps_s3_notification_push_http_on_master():
     http_server.close()
 
 
+def test_ps_s3_opaque_data():
+    """ test that opaque id set in topic, is sent in notification """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, ps_zones  = init_env(require_ps=True)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    opaque_data = 'http://1.2.3.4:8888'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+    result, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    content = 'bar'
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+    # check http receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+    
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads] 
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
+
+def test_ps_s3_opaque_data_on_master():
+    """ test that opaque id set in topic, is sent in notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    opaque_data = 'http://1.2.3.4:8888'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    start_time = time.time()
+    content = 'bar'
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    
+    # check http receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+    
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads] 
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
 def test_ps_topic():
     """ test set/get/delete of topic """
     _, ps_zones = init_env()
@@ -2645,8 +2796,9 @@ def test_ps_s3_metadata_on_master():
     # check amqp receiver
     event_count = 0
     for event in receiver.get_and_reset_events():
-        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
-        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+        s3_event = event['Records'][0]['s3']
+        assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
         event_count +=1
 
     # only PUT and POST has the metadata value
@@ -2660,8 +2812,9 @@ def test_ps_s3_metadata_on_master():
     # check amqp receiver
     event_count = 0
     for event in receiver.get_and_reset_events():
-        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
-        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+        s3_event = event['Records'][0]['s3']
+        assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
         event_count +=1
 
     # all 3 object has metadata when deleted
index 61d32679bc75e272db31e8dafb6998709a25b64c..4ccff3eb0ad4e5e335cba9b04cf85c75797dfe92 100644 (file)
@@ -173,12 +173,12 @@ def delete_all_s3_topics(zone, region):
 
 class PSTopicS3:
     """class to set/list/get/delete a topic
-    POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
+    POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
     POST ?Action=ListTopics
     POST ?Action=GetTopic&TopicArn=<topic-arn>
     POST ?Action=DeleteTopic&TopicArn=<topic-arn>
     """
-    def __init__(self, conn, topic_name, region, endpoint_args=None):
+    def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
         self.conn = conn
         self.topic_name = topic_name.strip()
         assert self.topic_name
@@ -186,14 +186,16 @@ class PSTopicS3:
         self.attributes = {}
         if endpoint_args is not None:
             self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
-            protocol = 'https' if conn.is_secure else 'http'
-            self.client = boto3.client('sns',
-                               endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
-                               aws_access_key_id=conn.aws_access_key_id,
-                               aws_secret_access_key=conn.aws_secret_access_key,
-                               region_name=region,
-                               verify='./cert.pem',
-                               config=Config(signature_version='s3'))
+        if opaque_data is not None:
+            self.attributes['OpaqueData'] = opaque_data
+        protocol = 'https' if conn.is_secure else 'http'
+        self.client = boto3.client('sns',
+                           endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+                           aws_access_key_id=conn.aws_access_key_id,
+                           aws_secret_access_key=conn.aws_secret_access_key,
+                           region_name=region,
+                           verify='./cert.pem',
+                           config=Config(signature_version='s3'))
 
 
     def get_config(self):