]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: start/stop endpoint managers in notification manager
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 18 Apr 2024 09:39:42 +0000 (09:39 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Sun, 12 May 2024 10:39:08 +0000 (10:39 +0000)
Fixes: https://tracker.ceph.com/issues/65337
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
12 files changed:
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_notify.h
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/driver/rados/rgw_pubsub_push.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/rgw_amqp.cc
src/rgw/rgw_appmain.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_lib.cc
src/rgw/rgw_main.cc
src/rgw/rgw_main.h
src/test/rgw/bucket_notification/test_bn.py

index 43481629bdf4960105662cac1e400ab2c5fe027b..c70693895bf83d61ba5d4de98c5720435fc0423a 100644 (file)
@@ -123,6 +123,7 @@ void publish_commit_completion(rados_completion_t completion, void *arg) {
 };
 
 class Manager : public DoutPrefixProvider {
+  bool shutdown = false;
   const size_t max_queue_size;
   const uint32_t queues_update_period_ms;
   const uint32_t queues_update_retry_ms;
@@ -311,7 +312,7 @@ private:
 
   // clean stale reservation from queue
   void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
-    while (true) {
+    while (!shutdown) {
       ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
       const auto now = ceph::coarse_real_time::clock::now();
       const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
@@ -346,6 +347,27 @@ private:
       boost::system::error_code ec;
            timer.async_wait(yield[ec]);
     }
+    ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl;
+  }
+
+  // unlock (lose ownership) queue
+  int unlock_queue(const std::string& queue_name, spawn::yield_context yield) {
+    librados::ObjectWriteOperation op;
+    op.assert_exists();
+    rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
+    auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+    const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+    if (ret == -ENOENT) {
+      ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+        << ". was removed. nothing to unlock" << dendl;
+      return 0;
+    }
+    if (ret == -EBUSY) {
+      ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+        << ". already owned by another RGW. no need to unlock" << dendl;
+      return 0;
+    }
+    return ret;
   }
 
   // processing of a specific queue
@@ -361,7 +383,7 @@ private:
 
     CountersManager queue_counters_container(queue_name, this->get_cct());
 
-    while (true) {
+    while (!shutdown) {
       // if queue was empty the last time, sleep for idle timeout
       if (is_idle) {
         Timer timer(io_context);
@@ -446,7 +468,7 @@ private:
             if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
                 || result == EntryProcessingResult::Migrating) {
               ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
-                << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+                << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " "
                 << entryProcessingResultString[static_cast<unsigned int>(result)] << dendl;
               remove_entries = true;
               needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
@@ -588,6 +610,7 @@ private:
         queue_counters_container.set(l_rgw_persistent_topic_size, entries_size);
       }
     }
+    ldpp_dout(this, 5) << "INFO: manager stopped. done processing for queue: " << queue_name << dendl;
   }
 
   // lits of owned queues
@@ -598,6 +621,7 @@ private:
   void process_queues(spawn::yield_context yield) {
     auto has_error = false;
     owned_queues_t owned_queues;
+    size_t processed_queue_count = 0;
 
     // add randomness to the duration between queue checking
     // to make sure that different daemons are not synced
@@ -609,7 +633,8 @@ private:
 
     std::vector<std::string> queue_gc;
     std::mutex queue_gc_lock;
-    while (true) {
+    auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+    while (!shutdown) {
       Timer timer(io_context);
       const auto duration = (has_error ? 
         std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + 
@@ -640,7 +665,7 @@ private:
               failover_time,
               LOCK_FLAG_MAY_RENEW);
 
-        ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
+        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
         if (ret == -EBUSY) {
           // lock is already taken by another RGW
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
@@ -662,12 +687,21 @@ private:
         if (owned_queues.insert(queue_name).second) {
           ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
           // start processing this queue
-          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
+          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) {
+            ++processed_queue_count;
             process_queue(queue_name, yield);
             // if queue processing ended, it means that the queue was removed or not owned anymore
+            const auto ret = unlock_queue(queue_name, yield);
+            if (ret < 0) {
+              ldpp_dout(this, 5) << "WARNING: failed to unlock queue: " << queue_name << " with error: " <<
+                ret << " (ownership would still move if not renewed)" << dendl;
+            } else {
+              ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " not locked (ownership can move)" << dendl;
+            }
             // mark it for deletion
             std::lock_guard lock_guard(queue_gc_lock);
             queue_gc.push_back(queue_name);
+            --processed_queue_count;
             ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
           }, make_stack_allocator());
         } else {
@@ -680,21 +714,55 @@ private:
         std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
           topics_persistency_tracker.erase(queue_name);
           owned_queues.erase(queue_name);
-          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
+          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " was removed" << dendl;
         });
         queue_gc.clear();
       }
     }
+    Timer timer(io_context);
+    while (processed_queue_count > 0) {
+      ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
+      timer.expires_from_now(std::chrono::milliseconds(queues_update_retry_ms));
+      boost::system::error_code ec;
+      timer.async_wait(yield[ec]);
+    }
+    ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl;
   }
 
 public:
 
   ~Manager() {
+  }
+
+  void stop() {
+    shutdown = true;
     work_guard.reset();
-    io_context.stop();
     std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
   }
 
