using namespace rgw;
-const std::chrono::milliseconds wait_time(300);
+const std::chrono::milliseconds wait_time(10);
+const std::chrono::milliseconds long_wait_time = wait_time*50;
+
class CctCleaner {
CephContext* cct;
class TestAMQP : public ::testing::Test {
protected:
+ amqp::connection_ptr_t conn = nullptr;
+ unsigned current_dequeued = 0U;
+
void SetUp() override {
ASSERT_TRUE(amqp::init(cct));
}
void TearDown() override {
amqp::shutdown();
}
+
+ // wait for at least one new (since last drain) message to be dequeueud
+ // and then wait for all pending answers to be received
+ void wait_until_drained() {
+ while (amqp::get_dequeued() == current_dequeued) {
+ std::this_thread::sleep_for(wait_time);
+ }
+ while (amqp::get_inflight() > 0) {
+ std::this_thread::sleep_for(wait_time);
+ }
+ current_dequeued = amqp::get_dequeued();
+ }
};
TEST_F(TestAMQP, ConnectionOK)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1");
+ conn = amqp::connect("amqp://localhost", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, NameResolutionFail)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1");
+ conn = amqp::connect("amqp://kaboom", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidPort)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1");
+ conn = amqp::connect("amqp://localhost:1234", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidHost)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1");
+ conn = amqp::connect("amqp://0.0.0.1", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidVhost)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1");
+ conn = amqp::connect("amqp://localhost/kaboom", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.1");
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
+ conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.2");
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
+ conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, URLParseError)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1");
+ conn = amqp::connect("http://localhost", "ex1");
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, ExchangeMismatch)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2");
+ conn = amqp::connect("http://localhost", "ex2");
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_TRUE(amqp::disconnect(conn));
}
// wait for them to be deleted
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections());
}
}
};
+void my_callback_expect_close_or_ack(int rc) {
+ // deleting the connection should trigger the callback with -4098
+ // but due to race conditions, some my get an ack
+ EXPECT_TRUE(-4098 == rc || 0 == rc);
+}
TEST_F(TestAMQP, ReceiveAck)
{
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_TRUE(callback_invoked);
+ amqp_mock::set_valid_host("localhost");
+}
+
+TEST_F(TestAMQP, ImplicitConnectionClose)
+{
callback_invoked = false;
+ const std::string host("localhost1");
+ amqp_mock::set_valid_host(host);
+ conn = amqp::connect("amqp://" + host, "ex1");
+ EXPECT_TRUE(conn);
+ const auto NUMBER_OF_CALLS = 2000;
+ for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
+ auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
+ EXPECT_EQ(rc, 0);
+ }
+ wait_until_drained();
+ // deleting the connection object should close the connection
+ conn.reset(nullptr);
amqp_mock::set_valid_host("localhost");
}
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
const auto NUMBER_OF_CALLS = 100;
for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
EXPECT_EQ(rc, 0);
}
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
callbacks_invoked = 0;
amqp_mock::set_valid_host("localhost");
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
EXPECT_EQ(rc, 0);
}
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
callbacks_invoked = 0;
amqp_mock::set_valid_host("localhost");
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
EXPECT_EQ(rc, 0);
}
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
callbacks_invoked = 0;
amqp_mock::set_valid_host("localhost");
amqp_mock::REPLY_ACK = false;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_TRUE(callback_invoked);
amqp_mock::REPLY_ACK = true;
callback_invoked = false;
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_TRUE(callback_invoked);
amqp_mock::FAIL_NEXT_WRITE = false;
callback_invoked = false;
const auto current_connections = amqp::get_connection_count();
const std::string host("localhost3");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
EXPECT_TRUE(amqp::disconnect(conn));
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
// make sure number of connections decreased back
EXPECT_EQ(amqp::get_connection_count(), current_connections);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_LT(rc, 0);
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
EXPECT_FALSE(callback_invoked);
callback_invoked = false;
amqp_mock::set_valid_host("localhost");
{
const std::string host = "192.168.0.1";
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1");
+ conn = amqp::connect("amqp://"+host, "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
// now next retry should be ok
amqp_mock::set_valid_host(host);
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
amqp_mock::set_valid_host("localhost");
{
const int port = 9999;
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
+ conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
// now next retry should be ok
amqp_mock::set_valid_port(port);
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
amqp_mock::set_valid_port(5672);
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost4");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
// set port to a different one, so that reconnect would fail
amqp_mock::set_valid_port(9999);
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_TRUE(callback_invoked);
callback_invoked = false;
rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_LT(rc, 0);
// expect immediate failure, no callback called after sleep
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
EXPECT_FALSE(callback_invoked);
// set port to the right one so that reconnect would succeed
amqp_mock::set_valid_port(5672);
callback_invoked = false;
amqp_mock::FAIL_NEXT_WRITE = false;
// give time to reconnect
- std::this_thread::sleep_for(wait_time);
+ std::this_thread::sleep_for(long_wait_time);
// retry to publish should succeed now
rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_TRUE(callback_invoked);
callback_invoked = false;
amqp_mock::set_valid_host("localhost");
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
// failure will take effect after: max(59, 70)
EXPECT_EQ(rc, 0);
}
// connection failes before multiple acks
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
// publish more mesages
expect_zero_rc = true;
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
EXPECT_EQ(rc, 0);
}
- std::this_thread::sleep_for(wait_time);
+ wait_until_drained();
EXPECT_EQ(callbacks_invoked, 2*NUMBER_OF_CALLS);
callbacks_invoked = 0;
amqp_mock::set_valid_host("localhost");