From 433717a248093d5dbcc4ac309722b645f80a40cd Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 1 Jul 2025 15:00:22 +0000 Subject: [PATCH] rgw/notifications: allow for graceful shutdown of notification manager * use short timers even for the longer timeouts * allow graceful shutdown when stopping Fixes: https://tracker.ceph.com/issues/71963 Signed-off-by: Yuval Lifshitz --- src/rgw/driver/rados/rgw_notify.cc | 195 ++++++++++++++--------------- 1 file changed, 94 insertions(+), 101 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 695e5af6c11..c7876ec6241 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -24,6 +24,7 @@ #include #include "librados/AioCompletionImpl.h" #include "common/async/yield_waiter.h" +#include #include @@ -81,24 +82,23 @@ void publish_commit_completion(rados_completion_t completion, void* arg) { }; class Manager : public DoutPrefixProvider { + using Executor = boost::asio::io_context::executor_type; bool shutdown = false; - const uint32_t queues_update_period_ms; - const uint32_t queues_update_retry_ms; - const uint32_t queue_idle_sleep_us; - const utime_t failover_time; + static constexpr auto queues_update_period = std::chrono::milliseconds(30000); // 30s + static constexpr auto queues_update_retry = std::chrono::milliseconds(1000); // 1s + static constexpr auto queue_idle_sleep = std::chrono::milliseconds(100); // 100ms + const utime_t failover_time = utime_t(queues_update_period*3); // 90s CephContext* const cct; static constexpr auto COOKIE_LEN = 16; const std::string lock_cookie; boost::asio::io_context io_context; - boost::asio::executor_work_guard work_guard; - const uint32_t worker_count; - std::vector workers; - const uint32_t stale_reservations_period_s; - const uint32_t reservations_cleanup_period_s; + boost::asio::executor_work_guard work_guard; + std::thread worker; + static constexpr auto stale_reservations_period = std::chrono::seconds(120); // 120s + static constexpr auto reservations_cleanup_period = std::chrono::seconds(30); // 30s queues_persistency_tracker topics_persistency_tracker; const SiteConfig& site; -public: - rgw::sal::RadosStore& rados_store; + rgw::sal::RadosStore* const rados_store; private: @@ -116,7 +116,7 @@ private: librados::ObjectReadOperation op; queues_t queues_chunk; op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval); - const auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, std::move(op), nullptr, y); + const auto ret = rgw_rados_operate(this, rados_store->getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, std::move(op), nullptr, y); if (ret == -ENOENT) { // queue list object was not created - nothing to do return 0; @@ -267,26 +267,44 @@ private: return EntryProcessingResult::Successful; } - using Clock = ceph::coarse_mono_clock; - using Executor = boost::asio::io_context::executor_type; - using Timer = boost::asio::basic_waitable_timer, Executor>; + Timer timer(io_context); + timer.expires_after(duration); + boost::system::error_code ec; + timer.async_wait(yield[ec]); + if (ec) { + ldpp_dout(this, 1) << "ERROR: async_sleep failed with error: " << ec.message() << dendl; + } + } // clean stale reservation from queue void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) { + auto next_check_time = ceph::coarse_real_clock::zero(); while (!shutdown) { - ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl; - const auto now = ceph::coarse_real_time::clock::now(); - const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s); + // check if it is time for queue cleanup + if (ceph::coarse_real_clock::now() > next_check_time) { + next_check_time = ceph::coarse_real_clock::now() + reservations_cleanup_period; + } else { + // short sleep duration to prevent busy wait when doing queue cleanup + async_sleep(yield, queues_update_retry); + continue; + } + const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time); + ldpp_dout(this, 20) << "INFO: performing stale reservation cleanup for queue: " << queue_name << + ". next cleanup will happen at: " << std::ctime(&tp) << dendl; + const auto stale_time = ceph::coarse_real_time::clock::now() - stale_reservations_period; librados::ObjectWriteOperation op; op.assert_exists(); - rados::cls::lock::assert_locked(&op, queue_name+"_lock", + rados::cls::lock::assert_locked(&op, queue_name+"_lock", ClsLockType::EXCLUSIVE, - lock_cookie, + lock_cookie, "" /*no tag*/); cls_2pc_queue_expire_reservations(op, stale_time); // check ownership and do reservation cleanup in one batch - auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, std::move(op), yield); + auto ret = rgw_rados_operate(this, rados_store->getRados()->get_notif_pool_ctx(), queue_name, std::move(op), yield); if (ret == -ENOENT) { // queue was deleted ldpp_dout(this, 10) << "INFO: queue: " << queue_name @@ -304,10 +322,6 @@ private: ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name << ". error: " << ret << dendl; } - Timer timer(io_context); - timer.expires_after(std::chrono::seconds(reservations_cleanup_period_s)); - boost::system::error_code ec; - timer.async_wait(yield[ec]); } ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl; } @@ -317,7 +331,7 @@ private: librados::ObjectWriteOperation op; op.assert_exists(); rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie); - auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); + auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx(); const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, std::move(op), yield); if (ret == -ENOENT) { ldpp_dout(this, 10) << "INFO: queue: " << queue_name @@ -340,7 +354,7 @@ private: std::string queue_topic_name; parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name); rgw_pubsub_topic topic_info; - RGWPubSub ps(&rados_store, queue_topic_tenant, site); + RGWPubSub ps(rados_store, queue_topic_tenant, site); int ret = ps.get_topic(this, queue_topic_name, topic_info, yield, nullptr); if (ret < 0) { ldpp_dout(this, 1) << "WARNING: failed to fetch topic: " @@ -387,14 +401,11 @@ private: while (!shutdown) { // if queue was empty the last time, sleep for idle timeout if (is_idle) { - Timer timer(io_context); - timer.expires_after(std::chrono::microseconds(queue_idle_sleep_us)); - boost::system::error_code ec; - timer.async_wait(yield[ec]); + async_sleep(yield, queue_idle_sleep); } // get list of entries in the queue - auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); + auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx(); is_idle = true; bool truncated = false; std::string end_marker; @@ -579,7 +590,7 @@ private: std::string tenant_name; // TODO: extract tenant name from queue_name once it is fixed uint64_t size_to_migrate = 0; - RGWPubSub ps(&rados_store, tenant_name, site); + RGWPubSub ps(rados_store, tenant_name, site); rgw_pubsub_topic topic; auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, @@ -661,27 +672,25 @@ private: owned_queues_t owned_queues; size_t processed_queue_count = 0; - // add randomness to the duration between queue checking - // to make sure that different daemons are not synced - std::random_device seed; - std::mt19937 rnd_gen(seed()); - const auto min_jitter = 100; // ms - const auto max_jitter = 500; // ms - std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter); - std::vector queue_gc; std::mutex queue_gc_lock; - auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); + auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx(); + auto next_check_time = ceph::coarse_real_clock::zero(); while (!shutdown) { - Timer timer(io_context); - const auto duration = (has_error ? - std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + - std::chrono::milliseconds(duration_jitter(rnd_gen)); - timer.expires_after(duration); - const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration); - ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl; - boost::system::error_code ec; - timer.async_wait(yield[ec]); + // check if queue list needs to be refreshed + if (ceph::coarse_real_clock::now() > next_check_time) { + next_check_time = ceph::coarse_real_clock::now() + queues_update_period; + const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time); + ldpp_dout(this, 20) << "INFO: processing queue list. next queues processing will happen at: " << std::ctime(&tp) << dendl; + } else { + // short sleep duration to prevent busy wait when refreshing queue list + // or retrying after error + async_sleep(yield, queues_update_retry); + if (!has_error) { + // in case of error we will retry + continue; + } + } queues_t queues; auto ret = read_queue_list(queues, yield); @@ -695,9 +704,9 @@ private: // or if ownership needs to be taken librados::ObjectWriteOperation op; op.assert_exists(); - rados::cls::lock::lock(&op, queue_name+"_lock", + rados::cls::lock::lock(&op, queue_name+"_lock", ClsLockType::EXCLUSIVE, - lock_cookie, + lock_cookie, "" /*no tag*/, "" /*no description*/, failover_time, @@ -760,25 +769,34 @@ private: queue_gc.clear(); } } - Timer timer(io_context); while (processed_queue_count > 0) { - ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl; - timer.expires_after(std::chrono::milliseconds(queues_update_retry_ms)); - boost::system::error_code ec; - timer.async_wait(yield[ec]); + ldpp_dout(this, 20) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl; + async_sleep(yield, queues_update_retry); } ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl; } public: - ~Manager() { - } + ~Manager() = default; void stop() { + ldpp_dout(this, 5) << "INFO: manager received stop signal. shutting down..." << dendl; shutdown = true; work_guard.reset(); - std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); }); + if (worker.joinable()) { + // try graceful shutdown first + auto future = std::async(std::launch::async, [this]() {worker.join();}); + if (future.wait_for(queues_update_retry*2) == std::future_status::timeout) { + // force stop if graceful shutdown takes too long + if (!io_context.stopped()) { + ldpp_dout(this, 5) << "INFO: force shutdown of manager" << dendl; + io_context.stop(); + } + worker.join(); + } + } + ldpp_dout(this, 5) << "INFO: manager shutdown ended" << dendl; } void init() { @@ -790,52 +808,32 @@ public: }); // start the worker threads to do the actual queue processing - for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) { - workers.emplace_back([this,worker_id]() { - const auto thread_name = fmt::format("notif-worker-{}", worker_id); - ceph_pthread_setname(thread_name.c_str()); - try { - io_context.run(); - } catch (const std::exception& err) { - ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl; - throw err; - } - }); - } - ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl; + worker = std::thread([this]() { + ceph_pthread_setname("notif-worker"); + try { + ldpp_dout(this, 10) << "INFO: notification worker started" << dendl; + io_context.run(); + ldpp_dout(this, 10) << "INFO: notification worker ended" << dendl; + } catch (const std::exception& err) { + ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl; + throw err; + } + }); + ldpp_dout(this, 10) << "INfO: started notification manager" << dendl; } - // ctor: start all threads - Manager(CephContext* _cct, uint32_t _queues_update_period_ms, - uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, - uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s, - uint32_t _worker_count, rgw::sal::RadosStore* store, - const SiteConfig& site) : - queues_update_period_ms(_queues_update_period_ms), - queues_update_retry_ms(_queues_update_retry_ms), - queue_idle_sleep_us(_queue_idle_sleep_us), - failover_time(std::chrono::milliseconds(failover_time_ms)), + Manager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) : cct(_cct), lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)), work_guard(boost::asio::make_work_guard(io_context)), - worker_count(_worker_count), - stale_reservations_period_s(_stale_reservations_period_s), - reservations_cleanup_period_s(_reservations_cleanup_period_s), site(site), - rados_store(*store) + rados_store(store) {} }; std::unique_ptr s_manager; constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB -constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds -constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed -constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms -constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time -constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread -constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old -constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, const SiteConfig& site) { @@ -847,12 +845,7 @@ bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, return false; } // TODO: take conf from CephContext - s_manager = std::make_unique(dpp->get_cct(), - Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, - IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, - STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S, - WORKER_COUNT, - store, site); + s_manager = std::make_unique(dpp->get_cct(), store, site); s_manager->init(); return true; } -- 2.39.5