From 07630a8759b0836f56d4ee6938e2b711ffb4e169 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 13 Jan 2020 12:48:24 +0200 Subject: [PATCH] rgw/notification: add opaque data opaque data may be set in topic configuration and later on sent inside nottifications triggered by that topic. Signed-off-by: Yuval Lifshitz --- doc/radosgw/notifications.rst | 7 + doc/radosgw/pubsub-module.rst | 6 +- doc/radosgw/s3-notification-compatibility.rst | 10 +- src/rgw/rgw_notify.cc | 4 +- src/rgw/rgw_pubsub.cc | 8 +- src/rgw/rgw_pubsub.h | 30 +++- src/rgw/rgw_rest_pubsub.cc | 4 +- src/rgw/rgw_rest_pubsub_common.cc | 2 +- src/rgw/rgw_rest_pubsub_common.h | 1 + src/rgw/rgw_sync_module_pubsub.cc | 20 ++- src/rgw/rgw_sync_module_pubsub_rest.cc | 1 + src/test/rgw/rgw_multi/tests_ps.py | 161 +++++++++++++++++- src/test/rgw/rgw_multi/zone_ps.py | 22 +-- 13 files changed, 236 insertions(+), 40 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index ced68e0d935c..fa608a409cf8 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -72,10 +72,13 @@ To update a topic, use the same command used for topic creation, with the topic [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker] [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false] [&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=] + [&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=] Request parameters: - push-endpoint: URI of an endpoint to send push notification to +- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic + - HTTP endpoint - URI: ``http[s]://[: + @@ -219,6 +223,7 @@ Response will have the following format: + @@ -288,6 +293,7 @@ pushed or pulled using the pubsub sync module. } }, "eventId":"", + "opaqueData":"", } ]} @@ -311,6 +317,7 @@ pushed or pulled using the pubsub sync module. - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) - s3.object.tags: any tags set on the objcet (an extension to the S3 notification API) - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) +- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API) .. _PubSub Module : ../pubsub-module .. _S3 Notification Compatibility: ../s3-notification-compatibility diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index eb4158bab52e..61cd4def207f 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -150,11 +150,12 @@ To update a topic, use the same command used for topic creation, with the topic :: - PUT /topics/[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] + PUT /topics/[?OpaqueData=][&push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] Request parameters: - push-endpoint: URI of an endpoint to send push notification to +- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic The endpoint URI may include parameters depending with the type of endpoint: @@ -221,6 +222,7 @@ Response will have the following format (JSON): "push_endpoint_topic":"" }, "arn":"" + "opaqueData":"" }, "subs":[] } @@ -502,6 +504,7 @@ the events will have an S3-compatible record format (JSON): } }, "eventId":"", + "opaqueData":"", } ]} @@ -524,6 +527,7 @@ the events will have an S3-compatible record format (JSON): - s3.object.metadata: not supported (an extension to the S3 notification API) - s3.object.tags: not supported (an extension to the S3 notification API) - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) +- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the ropic (an extension to the S3 notification API) In case that the subscription was not created via a non S3-compatible notification, the events will have the following event format (JSON): diff --git a/doc/radosgw/s3-notification-compatibility.rst b/doc/radosgw/s3-notification-compatibility.rst index ffbceca64d4e..b587fcf495f9 100644 --- a/doc/radosgw/s3-notification-compatibility.rst +++ b/doc/radosgw/s3-notification-compatibility.rst @@ -112,10 +112,14 @@ Note that most of the API is not applicable to Ceph, and only the following acti - ``DeleteTopic`` - ``ListTopics`` -We also extend it by: +We also have the following extensions to topic configuration: + + - In ``GetTopic`` we allow fetching a specific topic, instead of all user topics + - In ``CreateTopic`` + + - we allow setting endpoint attributes + - we allow setting opaque data thta will be sent to the endpoint in the notification - - ``GetTopic`` - allowing for fetching a specific topic, instead of all user topics - - In ``CreateTopic`` we allow setting endpoint attributes .. _AWS Simple Notification Service API: https://docs.aws.amazon.com/sns/latest/api/API_Operations.html .. _AWS S3 Bucket Notifications API: https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 87cea8f0c0af..79ac062df7a2 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -24,7 +24,7 @@ void populate_record_from_request(const req_state *s, record.userIdentity = s->user->get_id().id; // user that triggered the change record.x_amz_request_id = s->req_id; // request ID of the original change record.x_amz_id_2 = s->host_id; // RGW on which the change was made - // configurationId is filled from subscription configuration + // configurationId is filled from notification configuration record.bucket_name = s->bucket_name; record.bucket_ownerIdentity = s->bucket_owner.get_id().id; record.bucket_arn = to_string(rgw::ARN(s->bucket)); @@ -42,6 +42,7 @@ void populate_record_from_request(const req_state *s, record.x_meta_map = s->info.x_meta_map; // pass tags record.tags = s->tagset.get_tags(); + // opaque data will be filled from topic configuration } bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) { @@ -86,6 +87,7 @@ int publish(const req_state* s, } event_should_be_handled = true; record.configurationId = topic_filter.s3_id; + record.opaque_data = topic_cfg.opaque_data; ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << "' on topic: '" << topic_cfg.dest.arn_topic << "' and bucket: '" << s->bucket.name << diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 3236d525f491..947578b2831a 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -282,6 +282,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const { } } encode_json("eventId", id, f); + encode_json("opaqueData", opaque_data, f); } void rgw_pubsub_event::dump(Formatter *f) const @@ -299,6 +300,7 @@ void rgw_pubsub_topic::dump(Formatter *f) const encode_json("name", name, f); encode_json("dest", dest, f); encode_json("arn", arn, f); + encode_json("opaqueData", opaque_data, f); } void rgw_pubsub_topic::dump_xml(Formatter *f) const @@ -307,6 +309,7 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const encode_xml("Name", name, f); encode_xml("EndPoint", dest, f); encode_xml("TopicArn", arn, f); + encode_xml("OpaqueData", opaque_data, f); } void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) @@ -562,10 +565,10 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) } int RGWUserPubSub::create_topic(const string& name) { - return create_topic(name, rgw_pubsub_sub_dest(), ""); + return create_topic(name, rgw_pubsub_sub_dest(), "", ""); } -int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) { +int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) { RGWObjVersionTracker objv_tracker; rgw_pubsub_user_topics topics; @@ -581,6 +584,7 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d new_topic.topic.name = name; new_topic.topic.dest = dest; new_topic.topic.arn = arn; + new_topic.topic.opaque_data = opaque_data; ret = write_user_topics(topics, &objv_tracker); if (ret < 0) { diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 7c74701c90fd..256c5918585b 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -97,7 +97,7 @@ WRITE_CLASS_ENCODER(rgw_s3_filter) using OptionalFilter = std::optional; -class rgw_pubsub_topic_filter; +struct rgw_pubsub_topic_filter; /* S3 notification configuration * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html @@ -254,9 +254,12 @@ struct rgw_pubsub_s3_record { KeyValueList x_meta_map; // tags KeyValueList tags; + // opaque data received from the topic + // could be used to identify the gateway + std::string opaque_data; void encode(bufferlist& bl) const { - ENCODE_START(3, 1, bl); + ENCODE_START(4, 1, bl); encode(eventVersion, bl); encode(eventSource, bl); encode(awsRegion, bl); @@ -280,11 +283,12 @@ struct rgw_pubsub_s3_record { encode(bucket_id, bl); encode(x_meta_map, bl); encode(tags, bl); + encode(opaque_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(3, bl); + DECODE_START(4, bl); decode(eventVersion, bl); decode(eventSource, bl); decode(awsRegion, bl); @@ -306,11 +310,14 @@ struct rgw_pubsub_s3_record { decode(object_sequencer, bl); decode(id, bl); if (struct_v >= 2) { - decode(bucket_id, bl); - decode(x_meta_map, bl); + decode(bucket_id, bl); + decode(x_meta_map, bl); } if (struct_v >= 3) { - decode(tags, bl); + decode(tags, bl); + } + if (struct_v >= 4) { + decode(opaque_data, bl); } DECODE_FINISH(bl); } @@ -433,24 +440,29 @@ struct rgw_pubsub_topic { std::string name; rgw_pubsub_sub_dest dest; std::string arn; + std::string opaque_data; void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(user, bl); encode(name, bl); encode(dest, bl); encode(arn, bl); + encode(opaque_data, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); + DECODE_START(3, bl); decode(user, bl); decode(name, bl); if (struct_v >= 2) { decode(dest, bl); decode(arn, bl); } + if (struct_v >= 3) { + decode(opaque_data, bl); + } DECODE_FINISH(bl); } @@ -739,7 +751,7 @@ public: // create a topic with push destination information and ARN // if the topic already exists the destination and ARN values may be updated (considered succsess) // return 0 on success, error code otherwise - int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn); + int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data); // remove a topic according to its name // if the topic does not exists it is a no-op (considered success) // return 0 on success, error code otherwise diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 94f295ba5585..65f783a293ef 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -32,6 +32,8 @@ public: return -EINVAL; } + opaque_data = s->info.args.get("OpaqueData"); + dest.push_endpoint = s->info.args.get("push-endpoint"); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { @@ -489,7 +491,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() { // generate the internal topic. destination is stored here for the "push-only" case // when no subscription exists // ARN is cached to make the "GET" method faster - op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn); + op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data); if (op_ret < 0) { ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << "', ret=" << op_ret << dendl; diff --git a/src/rgw/rgw_rest_pubsub_common.cc b/src/rgw/rgw_rest_pubsub_common.cc index 30d058f7bdb1..ba09a3073dc0 100644 --- a/src/rgw/rgw_rest_pubsub_common.cc +++ b/src/rgw/rgw_rest_pubsub_common.cc @@ -52,7 +52,7 @@ void RGWPSCreateTopicOp::execute() { } ups.emplace(store, s->owner.get_id()); - op_ret = ups->create_topic(topic_name, dest, topic_arn); + op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data); if (op_ret < 0) { ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl; return; diff --git a/src/rgw/rgw_rest_pubsub_common.h b/src/rgw/rgw_rest_pubsub_common.h index f11c75658f52..d472fa4076d9 100644 --- a/src/rgw/rgw_rest_pubsub_common.h +++ b/src/rgw/rgw_rest_pubsub_common.h @@ -18,6 +18,7 @@ protected: std::string topic_name; rgw_pubsub_sub_dest dest; std::string topic_arn; + std::string opaque_data; virtual int get_params() = 0; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index f674149c3a49..abdf8b1f545a 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -148,10 +148,12 @@ using PSSubConfigRef = std::shared_ptr; struct PSTopicConfig { std::string name; std::set subs; + std::string opaque_data; void dump(Formatter *f) const { encode_json("name", name, f); encode_json("subs", subs, f); + encode_json("opaque", opaque_data, f); } }; @@ -195,10 +197,10 @@ using PSTopicConfigRef = std::shared_ptr; using TopicsRef = std::shared_ptr>; struct PSConfig { - string id{"pubsub"}; + const std::string id{"pubsub"}; rgw_user user; - string data_bucket_prefix; - string data_oid_prefix; + std::string data_bucket_prefix; + std::string data_oid_prefix; int events_retention_days{0}; @@ -234,7 +236,7 @@ struct PSConfig { } { Formatter::ObjectSection section(*f, "notifications"); - string last; + std::string last; for (auto& notif : notifications) { const string& n = notif.first; if (n != last) { @@ -288,7 +290,7 @@ struct PSConfig { } void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { - string path = bucket.name + "/" + key.name; + const std::string path = bucket.name + "/" + key.name; auto iter = notifications.upper_bound(path); if (iter == notifications.begin()) { @@ -1149,6 +1151,7 @@ public: std::shared_ptr tc = std::make_shared(); tc->name = info.name; tc->subs = user_topics.topics[info.name].subs; + tc->opaque_data = info.opaque_data; (*topics)->push_back(tc); } @@ -1173,7 +1176,7 @@ class RGWPSHandleObjEventCR : public RGWCoroutine { PSSubscriptionRef sub; std::array::const_iterator oiter; std::vector::const_iterator titer; - std::set::const_iterator siter; + std::set::const_iterator siter; int last_sub_conf_error; public: @@ -1254,6 +1257,7 @@ public: // subscription was made by S3 compatible API ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; record->configurationId = sub->sub_conf->s3_id; + record->opaque_data = (*titer)->opaque_data; yield call(PSSubscription::store_event_cr(sync_env, sub, record)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); @@ -1328,7 +1332,7 @@ public: { std::vector > attrs; for (auto& attr : attrs) { - string k = attr.first; + std::string k = attr.first; if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); } @@ -1528,7 +1532,7 @@ public: RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) { data_handler = std::unique_ptr(new RGWPSDataSyncModule(cct, config)); - string jconf = json_str("conf", *data_handler->get_conf()); + const std::string jconf = json_str("conf", *data_handler->get_conf()); JSONParser p; if (!p.parse(jconf.c_str(), jconf.size())) { ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index c9ac49766e81..9a77ea78627a 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -25,6 +25,7 @@ public: topic_name = s->object.name; + opaque_data = s->info.args.get("OpaqueData"); dest.push_endpoint = s->info.args.get("push-endpoint"); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 38c11a9e844f..4eb5bb891401 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -140,6 +140,13 @@ class StreamingHTTPServer: worker.reset_events() verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions) + def get_and_reset_events(self): + events = [] + for worker in self.workers: + events += worker.get_events() + worker.reset_events() + return events + def close(self): """close all workers in the http server and wait for it to finish""" # make sure that the shared socket is closed @@ -1796,6 +1803,150 @@ def test_ps_s3_notification_push_http_on_master(): http_server.close() +def test_ps_s3_opaque_data(): + """ test that opaque id set in topic, is sent in notification """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + zones, ps_zones = init_env(require_ps=True) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + # wait for sync + zone_meta_checkpoint(ps_zones[0].zone) + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + opaque_data = 'http://1.2.3.4:8888' + endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data + topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) + result, status = topic_conf.set_config() + assert_equal(status/100, 2) + parsed_result = json.loads(result) + topic_arn = parsed_result['arn'] + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + content = 'bar' + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + # wait for sync + zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + + # check http receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + events = http_server.get_and_reset_events() + for event in events: + assert_equal(event['Records'][0]['opaqueData'], opaque_data) + + # cleanup + for key in keys: + key.delete() + [thr.join() for thr in client_threads] + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + zones[0].delete_bucket(bucket_name) + http_server.close() + + +def test_ps_s3_opaque_data_on_master(): + """ test that opaque id set in topic, is sent in notification on master """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + zones, _ = init_env(require_ps=False) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + opaque_data = 'http://1.2.3.4:8888' + topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + start_time = time.time() + content = 'bar' + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + events = http_server.get_and_reset_events() + for event in events: + assert_equal(event['Records'][0]['opaqueData'], opaque_data) + + # cleanup + for key in keys: + key.delete() + [thr.join() for thr in client_threads] + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + zones[0].delete_bucket(bucket_name) + http_server.close() + def test_ps_topic(): """ test set/get/delete of topic """ _, ps_zones = init_env() @@ -2645,8 +2796,9 @@ def test_ps_s3_metadata_on_master(): # check amqp receiver event_count = 0 for event in receiver.get_and_reset_events(): - assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value) + s3_event = event['Records'][0]['s3'] + assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) + assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) event_count +=1 # only PUT and POST has the metadata value @@ -2660,8 +2812,9 @@ def test_ps_s3_metadata_on_master(): # check amqp receiver event_count = 0 for event in receiver.get_and_reset_events(): - assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value) + s3_event = event['Records'][0]['s3'] + assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) + assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) event_count +=1 # all 3 object has metadata when deleted diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index ddefde179bf8..090fbc789fc2 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -173,12 +173,12 @@ def delete_all_s3_topics(zone, region): class PSTopicS3: """class to set/list/get/delete a topic - POST ?Action=CreateTopic&Name=&push-endpoint=&[=...]] + POST ?Action=CreateTopic&Name=[&OpaqueData=[&push-endpoint=&[=...]]] POST ?Action=ListTopics POST ?Action=GetTopic&TopicArn= POST ?Action=DeleteTopic&TopicArn= """ - def __init__(self, conn, topic_name, region, endpoint_args=None): + def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None): self.conn = conn self.topic_name = topic_name.strip() assert self.topic_name @@ -186,14 +186,16 @@ class PSTopicS3: self.attributes = {} if endpoint_args is not None: self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)} - protocol = 'https' if conn.is_secure else 'http' - self.client = boto3.client('sns', - endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), - aws_access_key_id=conn.aws_access_key_id, - aws_secret_access_key=conn.aws_secret_access_key, - region_name=region, - verify='./cert.pem', - config=Config(signature_version='s3')) + if opaque_data is not None: + self.attributes['OpaqueData'] = opaque_data + protocol = 'https' if conn.is_secure else 'http' + self.client = boto3.client('sns', + endpoint_url=protocol+'://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + region_name=region, + verify='./cert.pem', + config=Config(signature_version='s3')) def get_config(self): -- 2.47.3