From 7a0630f9d80ea6db496a1b3dd4d21ed3af541ad9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 2 May 2024 14:19:44 -0400 Subject: [PATCH] rgw/notify: decouple add_persistent_topic() from Manager Signed-off-by: Casey Bodley (cherry picked from commit d36b978bcc2ea2eef6a64a27fbe22015d2304f98) --- src/rgw/driver/rados/rgw_notify.cc | 77 +++++++++++++++--------------- src/rgw/driver/rados/rgw_notify.h | 5 +- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 03c613ba97c82..cdbbd63de474d 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -108,7 +108,6 @@ void publish_commit_completion(rados_completion_t completion, void *arg) { class Manager : public DoutPrefixProvider { bool shutdown = false; - const size_t max_queue_size; const uint32_t queues_update_period_ms; const uint32_t queues_update_retry_ms; const uint32_t queue_idle_sleep_us; @@ -738,12 +737,11 @@ public: } // ctor: start all threads - Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, + Manager(CephContext* _cct, uint32_t _queues_update_period_ms, uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s, uint32_t _worker_count, rgw::sal::RadosStore* store, const SiteConfig& site) : - max_queue_size(_max_queue_size), queues_update_period_ms(_queues_update_period_ms), queues_update_retry_ms(_queues_update_retry_ms), queue_idle_sleep_us(_queue_idle_sleep_us), @@ -757,39 +755,6 @@ public: site(site), rados_store(*store) {} - - int add_persistent_topic(const std::string& topic_queue, optional_yield y) { - if (topic_queue == Q_LIST_OBJECT_NAME) { - ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl; - return -EINVAL; - } - librados::ObjectWriteOperation op; - op.create(true); - cls_2pc_queue_init(op, topic_queue, max_queue_size); - auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); - auto ret = rgw_rados_operate(this, rados_ioctx, topic_queue, &op, y); - if (ret == -EEXIST) { - // queue already exists - nothing to do - ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl; - return 0; - } - if (ret < 0) { - // failed to create queue - ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl; - return ret; - } - - bufferlist empty_bl; - std::map new_topic{{topic_queue, empty_bl}}; - op.omap_set(new_topic); - ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y); - if (ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl; - return ret; - } - ldpp_dout(this, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl; - return 0; - } }; std::unique_ptr s_manager; @@ -813,7 +778,7 @@ bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, return false; } // TODO: take conf from CephContext - s_manager = std::make_unique(dpp->get_cct(), MAX_QUEUE_SIZE, + s_manager = std::make_unique(dpp->get_cct(), Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S, @@ -830,11 +795,45 @@ void shutdown() { s_manager.reset(); } -int add_persistent_topic(const std::string& topic_name, optional_yield y) { +int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, + const std::string& topic_queue, optional_yield y) +{ + if (topic_queue == Q_LIST_OBJECT_NAME) { + ldpp_dout(dpp, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl; + return -EINVAL; + } + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, topic_queue, MAX_QUEUE_SIZE); + auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_queue, &op, y); + if (ret == -EEXIST) { + // queue already exists - nothing to do + ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl; + return 0; + } + if (ret < 0) { + // failed to create queue + ldpp_dout(dpp, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl; + return ret; + } + + bufferlist empty_bl; + std::map new_topic{{topic_queue, empty_bl}}; + op.omap_set(new_topic); + ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl; + return 0; +} + +int add_persistent_topic(const std::string& topic_queue, optional_yield y) { if (!s_manager) { return -EAGAIN; } - return s_manager->add_persistent_topic(topic_name, y); + return add_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y); } int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y) { diff --git a/src/rgw/driver/rados/rgw_notify.h b/src/rgw/driver/rados/rgw_notify.h index e1566d3f71d96..0321cbdff3bb1 100644 --- a/src/rgw/driver/rados/rgw_notify.h +++ b/src/rgw/driver/rados/rgw_notify.h @@ -36,11 +36,14 @@ void shutdown(); // this operation also add a topic queue to the common (to all RGWs) list of all topics int add_persistent_topic(const std::string& topic_queue, optional_yield y); +// same as the above, except you need to provide the IoCtx +int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y); + // remove persistent delivery queue for a topic (endpoint) // this operation also remove the topic queue from the common (to all RGWs) list of all topics int remove_persistent_topic(const std::string& topic_queue, optional_yield y); -// same as the above, expect you need to provide the IoCtx, the above uses rgw::notify::Manager::rados_ioctx +// same as the above, except you need to provide the IoCtx int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y); // struct holding reservation information -- 2.39.5