]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: allow for graceful shutdown of notification manager 63986/head
authorYuval Lifshitz <ylifshit@ibm.com>
Tue, 1 Jul 2025 15:00:22 +0000 (15:00 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 3 Jul 2025 15:47:14 +0000 (15:47 +0000)
* use short timers even for the longer timeouts
* allow graceful shutdown when stopping

Fixes: https://tracker.ceph.com/issues/71963
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_notify.cc

index 695e5af6c119215e665fd484e5e6a4ff55edb9e4..c7876ec62413d683258fc15ffc3e2beb6e392607 100644 (file)
@@ -24,6 +24,7 @@
 #include <fmt/format.h>
 #include "librados/AioCompletionImpl.h"
 #include "common/async/yield_waiter.h"
+#include <future>
 
 #include <unordered_map>
 
@@ -81,24 +82,23 @@ void publish_commit_completion(rados_completion_t completion, void* arg) {
 };
 
 class Manager : public DoutPrefixProvider {
+  using Executor = boost::asio::io_context::executor_type;
   bool shutdown = false;
-  const uint32_t queues_update_period_ms;
-  const uint32_t queues_update_retry_ms;
-  const uint32_t queue_idle_sleep_us;
-  const utime_t failover_time;
+  static constexpr auto queues_update_period = std::chrono::milliseconds(30000); // 30s
+  static constexpr auto queues_update_retry = std::chrono::milliseconds(1000); // 1s
+  static constexpr auto queue_idle_sleep = std::chrono::milliseconds(100); // 100ms
+  const utime_t failover_time = utime_t(queues_update_period*3); // 90s
   CephContext* const cct;
   static constexpr auto COOKIE_LEN = 16;
   const std::string lock_cookie;
   boost::asio::io_context io_context;
-  boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
-  const uint32_t worker_count;
-  std::vector<std::thread> workers;
-  const uint32_t stale_reservations_period_s;
-  const uint32_t reservations_cleanup_period_s;
+  boost::asio::executor_work_guard<Executor> work_guard;
+  std::thread worker;
+  static constexpr auto stale_reservations_period = std::chrono::seconds(120); // 120s
+  static constexpr auto reservations_cleanup_period = std::chrono::seconds(30); // 30s
   queues_persistency_tracker topics_persistency_tracker;
   const SiteConfig& site;
-public:
-  rgw::sal::RadosStore& rados_store;
+  rgw::sal::RadosStore* const rados_store;
 
 private:
 
@@ -116,7 +116,7 @@ private:
       librados::ObjectReadOperation op;
       queues_t queues_chunk;
       op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
-      const auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, std::move(op), nullptr, y);
+      const auto ret = rgw_rados_operate(this, rados_store->getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, std::move(op), nullptr, y);
       if (ret == -ENOENT) {
         // queue list object was not created - nothing to do
         return 0;
@@ -267,26 +267,44 @@ private:
     return EntryProcessingResult::Successful;
   }
 
-  using Clock = ceph::coarse_mono_clock;
-  using Executor = boost::asio::io_context::executor_type;
-  using Timer = boost::asio::basic_waitable_timer<Clock,
+  void async_sleep(boost::asio::yield_context yield, const std::chrono::milliseconds& duration) {
+    using Clock = ceph::coarse_mono_clock;
+    using Timer = boost::asio::basic_waitable_timer<Clock,
         boost::asio::wait_traits<Clock>, Executor>;
+    Timer timer(io_context);
+    timer.expires_after(duration);
+    boost::system::error_code ec;
+    timer.async_wait(yield[ec]);
+    if (ec) {
+      ldpp_dout(this, 1) << "ERROR: async_sleep failed with error: " << ec.message() << dendl;
+    }
+  }
 
   // clean stale reservation from queue
   void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
+    auto next_check_time = ceph::coarse_real_clock::zero();
     while (!shutdown) {
-      ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
-      const auto now = ceph::coarse_real_time::clock::now();
-      const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
+      // check if it is time for queue cleanup
+      if (ceph::coarse_real_clock::now() > next_check_time) {
+        next_check_time = ceph::coarse_real_clock::now() + reservations_cleanup_period;
+      } else {
+        // short sleep duration to prevent busy wait when doing queue cleanup
+        async_sleep(yield, queues_update_retry);
+        continue;
+      }
+      const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+      ldpp_dout(this, 20) << "INFO: performing stale reservation cleanup for queue: " << queue_name <<
+        ". next cleanup will happen at: " << std::ctime(&tp) << dendl;
+      const auto stale_time = ceph::coarse_real_time::clock::now() - stale_reservations_period;
       librados::ObjectWriteOperation op;
       op.assert_exists();
-      rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
+      rados::cls::lock::assert_locked(&op, queue_name+"_lock",
         ClsLockType::EXCLUSIVE,
-        lock_cookie, 
+        lock_cookie,
         "" /*no tag*/);
       cls_2pc_queue_expire_reservations(op, stale_time);
       // check ownership and do reservation cleanup in one batch
-      auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, std::move(op), yield);
+      auto ret = rgw_rados_operate(this, rados_store->getRados()->get_notif_pool_ctx(), queue_name, std::move(op), yield);
       if (ret == -ENOENT) {
         // queue was deleted
         ldpp_dout(this, 10) << "INFO: queue: " << queue_name
@@ -304,10 +322,6 @@ private:
         ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
           << ". error: " << ret << dendl;
       }
-      Timer timer(io_context);
-      timer.expires_after(std::chrono::seconds(reservations_cleanup_period_s));
-      boost::system::error_code ec;
-           timer.async_wait(yield[ec]);
     }
     ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl;
   }
