From: Yuval Lifshitz Date: Sun, 31 Mar 2019 19:52:27 +0000 (+0300) Subject: rgw/pubsub: avoid static creation of amqp manager X-Git-Tag: v15.1.0~3002^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=376a3df0d12b544a0378a1467fcf4b2c32fd8afa;p=ceph.git rgw/pubsub: avoid static creation of amqp manager Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index f9acabb2c5f..4143eaae977 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "rgw_amqp.h" +#include #include #include #include @@ -533,6 +534,7 @@ private: MessageQueue messages; std::atomic queued; std::atomic dequeued; + CephContext* const cct; mutable std::mutex connections_lock; std::thread runner; @@ -634,7 +636,6 @@ private: for (;conn_it != end_it;) { auto& conn = conn_it->second; - ldout(conn->cct, 20) << "AMQP run: start processing connection" << dendl; // delete the connection if marked for deletion if (conn->marked_for_deletion) { ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl; @@ -668,7 +669,6 @@ private: if (rc == AMQP_STATUS_TIMEOUT) { // TODO mark connection as idle - ldout(conn->cct, 20) << "AMQP run: connection is idle" << dendl; INCREMENT_AND_CONTINUE(conn_it); } @@ -777,7 +777,8 @@ public: Manager(size_t _max_connections, size_t _max_inflight, size_t _max_queue, - long _usec_timeout) : + long _usec_timeout, + CephContext* _cct) : max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), @@ -788,6 +789,7 @@ public: messages(max_queue), queued(0), dequeued(0), + cct(_cct), runner(&Manager::run, this) { // The hashmap has "max connections" as the initial number of buckets, // and allows for 10 collisions per bucket before rehash. @@ -818,7 +820,7 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) { + connection_ptr_t connect(const std::string& url, const std::string& exchange) { if (stopped) { // TODO: increment counter ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; @@ -939,33 +941,36 @@ public: // singleton manager // note that the manager itself is not a singleton, and multiple instances may co-exist +// TODO make the pointer atomic in allocation and deallocation to avoid race conditions static Manager* s_manager = nullptr; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; -class SingletonManager { - std::mutex manager_creation_lock; - public: - SingletonManager(CephContext* cct=nullptr) { - // TODO get parameters from conf - std::lock_guard l(manager_creation_lock); - if (s_manager == nullptr) { - s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100); - } - } -}; +bool init(CephContext* cct) { + if (s_manager) { + return false; + } + // TODO: take conf from CephContext + s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct); + return true; +} + +void shutdown() { + delete s_manager; + s_manager = nullptr; +} -connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) { - static const SingletonManager singleton(cct); - return s_manager->connect(url, exchange, cct); +connection_ptr_t connect(const std::string& url, const std::string& exchange) { + if (!s_manager) return nullptr; + return s_manager->connect(url, exchange); } int publish(connection_ptr_t& conn, const std::string& topic, const std::string& message) { - if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED; + if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; return s_manager->publish(conn, topic, message); } @@ -973,47 +978,47 @@ int publish_with_confirm(connection_ptr_t& conn, const std::string& topic, const std::string& message, reply_callback_t cb) { - if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED; + if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; return s_manager->publish_with_confirm(conn, topic, message, cb); } size_t get_connection_count() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_connection_count(); } size_t get_inflight() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_inflight(); } size_t get_queued() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_queued(); } size_t get_dequeued() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_dequeued(); } size_t get_max_connections() { - if (s_manager == nullptr) return MAX_CONNECTIONS_DEFAULT; + if (!s_manager) return MAX_CONNECTIONS_DEFAULT; return s_manager->max_connections; } size_t get_max_inflight() { - if (s_manager == nullptr) return MAX_INFLIGHT_DEFAULT; + if (!s_manager) return MAX_INFLIGHT_DEFAULT; return s_manager->max_inflight; } size_t get_max_queue() { - if (s_manager == nullptr) return MAX_QUEUE_DEFAULT; + if (!s_manager) return MAX_QUEUE_DEFAULT; return s_manager->max_queue; } bool disconnect(connection_ptr_t& conn) { - if (s_manager == nullptr) return false; + if (!s_manager) return false; return s_manager->disconnect(conn); } diff --git a/src/rgw/rgw_amqp.h b/src/rgw/rgw_amqp.h index 2549116d289..938bdade495 100644 --- a/src/rgw/rgw_amqp.h +++ b/src/rgw/rgw_amqp.h @@ -23,8 +23,14 @@ void intrusive_ptr_release(const connection_t* p); // indicating the result, and not to return anything typedef std::function reply_callback_t; +// initialize the amqp manager +bool init(CephContext* cct); + +// shutdown the amqp manager +void shutdown(); + // connect to an amqp endpoint -connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct); +connection_ptr_t connect(const std::string& url, const std::string& exchange); // publish a message over a connection that was already created int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index ad21c397403..63ef569bc11 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -42,6 +42,10 @@ #include "rgw_asio_frontend.h" #endif /* WITH_RADOSGW_BEAST_FRONTEND */ +#ifdef WITH_RADOSGW_AMQP_ENDPOINT +#include "rgw_amqp.h" +#endif /* WITH_RADOSGW_AMQP_ENDPOINT */ + #include "rgw_dmclock_scheduler_ctx.h" #include "services/svc_zone.h" @@ -302,6 +306,10 @@ int main(int argc, const char **argv) FCGX_Init(); #endif +#ifdef WITH_RADOSGW_AMQP_ENDPOINT + rgw::amqp::init(cct.get()); +#endif + RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, g_conf()->rgw_enable_gc_threads, @@ -594,6 +602,10 @@ int main(int argc, const char **argv) rgw_http_client_cleanup(); rgw::curl::cleanup_curl(); +#ifdef WITH_RADOSGW_AMQP_ENDPOINT + rgw::amqp::shutdown(); +#endif + rgw_perf_stop(g_ceph_context); dout(1) << "final shutdown" << dendl; diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 1244ee8ccb9..81c9aefa1a6 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -258,7 +258,7 @@ public: CephContext* cct) : endpoint(_endpoint), topic(_topic), - conn(amqp::connect(endpoint, get_exchange(args), cct)) { + conn(amqp::connect(endpoint, get_exchange(args))) { bool exists; // get ack level str_ack_level = args.get("amqp-ack-level", &exists); diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc index 41b395acca8..89086d3ec82 100644 --- a/src/test/rgw/test_rgw_amqp.cc +++ b/src/test/rgw/test_rgw_amqp.cc @@ -13,74 +13,102 @@ using namespace rgw; const std::chrono::milliseconds wait_time(300); +class CctCleaner { + CephContext* cct; +public: + CctCleaner(CephContext* _cct) : cct(_cct) {} + ~CctCleaner() { +#ifdef WITH_SEASTAR + delete cct; +#else + cct->put(); +#endif + } +}; + auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); -TEST(AMQP_Connection, ConnectionOK) +CctCleaner cleaner(cct); + +class TestAMQP : public ::testing::Test { +protected: + void SetUp() override { + ASSERT_TRUE(amqp::init(cct)); + } + + void TearDown() override { + amqp::shutdown(); + } +}; + +TEST_F(TestAMQP, ConnectionOK) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); } -TEST(AMQP_Connection, ConnectionReuse) +TEST_F(TestAMQP, ConnectionReuse) { + amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1"); + EXPECT_TRUE(conn1); const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1", cct); - EXPECT_TRUE(conn); + amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1"); + EXPECT_TRUE(conn2); EXPECT_EQ(amqp::get_connection_count(), connection_number); - auto rc = amqp::publish(conn, "topic", "message"); + auto rc = amqp::publish(conn1, "topic", "message"); EXPECT_EQ(rc, 0); } -TEST(AMQP_Connection, NameResolutionFail) +TEST_F(TestAMQP, NameResolutionFail) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); } -TEST(AMQP_Connection, InvalidPort) +TEST_F(TestAMQP, InvalidPort) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); } -TEST(AMQP_Connection, InvalidHost) +TEST_F(TestAMQP, InvalidHost) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); } -TEST(AMQP_Connection, InvalidVhost) +TEST_F(TestAMQP, InvalidVhost) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); } -TEST(AMQP_Connection, UserPassword) +TEST_F(TestAMQP, UserPassword) { amqp_mock::set_valid_host("127.0.0.1"); { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -90,7 +118,7 @@ TEST(AMQP_Connection, UserPassword) amqp_mock::set_valid_host("127.0.0.2"); { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -99,27 +127,27 @@ TEST(AMQP_Connection, UserPassword) amqp_mock::set_valid_host("localhost"); } -TEST(AMQP_Connection, URLParseError) +TEST_F(TestAMQP, URLParseError) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1"); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); } -TEST(AMQP_Connection, ExchangeMismatch) +TEST_F(TestAMQP, ExchangeMismatch) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2", cct); + amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2"); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); } -TEST(AMQP_Connection, MaxConnections) +TEST_F(TestAMQP, MaxConnections) { // fill up all connections std::vector connections; @@ -127,7 +155,7 @@ TEST(AMQP_Connection, MaxConnections) while (remaining_connections > 0) { const auto host = "127.10.0." + std::to_string(remaining_connections); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); @@ -139,7 +167,7 @@ TEST(AMQP_Connection, MaxConnections) { const std::string host = "toomany"; amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_FALSE(conn); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); @@ -170,12 +198,12 @@ void my_callback_expect_nack(int rc) { callback_invoked = true; } -TEST(AMQP_PublishAndWait, ReceiveAck) +TEST_F(TestAMQP, ReceiveAck) { callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); @@ -185,13 +213,13 @@ TEST(AMQP_PublishAndWait, ReceiveAck) amqp_mock::set_valid_host("localhost"); } -TEST(AMQP_PublishAndWait, ReceiveNack) +TEST_F(TestAMQP, ReceiveNack) { callback_invoked = false; amqp_mock::REPLY_ACK = false; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); @@ -202,13 +230,13 @@ TEST(AMQP_PublishAndWait, ReceiveNack) amqp_mock::set_valid_host("localhost"); } -TEST(AMQP_PublishAndWait, FailWrite) +TEST_F(TestAMQP, FailWrite) { callback_invoked = false; amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); @@ -219,13 +247,13 @@ TEST(AMQP_PublishAndWait, FailWrite) amqp_mock::set_valid_host("localhost"); } -TEST(AMQP_PublishAndWait, ClosedConnection) +TEST_F(TestAMQP, ClosedConnection) { callback_invoked = false; const auto current_connections = amqp::get_connection_count(); const std::string host("localhost3"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), current_connections + 1); EXPECT_TRUE(amqp::disconnect(conn)); @@ -240,11 +268,11 @@ TEST(AMQP_PublishAndWait, ClosedConnection) amqp_mock::set_valid_host("localhost"); } -TEST(AMQP_ConnectionRetry, InvalidHost) +TEST_F(TestAMQP, RetryInvalidHost) { const std::string host = "192.168.0.1"; const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -257,11 +285,11 @@ TEST(AMQP_ConnectionRetry, InvalidHost) amqp_mock::set_valid_host("localhost"); } -TEST(AMQP_ConnectionRetry, InvalidPort) +TEST_F(TestAMQP, RetryInvalidPort) { const int port = 9999; const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -274,13 +302,13 @@ TEST(AMQP_ConnectionRetry, InvalidPort) amqp_mock::set_valid_port(5672); } -TEST(AMQP_ConnectionRetry, FailWrite) +TEST_F(TestAMQP, RetryFailWrite) { callback_invoked = false; amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost4"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0);