]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: rgw_pubsub_dest stores persistent queue oid
authorCasey Bodley <cbodley@redhat.com>
Wed, 20 Mar 2024 17:57:21 +0000 (13:57 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 10 Apr 2024 17:09:17 +0000 (13:09 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc

index 33cfd21e1ab54e7d81d976026d3e0fc3ba362fc2..ce84b38724a3e3bdb277fbb0b2f855b7bcd74752 100644 (file)
@@ -478,6 +478,7 @@ void rgw_pubsub_dest::dump(Formatter *f) const
   encode_json("push_endpoint_topic", arn_topic, f);
   encode_json("stored_secret", stored_secret, f);
   encode_json("persistent", persistent, f);
+  encode_json("persistent_queue", persistent_queue, f);
   encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
   encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
   encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
@@ -525,6 +526,7 @@ void rgw_pubsub_dest::decode_json(JSONObj* f) {
   JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f);
   JSONDecoder::decode_json("stored_secret", stored_secret, f);
   JSONDecoder::decode_json("persistent", persistent, f);
+  JSONDecoder::decode_json("persistent_queue", persistent_queue, f);
   std::string ttl;
   JSONDecoder::decode_json("time_to_live", ttl, f);
   time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl);
index 1ae88bdbe267198f841f96662378f1a2c560b8f9..3835407eb4567d49b93027cf784b226ca06b8d15 100644 (file)
@@ -342,12 +342,14 @@ struct rgw_pubsub_dest {
   std::string arn_topic;
   bool stored_secret = false;
   bool persistent = false;
+  // rados object name of the persistent queue in the 'notif' pool
+  std::string persistent_queue;
   uint32_t time_to_live;
   uint32_t max_retries;
   uint32_t retry_sleep_duration;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(6, 1, bl);
+    ENCODE_START(7, 1, bl);
     encode("", bl);
     encode("", bl);
     encode(push_endpoint, bl);
@@ -358,6 +360,7 @@ struct rgw_pubsub_dest {
     encode(time_to_live, bl);
     encode(max_retries, bl);
     encode(retry_sleep_duration, bl);
+    encode(persistent_queue, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -384,6 +387,13 @@ struct rgw_pubsub_dest {
       decode(max_retries, bl);
       decode(retry_sleep_duration, bl);
     }
+    if (struct_v >= 7) {
+      decode(persistent_queue, bl);
+    } else if (persistent) {
+      // persistent topics created before v7 did not support tenant namespacing.
+      // continue to use 'arn_topic' alone as the queue's rados object name
+      persistent_queue = arn_topic;
+    }
     DECODE_FINISH(bl);
   }
 
index a64c55ad10beb2694c493ebc796684a19693dd92..7e1b0491a19397579bfc843778df70be03357709 100644 (file)
@@ -306,6 +306,12 @@ class RGWPSCreateTopicOp : public RGWOp {
     ret = ps.get_topic(this, topic_name, result, y, nullptr);
     if (ret == -ENOENT) {
       // topic not present
+
+      // initialize the persistent queue's location. this cannot change for
+      // existing topics. use ':' as the namespace delimiter because its
+      // inclusion in a TopicName would break ARNs
+      dest.persistent_queue = string_cat_reserve(
+          get_account_or_tenant(s->owner.id), ":", topic_name);
     } else if (ret < 0) {
       ldpp_dout(this, 1) << "failed to read topic '" << topic_name
           << "', with error:" << ret << dendl;