@@ -317,7 +331,7 @@ private:
     librados::ObjectWriteOperation op;
     op.assert_exists();
     rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
-    auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+    auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx();
     const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, std::move(op), yield);
     if (ret == -ENOENT) {
       ldpp_dout(this, 10) << "INFO: queue: " << queue_name
@@ -340,7 +354,7 @@ private:
     std::string queue_topic_name;
     parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name);
     rgw_pubsub_topic topic_info;
-    RGWPubSub ps(&rados_store, queue_topic_tenant, site);
+    RGWPubSub ps(rados_store, queue_topic_tenant, site);
     int ret = ps.get_topic(this, queue_topic_name, topic_info, yield, nullptr);
     if (ret < 0) {
       ldpp_dout(this, 1) << "WARNING: failed to fetch topic: "
@@ -387,14 +401,11 @@ private:
     while (!shutdown) {
       // if queue was empty the last time, sleep for idle timeout
       if (is_idle) {
-        Timer timer(io_context);
-        timer.expires_after(std::chrono::microseconds(queue_idle_sleep_us));
-        boost::system::error_code ec;
-             timer.async_wait(yield[ec]);
+        async_sleep(yield, queue_idle_sleep);
       }
 
       // get list of entries in the queue
-      auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+      auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx();
       is_idle = true;
       bool truncated = false;
       std::string end_marker;
@@ -579,7 +590,7 @@ private:
           std::string tenant_name;
           // TODO: extract tenant name from queue_name once it is fixed
           uint64_t size_to_migrate = 0;
-          RGWPubSub ps(&rados_store, tenant_name, site);
+          RGWPubSub ps(rados_store, tenant_name, site);
 
           rgw_pubsub_topic topic;
           auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
@@ -661,27 +672,25 @@ private:
     owned_queues_t owned_queues;
     size_t processed_queue_count = 0;
 
-    // add randomness to the duration between queue checking
-    // to make sure that different daemons are not synced
-    std::random_device seed;
-    std::mt19937 rnd_gen(seed());
-    const auto min_jitter = 100; // ms
-    const auto max_jitter = 500; // ms
-    std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
-
     std::vector<std::string> queue_gc;
     std::mutex queue_gc_lock;
-    auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+    auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx();
+    auto next_check_time = ceph::coarse_real_clock::zero();
     while (!shutdown) {
-      Timer timer(io_context);
-      const auto duration = (has_error ? 
-        std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + 
-        std::chrono::milliseconds(duration_jitter(rnd_gen));
-      timer.expires_after(duration);
-      const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
-      ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp)  << dendl;
-      boost::system::error_code ec;
-      timer.async_wait(yield[ec]);
+      // check if queue list needs to be refreshed
+      if (ceph::coarse_real_clock::now() > next_check_time) {
+        next_check_time = ceph::coarse_real_clock::now() + queues_update_period;
+        const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+        ldpp_dout(this, 20) << "INFO: processing queue list. next queues processing will happen at: " << std::ctime(&tp) << dendl;
+      } else {
+        // short sleep duration to prevent busy wait when refreshing queue list
+        // or retrying after error
+        async_sleep(yield, queues_update_retry);
+        if (!has_error) {
+          // in case of error we will retry
+          continue;
+        }
+      }
 
       queues_t queues;
       auto ret = read_queue_list(queues, yield);
@@ -695,9 +704,9 @@ private:
         // or if ownership needs to be taken
         librados::ObjectWriteOperation op;
         op.assert_exists();
-        rados::cls::lock::lock(&op, queue_name+"_lock", 
+        rados::cls::lock::lock(&op, queue_name+"_lock",
               ClsLockType::EXCLUSIVE,
-              lock_cookie, 
+              lock_cookie,
               "" /*no tag*/,
               "" /*no description*/,
               failover_time,
@@ -760,25 +769,34 @@ private:
         queue_gc.clear();
       }
     }
-    Timer timer(io_context);
     while (processed_queue_count > 0) {
-      ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
-      timer.expires_after(std::chrono::milliseconds(queues_update_retry_ms));
-      boost::system::error_code ec;
-      timer.async_wait(yield[ec]);
+      ldpp_dout(this, 20) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
+      async_sleep(yield, queues_update_retry);
     }
     ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl;
   }
 
 public:
 
-  ~Manager() {
-  }
+  ~Manager() = default;
 
   void stop() {
+    ldpp_dout(this, 5) << "INFO: manager received stop signal. shutting down..." << dendl;
     shutdown = true;
     work_guard.reset();
-    std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
+    if (worker.joinable()) {
+      // try graceful shutdown first
+      auto future = std::async(std::launch::async, [this]() {worker.join();});
+      if (future.wait_for(queues_update_retry*2) == std::future_status::timeout) {
+        // force stop if graceful shutdown takes too long
+        if (!io_context.stopped()) {
+          ldpp_dout(this, 5) << "INFO: force shutdown of manager" << dendl;
+          io_context.stop();
+        }
+        worker.join();
+      }
+    }
+    ldpp_dout(this, 5) << "INFO: manager shutdown ended" << dendl;
   }
 
   void init() {
@@ -790,52 +808,32 @@ public:
         });
 
     // start the worker threads to do the actual queue processing
-    for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
-      workers.emplace_back([this,worker_id]() {
-        const auto thread_name = fmt::format("notif-worker-{}", worker_id);
-        ceph_pthread_setname(thread_name.c_str());
-        try {
-          io_context.run(); 
-        } catch (const std::exception& err) {
-          ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
-          throw err;
-        }
-      });
-    }
-    ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
+    worker = std::thread([this]() {
+      ceph_pthread_setname("notif-worker");
+      try {
+        ldpp_dout(this, 10) << "INFO: notification worker started" << dendl;
+        io_context.run();
+        ldpp_dout(this, 10) << "INFO: notification worker ended" << dendl;
+      } catch (const std::exception& err) {
+        ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
+        throw err;
+      }
+    });
+    ldpp_dout(this, 10) << "INfO: started notification manager" << dendl;
   }
 
