]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: add opaque data 32723/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Mon, 13 Jan 2020 10:48:24 +0000 (12:48 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Sun, 19 Jan 2020 13:37:56 +0000 (15:37 +0200)
opaque data may be set in topic configuration and later on sent inside
nottifications triggered by that topic.

Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
13 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
doc/radosgw/s3-notification-compatibility.rst
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_pubsub_common.cc
src/rgw/rgw_rest_pubsub_common.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index ced68e0d935cca224cc50641d393f1a4436b1a57..fa608a409cf857dd56200ba91ac6a910f3f5a94a 100644 (file)
@@ -72,10 +72,13 @@ To update a topic, use the same command used for topic creation, with the topic
    [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
    [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
    [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
+   [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
 
 Request parameters:
 
 - push-endpoint: URI of an endpoint to send push notification to
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
+
 - HTTP endpoint 
 
  - URI: ``http[s]://<fqdn>[:<port]``
@@ -158,6 +161,7 @@ Response will have the following format:
                     <EndpointTopic></EndpointTopic>
                 </EndPoint>
                 <TopicArn></TopicArn>
+                <OpaqueData></OpaqueData>
             </Topic>
         </GetTopicResult>
         <ResponseMetadata>
@@ -219,6 +223,7 @@ Response will have the following format:
                         <EndpointTopic></EndpointTopic>
                     </EndPoint>
                     <TopicArn></TopicArn>
+                    <OpaqueData></OpaqueData>
                 </member>
             </Topics>
         </ListTopicsResult>
@@ -288,6 +293,7 @@ pushed or pulled using the pubsub sync module.
                }
            },
            "eventId":"",
+           "opaqueData":"",
        }
    ]}
 
@@ -311,6 +317,7 @@ pushed or pulled using the pubsub sync module.
 - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) 
 - s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
 
 .. _PubSub Module : ../pubsub-module
 .. _S3 Notification Compatibility: ../s3-notification-compatibility
index eb4158bab52efed6c8ee4cf02be76a92137f5b52..61cd4def207ff93c54b66c97216a18c1f92dec08 100644 (file)
@@ -150,11 +150,12 @@ To update a topic, use the same command used for topic creation, with the topic
 
 ::
 
-   PUT /topics/<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
+   PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
 
 Request parameters:
 
 - push-endpoint: URI of an endpoint to send push notification to
+- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic
 
 The endpoint URI may include parameters depending with the type of endpoint:
 
@@ -221,6 +222,7 @@ Response will have the following format (JSON):
                "push_endpoint_topic":""
            },
            "arn":""
+           "opaqueData":""
        },
        "subs":[]
    }             
@@ -502,6 +504,7 @@ the events will have an S3-compatible record format (JSON):
                }
            },
            "eventId":"",
+           "opaqueData":"",
        }
    ]}
 
@@ -524,6 +527,7 @@ the events will have an S3-compatible record format (JSON):
 - s3.object.metadata: not supported (an extension to the S3 notification API)
 - s3.object.tags: not supported (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
+- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API)
 
 In case that the subscription was not created via a non S3-compatible notification, 
 the events will have the following event format (JSON):
index ffbceca64d4e95e3c27ef50edd37ffb6cc663477..b587fcf495f9fb95f25f586604729785a52f23b8 100644 (file)
@@ -112,10 +112,14 @@ Note that most of the API is not applicable to Ceph, and only the following acti
  - ``DeleteTopic``
  - ``ListTopics``
 
-We also extend it by: 
+We also have the following extensions to topic configuration: 
+
+ - In ``GetTopic`` we allow fetching a specific topic, instead of all user topics
+ - In ``CreateTopic``
+
+  - we allow setting endpoint attributes
+  - we allow setting opaque data thta will be sent to the endpoint in the notification
 
- - ``GetTopic`` - allowing for fetching a specific topic, instead of all user topics
- - In ``CreateTopic`` we allow setting endpoint attributes
 
 .. _AWS Simple Notification Service API: https://docs.aws.amazon.com/sns/latest/api/API_Operations.html
 .. _AWS S3 Bucket Notifications API: https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
