amqp_mock::set_valid_host("localhost");
}
-int fail_after = -1;
-int recover_after = -1;
-bool expect_zero_rc = true;
-
-void my_callback_triggering_failure(int rc) {
- if (expect_zero_rc) {
- EXPECT_EQ(rc, 0);
- } else {
- EXPECT_NE(rc, 0);
- }
- ++callbacks_invoked;
- if (fail_after == callbacks_invoked) {
- amqp_mock::FAIL_NEXT_READ = true;
- expect_zero_rc = false;
-
- }
- if (recover_after == callbacks_invoked) {
- amqp_mock::FAIL_NEXT_READ = false;
- }
-}
-
-TEST_F(TestAMQP, AcksWithReconnect)
-{
- callbacks_invoked = 0;
- const std::string host("localhost1");
- amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
- EXPECT_TRUE(conn);
- amqp_mock::set_multiple(59);
- // failure will take effect after: max(59, 70)
- fail_after = 70;
- // all callback are flushed during failure, so, recover will take effect after: max(90, 100)
- recover_after = 90;
- const auto NUMBER_OF_CALLS = 100;
- for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
- EXPECT_EQ(rc, 0);
- }
- // connection failes before multiple acks
- wait_until_drained();
- EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
- // publish more mesages
- expect_zero_rc = true;
- for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
- auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
- EXPECT_EQ(rc, 0);
- }
- wait_until_drained();
- EXPECT_EQ(callbacks_invoked, 2*NUMBER_OF_CALLS);
- callbacks_invoked = 0;
- amqp_mock::set_valid_host("localhost");
- fail_after = -1;
-}
-