}
}
-template <typename T>
-void _assert_eq(ConnectionRef conn,
- unsigned index,
- const char* expr_actual, T actual,
- const char* expr_expected, T expected) {
- if (actual != expected) {
- throw std::runtime_error(fmt::format(
- "[{}] {} '{}' is actually {}, not the expected '{}' {}",
- index, *conn, expr_actual, actual, expr_expected, expected));
- }
-}
-#define ASSERT_EQUAL(conn, index, actual, expected) \
- _assert_eq(conn, index, #actual, actual, #expected, expected)
-
struct ConnResult {
ConnectionRef conn;
unsigned index;
ConnResult(Connection& conn, unsigned index)
: conn(conn.shared_from_this()), index(index) {}
+ template <typename T>
+ void _assert_eq(const char* expr_actual, T actual,
+ const char* expr_expected, T expected) const {
+ if (actual != expected) {
+ throw std::runtime_error(fmt::format(
+ "[{}] {} '{}' is actually {}, not the expected '{}' {}",
+ index, *conn, expr_actual, actual, expr_expected, expected));
+ }
+ }
+
+#define ASSERT_EQUAL(actual, expected) \
+ _assert_eq(#actual, actual, #expected, expected)
+
void assert_state_at(conn_state_t expected) const {
- ASSERT_EQUAL(conn, index, state, expected);
+ ASSERT_EQUAL(state, expected);
}
void assert_connect(unsigned attempts,
unsigned connects,
unsigned reconnects,
unsigned dispatched) const {
- ASSERT_EQUAL(conn, index, connect_attempts, attempts);
- ASSERT_EQUAL(conn, index, client_connect_attempts, connects);
- ASSERT_EQUAL(conn, index, client_reconnect_attempts, reconnects);
- ASSERT_EQUAL(conn, index, cnt_connect_dispatched, dispatched);
+ ASSERT_EQUAL(connect_attempts, attempts);
+ ASSERT_EQUAL(client_connect_attempts, connects);
+ ASSERT_EQUAL(client_reconnect_attempts, reconnects);
+ ASSERT_EQUAL(cnt_connect_dispatched, dispatched);
+ }
+
+ void assert_connect(unsigned attempts,
+ unsigned dispatched) const {
+ ASSERT_EQUAL(connect_attempts, attempts);
+ ASSERT_EQUAL(cnt_connect_dispatched, dispatched);
}
void assert_accept(unsigned attempts,
unsigned accepts,
unsigned reaccepts,
unsigned dispatched) const {
- ASSERT_EQUAL(conn, index, accept_attempts, attempts);
- ASSERT_EQUAL(conn, index, server_connect_attempts, accepts);
- ASSERT_EQUAL(conn, index, server_reconnect_attempts, reaccepts);
- ASSERT_EQUAL(conn, index, cnt_accept_dispatched, dispatched);
+ ASSERT_EQUAL(accept_attempts, attempts);
+ ASSERT_EQUAL(server_connect_attempts, accepts);
+ ASSERT_EQUAL(server_reconnect_attempts, reaccepts);
+ ASSERT_EQUAL(cnt_accept_dispatched, dispatched);
}
void assert_accept(unsigned attempts,
- unsigned total_accepts,
unsigned dispatched) const {
- ASSERT_EQUAL(conn, index, accept_attempts, attempts);
- ASSERT_EQUAL(conn, index, server_connect_attempts + server_reconnect_attempts, total_accepts);
- ASSERT_EQUAL(conn, index, cnt_accept_dispatched, dispatched);
+ ASSERT_EQUAL(accept_attempts, attempts);
+ ASSERT_EQUAL(cnt_accept_dispatched, dispatched);
}
void assert_reset(unsigned local, unsigned remote) const {
- ASSERT_EQUAL(conn, index, cnt_reset_dispatched, local);
- ASSERT_EQUAL(conn, index, cnt_remote_reset_dispatched, remote);
+ ASSERT_EQUAL(cnt_reset_dispatched, local);
+ ASSERT_EQUAL(cnt_remote_reset_dispatched, remote);
}
void dump() const {
if (bp == custom_bp_t::SOCKET_CONNECTING) {
++result->connect_attempts;
+ logger().info("[Test] connect_attempts={}", result->connect_attempts);
} else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
++result->client_connect_attempts;
+ logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
} else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
++result->client_reconnect_attempts;
+ logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
} else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
++result->accept_attempts;
+ logger().info("[Test] accept_attempts={}", result->accept_attempts);
} else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
++result->server_connect_attempts;
+ logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
} else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
++result->server_reconnect_attempts;
+ logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
}
auto it_bp = breakpoints.find(bp);
if (pending_receive == 0) {
interceptor.notify();
}
- logger().info("[{}] {} got op, pending {} ops", result->index, *c, pending_receive);
+ logger().info("[Test] got op, left {} ops -- [{}] {}",
+ pending_receive, result->index, *c);
return seastar::now();
}
tracked_index = result->index;
tracked_conn = conn;
++result->cnt_accept_dispatched;
- logger().info("[{}] {} got accepted and tracked, start to send {} ops",
- result->index, *conn, pending_send);
+ logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
+ result->cnt_accept_dispatched, result->index, *conn);
return flush_pending_send();
}
ceph_assert(result->index == tracked_index);
++result->cnt_connect_dispatched;
- logger().info("[{}] {} got connected", result->index, *conn);
+ logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
+ result->cnt_connect_dispatched, result->index, *conn);
return seastar::now();
}
tracked_index = 0;
tracked_conn = nullptr;
++result->cnt_reset_dispatched;
- logger().info("[{}] {} got reset and untracked", result->index, *conn);
+ logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
+ result->cnt_reset_dispatched, result->index, *conn);
return seastar::now();
}
}
ceph_assert(result->index == tracked_index);
- logger().info("[{}] {} got remotely reset", result->index, *conn);
++result->cnt_remote_reset_dispatched;
+ logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
+ result->cnt_remote_reset_dispatched, result->index, *conn);
return seastar::now();
}
}
seastar::future<> flush_pending_send() {
+ if (pending_send != 0) {
+ logger().info("[Test] flush sending {} ops", pending_send);
+ }
ceph_assert(tracked_conn);
return seastar::do_until(
[this] { return pending_send == 0; },
if (num_ready_conns > 0) {
if (interceptor.results.size() > num_ready_conns) {
throw std::runtime_error(fmt::format(
- "{} connections,more than expected: {}",
+ "{} connections, more than expected: {}",
interceptor.results.size(), num_ready_conns));
} else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) {
- logger().info("Waiting for connections, currently {} out of {},"
- " pending {} ready ...",
+ logger().info("[Test] wait_ready(): wait for connections,"
+ " currently {} out of {}, pending {} ready ...",
interceptor.results.size(), num_ready_conns, pending_conns);
do_wait = true;
}
if (wait_received &&
(pending_send || pending_peer_receive || pending_receive)) {
if (pending_conns || pending_establish) {
- logger().info("Waiting for pending_send={}, pending_peer_receive={},"
- " pending_receive={}, with {} pending connections ...",
+ logger().info("[Test] wait_ready(): wait for pending_send={},"
+ " pending_peer_receive={}, pending_receive={},"
+ " pending {}/{} ready/establish connections ...",
pending_send, pending_peer_receive, pending_receive,
- pending_conns + pending_establish);
+ pending_conns, pending_establish);
do_wait = true;
}
}
replaced_conns, num_replaced));
}
if (replaced_conns < num_replaced) {
- logger().info("Waiting for {} replaced connections, currently {} ...",
+ logger().info("[Test] wait_ready(): wait for {} replaced connections,"
+ " currently {} ...",
num_replaced, replaced_conns);
do_wait = true;
}
return wait_ready(num_ready_conns, num_replaced, wait_received);
});
} else {
- logger().info("Wait done!");
+ logger().info("[Test] wait_ready(): wait done!");
return seastar::now();
}
}
void notify_peer_reply() {
ceph_assert(pending_peer_receive > 0);
--pending_peer_receive;
- logger().info("TestPeer received op, pending {} peer receive ops",
+ logger().info("[Test] TestPeer said got op, left {} ops",
pending_peer_receive);
if (pending_peer_receive == 0) {
interceptor.notify();
}
}
+ void dump_results() const {
+ for (auto& result : interceptor.results) {
+ result.dump();
+ }
+ }
+
static seastar::future<std::unique_ptr<FailoverSuite>>
create(entity_addr_t test_addr,
SocketPolicy test_policy,
// called by tests
public:
seastar::future<> connect_peer() {
+ logger().info("[Test] connect_peer({})", test_peer_addr);
ceph_assert(!tracked_conn);
return test_msgr.connect(test_peer_addr, entity_name_t::TYPE_OSD
).then([this] (auto xconn) {
seastar::future<> send_peer() {
if (tracked_conn) {
+ logger().info("[Test] send_peer()");
ceph_assert(!pending_send);
return send_op();
} else {
++pending_send;
+ logger().info("[Test] send_peer() (pending {})", pending_send);
return seastar::now();
}
}
seastar::future<> wait_blocked() {
- logger().info("Waiting for blocked...");
+ logger().info("[Test] wait_blocked() ...");
return interceptor.blocker.wait_blocked();
}
void unblock() {
- logger().info("Unblock breakpoint");
+ logger().info("[Test] unblock()");
return interceptor.blocker.unblock();
}
seastar::future<> wait_replaced(unsigned count) {
+ logger().info("[Test] wait_replaced({}) ...", count);
return wait_ready(0, count, false);
}
seastar::future<> wait_established() {
+ logger().info("[Test] wait_established() ...");
return wait_ready(0, 0, true);
}
seastar::future<std::reference_wrapper<ConnResults>>
wait_results(unsigned count) {
+ logger().info("[Test] wait_result({}) ...", count);
return wait_ready(count, 0, true).then([this] {
return std::reference_wrapper<ConnResults>(interceptor.results);
});
}).then([this] {
test_suite->post_check();
logger().info("\n[SUCCESS]");
- }).handle_exception([] (auto eptr) {
+ }).handle_exception([this] (auto eptr) {
logger().info("\n[FAIL: {}]", eptr);
+ test_suite->dump_results();
throw;
}).finally([this] {
return stop_peer();
}
seastar::future<> peer_connect_me() {
+ logger().info("[Test] peer_connect_me({})", test_addr);
return prepare_cmd(cmd_t::suite_connect_me,
[this] (auto m) {
m->cmd.emplace_back(fmt::format("{}", test_addr));
}
seastar::future<> peer_send_me() {
+ logger().info("[Test] peer_send_me()");
ceph_assert(test_suite);
test_suite->needs_receive();
return prepare_cmd(cmd_t::suite_send_me);
unsigned pending_send = 0;
seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+ logger().info("[TestPeer] got op from Test");
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
ceph_assert(tracked_conn == c->shared_from_this());
return op_callback();
}
seastar::future<> flush_pending_send() {
+ if (pending_send != 0) {
+ logger().info("[TestPeer] flush sending {} ops", pending_send);
+ }
ceph_assert(tracked_conn);
return seastar::do_until(
[this] { return pending_send == 0; },
return peer_msgr.shutdown();
}
- seastar::future<> connect(entity_addr_t addr) {
+ seastar::future<> connect_peer(entity_addr_t addr) {
+ logger().info("[TestPeer] connect_peer({})", addr);
ceph_assert(!tracked_conn);
return peer_msgr.connect(addr, entity_name_t::TYPE_OSD
).then([this] (auto xconn) {
seastar::future<> send_peer() {
if (tracked_conn) {
+ logger().info("[TestPeer] send_peer()");
return send_op();
} else {
++pending_send;
+ logger().info("[TestPeer] send_peer() (pending {})", pending_send);
return seastar::now();
}
}
ceph_assert(test_suite);
entity_addr_t test_addr = entity_addr_t();
test_addr.parse(m_cmd->cmd[1].c_str(), nullptr);
- return test_suite->connect(test_addr);
+ return test_suite->connect_peer(test_addr);
}
case cmd_t::suite_send_me:
ceph_assert(test_suite);
results[0].assert_reset(0, 0);
results[1].assert_state_at(conn_state_t::replaced);
results[1].assert_connect(0, 0, 0, 0);
- results[1].assert_accept(1, 1, 0);
+ results[1].assert_accept(1, 0);
results[1].assert_reset(0, 0);
});
});
results[0].assert_reset(0, 0);
results[1].assert_state_at(conn_state_t::closed);
results[1].assert_connect(0, 0, 0, 0);
- ceph_assert(results[1].accept_attempts == 1);
- ceph_assert(results[1].cnt_accept_dispatched == 0);
+ results[1].assert_accept(1, 0);
results[1].assert_reset(0, 0);
});
});
return suite.wait_results(2);
}).then([] (ConnResults& results) {
results[0].assert_state_at(conn_state_t::established);
- ceph_assert(results[0].connect_attempts == 2);
- ceph_assert(results[0].cnt_connect_dispatched == 1);
+ results[0].assert_connect(2, 1);
results[0].assert_accept(0, 0, 0, 1);
results[0].assert_reset(0, 0);
results[1].assert_state_at(conn_state_t::replaced);
}).then([] (ConnResults& results) {
results[0].assert_state_at(conn_state_t::closed);
results[0].assert_connect(0, 0, 0, 0);
- ceph_assert(results[0].accept_attempts == 1);
- ceph_assert(results[0].cnt_accept_dispatched == 0);
+ results[0].assert_accept(1, 0);
results[0].assert_reset(0, 0);
results[1].assert_state_at(conn_state_t::established);
results[1].assert_connect(1, 1, 0, 1);
return suite.wait_results(2);
}).then([] (ConnResults& results) {
results[0].assert_state_at(conn_state_t::established);
- ceph_assert(results[0].connect_attempts == 1);
- ceph_assert(results[0].cnt_connect_dispatched == 0);
+ results[0].assert_connect(1, 0);
results[0].assert_accept(0, 0, 0, 1);
results[0].assert_reset(0, 0);
results[1].assert_state_at(conn_state_t::replaced);
}).then([&suite] {
return suite.wait_blocked();
}).then([&suite] {
- logger().info("Block the broken REPLACING for 210ms...");
+ logger().info("[Test] block the broken REPLACING for 210ms...");
return seastar::sleep(210ms);
}).then([&suite] {
suite.unblock();
results[1].assert_reset(0, 0);
results[2].assert_state_at(conn_state_t::replaced);
results[2].assert_connect(0, 0, 0, 0);
- ceph_assert(results[2].accept_attempts == 1);
- ceph_assert(results[2].server_connect_attempts == 0);
- ceph_assert(results[2].server_reconnect_attempts >= 1);
- ceph_assert(results[2].cnt_accept_dispatched == 0);
+ results[2].assert_accept(1, 0);
results[2].assert_reset(0, 0);
+ ceph_assert(results[2].server_reconnect_attempts >= 1);
});
});
}