+  void init() {
+    spawn::spawn(io_context, [this](spawn::yield_context yield) {
+          process_queues(yield);
+        }, make_stack_allocator());
+
+    // start the worker threads to do the actual queue processing
+    const std::string WORKER_THREAD_NAME = "notif-worker";
+    for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
+      workers.emplace_back([this]() {
+        try {
+          io_context.run(); 
+        } catch (const std::exception& err) {
+          ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
+          throw err;
+        }
+      });
+      const auto rc = ceph_pthread_setname(workers.back().native_handle(), 
+        (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
+      ceph_assert(rc == 0);
+    }
+    ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
+  }
+
   // ctor: start all threads
   Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, 
           uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, 
@@ -714,28 +782,7 @@ public:
     reservations_cleanup_period_s(_reservations_cleanup_period_s),
     site(site),
     rados_store(*store)
-    {
-      spawn::spawn(io_context, [this](spawn::yield_context yield) {
-            process_queues(yield);
-          }, make_stack_allocator());
-
-      // start the worker threads to do the actual queue processing
-      const std::string WORKER_THREAD_NAME = "notif-worker";
-      for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
-        workers.emplace_back([this]() {
-          try {
-            io_context.run(); 
-          } catch (const std::exception& err) {
-            ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
-            throw(err);
-          }
-        });
-        const auto rc = ceph_pthread_setname(workers.back().native_handle(), 
-          (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
-        ceph_assert(rc == 0);
-      }
-      ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
-    }
+    {}
 
   int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
     if (topic_queue == Q_LIST_OBJECT_NAME) {
@@ -771,10 +818,7 @@ public:
   }
 };
 
-// singleton manager
-// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
-static Manager* s_manager = nullptr;
+std::unique_ptr<Manager> s_manager;
 
 constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
 constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30;     // check queue list every 30seconds
@@ -785,24 +829,31 @@ constexpr uint32_t WORKER_COUNT = 1;                 // 1 worker thread
 constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120;   // cleanup reservations that are more than 2 minutes old
 constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
 
-bool init(CephContext* cct, rgw::sal::RadosStore* store,
-          const SiteConfig& site, const DoutPrefixProvider *dpp) {
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+          const SiteConfig& site) {
   if (s_manager) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to init notification manager: already exists" << dendl;
+    return false;
+  }
+  if (!RGWPubSubEndpoint::init_all(dpp->get_cct())) {
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = new Manager(cct, MAX_QUEUE_SIZE, 
+  s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE, 
       Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, 
       IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, 
       STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
       WORKER_COUNT,
       store, site);
+  s_manager->init();
   return true;
 }
 
 void shutdown() {
-  delete s_manager;
-  s_manager = nullptr;
+  if (!s_manager) return;
+  RGWPubSubEndpoint::shutdown_all();
+  s_manager->stop();
+  s_manager.reset();
 }
 
 int add_persistent_topic(const std::string& topic_name, optional_yield y) {
@@ -842,7 +893,7 @@ int remove_persistent_topic(const std::string& topic_queue, optional_yield y) {
   if (!s_manager) {
     return -EAGAIN;
   }
-  return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
+  return remove_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
 }
 
 rgw::sal::Object* get_object_with_attributes(
index 7014cda3ca35011ce93d151503545d931896c44b..e1566d3f71d964ca90556763fab257fff04e1447 100644 (file)
@@ -26,8 +26,8 @@ namespace rgw::notify {
 // initialize the notification manager
 // notification manager is dequeuing the 2-phase-commit queues
 // and send the notifications to the endpoints
-bool init(CephContext* cct, rgw::sal::RadosStore* store,
-          const rgw::SiteConfig& site, const DoutPrefixProvider *dpp);
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+          const rgw::SiteConfig& site);
 
 // shutdown the notification manager
 void shutdown();
index d2c0824c90cbb0262517eb69cd9565d14529574c..16937ef23ec26dbfec71ec655bbf4c597f58323a 100644 (file)
@@ -5,6 +5,7 @@
 #include <string>
 #include <sstream>
 #include <algorithm>
+#include <curl/curl.h>
 #include "common/Formatter.h"
 #include "common/iso_8601.h"
 #include "common/async/completion.h"
@@ -23,6 +24,8 @@
 #include <functional>
 #include "rgw_perf_counters.h"
 
+#define dout_subsys ceph_subsys_rgw_notification
+
 using namespace rgw;
 
 template<typename EventType>
@@ -52,6 +55,9 @@ bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_val
   return value;
 }
 
+static std::unique_ptr<RGWHTTPManager> s_http_manager;
+static std::shared_mutex s_http_manager_mutex;
+
 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
 private:
   CephContext* const cct;
@@ -83,6 +89,11 @@ public:
   }
 
   int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
+    std::shared_lock lock(s_http_manager_mutex);
+    if (!s_http_manager) {
+      ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl;
+      return -ESRCH;
+    }
     bufferlist read_bl;
     RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
     const auto post_data = json_format_pubsub_event(event);
@@ -101,7 +112,10 @@ public:
     request.set_send_length(post_data.length());
     request.append_header("Content-Type", "application/json");
     if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
-    const auto rc = RGWHTTP::process(&request, y);
+    auto rc = s_http_manager->add_request(&request);
+    if (rc == 0) {
+      rc = request.wait(y);
+    }
     if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
     // TODO: use read_bl to process return code and handle according to ack level
     return rc;
@@ -398,3 +412,48 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
   return nullptr;
 }
 