index 87cea8f0c0afadabd9158009da60712c044abd6c..79ac062df7a2691bc22bccc46a237dec319c3a93 100644 (file)
@@ -24,7 +24,7 @@ void populate_record_from_request(const req_state *s,
   record.userIdentity = s->user->get_id().id;    // user that triggered the change
   record.x_amz_request_id = s->req_id;          // request ID of the original change
   record.x_amz_id_2 = s->host_id;               // RGW on which the change was made
-  // configurationId is filled from subscription configuration
+  // configurationId is filled from notification configuration
   record.bucket_name = s->bucket_name;
   record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
   record.bucket_arn = to_string(rgw::ARN(s->bucket));
@@ -42,6 +42,7 @@ void populate_record_from_request(const req_state *s,
   record.x_meta_map = s->info.x_meta_map;
   // pass tags
   record.tags = s->tagset.get_tags();
+  // opaque data will be filled from topic configuration
 }
 
 bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
@@ -86,6 +87,7 @@ int publish(const req_state* s,
         }
         event_should_be_handled = true;
         record.configurationId = topic_filter.s3_id;
+        record.opaque_data = topic_cfg.opaque_data;
         ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << 
             "' on topic: '" << topic_cfg.dest.arn_topic << 
             "' and bucket: '" << s->bucket.name << 
index 3236d525f4913c190bbe60a1945da6d6142a589f..947578b2831a1a75b2ed9273f2100235569bdabd 100644 (file)
@@ -282,6 +282,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
     }
   }
   encode_json("eventId", id, f);
+  encode_json("opaqueData", opaque_data, f);
 }
 
 void rgw_pubsub_event::dump(Formatter *f) const
@@ -299,6 +300,7 @@ void rgw_pubsub_topic::dump(Formatter *f) const
   encode_json("name", name, f);
   encode_json("dest", dest, f);
   encode_json("arn", arn, f);
+  encode_json("opaqueData", opaque_data, f);
 }
 
 void rgw_pubsub_topic::dump_xml(Formatter *f) const
@@ -307,6 +309,7 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const
   encode_xml("Name", name, f);
   encode_xml("EndPoint", dest, f);
   encode_xml("TopicArn", arn, f);
+  encode_xml("OpaqueData", opaque_data, f);
 }
 
 void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