-  // ctor: start all threads
-  Manager(CephContext* _cct, uint32_t _queues_update_period_ms,
-          uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, 
-          uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
-          uint32_t _worker_count, rgw::sal::RadosStore* store,
-          const SiteConfig& site) :
-    queues_update_period_ms(_queues_update_period_ms),
-    queues_update_retry_ms(_queues_update_retry_ms),
-    queue_idle_sleep_us(_queue_idle_sleep_us),
-    failover_time(std::chrono::milliseconds(failover_time_ms)),
+  Manager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) :
     cct(_cct),
     lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
     work_guard(boost::asio::make_work_guard(io_context)),
-    worker_count(_worker_count),
-    stale_reservations_period_s(_stale_reservations_period_s),
-    reservations_cleanup_period_s(_reservations_cleanup_period_s),
     site(site),
-    rados_store(*store)
+    rados_store(store)
     {}
 };
 
 std::unique_ptr<Manager> s_manager;
 
 constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
-constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30;     // check queue list every 30seconds
-constexpr uint32_t Q_LIST_RETRY_MSEC = 1000;         // retry every second if queue list update failed
-constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000;     // idle sleep 100ms
-constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
-constexpr uint32_t WORKER_COUNT = 1;                 // 1 worker thread
-constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120;   // cleanup reservations that are more than 2 minutes old
-constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
 
 bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
           const SiteConfig& site) {
@@ -847,12 +845,7 @@ bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = std::make_unique<Manager>(dpp->get_cct(),
-      Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, 
-      IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, 
-      STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
-      WORKER_COUNT,
-      store, site);
+  s_manager = std::make_unique<Manager>(dpp->get_cct(), store, site);
   s_manager->init();
   return true;
 }