+bool init_http_manager(CephContext* cct) {
+  std::unique_lock lock(s_http_manager_mutex);
+  if (s_http_manager) return false;
+  s_http_manager = std::make_unique<RGWHTTPManager>(cct);
+  return (s_http_manager->start() == 0);
+}
+
+void shutdown_http_manager() {
+  std::unique_lock lock(s_http_manager_mutex);
+  if (s_http_manager) {
+    s_http_manager->stop();
+    s_http_manager.reset();
+  }
+}
+
+bool RGWPubSubEndpoint::init_all(CephContext* cct) {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+  if (!amqp::init(cct)) {
+    ldout(cct, 1) << "ERROR: failed to init amqp endpoint manager" << dendl;
+    return false;
+  }
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  if (!kafka::init(cct)) {
+    ldout(cct, 1) << "ERROR: failed to init kafka endpoint manager" << dendl;
+    return false;
+  }
+#endif
+  if (!init_http_manager(cct)) {
+    ldout(cct, 1) << "ERROR: failed to init http endpoint manager" << dendl;
+    return false;
+  }
+  return true;
+}
+
+void RGWPubSubEndpoint::shutdown_all() {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+  amqp::shutdown();
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+  kafka::shutdown();
+#endif
+  shutdown_http_manager();
+}
+
index 040be3dd428911b4913046dc80e800ae93823ea7..bacebfba44c12e01494bb80ef5555135c1f18163 100644 (file)
@@ -40,5 +40,11 @@ public:
     configuration_error(const std::string& what_arg) : 
       std::logic_error("pubsub endpoint configuration error: " + what_arg) {}
   };
+
+  // init all supported endpoints
+  static bool init_all(CephContext* cct);
+  // shutdown all supported endpoints
+  static void shutdown_all();
+
 };
 
index 95f05f149a062029901f6fe7c146830d329b5b32..45d50452e03c50a8d81f6c0bc8a112c18eac9ce0 100644 (file)
@@ -1356,10 +1356,8 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
   index_completion_manager = new RGWIndexCompletionManager(this);
 
   if (run_notification_thread) {
-    ret = rgw::notify::init(cct, driver, *svc.site, dpp);
-    if (ret < 0 ) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
-      return ret;
+    if (!rgw::notify::init(dpp, driver, *svc.site)) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
     }
 
     using namespace rgw;
index 46ab9c575bd67c9b5b7f98629fb1c63c25b767c7..ea824b8295aca169832936026d4929963b637ab4 100644 (file)
@@ -966,8 +966,8 @@ public:
 
 // singleton manager
 // note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
 static Manager* s_manager = nullptr;