@@ -562,10 +565,10 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
 }
 
 int RGWUserPubSub::create_topic(const string& name) {
-  return create_topic(name, rgw_pubsub_sub_dest(), "");
+  return create_topic(name, rgw_pubsub_sub_dest(), "", "");
 }
 
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) {
+int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_user_topics topics;
 
@@ -581,6 +584,7 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
   new_topic.topic.name = name;
   new_topic.topic.dest = dest;
   new_topic.topic.arn = arn;
+  new_topic.topic.opaque_data = opaque_data;
 
   ret = write_user_topics(topics, &objv_tracker);
   if (ret < 0) {
index 7c74701c90fdeaaa73d7612faf2ffc5d674802da..256c5918585b7e6e22ebf7b548f26cf78fad0763 100644 (file)
@@ -97,7 +97,7 @@ WRITE_CLASS_ENCODER(rgw_s3_filter)
 
 using OptionalFilter = std::optional<rgw_s3_filter>;
 
-class rgw_pubsub_topic_filter;
+struct rgw_pubsub_topic_filter;
 /* S3 notification configuration
  * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
@@ -254,9 +254,12 @@ struct rgw_pubsub_s3_record {
   KeyValueList x_meta_map;
   // tags
   KeyValueList tags;
+  // opaque data received from the topic
+  // could be used to identify the gateway
+  std::string opaque_data;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(4, 1, bl);
     encode(eventVersion, bl);
     encode(eventSource, bl);
     encode(awsRegion, bl);
@@ -280,11 +283,12 @@ struct rgw_pubsub_s3_record {
     encode(bucket_id, bl);
     encode(x_meta_map, bl);
     encode(tags, bl);
+    encode(opaque_data, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(3, bl);
+    DECODE_START(4, bl);
     decode(eventVersion, bl);
     decode(eventSource, bl);
     decode(awsRegion, bl);
@@ -306,11 +310,14 @@ struct rgw_pubsub_s3_record {
     decode(object_sequencer, bl);
     decode(id, bl);
     if (struct_v >= 2) {
-        decode(bucket_id, bl);
-        decode(x_meta_map, bl);
+      decode(bucket_id, bl);
+      decode(x_meta_map, bl);
     }
     if (struct_v >= 3) {
-        decode(tags, bl);
+      decode(tags, bl);
+    }
+    if (struct_v >= 4) {
+      decode(opaque_data, bl);
     }
     DECODE_FINISH(bl);
   }
@@ -433,24 +440,29 @@ struct rgw_pubsub_topic {
   std::string name;
   rgw_pubsub_sub_dest dest;
   std::string arn;
+  std::string opaque_data;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(user, bl);
     encode(name, bl);
     encode(dest, bl);
     encode(arn, bl);
+    encode(opaque_data, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(user, bl);
     decode(name, bl);
     if (struct_v >= 2) {
       decode(dest, bl);
       decode(arn, bl);
     }
+    if (struct_v >= 3) {
+      decode(opaque_data, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -739,7 +751,7 @@ public:
   // create a topic with push destination information and ARN
   // if the topic already exists the destination and ARN values may be updated (considered succsess)
   // return 0 on success, error code otherwise
-  int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
+  int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data);
   // remove a topic according to its name
   // if the topic does not exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
index 94f295ba5585320ac1a9296563862c2d121cad3c..65f783a293ef4b5b9f3a8373484e6124b8654fa2 100644 (file)
@@ -32,6 +32,8 @@ public:
       return -EINVAL;
     }
 
+    opaque_data = s->info.args.get("OpaqueData");
+
     dest.push_endpoint = s->info.args.get("push-endpoint");
 
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
@@ -489,7 +491,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     // generate the internal topic. destination is stored here for the "push-only" case
     // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn);
+    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
index 30d058f7bdb13c0ca8aad821a4bc2447333e119e..ba09a3073dc0d845ba86def68acdf1b70f9e989a 100644 (file)
@@ -52,7 +52,7 @@ void RGWPSCreateTopicOp::execute() {
   }
 
   ups.emplace(store, s->owner.get_id());
-  op_ret = ups->create_topic(topic_name, dest, topic_arn);
+  op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data);
   if (op_ret < 0) {
     ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
index f11c75658f52395d93443c70b5f14837fd341a1f..d472fa4076d9818c886a3ab43c71b35d21bbece8 100644 (file)
@@ -18,6 +18,7 @@ protected:
   std::string topic_name;
   rgw_pubsub_sub_dest dest;
   std::string topic_arn;
+  std::string opaque_data;
   
   virtual int get_params() = 0;
 
index f674149c3a49372f00ca71d174572396d7fef60a..abdf8b1f545a7a6262d7d97eb92b6bf39996e939 100644 (file)
@@ -148,10 +148,12 @@ using  PSSubConfigRef = std::shared_ptr<PSSubConfig>;
 struct PSTopicConfig {
   std::string name;
   std::set<std::string> subs;
+  std::string opaque_data;
 
   void dump(Formatter *f) const {
     encode_json("name", name, f);
     encode_json("subs", subs, f);
+    encode_json("opaque", opaque_data, f);
   }
 };
 
@@ -195,10 +197,10 @@ using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
 
 struct PSConfig {
-  string id{"pubsub"};
+  const std::string id{"pubsub"};
   rgw_user user;
-  string data_bucket_prefix;
-  string data_oid_prefix;
+  std::string data_bucket_prefix;
+  std::string data_oid_prefix;
 
   int events_retention_days{0};
 
@@ -234,7 +236,7 @@ struct PSConfig {
     }
     {
       Formatter::ObjectSection section(*f, "notifications");
-      string last;
+      std::string last;
       for (auto& notif : notifications) {
         const string& n = notif.first;
         if (n != last) {
@@ -288,7 +290,7 @@ struct PSConfig {
   }
 
   void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
-    string path = bucket.name + "/" + key.name;
+    const std::string path = bucket.name + "/" + key.name;
 
     auto iter = notifications.upper_bound(path);
     if (iter == notifications.begin()) {
@@ -1149,6 +1151,7 @@ public:
         std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
         tc->name = info.name;
         tc->subs = user_topics.topics[info.name].subs;
+        tc->opaque_data = info.opaque_data;
         (*topics)->push_back(tc);
       }
 
@@ -1173,7 +1176,7 @@ class RGWPSHandleObjEventCR : public RGWCoroutine {
   PSSubscriptionRef sub;
   std::array<rgw_user, 2>::const_iterator oiter;
   std::vector<PSTopicConfigRef>::const_iterator titer;
-  std::set<string>::const_iterator siter;
+  std::set<std::string>::const_iterator siter;
   int last_sub_conf_error;
 
 public:
@@ -1254,6 +1257,7 @@ public:
               // subscription was made by S3 compatible API
               ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
               record->configurationId = sub->sub_conf->s3_id;
+              record->opaque_data = (*titer)->opaque_data;
               yield call(PSSubscription::store_event_cr(sync_env, sub, record));
               if (retcode < 0) {
                 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
@@ -1328,7 +1332,7 @@ public:
       {
         std::vector<std::pair<std::string, std::string> > attrs;
         for (auto& attr : attrs) {
-          string k = attr.first;
+          std::string k = attr.first;
           if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
             k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
           }
@@ -1528,7 +1532,7 @@ public:
 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
 {
   data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
-  string jconf = json_str("conf", *data_handler->get_conf());
+  const std::string jconf = json_str("conf", *data_handler->get_conf());
   JSONParser p;
   if (!p.parse(jconf.c_str(), jconf.size())) {
     ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
index c9ac49766e8111967f0b56a3bdd4acf5fa5bdf86..9a77ea78627a847b299f9342692b0b4773d44e93 100644 (file)
@@ -25,6 +25,7 @@ public:
     
     topic_name = s->object.name;
 
+    opaque_data = s->info.args.get("OpaqueData");
     dest.push_endpoint = s->info.args.get("push-endpoint");
     
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
index 38c11a9e844f17ab4bf8812047001b98871a3cc0..4eb5bb8914018eb89b0d8ddd294595dc01c6c42f 100644 (file)
@@ -140,6 +140,13 @@ class StreamingHTTPServer:
             worker.reset_events()
         verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
 
+    def get_and_reset_events(self):
+        events = []
+        for worker in self.workers:
+            events += worker.get_events()
+            worker.reset_events()
+        return events
+
     def close(self):
         """close all workers in the http server and wait for it to finish"""
         # make sure that the shared socket is closed
@@ -1796,6 +1803,150 @@ def test_ps_s3_notification_push_http_on_master():
     http_server.close()
 
 
+def test_ps_s3_opaque_data():
+    """ test that opaque id set in topic, is sent in notification """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, ps_zones  = init_env(require_ps=True)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+    # wait for sync
+    zone_meta_checkpoint(ps_zones[0].zone)
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    opaque_data = 'http://1.2.3.4:8888'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
+    topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+    result, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    content = 'bar'
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    # wait for sync
+    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+
+    # check http receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+    
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads] 
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
+
+def test_ps_s3_opaque_data_on_master():
+    """ test that opaque id set in topic, is sent in notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    opaque_data = 'http://1.2.3.4:8888'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    start_time = time.time()
+    content = 'bar'
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    
+    # check http receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+    
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads] 
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
 def test_ps_topic():
     """ test set/get/delete of topic """
     _, ps_zones = init_env()
@@ -2645,8 +2796,9 @@ def test_ps_s3_metadata_on_master():
     # check amqp receiver
     event_count = 0
     for event in receiver.get_and_reset_events():
-        assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
-        assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value)
+        s3_event = event['Records'][0]['s3']
+        assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
         event_count +=1
 
     # only PUT and POST has the metadata value
@@ -2660,8 +2812,9 @@ def test_ps_s3_metadata_on_master():
     # check amqp receiver
     event_count = 0
     for event in receiver.get_and_reset_events():
-        assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
-        assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value)
+        s3_event = event['Records'][0]['s3']
+        assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
         event_count +=1
 
     # all 3 object has metadata when deleted
index ddefde179bf858ffcb080e5be563eb1e5cddf249..090fbc789fc2bab29e5997cbe3b9de03c70673fe 100644 (file)
@@ -173,12 +173,12 @@ def delete_all_s3_topics(zone, region):
 
 class PSTopicS3:
     """class to set/list/get/delete a topic
-    POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
+    POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
     POST ?Action=ListTopics
     POST ?Action=GetTopic&TopicArn=<topic-arn>
     POST ?Action=DeleteTopic&TopicArn=<topic-arn>
     """
-    def __init__(self, conn, topic_name, region, endpoint_args=None):
+    def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
         self.conn = conn
         self.topic_name = topic_name.strip()
         assert self.topic_name
@@ -186,14 +186,16 @@ class PSTopicS3:
         self.attributes = {}
         if endpoint_args is not None:
             self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
-            protocol = 'https' if conn.is_secure else 'http'
-            self.client = boto3.client('sns',
-                               endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
-                               aws_access_key_id=conn.aws_access_key_id,
-                               aws_secret_access_key=conn.aws_secret_access_key,
-                               region_name=region,
-                               verify='./cert.pem',
-                               config=Config(signature_version='s3'))
+        if opaque_data is not None:
+            self.attributes['OpaqueData'] = opaque_data
+        protocol = 'https' if conn.is_secure else 'http'
+        self.client = boto3.client('sns',
+                           endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+                           aws_access_key_id=conn.aws_access_key_id,
+                           aws_secret_access_key=conn.aws_secret_access_key,
+                           region_name=region,
+                           verify='./cert.pem',
+                           config=Config(signature_version='s3'))
 
 
     def get_config(self):