]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: add timeout for connections which are not yet ready
authorxie xingguo <xie.xingguo@zte.com.cn>
Sat, 2 Mar 2019 08:23:12 +0000 (16:23 +0800)
committerPrashant D <pdhange@redhat.com>
Wed, 5 Jun 2019 03:18:39 +0000 (23:18 -0400)
There could be various corner cases that may cause an async
connection stuck in the connecting stage (e.g., by manually
creating some loop back connections on the switches of our test cluster,
we can almost 100% reproduce http://tracker.ceph.com/issues/37499).

In 61b9432ef9a3847eceb96f8d5a854567c49bbf61 I try to employ the
existing keep_alive mechanism to get those stuck connections out of the
trap but it does not work if the corresponding connection
is not yet ready, since we always require the underlying connection to be
**ready** in order to send out a keep_alive message.

Fix by making a more general connecting timeout strategy.
If a connecting process can not be finished within a specific interval,
then we simply cut it off and retry.

Fixes: http://tracker.ceph.com/issues/37499
Fixes: http://tracker.ceph.com/issues/38493
Signed-off-by: xie xingguo <xie.xingguo@zte.com.cn>
(cherry picked from commit 7209cc6aa3263d1dfb5cb19d57dd1f6b56aa2804)

Conflicts:
src/common/legacy_config_opts.h
src/common/options.cc : Resolved for ms_connection_ready_timeout

src/common/legacy_config_opts.h
src/common/options.cc
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc

index 7eb69d54ceb7cc4e9f518b7aa4a111001d74d893..b1efe6eac6600f4db4fbda3be87a7fdf304a6c17 100644 (file)
@@ -137,7 +137,7 @@ OPTION(ms_bind_retry_delay, OPT_INT) // Delay between attempts to bind
 OPTION(ms_bind_before_connect, OPT_BOOL)
 OPTION(ms_tcp_listen_backlog, OPT_INT)
 OPTION(ms_rwthread_stack_bytes, OPT_U64)
-OPTION(ms_tcp_read_timeout, OPT_U64)
+OPTION(ms_connection_ready_timeout, OPT_U64)
 OPTION(ms_connection_idle_timeout, OPT_U64)
 OPTION(ms_pq_max_tokens_per_priority, OPT_U64)
 OPTION(ms_pq_min_cost, OPT_U64)
index e86bd9d3448cb9911f07dc42b32732bef3ff2886..3525abfe9e57589d9f3f96f178256093bc432285 100644 (file)
@@ -1023,6 +1023,10 @@ std::vector<Option> get_global_options() {
     .set_default(1_M)
     .set_description("Size of stack for SimpleMessenger read/write threads"),
 
+    Option("ms_connection_ready_timeout", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(10)
+    .set_description("Time before we declare a not yet ready connection as dead (seconds)"),
+
     Option("ms_connection_idle_timeout", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(900)
     .set_description("Time before an idle connection is closed (seconds)"),
index 272589e4379d5794cb97c2c331864a2e1622f23b..9c0d68d98f80f314b4b5f8fda16f2896e7d1c2e4 100644 (file)
@@ -120,6 +120,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
     recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0),
     last_active(ceph::coarse_mono_clock::now()),
+    connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
     inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
     msgr2(m2), state_offset(0),
     worker(w), center(&w->center),read_buffer(nullptr)
@@ -373,6 +374,12 @@ void AsyncConnection::process() {
     case STATE_CONNECTING: {
       ceph_assert(!policy.server);
 
+      // clear timer (if any) since we are connecting/re-connecting
+      if (last_tick_id) {
+        center->delete_time_event(last_tick_id);
+        last_tick_id = 0;
+      }
+
       if (cs) {
         center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
         cs.close();
@@ -418,6 +425,11 @@ void AsyncConnection::process() {
       ldout(async_msgr->cct, 10)
           << __func__ << " connect successfully, ready to send banner" << dendl;
       state = STATE_CONNECTION_ESTABLISHED;
+      ceph_assert(last_tick_id == 0);
+      // exclude TCP nonblock connect time
+      last_connect_started = ceph::coarse_mono_clock::now();
+      last_tick_id = center->create_time_event(
+        connect_timeout_us, tick_handler);
       break;
     }
 
@@ -731,13 +743,29 @@ void AsyncConnection::tick(uint64_t id)
                              << " last_active=" << last_active << dendl;
   std::lock_guard<std::mutex> l(lock);
   last_tick_id = 0;
-  auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
-  if (inactive_timeout_us < (uint64_t)idle_period) {
-    ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
-                              << inactive_timeout_us
-                              << " us, mark self fault." << dendl;
-    protocol->fault();
-  } else if (is_connected()) {
-    last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+  if (!is_connected()) {
+    if (connect_timeout_us <=
+        (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>
+          (now - last_connect_started).count()) {
+      ldout(async_msgr->cct, 0) << __func__ << " see no progress in more than "
+                                << connect_timeout_us
+                                << " us during connecting, fault."
+                                << dendl;
+      protocol->fault();
+    } else {
+      last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
+    }
+  } else {
+    auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>
+      (now - last_active).count();
+    if (inactive_timeout_us < (uint64_t)idle_period) {
+      ldout(async_msgr->cct, 1) << __func__ << " idle (" << idle_period
+                                << ") for more than " << inactive_timeout_us
+                                << " us, fault."
+                                << dendl;
+      protocol->fault();
+    } else {
+      last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+    }
   }
 }
index 7b5f080bc326c457928cffa7b9acf4f518cd13e9..5b914cc5e639118151dfa9567da0166c24a0a0ef 100644 (file)
@@ -186,9 +186,11 @@ class AsyncConnection : public Connection {
   uint32_t recv_start;
   uint32_t recv_end;
   set<uint64_t> register_time_events; // need to delete it if stop
+  ceph::coarse_mono_clock::time_point last_connect_started;
   ceph::coarse_mono_clock::time_point last_active;
   ceph::mono_clock::time_point recv_start_time;
   uint64_t last_tick_id = 0;
+  const uint64_t connect_timeout_us;
   const uint64_t inactive_timeout_us;
 
   // Tis section are temp variables used by state transition
index a392529bb9d2db2df0c6c16ad60acd7e5a67a997..9ac49878c04d1a9ea2b23244e3fe5917fbf3ee26 100644 (file)
@@ -2295,6 +2295,12 @@ CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
             if (exproto->state == CLOSED) return;
             ceph_assert(exproto->state == NONE);
 
+            // we have called shutdown_socket above
+            ceph_assert(existing->last_tick_id == 0);
+            // restart timer since we are going to re-build connection
+            existing->last_connect_started = ceph::coarse_mono_clock::now();
+            existing->last_tick_id = existing->center->create_time_event(
+              existing->connect_timeout_us, existing->tick_handler);
             existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
             exproto->state = ACCEPTING;
 
index a631d56dcd542243673322e6c706d48d5ef0a03e..110b970144cacb342f18e195900aa5a470954b67 100644 (file)
@@ -2712,6 +2712,12 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
           ceph_assert(exproto->state == NONE);
 
           exproto->state = SESSION_ACCEPTING;
+          // we have called shutdown_socket above
+          ceph_assert(existing->last_tick_id == 0);
+          // restart timer since we are going to re-build connection
+          existing->last_connect_started = ceph::coarse_mono_clock::now();
+          existing->last_tick_id = existing->center->create_time_event(
+            existing->connect_timeout_us, existing->tick_handler);
           existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
           existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
                                               existing->read_handler);