From: Vicente Cheng Date: Thu, 16 Dec 2021 03:20:05 +0000 (+0000) Subject: test/msgr: add unittest to simulate network block temporarily X-Git-Tag: v18.0.0~1400^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=62f0140d80a7564d3db377153a3cd87b3816d497;p=ceph.git test/msgr: add unittest to simulate network block temporarily Add new test case to verify the network block temporarily, that case would make outgoing_bl overflow so add the assert checking mechanism to claim_append Just use 2 connections because that we could not generate the large data set to verify it Simulate the EAGAIN situation looks like by skip calling cs.send() because EAGAIN would return size 0 and keep the outgoing_bl Signed-off-by: Vicente Cheng --- diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 4b1bc08936cd..0efa576d76a9 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -1287,6 +1287,8 @@ static ceph::spinlock debug_lock; void buffer::list::claim_append(list& bl) { + // check overflow + assert(_len + bl._len >= _len); // steal the other guy's buffers _len += bl._len; _num += bl._num; diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 3e9454db2167..e586eb4e0df9 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -1262,6 +1262,12 @@ options: desc: Inject various internal delays to induce races (seconds) default: 0 with_legacy: true +- name: ms_inject_network_congestion + type: uint + level: dev + desc: Inject a network congestions that stuck with N times operations + default: 0 + with_legacy: true - name: ms_blackhole_osd type: bool level: dev diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5769c580e074..78f19b8e2f65 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -327,7 +327,12 @@ ssize_t AsyncConnection::_try_send(bool more) ceph_assert(center->in_thread()); ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length() << " bytes" << dendl; - ssize_t r = cs.send(outgoing_bl, more); + // network block would make ::send return EAGAIN, that would make here looks + // like do not call cs.send() and r = 0 + ssize_t r = 0; + if (likely(!inject_network_congestion())) { + r = cs.send(outgoing_bl, more); + } if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; return r; @@ -362,6 +367,11 @@ void AsyncConnection::inject_delay() { } } +bool AsyncConnection::inject_network_congestion() const { + return (async_msgr->cct->_conf->ms_inject_network_congestion > 0 && + rand() % async_msgr->cct->_conf->ms_inject_network_congestion != 0); +} + void AsyncConnection::process() { std::lock_guard l(lock); last_active = ceph::coarse_mono_clock::now(); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index c7f0f9fe8603..82c29985b18d 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -64,6 +64,7 @@ class AsyncConnection : public Connection { void _stop(); void fault(); void inject_delay(); + bool inject_network_congestion() const; bool is_queued() const; void shutdown_socket(); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 17f679b0904e..fd7b30fdc7f9 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -1775,16 +1775,20 @@ class SyntheticWorkload { DummyAuthClientServer dummy_auth; public: - static const unsigned max_in_flight = 64; - static const unsigned max_connections = 128; + const unsigned max_in_flight = 0; + const unsigned max_connections = 0; static const unsigned max_message_len = 1024 * 1024 * 4; SyntheticWorkload(int servers, int clients, string type, int random_num, - Messenger::Policy srv_policy, Messenger::Policy cli_policy) + Messenger::Policy srv_policy, Messenger::Policy cli_policy, + int _max_in_flight = 64, int _max_connections = 128) : client_policy(cli_policy), dispatcher(false, this), rng(time(NULL)), - dummy_auth(g_ceph_context) { + dummy_auth(g_ceph_context), + max_in_flight(_max_in_flight), + max_connections(_max_connections) { + dummy_auth.auth_registry.refresh_config(); Messenger *msgr; int base_port = 16800; @@ -1918,6 +1922,32 @@ class SyntheticWorkload { } } + void send_large_message(bool inject_network_congestion=false) { + std::lock_guard l{lock}; + ConnectionRef conn = _get_random_connection(); + uuid_d uuid; + uuid.generate_random(); + MCommand *m = new MCommand(uuid); + vector cmds; + cmds.push_back("command"); + // set the random data to make the large message + bufferlist bl; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*256; i++) + bl.append(s); + // bl is around 6M + m->set_data(bl); + m->cmd = cmds; + m->set_priority(200); + // setup after connection is ready + if (inject_network_congestion && conn->is_connected()) { + g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100"); + } else { + g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0"); + } + conn->send_message(m); + } + void drop_connection() { std::lock_guard l{lock}; if (available_connections.size() < 10) @@ -2196,6 +2226,31 @@ TEST_P(MessengerTest, SyntheticInjectTest4) { g_ceph_context->_conf.set_val("ms_inject_delay_max", "0"); } +// This is test for network block, means ::send return EAGAIN +TEST_P(MessengerTest, SyntheticInjectTest5) { + SyntheticWorkload test_msg(1, 8, GetParam(), 100, + Messenger::Policy::stateful_server(0), + Messenger::Policy::lossless_client(0), + 64, 2); + bool simulate_network_congestion = true; + for (int i = 0; i < 2; ++i) + test_msg.generate_connection(); + for (int i = 0; i < 5000; ++i) { + if (!(i % 10)) { + ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + if (i < 1600) { + // means that we would stuck 1600 * 6M (9.6G) around with 2 connections + test_msg.send_large_message(simulate_network_congestion); + } else { + simulate_network_congestion = false; + test_msg.send_large_message(simulate_network_congestion); + } + } + test_msg.wait_for_done(); +} + class MarkdownDispatcher : public Dispatcher { ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");