+static std::shared_mutex s_manager_mutex;
 
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
 static const size_t MAX_INFLIGHT_DEFAULT = 8192;
@@ -977,6 +977,7 @@ static const unsigned IDLE_TIME_MS = 100;
 static const unsigned RECONNECT_TIME_MS = 100;
 
 bool init(CephContext* cct) {
+  std::unique_lock lock(s_manager_mutex);
   if (s_manager) {
     return false;
   }
@@ -987,12 +988,14 @@ bool init(CephContext* cct) {
 }
 
 void shutdown() {
+  std::unique_lock lock(s_manager_mutex);
   delete s_manager;
   s_manager = nullptr;
 }
 
 bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
         boost::optional<const std::string&> ca_location) {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
   return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
 }
@@ -1000,6 +1003,7 @@ bool connect(connection_id_t& conn_id, const std::string& url, const std::string
 int publish(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message) {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
   return s_manager->publish(conn_id, topic, message);
 }
@@ -1008,41 +1012,49 @@ int publish_with_confirm(const connection_id_t& conn_id,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
   return s_manager->publish_with_confirm(conn_id, topic, message, cb);
 }
 
 size_t get_connection_count() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_connection_count();
 }
 
 size_t get_inflight() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_inflight();
 }
 
 size_t get_queued() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_queued();
 }
 
 size_t get_dequeued() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_dequeued();
 }
 
 size_t get_max_connections() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
   return s_manager->max_connections;
 }
 
 size_t get_max_inflight() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return MAX_INFLIGHT_DEFAULT;
   return s_manager->max_inflight;
 }
 
 size_t get_max_queue() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return MAX_QUEUE_DEFAULT;
   return s_manager->max_queue;
 }
index 8266985ece44ed0650ac5572d7b37b834d667323..9969811083eff00887469cf1901a02db06542ecd 100644 (file)
 #include "rgw_kmip_client_impl.h"
 #include "rgw_perf_counters.h"
 #include "rgw_signal.h"
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-#include "rgw_amqp.h"
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-#include "rgw_kafka.h"
-#endif
 #ifdef WITH_ARROW_FLIGHT
 #include "rgw_flight_frontend.h"
 #endif
@@ -554,20 +548,6 @@ void rgw::AppMain::init_tracepoints()
   tracing::rgw::tracer.init(dpp->get_cct(), "rgw");
 } /* init_tracepoints() */
 
-void rgw::AppMain::init_notification_endpoints()
-{
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-  if (!rgw::amqp::init(dpp->get_cct())) {
-    derr << "ERROR: failed to initialize AMQP manager" << dendl;
-  }
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-  if (!rgw::kafka::init(dpp->get_cct())) {
-    derr << "ERROR: failed to initialize Kafka manager" << dendl;
-  }
-#endif
-} /* init_notification_endpoints */
-
 void rgw::AppMain::init_lua()
 {
   rgw::sal::Driver* driver = env.driver;
@@ -644,12 +624,6 @@ void rgw::AppMain::shutdown(std::function<void(void)> finalize_async_signals)
   rgw::curl::cleanup_curl();
   g_conf().remove_observer(implicit_tenant_context.get());
   implicit_tenant_context.reset(); // deletes
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-  rgw::amqp::shutdown();
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-  rgw::kafka::shutdown();
-#endif
   rgw_perf_stop(g_ceph_context);
   ratelimiter.reset(); // deletes--ensure this happens before we destruct
 } /* AppMain::shutdown */
index ce496e414a7d543bc230316ff58afee23e6dd4e5..ce1b273d8b78b42c87649187711814fe13ffd926 100644 (file)
@@ -688,14 +688,15 @@ public:
 
 // singleton manager
 // note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
 static Manager* s_manager = nullptr;
