]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/amqp: fix race condition in AMQP unit test 30889/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 6 Oct 2019 08:31:35 +0000 (11:31 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Mon, 14 Oct 2019 09:02:15 +0000 (12:02 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
(cherry picked from commit 5934ef5071a71c7ae8604d5f2a5e83ac4d01d263)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/test/rgw/test_rgw_amqp.cc

index e7147ed1fc83eee557ac97df68de883c851edfa1..92b0d497849fb02a1d946212f17f419195ffa2a9 100644 (file)
@@ -11,7 +11,9 @@
 
 using namespace rgw;
 
-const std::chrono::milliseconds wait_time(300);
+const std::chrono::milliseconds wait_time(10);
+const std::chrono::milliseconds long_wait_time = wait_time*50;
+
 
 class CctCleaner {
   CephContext* cct;
@@ -32,6 +34,9 @@ CctCleaner cleaner(cct);
 
 class TestAMQP : public ::testing::Test {
 protected:
+  amqp::connection_ptr_t conn = nullptr;
+  unsigned current_dequeued = 0U;
+
   void SetUp() override {
     ASSERT_TRUE(amqp::init(cct));
   }
@@ -39,12 +44,24 @@ protected:
   void TearDown() override {
     amqp::shutdown();
   }
+
+  // wait for at least one new (since last drain) message to be dequeueud
+  // and then wait for all pending answers to be received
+  void wait_until_drained() {  
+    while (amqp::get_dequeued() == current_dequeued) {
+      std::this_thread::sleep_for(wait_time);
+    }
+    while (amqp::get_inflight() > 0) {
+      std::this_thread::sleep_for(wait_time);
+    }
+    current_dequeued = amqp::get_dequeued();
+  }
 };
 
 TEST_F(TestAMQP, ConnectionOK)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1");
+  conn = amqp::connect("amqp://localhost", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -66,7 +83,7 @@ TEST_F(TestAMQP, ConnectionReuse)
 TEST_F(TestAMQP, NameResolutionFail)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1");
+  conn = amqp::connect("amqp://kaboom", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -76,7 +93,7 @@ TEST_F(TestAMQP, NameResolutionFail)
 TEST_F(TestAMQP, InvalidPort)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1");
+  conn = amqp::connect("amqp://localhost:1234", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -86,7 +103,7 @@ TEST_F(TestAMQP, InvalidPort)
 TEST_F(TestAMQP, InvalidHost)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1");
+  conn = amqp::connect("amqp://0.0.0.1", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -96,7 +113,7 @@ TEST_F(TestAMQP, InvalidHost)
 TEST_F(TestAMQP, InvalidVhost)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1");
+  conn = amqp::connect("amqp://localhost/kaboom", "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -108,7 +125,7 @@ TEST_F(TestAMQP, UserPassword)
   amqp_mock::set_valid_host("127.0.0.1");
   {
     const auto connection_number = amqp::get_connection_count();
-    amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
+    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -118,7 +135,7 @@ TEST_F(TestAMQP, UserPassword)
   amqp_mock::set_valid_host("127.0.0.2");
   {
     const auto connection_number = amqp::get_connection_count();
-    amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
+    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -130,7 +147,7 @@ TEST_F(TestAMQP, UserPassword)
 TEST_F(TestAMQP, URLParseError)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1");
+  conn = amqp::connect("http://localhost", "ex1");
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -140,7 +157,7 @@ TEST_F(TestAMQP, URLParseError)
 TEST_F(TestAMQP, ExchangeMismatch)
 {
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2");
+  conn = amqp::connect("http://localhost", "ex2");
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -179,7 +196,7 @@ TEST_F(TestAMQP, MaxConnections)
     EXPECT_TRUE(amqp::disconnect(conn));
   }
   // wait for them to be deleted
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections());
 }
 
@@ -218,19 +235,41 @@ public:
     }
 };
 
+void my_callback_expect_close_or_ack(int rc) {
+  // deleting the connection should trigger the callback with -4098
+  // but due to race conditions, some my get an ack
+  EXPECT_TRUE(-4098 == rc || 0 == rc);
+}
 
 TEST_F(TestAMQP, ReceiveAck)
 {
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_TRUE(callback_invoked);
+  amqp_mock::set_valid_host("localhost");
+}
+
+TEST_F(TestAMQP, ImplicitConnectionClose)
+{
   callback_invoked = false;
+  const std::string host("localhost1");
+  amqp_mock::set_valid_host(host);
+  conn = amqp::connect("amqp://" + host, "ex1");
+  EXPECT_TRUE(conn);
+  const auto NUMBER_OF_CALLS = 2000;
+  for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
+    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
+    EXPECT_EQ(rc, 0);
+  }
+  wait_until_drained();
+  // deleting the connection object should close the connection
+  conn.reset(nullptr);
   amqp_mock::set_valid_host("localhost");
 }
 
@@ -239,14 +278,14 @@ TEST_F(TestAMQP, ReceiveMultipleAck)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
     auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
     EXPECT_EQ(rc, 0);
   }
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
   callbacks_invoked = 0;
   amqp_mock::set_valid_host("localhost");
@@ -257,7 +296,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
@@ -265,7 +304,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple)
     auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
     EXPECT_EQ(rc, 0);
   }
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
   callbacks_invoked = 0;
   amqp_mock::set_valid_host("localhost");
@@ -276,7 +315,7 @@ TEST_F(TestAMQP, DynamicCallback)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
@@ -285,7 +324,7 @@ TEST_F(TestAMQP, DynamicCallback)
             std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
     EXPECT_EQ(rc, 0);
   }
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
   callbacks_invoked = 0;
   amqp_mock::set_valid_host("localhost");
@@ -297,11 +336,11 @@ TEST_F(TestAMQP, ReceiveNack)
   amqp_mock::REPLY_ACK = false;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_TRUE(callback_invoked);
   amqp_mock::REPLY_ACK = true;
   callback_invoked = false;
@@ -314,11 +353,11 @@ TEST_F(TestAMQP, FailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_TRUE(callback_invoked);
   amqp_mock::FAIL_NEXT_WRITE = false;
   callback_invoked = false;
@@ -331,16 +370,16 @@ TEST_F(TestAMQP, ClosedConnection)
   const auto current_connections = amqp::get_connection_count();
   const std::string host("localhost3");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
   EXPECT_TRUE(amqp::disconnect(conn));
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   // make sure number of connections decreased back
   EXPECT_EQ(amqp::get_connection_count(), current_connections);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_LT(rc, 0);
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   EXPECT_FALSE(callback_invoked);
   callback_invoked = false;
   amqp_mock::set_valid_host("localhost");
@@ -350,14 +389,14 @@ TEST_F(TestAMQP, RetryInvalidHost)
 {
   const std::string host = "192.168.0.1";
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1");
+  conn = amqp::connect("amqp://"+host, "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
   // now next retry should be ok
   amqp_mock::set_valid_host(host);
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   rc = amqp::publish(conn, "topic", "message");
   EXPECT_EQ(rc, 0);
   amqp_mock::set_valid_host("localhost");
@@ -367,14 +406,14 @@ TEST_F(TestAMQP, RetryInvalidPort)
 {
   const int port = 9999;
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
+  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
   EXPECT_LT(rc, 0);
   // now next retry should be ok
   amqp_mock::set_valid_port(port);
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   rc = amqp::publish(conn, "topic", "message");
   EXPECT_EQ(rc, 0);
   amqp_mock::set_valid_port(5672);
@@ -386,30 +425,30 @@ TEST_F(TestAMQP, RetryFailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost4");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
   // set port to a different one, so that reconnect would fail
   amqp_mock::set_valid_port(9999);
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_TRUE(callback_invoked);
   callback_invoked = false;
   rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_LT(rc, 0);
   // expect immediate failure, no callback called after sleep
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   EXPECT_FALSE(callback_invoked);
   // set port to the right one so that reconnect would succeed
   amqp_mock::set_valid_port(5672);
   callback_invoked = false;
   amqp_mock::FAIL_NEXT_WRITE = false;
   // give time to reconnect
-  std::this_thread::sleep_for(wait_time);
+  std::this_thread::sleep_for(long_wait_time);
   // retry to publish should succeed now
   rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_TRUE(callback_invoked);
   callback_invoked = false;
   amqp_mock::set_valid_host("localhost");
@@ -441,7 +480,7 @@ TEST_F(TestAMQP, AcksWithReconnect)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1");
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   // failure will take effect after: max(59, 70)
@@ -454,7 +493,7 @@ TEST_F(TestAMQP, AcksWithReconnect)
     EXPECT_EQ(rc, 0);
   }
   // connection failes before multiple acks
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
   // publish more mesages
   expect_zero_rc = true;
@@ -462,7 +501,7 @@ TEST_F(TestAMQP, AcksWithReconnect)
     auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
     EXPECT_EQ(rc, 0);
   }
-  std::this_thread::sleep_for(wait_time);
+  wait_until_drained();
   EXPECT_EQ(callbacks_invoked, 2*NUMBER_OF_CALLS);
   callbacks_invoked = 0;
   amqp_mock::set_valid_host("localhost");