]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/msgr: add unittest to simulate network block temporarily
authorVicente Cheng <vicente_cheng@bigtera.com>
Thu, 16 Dec 2021 03:20:05 +0000 (03:20 +0000)
committerVicente Cheng <freeze.bilsted@gmail.com>
Mon, 7 Feb 2022 02:09:38 +0000 (02:09 +0000)
   Add new test case to verify the network block temporarily,
   that case would make outgoing_bl overflow so add the assert
   checking mechanism to claim_append

   Just use 2 connections because that we could not generate the
   large data set to verify it

   Simulate the EAGAIN situation looks like by skip calling
   cs.send() because EAGAIN would return size 0 and keep the
   outgoing_bl

Signed-off-by: Vicente Cheng <vicente_cheng@bigtera.com>
src/common/buffer.cc
src/common/options/global.yaml.in
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/test/msgr/test_msgr.cc

index 4b1bc08936cda9f4dae3ebd759f676b81493450e..0efa576d76a956e731ad124b5ee4b998603f7138 100644 (file)
@@ -1287,6 +1287,8 @@ static ceph::spinlock debug_lock;
 
   void buffer::list::claim_append(list& bl)
   {
+    // check overflow
+    assert(_len + bl._len >= _len);
     // steal the other guy's buffers
     _len += bl._len;
     _num += bl._num;
index 3e9454db2167453a0e9ed3b004e137efcac3fab7..e586eb4e0df99783ba139f8b7ab661bf37f80d88 100644 (file)
@@ -1262,6 +1262,12 @@ options:
   desc: Inject various internal delays to induce races (seconds)
   default: 0
   with_legacy: true
+- name: ms_inject_network_congestion
+  type: uint
+  level: dev
+  desc: Inject a network congestions that stuck with N times operations
+  default: 0
+  with_legacy: true
 - name: ms_blackhole_osd
   type: bool
   level: dev
index 5769c580e0742f1a513877582a6aac1beb28ad34..78f19b8e2f6563f43f7c10b73c45127442b78396 100644 (file)
@@ -327,7 +327,12 @@ ssize_t AsyncConnection::_try_send(bool more)
   ceph_assert(center->in_thread());
   ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
                              << " bytes" << dendl;
-  ssize_t r = cs.send(outgoing_bl, more);
+  // network block would make ::send return EAGAIN, that would make here looks
+  // like do not call cs.send() and r = 0
+  ssize_t r = 0;
+  if (likely(!inject_network_congestion())) {
+    r = cs.send(outgoing_bl, more);
+  }
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
     return r;
@@ -362,6 +367,11 @@ void AsyncConnection::inject_delay() {
   }
 }
 
+bool AsyncConnection::inject_network_congestion() const {
+  return (async_msgr->cct->_conf->ms_inject_network_congestion > 0 &&
+         rand() % async_msgr->cct->_conf->ms_inject_network_congestion != 0);
+}
+
 void AsyncConnection::process() {
   std::lock_guard<std::mutex> l(lock);
   last_active = ceph::coarse_mono_clock::now();
index c7f0f9fe86039a77eb4351077dead980a822a7a4..82c29985b18d9e7ef9cb56b8d7f71c9ea0d51843 100644 (file)
@@ -64,6 +64,7 @@ class AsyncConnection : public Connection {
   void _stop();
   void fault();
   void inject_delay();
+  bool inject_network_congestion() const;
 
   bool is_queued() const;
   void shutdown_socket();
index 17f679b0904e3f5196aafc0c85dde4348586375c..fd7b30fdc7f98b1e6145be630f7d603a602a4dba 100644 (file)
@@ -1775,16 +1775,20 @@ class SyntheticWorkload {
   DummyAuthClientServer dummy_auth;
 
  public:
-  static const unsigned max_in_flight = 64;
-  static const unsigned max_connections = 128;
+  const unsigned max_in_flight = 0;
+  const unsigned max_connections = 0;
   static const unsigned max_message_len = 1024 * 1024 * 4;
 
   SyntheticWorkload(int servers, int clients, string type, int random_num,
-                    Messenger::Policy srv_policy, Messenger::Policy cli_policy)
+                    Messenger::Policy srv_policy, Messenger::Policy cli_policy,
+                   int _max_in_flight = 64, int _max_connections = 128)
     : client_policy(cli_policy),
       dispatcher(false, this),
       rng(time(NULL)),
-      dummy_auth(g_ceph_context) {
+      dummy_auth(g_ceph_context),
+      max_in_flight(_max_in_flight),
+      max_connections(_max_connections) {
+
     dummy_auth.auth_registry.refresh_config();
     Messenger *msgr;
     int base_port = 16800;
@@ -1918,6 +1922,32 @@ class SyntheticWorkload {
     }
   }
 
+  void send_large_message(bool inject_network_congestion=false) {
+    std::lock_guard l{lock};
+    ConnectionRef conn = _get_random_connection();
+    uuid_d uuid;
+    uuid.generate_random();
+    MCommand *m = new MCommand(uuid);
+    vector<string> cmds;
+    cmds.push_back("command");
+    // set the random data to make the large message
+    bufferlist bl;
+    string s("abcdefghijklmnopqrstuvwxyz");
+    for (int i = 0; i < 1024*256; i++)
+      bl.append(s);
+    // bl is around 6M
+    m->set_data(bl);
+    m->cmd = cmds;
+    m->set_priority(200);
+    // setup after connection is ready
+    if (inject_network_congestion && conn->is_connected()) {
+      g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100");
+    } else {
+      g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0");
+    }
+    conn->send_message(m);
+  }
+
   void drop_connection() {
     std::lock_guard l{lock};
     if (available_connections.size() < 10)
@@ -2196,6 +2226,31 @@ TEST_P(MessengerTest, SyntheticInjectTest4) {
   g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
 }
 
+// This is test for network block, means ::send return EAGAIN
+TEST_P(MessengerTest, SyntheticInjectTest5) {
+  SyntheticWorkload test_msg(1, 8, GetParam(), 100,
+                             Messenger::Policy::stateful_server(0),
+                             Messenger::Policy::lossless_client(0),
+                            64, 2);
+  bool simulate_network_congestion = true;
+  for (int i = 0; i < 2; ++i)
+    test_msg.generate_connection();
+  for (int i = 0; i < 5000; ++i) {
+    if (!(i % 10)) {
+      ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl;
+      test_msg.print_internal_state();
+    }
+    if (i < 1600) {
+      // means that we would stuck 1600 * 6M (9.6G) around with 2 connections
+      test_msg.send_large_message(simulate_network_congestion);
+    } else {
+      simulate_network_congestion = false;
+      test_msg.send_large_message(simulate_network_congestion);
+    }
+  }
+  test_msg.wait_for_done();
+}
+
 
 class MarkdownDispatcher : public Dispatcher {
   ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");