From a544dfe45679495ca493ce60c7ae9e9700089101 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 6 Oct 2019 11:31:35 +0300 Subject: [PATCH] rgw/amqp: fix race condition in AMQP unit test Signed-off-by: Yuval Lifshitz (cherry picked from commit 5934ef5071a71c7ae8604d5f2a5e83ac4d01d263) Signed-off-by: Yuval Lifshitz --- src/test/rgw/test_rgw_amqp.cc | 115 +++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 38 deletions(-) diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc index e7147ed1fc83e..92b0d497849fb 100644 --- a/src/test/rgw/test_rgw_amqp.cc +++ b/src/test/rgw/test_rgw_amqp.cc @@ -11,7 +11,9 @@ using namespace rgw; -const std::chrono::milliseconds wait_time(300); +const std::chrono::milliseconds wait_time(10); +const std::chrono::milliseconds long_wait_time = wait_time*50; + class CctCleaner { CephContext* cct; @@ -32,6 +34,9 @@ CctCleaner cleaner(cct); class TestAMQP : public ::testing::Test { protected: + amqp::connection_ptr_t conn = nullptr; + unsigned current_dequeued = 0U; + void SetUp() override { ASSERT_TRUE(amqp::init(cct)); } @@ -39,12 +44,24 @@ protected: void TearDown() override { amqp::shutdown(); } + + // wait for at least one new (since last drain) message to be dequeueud + // and then wait for all pending answers to be received + void wait_until_drained() { + while (amqp::get_dequeued() == current_dequeued) { + std::this_thread::sleep_for(wait_time); + } + while (amqp::get_inflight() > 0) { + std::this_thread::sleep_for(wait_time); + } + current_dequeued = amqp::get_dequeued(); + } }; TEST_F(TestAMQP, ConnectionOK) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1"); + conn = amqp::connect("amqp://localhost", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -66,7 +83,7 @@ TEST_F(TestAMQP, ConnectionReuse) TEST_F(TestAMQP, NameResolutionFail) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1"); + conn = amqp::connect("amqp://kaboom", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -76,7 +93,7 @@ TEST_F(TestAMQP, NameResolutionFail) TEST_F(TestAMQP, InvalidPort) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1"); + conn = amqp::connect("amqp://localhost:1234", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -86,7 +103,7 @@ TEST_F(TestAMQP, InvalidPort) TEST_F(TestAMQP, InvalidHost) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1"); + conn = amqp::connect("amqp://0.0.0.1", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -96,7 +113,7 @@ TEST_F(TestAMQP, InvalidHost) TEST_F(TestAMQP, InvalidVhost) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1"); + conn = amqp::connect("amqp://localhost/kaboom", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -108,7 +125,7 @@ TEST_F(TestAMQP, UserPassword) amqp_mock::set_valid_host("127.0.0.1"); { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1"); + conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -118,7 +135,7 @@ TEST_F(TestAMQP, UserPassword) amqp_mock::set_valid_host("127.0.0.2"); { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1"); + conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -130,7 +147,7 @@ TEST_F(TestAMQP, UserPassword) TEST_F(TestAMQP, URLParseError) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1"); + conn = amqp::connect("http://localhost", "ex1"); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); @@ -140,7 +157,7 @@ TEST_F(TestAMQP, URLParseError) TEST_F(TestAMQP, ExchangeMismatch) { const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2"); + conn = amqp::connect("http://localhost", "ex2"); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); @@ -179,7 +196,7 @@ TEST_F(TestAMQP, MaxConnections) EXPECT_TRUE(amqp::disconnect(conn)); } // wait for them to be deleted - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections()); } @@ -218,19 +235,41 @@ public: } }; +void my_callback_expect_close_or_ack(int rc) { + // deleting the connection should trigger the callback with -4098 + // but due to race conditions, some my get an ack + EXPECT_TRUE(-4098 == rc || 0 == rc); +} TEST_F(TestAMQP, ReceiveAck) { callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_TRUE(callback_invoked); + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, ImplicitConnectionClose) +{ callback_invoked = false; + const std::string host("localhost1"); + amqp_mock::set_valid_host(host); + conn = amqp::connect("amqp://" + host, "ex1"); + EXPECT_TRUE(conn); + const auto NUMBER_OF_CALLS = 2000; + for (auto i = 0; i < NUMBER_OF_CALLS; ++i) { + auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack); + EXPECT_EQ(rc, 0); + } + wait_until_drained(); + // deleting the connection object should close the connection + conn.reset(nullptr); amqp_mock::set_valid_host("localhost"); } @@ -239,14 +278,14 @@ TEST_F(TestAMQP, ReceiveMultipleAck) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); const auto NUMBER_OF_CALLS = 100; for (auto i=0; i < NUMBER_OF_CALLS; ++i) { auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks); EXPECT_EQ(rc, 0); } - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); callbacks_invoked = 0; amqp_mock::set_valid_host("localhost"); @@ -257,7 +296,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; @@ -265,7 +304,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple) auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks); EXPECT_EQ(rc, 0); } - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); callbacks_invoked = 0; amqp_mock::set_valid_host("localhost"); @@ -276,7 +315,7 @@ TEST_F(TestAMQP, DynamicCallback) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; @@ -285,7 +324,7 @@ TEST_F(TestAMQP, DynamicCallback) std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1)); EXPECT_EQ(rc, 0); } - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); callbacks_invoked = 0; amqp_mock::set_valid_host("localhost"); @@ -297,11 +336,11 @@ TEST_F(TestAMQP, ReceiveNack) amqp_mock::REPLY_ACK = false; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_TRUE(callback_invoked); amqp_mock::REPLY_ACK = true; callback_invoked = false; @@ -314,11 +353,11 @@ TEST_F(TestAMQP, FailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_TRUE(callback_invoked); amqp_mock::FAIL_NEXT_WRITE = false; callback_invoked = false; @@ -331,16 +370,16 @@ TEST_F(TestAMQP, ClosedConnection) const auto current_connections = amqp::get_connection_count(); const std::string host("localhost3"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), current_connections + 1); EXPECT_TRUE(amqp::disconnect(conn)); - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); // make sure number of connections decreased back EXPECT_EQ(amqp::get_connection_count(), current_connections); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_LT(rc, 0); - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); EXPECT_FALSE(callback_invoked); callback_invoked = false; amqp_mock::set_valid_host("localhost"); @@ -350,14 +389,14 @@ TEST_F(TestAMQP, RetryInvalidHost) { const std::string host = "192.168.0.1"; const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1"); + conn = amqp::connect("amqp://"+host, "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); // now next retry should be ok amqp_mock::set_valid_host(host); - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); amqp_mock::set_valid_host("localhost"); @@ -367,14 +406,14 @@ TEST_F(TestAMQP, RetryInvalidPort) { const int port = 9999; const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1"); + conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1"); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); // now next retry should be ok amqp_mock::set_valid_port(port); - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); amqp_mock::set_valid_port(5672); @@ -386,30 +425,30 @@ TEST_F(TestAMQP, RetryFailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost4"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); // set port to a different one, so that reconnect would fail amqp_mock::set_valid_port(9999); - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_TRUE(callback_invoked); callback_invoked = false; rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_LT(rc, 0); // expect immediate failure, no callback called after sleep - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); EXPECT_FALSE(callback_invoked); // set port to the right one so that reconnect would succeed amqp_mock::set_valid_port(5672); callback_invoked = false; amqp_mock::FAIL_NEXT_WRITE = false; // give time to reconnect - std::this_thread::sleep_for(wait_time); + std::this_thread::sleep_for(long_wait_time); // retry to publish should succeed now rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_TRUE(callback_invoked); callback_invoked = false; amqp_mock::set_valid_host("localhost"); @@ -441,7 +480,7 @@ TEST_F(TestAMQP, AcksWithReconnect) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1"); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); // failure will take effect after: max(59, 70) @@ -454,7 +493,7 @@ TEST_F(TestAMQP, AcksWithReconnect) EXPECT_EQ(rc, 0); } // connection failes before multiple acks - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); // publish more mesages expect_zero_rc = true; @@ -462,7 +501,7 @@ TEST_F(TestAMQP, AcksWithReconnect) auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure); EXPECT_EQ(rc, 0); } - std::this_thread::sleep_for(wait_time); + wait_until_drained(); EXPECT_EQ(callbacks_invoked, 2*NUMBER_OF_CALLS); callbacks_invoked = 0; amqp_mock::set_valid_host("localhost"); -- 2.39.5