From: Yuval Lifshitz Date: Mon, 13 Jan 2020 10:48:24 +0000 (+0200) Subject: rgw/notification: add opaque data X-Git-Tag: v14.2.10~89^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=33e5784022899bb5e94220f4bcd489686dae03a3;p=ceph.git rgw/notification: add opaque data opaque data may be set in topic configuration and later on sent inside nottifications triggered by that topic. Signed-off-by: Yuval Lifshitz (cherry picked from commit 07630a8759b0836f56d4ee6938e2b711ffb4e169) Signed-off-by: Yuval Lifshitz Conflicts: src/test/rgw/rgw_multi/tests_ps.py --- diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 9ec16ea83d10..3fd7f2d518be 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -72,10 +72,13 @@ To update a topic, use the same command used for topic creation, with the topic [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker] [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false] [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=] + [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=] Request parameters: - push-endpoint: URI of an endpoint to send push notification to +- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic + - HTTP endpoint - URI: ``http[s]://[: + @@ -219,6 +223,7 @@ Response will have the following format: + @@ -288,6 +293,7 @@ pushed or pulled using the pubsub sync module. } }, "eventId":"", + "opaqueData":"", } ]} @@ -311,6 +317,7 @@ pushed or pulled using the pubsub sync module. - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) - s3.object.tags: any tags set on the objcet (an extension to the S3 notification API) - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) +- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API) .. _PubSub Module : ../pubsub-module .. _S3 Notification Compatibility: ../s3-notification-compatibility diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index cf3951fec945..cc299cb55381 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -149,11 +149,12 @@ To update a topic, use the same command used for topic creation, with the topic :: - PUT /topics/[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] + PUT /topics/[?OpaqueData=][&push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] Request parameters: - push-endpoint: URI of an endpoint to send push notification to +- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic The endpoint URI may include parameters depending with the type of endpoint: @@ -220,6 +221,7 @@ Response will have the following format (JSON): "push_endpoint_topic":"" }, "arn":"" + "opaqueData":"" }, "subs":[] } @@ -501,6 +503,7 @@ the events will have an S3-compatible record format (JSON): } }, "eventId":"", + "opaqueData":"", } ]} @@ -523,6 +526,7 @@ the events will have an S3-compatible record format (JSON): - s3.object.metadata: not supported (an extension to the S3 notification API) - s3.object.tags: not supported (an extension to the S3 notification API) - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) +- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API) In case that the subscription was not created via a non S3-compatible notification, the events will have the following event format (JSON): diff --git a/doc/radosgw/s3-notification-compatibility.rst b/doc/radosgw/s3-notification-compatibility.rst index ffbceca64d4e..b587fcf495f9 100644 --- a/doc/radosgw/s3-notification-compatibility.rst +++ b/doc/radosgw/s3-notification-compatibility.rst @@ -112,10 +112,14 @@ Note that most of the API is not applicable to Ceph, and only the following acti - ``DeleteTopic`` - ``ListTopics`` -We also extend it by: +We also have the following extensions to topic configuration: + + - In ``GetTopic`` we allow fetching a specific topic, instead of all user topics + - In ``CreateTopic`` + + - we allow setting endpoint attributes + - we allow setting opaque data thta will be sent to the endpoint in the notification - - ``GetTopic`` - allowing for fetching a specific topic, instead of all user topics - - In ``CreateTopic`` we allow setting endpoint attributes .. _AWS Simple Notification Service API: https://docs.aws.amazon.com/sns/latest/api/API_Operations.html .. _AWS S3 Bucket Notifications API: https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 75a5eeb0ca94..55da57799ae9 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -24,7 +24,7 @@ void populate_record_from_request(const req_state *s, record.userIdentity = s->user->user_id.id; // user that triggered the change record.x_amz_request_id = s->req_id; // request ID of the original change record.x_amz_id_2 = s->host_id; // RGW on which the change was made - // configurationId is filled from subscription configuration + // configurationId is filled from notification configuration record.bucket_name = s->bucket_name; record.bucket_ownerIdentity = s->bucket_owner.get_id().id; record.bucket_arn = to_string(rgw::ARN(s->bucket)); @@ -42,6 +42,7 @@ void populate_record_from_request(const req_state *s, record.x_meta_map = s->info.x_meta_map; // pass tags record.tags = s->tagset.get_tags(); + // opaque data will be filled from topic configuration } bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) { @@ -86,6 +87,7 @@ int publish(const req_state* s, } event_should_be_handled = true; record.configurationId = topic_filter.s3_id; + record.opaque_data = topic_cfg.opaque_data; ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << "' on topic: '" << topic_cfg.dest.arn_topic << "' and bucket: '" << s->bucket.name << diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 8f945e82296c..f3ff342ff4c5 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -282,6 +282,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const { } } encode_json("eventId", id, f); + encode_json("opaqueData", opaque_data, f); } void rgw_pubsub_event::dump(Formatter *f) const @@ -299,6 +300,7 @@ void rgw_pubsub_topic::dump(Formatter *f) const encode_json("name", name, f); encode_json("dest", dest, f); encode_json("arn", arn, f); + encode_json("opaqueData", opaque_data, f); } void rgw_pubsub_topic::dump_xml(Formatter *f) const @@ -307,6 +309,7 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const encode_xml("Name", name, f); encode_xml("EndPoint", dest, f); encode_xml("TopicArn", arn, f); + encode_xml("OpaqueData", opaque_data, f); } void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) @@ -556,10 +559,10 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) } int RGWUserPubSub::create_topic(const string& name) { - return create_topic(name, rgw_pubsub_sub_dest(), ""); + return create_topic(name, rgw_pubsub_sub_dest(), "", ""); } -int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) { +int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) { RGWObjVersionTracker objv_tracker; rgw_pubsub_user_topics topics; @@ -575,6 +578,7 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d new_topic.topic.name = name; new_topic.topic.dest = dest; new_topic.topic.arn = arn; + new_topic.topic.opaque_data = opaque_data; ret = write_user_topics(topics, &objv_tracker); if (ret < 0) { diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index fbe6e15dbc80..d7b1758ad446 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -97,7 +97,7 @@ WRITE_CLASS_ENCODER(rgw_s3_filter) using OptionalFilter = std::optional; -class rgw_pubsub_topic_filter; +struct rgw_pubsub_topic_filter; /* S3 notification configuration * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html @@ -254,9 +254,12 @@ struct rgw_pubsub_s3_record { KeyValueList x_meta_map; // tags KeyValueList tags; + // opaque data received from the topic + // could be used to identify the gateway + std::string opaque_data; void encode(bufferlist& bl) const { - ENCODE_START(3, 1, bl); + ENCODE_START(4, 1, bl); encode(eventVersion, bl); encode(eventSource, bl); encode(awsRegion, bl); @@ -280,11 +283,12 @@ struct rgw_pubsub_s3_record { encode(bucket_id, bl); encode(x_meta_map, bl); encode(tags, bl); + encode(opaque_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(3, bl); + DECODE_START(4, bl); decode(eventVersion, bl); decode(eventSource, bl); decode(awsRegion, bl); @@ -306,11 +310,14 @@ struct rgw_pubsub_s3_record { decode(object_sequencer, bl); decode(id, bl); if (struct_v >= 2) { - decode(bucket_id, bl); - decode(x_meta_map, bl); + decode(bucket_id, bl); + decode(x_meta_map, bl); } if (struct_v >= 3) { - decode(tags, bl); + decode(tags, bl); + } + if (struct_v >= 4) { + decode(opaque_data, bl); } DECODE_FINISH(bl); } @@ -433,24 +440,29 @@ struct rgw_pubsub_topic { std::string name; rgw_pubsub_sub_dest dest; std::string arn; + std::string opaque_data; void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(user, bl); encode(name, bl); encode(dest, bl); encode(arn, bl); + encode(opaque_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); + DECODE_START(3, bl); decode(user, bl); decode(name, bl); if (struct_v >= 2) { decode(dest, bl); decode(arn, bl); } + if (struct_v >= 3) { + decode(opaque_data, bl); + } DECODE_FINISH(bl); } @@ -750,7 +762,7 @@ public: // create a topic with push destination information and ARN // if the topic already exists the destination and ARN values may be updated (considered succsess) // return 0 on success, error code otherwise - int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn); + int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data); // remove a topic according to its name // if the topic does not exists it is a no-op (considered success) // return 0 on success, error code otherwise diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 675830db9e3d..de4babd4d2c2 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -32,6 +32,8 @@ public: return -EINVAL; } + opaque_data = s->info.args.get("OpaqueData"); + dest.push_endpoint = s->info.args.get("push-endpoint"); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { @@ -489,7 +491,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { // generate the internal topic. destination is stored here for the "push-only" case // when no subscription exists // ARN is cached to make the "GET" method faster - op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn); + op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data); if (op_ret < 0) { ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << "', ret=" << op_ret << dendl; diff --git a/src/rgw/rgw_rest_pubsub_common.cc b/src/rgw/rgw_rest_pubsub_common.cc index 1b969ceb947f..3b5de53fa4d3 100644 --- a/src/rgw/rgw_rest_pubsub_common.cc +++ b/src/rgw/rgw_rest_pubsub_common.cc @@ -52,7 +52,7 @@ void RGWPSCreateTopicOp::execute() { } ups.emplace(store, s->owner.get_id()); - op_ret = ups->create_topic(topic_name, dest, topic_arn); + op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data); if (op_ret < 0) { ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl; return; diff --git a/src/rgw/rgw_rest_pubsub_common.h b/src/rgw/rgw_rest_pubsub_common.h index f11c75658f52..d472fa4076d9 100644 --- a/src/rgw/rgw_rest_pubsub_common.h +++ b/src/rgw/rgw_rest_pubsub_common.h @@ -18,6 +18,7 @@ protected: std::string topic_name; rgw_pubsub_sub_dest dest; std::string topic_arn; + std::string opaque_data; virtual int get_params() = 0; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 1cb76c7bb9c8..fd514b81adda 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -144,10 +144,12 @@ using PSSubConfigRef = std::shared_ptr; struct PSTopicConfig { std::string name; std::set subs; + std::string opaque_data; void dump(Formatter *f) const { encode_json("name", name, f); encode_json("subs", subs, f); + encode_json("opaque", opaque_data, f); } }; @@ -191,10 +193,10 @@ using PSTopicConfigRef = std::shared_ptr; using TopicsRef = std::shared_ptr>; struct PSConfig { - string id{"pubsub"}; + const std::string id{"pubsub"}; rgw_user user; - string data_bucket_prefix; - string data_oid_prefix; + std::string data_bucket_prefix; + std::string data_oid_prefix; int events_retention_days{0}; @@ -230,7 +232,7 @@ struct PSConfig { } { Formatter::ObjectSection section(*f, "notifications"); - string last; + std::string last; for (auto& notif : notifications) { const string& n = notif.first; if (n != last) { @@ -284,7 +286,7 @@ struct PSConfig { } void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { - string path = bucket.name + "/" + key.name; + const std::string path = bucket.name + "/" + key.name; auto iter = notifications.upper_bound(path); if (iter == notifications.begin()) { @@ -1142,6 +1144,7 @@ public: std::shared_ptr tc = std::make_shared(); tc->name = info.name; tc->subs = user_topics.topics[info.name].subs; + tc->opaque_data = info.opaque_data; (*topics)->push_back(tc); } @@ -1166,7 +1169,7 @@ class RGWPSHandleObjEventCR : public RGWCoroutine { PSSubscriptionRef sub; std::array::const_iterator oiter; std::vector::const_iterator titer; - std::set::const_iterator siter; + std::set::const_iterator siter; int last_sub_conf_error; public: @@ -1247,6 +1250,7 @@ public: // subscription was made by S3 compatible API ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; record->configurationId = sub->sub_conf->s3_id; + record->opaque_data = (*titer)->opaque_data; yield call(PSSubscription::store_event_cr(sync_env, sub, record)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); @@ -1321,7 +1325,7 @@ public: { std::vector > attrs; for (auto& attr : attrs) { - string k = attr.first; + std::string k = attr.first; if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); } @@ -1521,7 +1525,7 @@ public: RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) { data_handler = std::unique_ptr(new RGWPSDataSyncModule(cct, config)); - string jconf = json_str("conf", *data_handler->get_conf()); + const std::string jconf = json_str("conf", *data_handler->get_conf()); JSONParser p; if (!p.parse(jconf.c_str(), jconf.size())) { ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index 069072ab3592..aec5a346a0e3 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -24,6 +24,7 @@ public: topic_name = s->object.name; + opaque_data = s->info.args.get("OpaqueData"); dest.push_endpoint = s->info.args.get("push-endpoint"); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 376f815313e0..b5df8817cd6b 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -140,6 +140,13 @@ class StreamingHTTPServer: worker.reset_events() verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions) + def get_and_reset_events(self): + events = [] + for worker in self.workers: + events += worker.get_events() + worker.reset_events() + return events + def close(self): """close all workers in the http server and wait for it to finish""" # make sure that the shared socket is closed @@ -1796,6 +1803,150 @@ def test_ps_s3_notification_push_http_on_master(): http_server.close() +def test_ps_s3_opaque_data(): + """ test that opaque id set in topic, is sent in notification """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + zones, ps_zones = init_env(require_ps=True) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + opaque_data = 'http://1.2.3.4:8888' + endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data + topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) + result, status = topic_conf.set_config() + assert_equal(status/100, 2) + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + content = 'bar' + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # check http receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + events = http_server.get_and_reset_events() + for event in events: + assert_equal(event['Records'][0]['opaqueData'], opaque_data) + + # cleanup + for key in keys: + key.delete() + [thr.join() for thr in client_threads] + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + zones[0].delete_bucket(bucket_name) + http_server.close() + + +def test_ps_s3_opaque_data_on_master(): + """ test that opaque id set in topic, is sent in notification on master """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + zones, _ = init_env(require_ps=False) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + opaque_data = 'http://1.2.3.4:8888' + topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + start_time = time.time() + content = 'bar' + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + events = http_server.get_and_reset_events() + for event in events: + assert_equal(event['Records'][0]['opaqueData'], opaque_data) + + # cleanup + for key in keys: + key.delete() + [thr.join() for thr in client_threads] + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + zones[0].delete_bucket(bucket_name) + http_server.close() + def test_ps_topic(): """ test set/get/delete of topic """ _, ps_zones = init_env() @@ -2645,8 +2796,9 @@ def test_ps_s3_metadata_on_master(): # check amqp receiver event_count = 0 for event in receiver.get_and_reset_events(): - assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value) + s3_event = event['Records'][0]['s3'] + assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) + assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) event_count +=1 # only PUT and POST has the metadata value @@ -2660,8 +2812,9 @@ def test_ps_s3_metadata_on_master(): # check amqp receiver event_count = 0 for event in receiver.get_and_reset_events(): - assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value) + s3_event = event['Records'][0]['s3'] + assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) + assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) event_count +=1 # all 3 object has metadata when deleted diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index 61d32679bc75..4ccff3eb0ad4 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -173,12 +173,12 @@ def delete_all_s3_topics(zone, region): class PSTopicS3: """class to set/list/get/delete a topic - POST ?Action=CreateTopic&Name=&push-endpoint=&[=...]] + POST ?Action=CreateTopic&Name=[&OpaqueData=[&push-endpoint=&[=...]]] POST ?Action=ListTopics POST ?Action=GetTopic&TopicArn= POST ?Action=DeleteTopic&TopicArn= """ - def __init__(self, conn, topic_name, region, endpoint_args=None): + def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None): self.conn = conn self.topic_name = topic_name.strip() assert self.topic_name @@ -186,14 +186,16 @@ class PSTopicS3: self.attributes = {} if endpoint_args is not None: self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)} - protocol = 'https' if conn.is_secure else 'http' - self.client = boto3.client('sns', - endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), - aws_access_key_id=conn.aws_access_key_id, - aws_secret_access_key=conn.aws_secret_access_key, - region_name=region, - verify='./cert.pem', - config=Config(signature_version='s3')) + if opaque_data is not None: + self.attributes['OpaqueData'] = opaque_data + protocol = 'https' if conn.is_secure else 'http' + self.client = boto3.client('sns', + endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + region_name=region, + verify='./cert.pem', + config=Config(signature_version='s3')) def get_config(self):