+static std::shared_mutex s_manager_mutex;
 
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
 static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
 static const size_t MAX_QUEUE_DEFAULT = 8192;
 
 bool init(CephContext* cct) {
+  std::unique_lock lock(s_manager_mutex);
   if (s_manager) {
     return false;
   }
@@ -705,6 +706,7 @@ bool init(CephContext* cct) {
 }
 
 void shutdown() {
+  std::unique_lock lock(s_manager_mutex);
   delete s_manager;
   s_manager = nullptr;
 }
@@ -714,6 +716,7 @@ bool connect(std::string& broker, const std::string& url, bool use_ssl, bool ver
         boost::optional<const std::string&> mechanism,
         boost::optional<const std::string&> user_name,
         boost::optional<const std::string&> password) {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
   return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
 }
@@ -721,6 +724,7 @@ bool connect(std::string& broker, const std::string& url, bool use_ssl, bool ver
 int publish(const std::string& conn_name,
     const std::string& topic,
     const std::string& message) {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return STATUS_MANAGER_STOPPED;
   return s_manager->publish(conn_name, topic, message);
 }
@@ -729,41 +733,49 @@ int publish_with_confirm(const std::string& conn_name,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return STATUS_MANAGER_STOPPED;
   return s_manager->publish_with_confirm(conn_name, topic, message, cb);
 }
 
 size_t get_connection_count() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_connection_count();
 }
   
 size_t get_inflight() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_inflight();
 }
 
 size_t get_queued() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_queued();
 }
 
 size_t get_dequeued() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return 0;
   return s_manager->get_dequeued();
 }
 
 size_t get_max_connections() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
   return s_manager->max_connections;
 }
 
 size_t get_max_inflight() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return MAX_INFLIGHT_DEFAULT;
   return s_manager->max_inflight;
 }
 
 size_t get_max_queue() {
+  std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return MAX_QUEUE_DEFAULT;
   return s_manager->max_queue;
 }
index 665a03b62792192e2c11d1b233edd65602f0052e..59443c52b02737d29378a33e1fe78165fbe33b5d 100644 (file)
@@ -548,7 +548,6 @@ namespace rgw {
       return r;
     }
 
-    main.init_notification_endpoints();
     main.init_lua();
 
     return 0;
index 6d0dab8245af7b07facc850e0f3b073e19645aa8..5536e2810f6a81d660375258e84dbd9159e9b843 100644 (file)
@@ -168,7 +168,6 @@ int main(int argc, char *argv[])
     main.shutdown();
     return r;
   }
-  main.init_notification_endpoints();
 
 #if defined(HAVE_SYS_PRCTL_H)
   if (prctl(PR_SET_DUMPABLE, 1) == -1) {
index caa6a0822828c9353e90e46f0e881c49e6ff4f8b..d106485855a30be16ac17f75893de94229fbf971 100644 (file)
@@ -114,7 +114,6 @@ public:
   void init_opslog();
   int init_frontends2(RGWLib* rgwlib = nullptr);
   void init_tracepoints();
-  void init_notification_endpoints();
   void init_lua();
 
   bool have_http() {
index 72be3594d3bfb5d33dd129ba21c8fff57096d3cb..6363d9b84ec0074932db6f8aee3dfa11c870e885 100644 (file)
@@ -2957,8 +2957,9 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None):
         parsed_result = json.loads(result[0])
         entries = parsed_result['Topic Stats']['Entries']
         retries += 1
+        time_diff = time.time() - start_time
+        log.info('queue %s has %d entries after %ds', topic_name, entries, time_diff)
         if retries > 30:
-            time_diff = time.time() - start_time
             log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff)
             assert_equal(entries, 0)
         time.sleep(5)
@@ -4510,6 +4511,111 @@ def test_ps_s3_notification_push_kafka_security_sasl_scram():
     kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
 
 
+@attr('http_test')
+def test_persistent_ps_s3_reload():
+    """ do a realm reload while we send notifications """
+    conn = connection()
+    zonegroup = get_config_zonegroup()
+
+    # make sure there is nothing to migrate
+    print('delete all topics')
+    delete_all_topics(conn, '', get_config_cluster())
+
+    # create random port for the http server
+    host = get_ip()
+    http_port = random.randint(10000, 20000)
+    print('start http server')
+    http_server = HTTPServerWithEvents((host, http_port), delay=2)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+
+    # create s3 topics
+    endpoint_address = 'http://'+host+':'+str(http_port)
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+    topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
+    topic_arn1 = topic_conf1.set_config()
+    # 2nd topic is unused
+    topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
+    topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
+    topic_arn2 = topic_conf2.set_config()
+
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster())
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    wait_for_queue_to_drain(topic_name1)
+
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('another-key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    # do a reload
+    print('do reload')
+    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    wait_for_queue_to_drain(topic_name1)
+    # verify events
+    keys = list(bucket.list())
+    http_server.verify_s3_events(keys, exact_match=False)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
+    # delete objects from the bucket
+    client_threads = []
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
+
+
 @attr('data_path_v2_test')
 def test_persistent_ps_s3_data_path_v2_migration():
     """ test data path v2 persistent migration """