]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notify: decouple add_persistent_topic() from Manager
authorCasey Bodley <cbodley@redhat.com>
Thu, 2 May 2024 18:19:44 +0000 (14:19 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 12 Jul 2024 20:07:00 +0000 (16:07 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit d36b978bcc2ea2eef6a64a27fbe22015d2304f98)

src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_notify.h

index 03c613ba97c824695dec0ba04108392d16f10b9d..cdbbd63de474d511d26a0794558dbf06ad152c32 100644 (file)
@@ -108,7 +108,6 @@ void publish_commit_completion(rados_completion_t completion, void *arg) {
 
 class Manager : public DoutPrefixProvider {
   bool shutdown = false;
-  const size_t max_queue_size;
   const uint32_t queues_update_period_ms;
   const uint32_t queues_update_retry_ms;
   const uint32_t queue_idle_sleep_us;
@@ -738,12 +737,11 @@ public:
   }
 
   // ctor: start all threads
-  Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, 
+  Manager(CephContext* _cct, uint32_t _queues_update_period_ms,
           uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, 
           uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
           uint32_t _worker_count, rgw::sal::RadosStore* store,
           const SiteConfig& site) :
-    max_queue_size(_max_queue_size),
     queues_update_period_ms(_queues_update_period_ms),
     queues_update_retry_ms(_queues_update_retry_ms),
     queue_idle_sleep_us(_queue_idle_sleep_us),
@@ -757,39 +755,6 @@ public:
     site(site),
     rados_store(*store)
     {}
-
-  int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
-    if (topic_queue == Q_LIST_OBJECT_NAME) {
-      ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
-      return -EINVAL;
-    }
-    librados::ObjectWriteOperation op;
-    op.create(true);
-    cls_2pc_queue_init(op, topic_queue, max_queue_size);
-    auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
-    auto ret = rgw_rados_operate(this, rados_ioctx, topic_queue, &op, y);
-    if (ret == -EEXIST) {
-      // queue already exists - nothing to do
-      ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
-      return 0;
-    }
-    if (ret < 0) {
-      // failed to create queue
-      ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
-      return ret;
-    }
-   
-    bufferlist empty_bl;
-    std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
-    op.omap_set(new_topic);
-    ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
-    if (ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
-      return ret;
-    } 
-    ldpp_dout(this, 20) << "INFO: queue: " << topic_queue << " added to queue list"  << dendl;
-    return 0;
-  }
 };
 
 std::unique_ptr<Manager> s_manager;
@@ -813,7 +778,7 @@ bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE, 
+  s_manager = std::make_unique<Manager>(dpp->get_cct(),
       Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, 
       IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, 
       STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
@@ -830,11 +795,45 @@ void shutdown() {
   s_manager.reset();
 }
 
-int add_persistent_topic(const std::string& topic_name, optional_yield y) {
+int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx,
+                         const std::string& topic_queue, optional_yield y)
+{
+  if (topic_queue == Q_LIST_OBJECT_NAME) {
+    ldpp_dout(dpp, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
+    return -EINVAL;
+  }
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_2pc_queue_init(op, topic_queue, MAX_QUEUE_SIZE);
+  auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_queue, &op, y);
+  if (ret == -EEXIST) {
+    // queue already exists - nothing to do
+    ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
+    return 0;
+  }
+  if (ret < 0) {
+    // failed to create queue
+    ldpp_dout(dpp, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
+    return ret;
+  }
+
+  bufferlist empty_bl;
+  std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
+  op.omap_set(new_topic);
+  ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
+    return ret;
+  }
+  ldpp_dout(dpp, 20) << "INFO: queue: " << topic_queue << " added to queue list"  << dendl;
+  return 0;
+}
+
+int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
   if (!s_manager) {
     return -EAGAIN;
   }
-  return s_manager->add_persistent_topic(topic_name, y);
+  return add_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
 }
 
 int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y) {
index e1566d3f71d964ca90556763fab257fff04e1447..0321cbdff3bb16daee592d147e341238c603f82c 100644 (file)
@@ -36,11 +36,14 @@ void shutdown();
 // this operation also add a topic queue to the common (to all RGWs) list of all topics
 int add_persistent_topic(const std::string& topic_queue, optional_yield y);
 
+// same as the above, except you need to provide the IoCtx
+int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y);
+
 // remove persistent delivery queue for a topic (endpoint)
 // this operation also remove the topic queue from the common (to all RGWs) list of all topics
 int remove_persistent_topic(const std::string& topic_queue, optional_yield y);
 
-// same as the above, expect you need to provide the IoCtx, the above uses rgw::notify::Manager::rados_ioctx
+// same as the above, except you need to provide the IoCtx
 int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y);
 
 // struct holding reservation information