]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: avoid static creation of amqp manager
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 31 Mar 2019 19:52:27 +0000 (22:52 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Sun, 31 Mar 2019 19:52:27 +0000 (22:52 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_amqp.cc
src/rgw/rgw_amqp.h
src/rgw/rgw_main.cc
src/rgw/rgw_pubsub_push.cc
src/test/rgw/test_rgw_amqp.cc

index f9acabb2c5f89f02569674113964287db58284d1..4143eaae977fcd4245e18eae2f7b587cb067101b 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "rgw_amqp.h"
+#include <atomic>
 #include <amqp.h>
 #include <amqp_tcp_socket.h>
 #include <amqp_framing.h>
@@ -533,6 +534,7 @@ private:
   MessageQueue messages;
   std::atomic<size_t> queued;
   std::atomic<size_t> dequeued;
+  CephContext* const cct;
   mutable std::mutex connections_lock;
   std::thread runner;
 
@@ -634,7 +636,6 @@ private:
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
-        ldout(conn->cct, 20) << "AMQP run: start processing connection" << dendl;
         // delete the connection if marked for deletion
         if (conn->marked_for_deletion) {
           ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
@@ -668,7 +669,6 @@ private:
 
         if (rc == AMQP_STATUS_TIMEOUT) {
           // TODO mark connection as idle
-          ldout(conn->cct, 20) << "AMQP run: connection is idle" << dendl;
           INCREMENT_AND_CONTINUE(conn_it);
         }
        
@@ -777,7 +777,8 @@ public:
   Manager(size_t _max_connections,
       size_t _max_inflight,
       size_t _max_queue, 
-      long _usec_timeout) : 
+      long _usec_timeout,
+      CephContext* _cct) : 
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
@@ -788,6 +789,7 @@ public:
     messages(max_queue),
     queued(0),
     dequeued(0),
+    cct(_cct),
     runner(&Manager::run, this) {
       // The hashmap has "max connections" as the initial number of buckets, 
       // and allows for 10 collisions per bucket before rehash.
@@ -818,7 +820,7 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) {
+  connection_ptr_t connect(const std::string& url, const std::string& exchange) {
     if (stopped) {
       // TODO: increment counter
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
@@ -939,33 +941,36 @@ public:
 
 // singleton manager
 // note that the manager itself is not a singleton, and multiple instances may co-exist
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
 static Manager* s_manager = nullptr;
 
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
 static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
 static const size_t MAX_QUEUE_DEFAULT = 8192;
 
-class SingletonManager {
-  std::mutex manager_creation_lock;
-  public:
-    SingletonManager(CephContext* cct=nullptr) {
-      // TODO get parameters from conf
-      std::lock_guard<std::mutex> l(manager_creation_lock);
-      if (s_manager == nullptr) {
-        s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100);
-      }
-    }
-};
+bool init(CephContext* cct) {
+  if (s_manager) {
+    return false;
+  }
+  // TODO: take conf from CephContext
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+  return true;
+}
+
+void shutdown() {
+  delete s_manager;
+  s_manager = nullptr;
+}
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) {
-  static const SingletonManager singleton(cct);
-  return s_manager->connect(url, exchange, cct);
+connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+  if (!s_manager) return nullptr;
+  return s_manager->connect(url, exchange);
 }
 
 int publish(connection_ptr_t& conn, 
     const std::string& topic,
     const std::string& message) {
-  if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+  if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
   return s_manager->publish(conn, topic, message);
 }
 
@@ -973,47 +978,47 @@ int publish_with_confirm(connection_ptr_t& conn,
     const std::string& topic,
     const std::string& message,
     reply_callback_t cb) {
-  if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+  if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
   return s_manager->publish_with_confirm(conn, topic, message, cb);
 }
 
 size_t get_connection_count() {
-  if (s_manager == nullptr) return 0;
+  if (!s_manager) return 0;
   return s_manager->get_connection_count();
 }
   
 size_t get_inflight() {
-  if (s_manager == nullptr) return 0;
+  if (!s_manager) return 0;
   return s_manager->get_inflight();
 }
 
 size_t get_queued() {
-  if (s_manager == nullptr) return 0;
+  if (!s_manager) return 0;
   return s_manager->get_queued();
 }
 
 size_t get_dequeued() {
-  if (s_manager == nullptr) return 0;
+  if (!s_manager) return 0;
   return s_manager->get_dequeued();
 }
 
 size_t get_max_connections() {
-  if (s_manager == nullptr) return MAX_CONNECTIONS_DEFAULT;
+  if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
   return s_manager->max_connections;
 }
 
 size_t get_max_inflight() {
-  if (s_manager == nullptr) return MAX_INFLIGHT_DEFAULT;
+  if (!s_manager) return MAX_INFLIGHT_DEFAULT;
   return s_manager->max_inflight;
 }
 
 size_t get_max_queue() {
-  if (s_manager == nullptr) return MAX_QUEUE_DEFAULT;
+  if (!s_manager) return MAX_QUEUE_DEFAULT;
   return s_manager->max_queue;
 }
 
 bool disconnect(connection_ptr_t& conn) {
-  if (s_manager == nullptr) return false;
+  if (!s_manager) return false;
   return s_manager->disconnect(conn);
 }
 
index 2549116d28967691cbf5c743bef435b924fa735e..938bdade4958cf1d11678da92bd58d877714c475 100644 (file)
@@ -23,8 +23,14 @@ void intrusive_ptr_release(const connection_t* p);
 // indicating the result, and not to return anything
 typedef std::function<void(int)> reply_callback_t;
 
+// initialize the amqp manager
+bool init(CephContext* cct);
+
+// shutdown the amqp manager
+void shutdown();
+
 // connect to an amqp endpoint
-connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct);
+connection_ptr_t connect(const std::string& url, const std::string& exchange);
 
 // publish a message over a connection that was already created
 int publish(connection_ptr_t& conn,
index ad21c3974030110cb45143e080db1b1b31c84618..63ef569bc11e25549d86fef9a42f6526943f1fda 100644 (file)
 #include "rgw_asio_frontend.h"
 #endif /* WITH_RADOSGW_BEAST_FRONTEND */
 
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+#include "rgw_amqp.h"
+#endif /* WITH_RADOSGW_AMQP_ENDPOINT */
+
 #include "rgw_dmclock_scheduler_ctx.h"
 
 #include "services/svc_zone.h"
@@ -302,6 +306,10 @@ int main(int argc, const char **argv)
   FCGX_Init();
 #endif
 
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+  rgw::amqp::init(cct.get());
+#endif
+
   RGWRados *store =
     RGWStoreManager::get_storage(g_ceph_context,
                                 g_conf()->rgw_enable_gc_threads,
@@ -594,6 +602,10 @@ int main(int argc, const char **argv)
   rgw_http_client_cleanup();
   rgw::curl::cleanup_curl();
 
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+  rgw::amqp::shutdown();
+#endif
+
   rgw_perf_stop(g_ceph_context);
 
   dout(1) << "final shutdown" << dendl;
index 1244ee8ccb996b46e5fd04464280edc25e14a7b5..81c9aefa1a680de999e03fb377be4aa113876ebb 100644 (file)
@@ -258,7 +258,7 @@ public:
       CephContext* cct) : 
         endpoint(_endpoint), 
         topic(_topic), 
-        conn(amqp::connect(endpoint, get_exchange(args), cct)) {
+        conn(amqp::connect(endpoint, get_exchange(args))) {
     bool exists;
     // get ack level
     str_ack_level = args.get("amqp-ack-level", &exists);
index 41b395acca85cde22a17440fd93f1f92208f455e..89086d3ec8270e5937042221e9beb39f25f6d1b0 100644 (file)
@@ -13,74 +13,102 @@ using namespace rgw;
 
 const std::chrono::milliseconds wait_time(300);
 
+class CctCleaner {
+  CephContext* cct;
+public:
+  CctCleaner(CephContext* _cct) : cct(_cct) {}
+  ~CctCleaner() { 
+#ifdef WITH_SEASTAR
+    delete cct; 
+#else
+    cct->put(); 
+#endif
+  }
+};
+
 auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
 
-TEST(AMQP_Connection, ConnectionOK)
+CctCleaner cleaner(cct);
+
+class TestAMQP : public ::testing::Test {
+protected:
+  void SetUp() override {
+    ASSERT_TRUE(amqp::init(cct));
+  }
+
+  void TearDown() override {
+    amqp::shutdown();
+  }
+};
+
+TEST_F(TestAMQP, ConnectionOK)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
-TEST(AMQP_Connection, ConnectionReuse)
+TEST_F(TestAMQP, ConnectionReuse)
 {
+  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1");
+  EXPECT_TRUE(conn1);
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1", cct);
-  EXPECT_TRUE(conn);
+  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1");
+  EXPECT_TRUE(conn2);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
-  auto rc = amqp::publish(conn, "topic", "message");
+  auto rc = amqp::publish(conn1, "topic", "message");
   EXPECT_EQ(rc, 0);
 }
 
-TEST(AMQP_Connection, NameResolutionFail)
+TEST_F(TestAMQP, NameResolutionFail)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
 }
 
-TEST(AMQP_Connection, InvalidPort)
+TEST_F(TestAMQP, InvalidPort)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
 }
 
-TEST(AMQP_Connection, InvalidHost)
+TEST_F(TestAMQP, InvalidHost)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
 }
 
-TEST(AMQP_Connection, InvalidVhost)
+TEST_F(TestAMQP, InvalidVhost)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
 }
 
-TEST(AMQP_Connection, UserPassword)
+TEST_F(TestAMQP, UserPassword)
 {
   amqp_mock::set_valid_host("127.0.0.1");
   {
     const auto connection_number = amqp::get_connection_count();
-    amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", cct);
+    amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -90,7 +118,7 @@ TEST(AMQP_Connection, UserPassword)
   amqp_mock::set_valid_host("127.0.0.2");
   {
     const auto connection_number = amqp::get_connection_count();
-    amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", cct);
+    amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -99,27 +127,27 @@ TEST(AMQP_Connection, UserPassword)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST(AMQP_Connection, URLParseError)
+TEST_F(TestAMQP, URLParseError)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1");
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
 }
 
-TEST(AMQP_Connection, ExchangeMismatch)
+TEST_F(TestAMQP, ExchangeMismatch)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2", cct);
+  amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2");
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
 }
 
-TEST(AMQP_Connection, MaxConnections)
+TEST_F(TestAMQP, MaxConnections)
 {
   // fill up all connections
   std::vector<amqp::connection_ptr_t> connections;
@@ -127,7 +155,7 @@ TEST(AMQP_Connection, MaxConnections)
   while (remaining_connections > 0) {
     const auto host = "127.10.0." + std::to_string(remaining_connections);
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
     EXPECT_TRUE(conn);
     auto rc = amqp::publish(conn, "topic", "message");
     EXPECT_EQ(rc, 0);
@@ -139,7 +167,7 @@ TEST(AMQP_Connection, MaxConnections)
   {
     const std::string host = "toomany";
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
     EXPECT_FALSE(conn);
     auto rc = amqp::publish(conn, "topic", "message");
     EXPECT_LT(rc, 0);
@@ -170,12 +198,12 @@ void my_callback_expect_nack(int rc) {
   callback_invoked = true;
 }
 
-TEST(AMQP_PublishAndWait, ReceiveAck)
+TEST_F(TestAMQP, ReceiveAck)
 {
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
@@ -185,13 +213,13 @@ TEST(AMQP_PublishAndWait, ReceiveAck)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST(AMQP_PublishAndWait, ReceiveNack)
+TEST_F(TestAMQP, ReceiveNack)
 {
   callback_invoked = false;
   amqp_mock::REPLY_ACK = false;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
@@ -202,13 +230,13 @@ TEST(AMQP_PublishAndWait, ReceiveNack)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST(AMQP_PublishAndWait, FailWrite)
+TEST_F(TestAMQP, FailWrite)
 {
   callback_invoked = false;
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
@@ -219,13 +247,13 @@ TEST(AMQP_PublishAndWait, FailWrite)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST(AMQP_PublishAndWait, ClosedConnection)
+TEST_F(TestAMQP, ClosedConnection)
 {
   callback_invoked = false;
   const auto current_connections = amqp::get_connection_count();
   const std::string host("localhost3");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
   EXPECT_TRUE(amqp::disconnect(conn));
@@ -240,11 +268,11 @@ TEST(AMQP_PublishAndWait, ClosedConnection)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST(AMQP_ConnectionRetry, InvalidHost)
+TEST_F(TestAMQP, RetryInvalidHost)
 {
   const std::string host = "192.168.0.1";
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -257,11 +285,11 @@ TEST(AMQP_ConnectionRetry, InvalidHost)
   amqp_mock::set_valid_host("localhost");
 }
 
-TEST(AMQP_ConnectionRetry, InvalidPort)
+TEST_F(TestAMQP, RetryInvalidPort)
 {
   const int port = 9999;
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -274,13 +302,13 @@ TEST(AMQP_ConnectionRetry, InvalidPort)
   amqp_mock::set_valid_port(5672);
 }
 
-TEST(AMQP_ConnectionRetry, FailWrite)
+TEST_F(TestAMQP, RetryFailWrite)
 {
   callback_invoked = false;
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost4");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);