]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: remove subscription data structures
authorYuval Lifshitz <ylifshit@redhat.com>
Thu, 24 Nov 2022 14:29:59 +0000 (16:29 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Mon, 23 Jan 2023 16:48:38 +0000 (16:48 +0000)
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/driver/rados/rgw_pubsub.cc
src/rgw/driver/rados/rgw_pubsub.h
src/rgw/driver/rados/rgw_rest_pubsub.cc
src/rgw/rgw_admin.cc
src/test/rgw/bucket_notification/test_bn.py

index b9aa54bacd8c1f88cdfa60719f4b23824eed1860..ab03e30f58f1046b01b717fecab1ae51b51feedf 100644 (file)
@@ -349,12 +349,6 @@ void rgw_pubsub_topic_filter::dump(Formatter *f) const
   encode_json("events", events, f);
 }
 
-void rgw_pubsub_topic_subs::dump(Formatter *f) const
-{
-  encode_json("topic", topic, f);
-  encode_json("subs", subs, f);
-}
-
 void rgw_pubsub_bucket_topics::dump(Formatter *f) const
 {
   Formatter::ArraySection s(*f, "topics");
@@ -374,14 +368,12 @@ void rgw_pubsub_topics::dump(Formatter *f) const
 void rgw_pubsub_topics::dump_xml(Formatter *f) const
 {
   for (auto& t : topics) {
-    encode_xml("member", t.second.topic, f);
+    encode_xml("member", t.second, f);
   }
 }
 
-void rgw_pubsub_sub_dest::dump(Formatter *f) const
+void rgw_pubsub_dest::dump(Formatter *f) const
 {
-  encode_json("bucket_name", bucket_name, f);
-  encode_json("oid_prefix", oid_prefix, f);
   encode_json("push_endpoint", push_endpoint, f);
   encode_json("push_endpoint_args", push_endpoint_args, f);
   encode_json("push_endpoint_topic", arn_topic, f);
@@ -389,10 +381,8 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const
   encode_json("persistent", persistent, f);
 }
 
-void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
+void rgw_pubsub_dest::dump_xml(Formatter *f) const
 {
-  // first 2 members are omitted here since they
-  // dont apply to AWS compliant topics
   encode_xml("EndpointAddress", push_endpoint, f);
   encode_xml("EndpointArgs", push_endpoint_args, f);
   encode_xml("EndpointTopic", arn_topic, f);
@@ -400,10 +390,8 @@ void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
   encode_xml("Persistent", persistent, f);
 }
 
-std::string rgw_pubsub_sub_dest::to_json_str() const
+std::string rgw_pubsub_dest::to_json_str() const
 {
-  // first 2 members are omitted here since they
-  // dont apply to AWS compliant topics
   JSONFormatter f;
   f.open_object_section("");
   encode_json("EndpointAddress", push_endpoint, &f);
@@ -417,15 +405,6 @@ std::string rgw_pubsub_sub_dest::to_json_str() const
   return ss.str();
 }
 
-void rgw_pubsub_sub_config::dump(Formatter *f) const
-{
-  encode_json("user", user, f);
-  encode_json("name", name, f);
-  encode_json("topic", topic, f);
-  encode_json("dest", dest, f);
-  encode_json("s3_id", s3_id, f);
-}
-
 RGWPubSub::RGWPubSub(rgw::sal::RadosStore* _store, const std::string& _tenant)
   : store(_store), tenant(_tenant), svc_sysobj(store->svc()->sysobj)
 {
@@ -499,25 +478,6 @@ int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
   return read_topics(result, nullptr);
 }
 
-int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
-{
-  rgw_pubsub_topics topics;
-  int ret = get_topics(&topics);
-  if (ret < 0) {
-    ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
-    return ret;
-  }
-
-  auto iter = topics.topics.find(name);
-  if (iter == topics.topics.end()) {
-    ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
-    return -ENOENT;
-  }
-
-  *result = iter->second;
-  return 0;
-}
-
 int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
 {
   rgw_pubsub_topics topics;
@@ -533,7 +493,7 @@ int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
     return -ENOENT;
   }
 
-  *result = iter->second.topic;
+  *result = iter->second;
   return 0;
 }
 
@@ -542,7 +502,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const
 }
 
 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
-  rgw_pubsub_topic_subs topic_info;
+  rgw_pubsub_topic topic_info;
 
   int ret = ps->get_topic(topic_name, &topic_info);
   if (ret < 0) {
@@ -564,7 +524,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const
     bucket.name << "'" << dendl;
 
   auto& topic_filter = bucket_topics.topics[topic_name];
-  topic_filter.topic = topic_info.topic;
+  topic_filter.topic = topic_info;
   topic_filter.events = events;
   topic_filter.s3_id = notif_name;
   if (s3_filter) {
@@ -584,7 +544,7 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const
 
 int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y)
 {
-  rgw_pubsub_topic_subs topic_info;
+  rgw_pubsub_topic topic_info;
 
   int ret = ps->get_topic(topic_name, &topic_info);
   if (ret < 0) {
@@ -653,10 +613,10 @@ int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optio
 }
 
 int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) {
-  return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y);
+  return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y);
 }
 
