From: Yuval Lifshitz Date: Thu, 24 Nov 2022 14:29:59 +0000 (+0200) Subject: rgw/pubsub: remove subscription data structures X-Git-Tag: v18.1.0~470^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2f91d774da0cc52d681c738518aba4318a6864dd;p=ceph-ci.git rgw/pubsub: remove subscription data structures Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/driver/rados/rgw_pubsub.cc b/src/rgw/driver/rados/rgw_pubsub.cc index b9aa54bacd8..ab03e30f58f 100644 --- a/src/rgw/driver/rados/rgw_pubsub.cc +++ b/src/rgw/driver/rados/rgw_pubsub.cc @@ -349,12 +349,6 @@ void rgw_pubsub_topic_filter::dump(Formatter *f) const encode_json("events", events, f); } -void rgw_pubsub_topic_subs::dump(Formatter *f) const -{ - encode_json("topic", topic, f); - encode_json("subs", subs, f); -} - void rgw_pubsub_bucket_topics::dump(Formatter *f) const { Formatter::ArraySection s(*f, "topics"); @@ -374,14 +368,12 @@ void rgw_pubsub_topics::dump(Formatter *f) const void rgw_pubsub_topics::dump_xml(Formatter *f) const { for (auto& t : topics) { - encode_xml("member", t.second.topic, f); + encode_xml("member", t.second, f); } } -void rgw_pubsub_sub_dest::dump(Formatter *f) const +void rgw_pubsub_dest::dump(Formatter *f) const { - encode_json("bucket_name", bucket_name, f); - encode_json("oid_prefix", oid_prefix, f); encode_json("push_endpoint", push_endpoint, f); encode_json("push_endpoint_args", push_endpoint_args, f); encode_json("push_endpoint_topic", arn_topic, f); @@ -389,10 +381,8 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const encode_json("persistent", persistent, f); } -void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const +void rgw_pubsub_dest::dump_xml(Formatter *f) const { - // first 2 members are omitted here since they - // dont apply to AWS compliant topics encode_xml("EndpointAddress", push_endpoint, f); encode_xml("EndpointArgs", push_endpoint_args, f); encode_xml("EndpointTopic", arn_topic, f); @@ -400,10 +390,8 @@ void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const encode_xml("Persistent", persistent, f); } -std::string rgw_pubsub_sub_dest::to_json_str() const +std::string rgw_pubsub_dest::to_json_str() const { - // first 2 members are omitted here since they - // dont apply to AWS compliant topics JSONFormatter f; f.open_object_section(""); encode_json("EndpointAddress", push_endpoint, &f); @@ -417,15 +405,6 @@ std::string rgw_pubsub_sub_dest::to_json_str() const return ss.str(); } -void rgw_pubsub_sub_config::dump(Formatter *f) const -{ - encode_json("user", user, f); - encode_json("name", name, f); - encode_json("topic", topic, f); - encode_json("dest", dest, f); - encode_json("s3_id", s3_id, f); -} - RGWPubSub::RGWPubSub(rgw::sal::RadosStore* _store, const std::string& _tenant) : store(_store), tenant(_tenant), svc_sysobj(store->svc()->sysobj) { @@ -499,25 +478,6 @@ int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) return read_topics(result, nullptr); } -int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) -{ - rgw_pubsub_topics topics; - int ret = get_topics(&topics); - if (ret < 0) { - ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; - return ret; - } - - auto iter = topics.topics.find(name); - if (iter == topics.topics.end()) { - ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; - return -ENOENT; - } - - *result = iter->second; - return 0; -} - int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result) { rgw_pubsub_topics topics; @@ -533,7 +493,7 @@ int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result) return -ENOENT; } - *result = iter->second.topic; + *result = iter->second; return 0; } @@ -542,7 +502,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const } int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) { - rgw_pubsub_topic_subs topic_info; + rgw_pubsub_topic topic_info; int ret = ps->get_topic(topic_name, &topic_info); if (ret < 0) { @@ -564,7 +524,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const bucket.name << "'" << dendl; auto& topic_filter = bucket_topics.topics[topic_name]; - topic_filter.topic = topic_info.topic; + topic_filter.topic = topic_info; topic_filter.events = events; topic_filter.s3_id = notif_name; if (s3_filter) { @@ -584,7 +544,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y) { - rgw_pubsub_topic_subs topic_info; + rgw_pubsub_topic topic_info; int ret = ps->get_topic(topic_name, &topic_info); if (ret < 0) { @@ -653,10 +613,10 @@ int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optio } int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) { - return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y); + return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y); } -int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) { +int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) { RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; @@ -667,12 +627,12 @@ int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, c return ret; } - rgw_pubsub_topic_subs& new_topic = topics.topics[name]; - new_topic.topic.user = rgw_user("", tenant); - new_topic.topic.name = name; - new_topic.topic.dest = dest; - new_topic.topic.arn = arn; - new_topic.topic.opaque_data = opaque_data; + rgw_pubsub_topic& new_topic = topics.topics[name]; + new_topic.user = rgw_user("", tenant); + new_topic.name = name; + new_topic.dest = dest; + new_topic.arn = arn; + new_topic.opaque_data = opaque_data; ret = write_topics(dpp, topics, &objv_tracker, y); if (ret < 0) { @@ -717,7 +677,3 @@ void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); } -void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { - *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name)); -} - diff --git a/src/rgw/driver/rados/rgw_pubsub.h b/src/rgw/driver/rados/rgw_pubsub.h index 08a329e4c02..3e6bebfceaa 100644 --- a/src/rgw/driver/rados/rgw_pubsub.h +++ b/src/rgw/driver/rados/rgw_pubsub.h @@ -334,9 +334,7 @@ WRITE_CLASS_ENCODER(rgw_pubsub_s3_event) // setting a unique ID for an event based on object hash and timestamp void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); -struct rgw_pubsub_sub_dest { - std::string bucket_name; - std::string oid_prefix; +struct rgw_pubsub_dest { std::string push_endpoint; std::string push_endpoint_args; std::string arn_topic; @@ -345,8 +343,8 @@ struct rgw_pubsub_sub_dest { void encode(bufferlist& bl) const { ENCODE_START(5, 1, bl); - encode(bucket_name, bl); - encode(oid_prefix, bl); + encode("", bl); + encode("", bl); encode(push_endpoint, bl); encode(push_endpoint_args, bl); encode(arn_topic, bl); @@ -357,8 +355,9 @@ struct rgw_pubsub_sub_dest { void decode(bufferlist::const_iterator& bl) { DECODE_START(5, bl); - decode(bucket_name, bl); - decode(oid_prefix, bl); + std::string dummy; + decode(dummy, bl); + decode(dummy, bl); decode(push_endpoint, bl); if (struct_v >= 2) { decode(push_endpoint_args, bl); @@ -379,45 +378,12 @@ struct rgw_pubsub_sub_dest { void dump_xml(Formatter *f) const; std::string to_json_str() const; }; -WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) - -struct rgw_pubsub_sub_config { - rgw_user user; - std::string name; - std::string topic; - rgw_pubsub_sub_dest dest; - std::string s3_id; - - void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); - encode(user, bl); - encode(name, bl); - encode(topic, bl); - encode(dest, bl); - encode(s3_id, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); - decode(user, bl); - decode(name, bl); - decode(topic, bl); - decode(dest, bl); - if (struct_v >= 2) { - decode(s3_id, bl); - } - DECODE_FINISH(bl); - } - - void dump(Formatter *f) const; -}; -WRITE_CLASS_ENCODER(rgw_pubsub_sub_config) +WRITE_CLASS_ENCODER(rgw_pubsub_dest) struct rgw_pubsub_topic { rgw_user user; std::string name; - rgw_pubsub_sub_dest dest; + rgw_pubsub_dest dest; std::string arn; std::string opaque_data; @@ -459,6 +425,7 @@ struct rgw_pubsub_topic { }; WRITE_CLASS_ENCODER(rgw_pubsub_topic) +// this struct deprecated and remain only for backward compatibility struct rgw_pubsub_topic_subs { rgw_pubsub_topic topic; std::set subs; @@ -540,17 +507,26 @@ struct rgw_pubsub_bucket_topics { WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) struct rgw_pubsub_topics { - std::map topics; + std::map topics; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 2, bl); encode(topics, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(topics, bl); + DECODE_START(2, bl); + if (struct_v >= 2) { + decode(topics, bl); + } else { + std::map v1topics; + decode(v1topics, bl); + std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()), + [](const auto& entry) { + return std::pair(entry.first, entry.second.topic); + }); + } DECODE_FINISH(bl); } @@ -579,10 +555,6 @@ class RGWPubSub return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker; } - std::string sub_meta_oid(const std::string& name) const { - return pubsub_oid_prefix + tenant + ".sub." + name; - } - template int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker); @@ -650,15 +622,9 @@ public: void get_meta_obj(rgw_raw_obj *obj) const; void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const; - void get_sub_meta_obj(const std::string& name, rgw_raw_obj *obj) const; - // get all topics (per tenant, if used)) and populate them into "result" // return 0 on success or if no topics exist, error code otherwise int get_topics(rgw_pubsub_topics *result); - // get a topic with its subscriptions by its name and populate it into "result" - // return -ENOENT if the topic does not exists - // return 0 on success, error code otherwise - int get_topic(const std::string& name, rgw_pubsub_topic_subs *result); // get a topic with by its name and populate it into "result" // return -ENOENT if the topic does not exists // return 0 on success, error code otherwise @@ -670,7 +636,7 @@ public: // create a topic with push destination information and ARN // if the topic already exists the destination and ARN values may be updated (considered succsess) // return 0 on success, error code otherwise - int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y); + int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y); // remove a topic according to its name // if the topic does not exists it is a no-op (considered success) // return 0 on success, error code otherwise diff --git a/src/rgw/driver/rados/rgw_rest_pubsub.cc b/src/rgw/driver/rados/rgw_rest_pubsub.cc index 23d56615ac9..28896bc1743 100644 --- a/src/rgw/driver/rados/rgw_rest_pubsub.cc +++ b/src/rgw/driver/rados/rgw_rest_pubsub.cc @@ -34,8 +34,8 @@ bool verify_transport_security(CephContext *cct, const RGWEnv& env) { // make sure that endpoint is a valid URL // make sure that if user/password are passed inside URL, it is over secure connection -// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL -bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) { +// update rgw_pubsub_dest to indicate that a password is stored in the URL +bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) { if (dest.push_endpoint.empty()) { return true; } @@ -57,8 +57,8 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext return true; } -bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) { - return topic.topic.dest.stored_secret; +bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) { + return topic.dest.stored_secret; } bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) { @@ -75,7 +75,7 @@ class RGWPSCreateTopicOp : public RGWOp { private: std::optional ps; std::string topic_name; - rgw_pubsub_sub_dest dest; + rgw_pubsub_dest dest; std::string topic_arn; std::string opaque_data; @@ -245,7 +245,7 @@ class RGWPSGetTopicOp : public RGWOp { private: std::string topic_name; std::optional ps; - rgw_pubsub_topic_subs result; + rgw_pubsub_topic result; int get_params() { const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); @@ -286,7 +286,7 @@ class RGWPSGetTopicOp : public RGWOp { const auto f = s->formatter; f->open_object_section("GetTopicResponse"); f->open_object_section("GetTopicResult"); - encode_xml("Topic", result.topic, f); + encode_xml("Topic", result, f); f->close_section(); f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); @@ -322,7 +322,7 @@ class RGWPSGetTopicAttributesOp : public RGWOp { private: std::string topic_name; std::optional ps; - rgw_pubsub_topic_subs result; + rgw_pubsub_topic result; int get_params() { const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); @@ -363,7 +363,7 @@ class RGWPSGetTopicAttributesOp : public RGWOp { const auto f = s->formatter; f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS); f->open_object_section("GetTopicAttributesResult"); - result.topic.dump_xml_as_attributes(f); + result.dump_xml_as_attributes(f); f->close_section(); // GetTopicAttributesResult f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 7aad430d54c..143e5e88a4f 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -10492,7 +10492,7 @@ next: RGWPubSub ps(static_cast(driver), tenant); - rgw_pubsub_topic_subs topic; + rgw_pubsub_topic topic; ret = ps.get_topic(topic_name, &topic); if (ret < 0) { cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 90f02ffccc7..d0f267ef2b7 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -654,7 +654,7 @@ def test_ps_s3_topic_admin_on_master(): # get topic 3 via commandline result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant]) parsed_result = json.loads(result[0]) - assert_equal(parsed_result['topic']['arn'], topic_arn3) + assert_equal(parsed_result['arn'], topic_arn3) # delete topic 3 _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant]) @@ -662,6 +662,7 @@ def test_ps_s3_topic_admin_on_master(): # try to get a deleted topic _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant]) + print('"topic not found" error is expected') assert_equal(result, 2) # get the remaining 2 topics @@ -1452,6 +1453,8 @@ def test_ps_s3_notification_push_kafka_on_master(): if topic_conf2 is not None: topic_conf2.del_config() # delete the bucket + for key in bucket.list(): + key.delete() conn.delete_bucket(bucket_name) if receiver is not None: stop_kafka_receiver(receiver, task)