-int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
+int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topics topics;
 
@@ -667,12 +627,12 @@ int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, c
     return ret;
   }
  
-  rgw_pubsub_topic_subs& new_topic = topics.topics[name];
-  new_topic.topic.user = rgw_user("", tenant);
-  new_topic.topic.name = name;
-  new_topic.topic.dest = dest;
-  new_topic.topic.arn = arn;
-  new_topic.topic.opaque_data = opaque_data;
+  rgw_pubsub_topic& new_topic = topics.topics[name];
+  new_topic.user = rgw_user("", tenant);
+  new_topic.name = name;
+  new_topic.dest = dest;
+  new_topic.arn = arn;
+  new_topic.opaque_data = opaque_data;
 
   ret = write_topics(dpp, topics, &objv_tracker, y);
   if (ret < 0) {
@@ -717,7 +677,3 @@ void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj)
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
 }
 
-void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
-  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
-}
-
index 08a329e4c02ae82b8ea9ac38b320965787af4a5a..3e6bebfceaa6bac48d6cf683bee88b1910d094b6 100644 (file)
@@ -334,9 +334,7 @@ WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
 // setting a unique ID for an event based on object hash and timestamp
 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
 
-struct rgw_pubsub_sub_dest {
-  std::string bucket_name;
-  std::string oid_prefix;
+struct rgw_pubsub_dest {
   std::string push_endpoint;
   std::string push_endpoint_args;
   std::string arn_topic;
@@ -345,8 +343,8 @@ struct rgw_pubsub_sub_dest {
 
   void encode(bufferlist& bl) const {
     ENCODE_START(5, 1, bl);
-    encode(bucket_name, bl);
-    encode(oid_prefix, bl);
+    encode("", bl);
+    encode("", bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
     encode(arn_topic, bl);
@@ -357,8 +355,9 @@ struct rgw_pubsub_sub_dest {
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(5, bl);
-    decode(bucket_name, bl);
-    decode(oid_prefix, bl);
+    std::string dummy;
+    decode(dummy, bl);
+    decode(dummy, bl);
     decode(push_endpoint, bl);
     if (struct_v >= 2) {
         decode(push_endpoint_args, bl);
@@ -379,45 +378,12 @@ struct rgw_pubsub_sub_dest {
   void dump_xml(Formatter *f) const;
   std::string to_json_str() const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
-
-struct rgw_pubsub_sub_config {
-  rgw_user user;
-  std::string name;
-  std::string topic;
-  rgw_pubsub_sub_dest dest;
-  std::string s3_id;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
-    encode(user, bl);
-    encode(name, bl);
-    encode(topic, bl);
-    encode(dest, bl);
-    encode(s3_id, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
-    decode(user, bl);
-    decode(name, bl);
-    decode(topic, bl);
-    decode(dest, bl);
-    if (struct_v >= 2) {
-      decode(s3_id, bl);
-    }
-    DECODE_FINISH(bl);
-  }
-
-  void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
+WRITE_CLASS_ENCODER(rgw_pubsub_dest)
 
 struct rgw_pubsub_topic {
   rgw_user user;
   std::string name;
-  rgw_pubsub_sub_dest dest;
+  rgw_pubsub_dest dest;
   std::string arn;
   std::string opaque_data;
 
@@ -459,6 +425,7 @@ struct rgw_pubsub_topic {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_topic)
 
+// this struct deprecated and remain only for backward compatibility
 struct rgw_pubsub_topic_subs {
   rgw_pubsub_topic topic;
   std::set<std::string> subs;
@@ -540,17 +507,26 @@ struct rgw_pubsub_bucket_topics {
 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
 
 struct rgw_pubsub_topics {
-  std::map<std::string, rgw_pubsub_topic_subs> topics;
+  std::map<std::string, rgw_pubsub_topic> topics;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 2, bl);
     encode(topics, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(topics, bl);
+    DECODE_START(2, bl);
+    if (struct_v >= 2) {
+      decode(topics, bl);
+    } else {
+      std::map<std::string, rgw_pubsub_topic_subs> v1topics;
+      decode(v1topics, bl);
+      std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()),
+          [](const auto& entry) {
+            return std::pair<std::string, rgw_pubsub_topic>(entry.first, entry.second.topic); 
+          });
+    }
     DECODE_FINISH(bl);
   }
 
@@ -579,10 +555,6 @@ class RGWPubSub
     return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker;
   }
 
-  std::string sub_meta_oid(const std::string& name) const {
-    return pubsub_oid_prefix + tenant + ".sub." + name;
-  }
-
   template <class T>
   int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
 
@@ -650,15 +622,9 @@ public:
   void get_meta_obj(rgw_raw_obj *obj) const;
   void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
 
-  void get_sub_meta_obj(const std::string& name, rgw_raw_obj *obj) const;
-
   // get all topics (per tenant, if used)) and populate them into "result"
   // return 0 on success or if no topics exist, error code otherwise
   int get_topics(rgw_pubsub_topics *result);
-  // get a topic with its subscriptions by its name and populate it into "result"
-  // return -ENOENT if the topic does not exists 
-  // return 0 on success, error code otherwise
-  int get_topic(const std::string& name, rgw_pubsub_topic_subs *result);
   // get a topic with by its name and populate it into "result"
   // return -ENOENT if the topic does not exists 
   // return 0 on success, error code otherwise
@@ -670,7 +636,7 @@ public:
   // create a topic with push destination information and ARN
   // if the topic already exists the destination and ARN values may be updated (considered succsess)
   // return 0 on success, error code otherwise
-  int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
+  int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
   // remove a topic according to its name
   // if the topic does not exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
index 23d56615ac953297a61b12898ea526d7b56830a8..28896bc1743a4e810db838e151300e5085864e53 100644 (file)
@@ -34,8 +34,8 @@ bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
 
 // make sure that endpoint is a valid URL
 // make sure that if user/password are passed inside URL, it is over secure connection
-// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
-bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+// update rgw_pubsub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
   if (dest.push_endpoint.empty()) {
       return true;
   }
@@ -57,8 +57,8 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext
   return true;
 }
 
-bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
-    return topic.topic.dest.stored_secret;
+bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
+    return topic.dest.stored_secret;
 }
 
 bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
@@ -75,7 +75,7 @@ class RGWPSCreateTopicOp : public RGWOp {
   private:
   std::optional<RGWPubSub> ps;
   std::string topic_name;
-  rgw_pubsub_sub_dest dest;
+  rgw_pubsub_dest dest;
   std::string topic_arn;
   std::string opaque_data;
   
@@ -245,7 +245,7 @@ class RGWPSGetTopicOp : public RGWOp {
   private:
   std::string topic_name;
   std::optional<RGWPubSub> ps;
-  rgw_pubsub_topic_subs result;
+  rgw_pubsub_topic result;
   
   int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
@@ -286,7 +286,7 @@ class RGWPSGetTopicOp : public RGWOp {
     const auto f = s->formatter;
     f->open_object_section("GetTopicResponse");
     f->open_object_section("GetTopicResult");
-    encode_xml("Topic", result.topic, f); 
+    encode_xml("Topic", result, f); 
     f->close_section();
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
@@ -322,7 +322,7 @@ class RGWPSGetTopicAttributesOp : public RGWOp {
   private:
   std::string topic_name;
   std::optional<RGWPubSub> ps;
-  rgw_pubsub_topic_subs result;
+  rgw_pubsub_topic result;
   
   int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
@@ -363,7 +363,7 @@ class RGWPSGetTopicAttributesOp : public RGWOp {
     const auto f = s->formatter;
     f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
     f->open_object_section("GetTopicAttributesResult");
-    result.topic.dump_xml_as_attributes(f);
+    result.dump_xml_as_attributes(f);
     f->close_section(); // GetTopicAttributesResult
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
index 7aad430d54cf5a748b907db7b150dbb274bda5fa..143e5e88a4ff1bfb30bdcf5b9dc6ad01565a66ad 100644 (file)
@@ -10492,7 +10492,7 @@ next:
 
     RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), tenant);
 
-    rgw_pubsub_topic_subs topic;
+    rgw_pubsub_topic topic;
     ret = ps.get_topic(topic_name, &topic);
     if (ret < 0) {
       cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
index 90f02ffccc76557fa44f3bd23d24805c8e2903cd..d0f267ef2b799615247bce38f80133b00c4d09be 100644 (file)
@@ -654,7 +654,7 @@ def test_ps_s3_topic_admin_on_master():
     # get topic 3 via commandline
     result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])  
     parsed_result = json.loads(result[0])
-    assert_equal(parsed_result['topic']['arn'], topic_arn3)
+    assert_equal(parsed_result['arn'], topic_arn3)
 
     # delete topic 3
     _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])  
@@ -662,6 +662,7 @@ def test_ps_s3_topic_admin_on_master():
 
     # try to get a deleted topic
     _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])  
+    print('"topic not found" error is expected')
     assert_equal(result, 2)
 
     # get the remaining 2 topics
@@ -1452,6 +1453,8 @@ def test_ps_s3_notification_push_kafka_on_master():
         if topic_conf2 is not None:
             topic_conf2.del_config()
         # delete the bucket
+        for key in bucket.list():
+            key.delete()
         conn.delete_bucket(bucket_name)
         if receiver is not None:
             stop_kafka_receiver(receiver, task)