]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: WAIT state and backoff for client
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 8 Aug 2019 08:56:30 +0000 (16:56 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 09:22:45 +0000 (17:22 +0800)
Client goes to WAIT state when it is delayed to reconnect, or wants to
be replaced by a newly established socket.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Config.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 90929bdedb251890ce01f851b99f81f6bc186bff..6b040a1981272d18df8b3326d48187a34b9abe36 100644 (file)
@@ -18,8 +18,8 @@ constexpr struct simple_md_config_t {
   bool cephx_service_require_signatures = false;
   bool ms_die_on_old_message = true;
   bool ms_die_on_skipped_message = true;
-  std::chrono::milliseconds ms_initial_backoff = 200ms;
-  std::chrono::milliseconds ms_max_backoff = 15000ms;
+  double ms_initial_backoff = .2;
+  double ms_max_backoff = 15.0;
   std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms;
   size_t osd_client_message_size_cap = 500ULL << 20;
 } conf;
index 9e2ba07c9b5ecfb9fe64e50bda85c7bdcfe34645..7aca87fd0f36afb2511c66c7b7494013c477045e 100644 (file)
@@ -958,15 +958,13 @@ seastar::future<> ProtocolV1::fault()
     messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
   }
-  if (h.backoff.count()) {
-    h.backoff += h.backoff;
-  } else {
-    h.backoff = conf.ms_initial_backoff;
-  }
-  if (h.backoff > conf.ms_max_backoff) {
-    h.backoff = conf.ms_max_backoff;
-  }
-  return seastar::sleep(h.backoff);
+  // XXX: we decided not to support lossless connection in v1. as the
+  // client's default policy is
+  // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+  // lossy. And by the time of crimson-osd's GA, the in-cluster communication
+  // will all be performed using v2 protocol.
+  ceph_abort("lossless policy not supported for v1");
+  return seastar::now();
 }
 
 } // namespace ceph::net
index a39df77489a1b1c44350e1803cb26f6b4165a12f..9474562e1387ca2361b6964be78d1888e04f8638 100644 (file)
@@ -127,10 +127,28 @@ inline ostream& operator<<(
 
 namespace ceph::net {
 
+seastar::future<> ProtocolV2::Timer::backoff(double seconds)
+{
+  logger().warn("{} waiting {} seconds ...", conn, seconds);
+  cancel();
+  last_dur_ = seconds;
+  as = seastar::abort_source();
+  auto dur = std::chrono::duration_cast<seastar::lowres_clock::duration>(
+      std::chrono::duration<double>(seconds));
+  return seastar::sleep_abortable(dur, *as
+  ).handle_exception_type([this] (const seastar::sleep_aborted& e) {
+    logger().warn("{} wait aborted", conn);
+    abort_protocol();
+  });
+}
+
 ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
                        SocketConnection& conn,
                        SocketMessenger& messenger)
-  : Protocol(proto_t::v2, dispatcher, conn), messenger{messenger} {}
+  : Protocol(proto_t::v2, dispatcher, conn),
+    messenger{messenger},
+    protocol_timer{conn}
+{}
 
 ProtocolV2::~ProtocolV2() {}
 
@@ -741,7 +759,6 @@ ProtocolV2::client_connect()
             connect_seq = 0;
             server_cookie = 0;
           }
-          // TODO: backoff = utime_t();
 
           return dispatcher.ms_handle_connect(
               seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
@@ -817,8 +834,6 @@ ProtocolV2::client_reconnect()
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
           requeue_up_to(reconnect_ok.msg_seq());
-          // TODO
-          // backoff = utime_t();
           return dispatcher.ms_handle_connect(
               seastar::static_pointer_cast<SocketConnection>(
                 conn.shared_from_this()))
@@ -932,7 +947,7 @@ void ProtocolV2::execute_connecting()
             break;
            }
            case next_step_t::wait: {
-            execute_wait();
+            execute_wait(true);
             break;
            }
            default: {
@@ -1684,6 +1699,7 @@ void ProtocolV2::execute_ready()
 {
   trigger_state(state_t::READY, write_state_t::open, false);
   execution_done = seastar::with_gate(pending_dispatch, [this] {
+    protocol_timer.cancel();
     return seastar::keep_doing([this] {
       return read_main_preamble()
       .then([this] (Tag tag) {
@@ -1774,11 +1790,31 @@ void ProtocolV2::notify_write()
 
 // WAIT state
 
-void ProtocolV2::execute_wait()
+void ProtocolV2::execute_wait(bool max_backoff)
 {
-  // TODO not implemented
-  // trigger_state(state_t::WAIT, write_state_t::delay, false);
-  ceph_assert(false);
+  trigger_state(state_t::WAIT, write_state_t::delay, true);
+  if (socket) {
+    socket->shutdown();
+  }
+  execution_done = seastar::with_gate(pending_dispatch,
+                                      [this, max_backoff] {
+    double backoff = protocol_timer.last_dur();
+    if (max_backoff) {
+      backoff = conf.ms_max_backoff;
+    } else if (backoff > 0) {
+      backoff = std::min(conf.ms_max_backoff, 2 * backoff);
+    } else {
+      backoff = conf.ms_initial_backoff;
+    }
+    return protocol_timer.backoff(backoff).then([this] {
+      execute_connecting();
+    }).handle_exception([this] (std::exception_ptr eptr) {
+      logger().debug("{} execute_wait(): got exception {} at state {}, abort",
+                     conn, eptr, get_state_name(state));
+      assert(state == state_t::REPLACING ||
+             state == state_t::CLOSING);
+    });
+  });
 }
 
 // SERVER_WAIT state
@@ -1815,6 +1851,8 @@ void ProtocolV2::trigger_close()
     ceph_assert(false);
   }
 
+  protocol_timer.cancel();
+
   if (!socket) {
     ceph_assert(state == state_t::CONNECTING);
   }
index 509b18c77de86f4231dc16d96cd4dbbe83afac17..179299f7298d9ca8cbbc88eef3869e471369c8a5 100644 (file)
@@ -3,6 +3,8 @@
 
 #pragma once
 
+#include <seastar/core/sleep.hh>
+
 #include "Protocol.h"
 #include "msg/async/frames_v2.h"
 #include "msg/async/crypto_onwire.h"
@@ -44,7 +46,7 @@ class ProtocolV2 final : public Protocol {
     CONNECTING,
     READY,
     STANDBY,
-    WAIT,           // ? CLIENT_WAIT
+    WAIT,
     REPLACING,      // ?
     CLOSING
   };
@@ -57,7 +59,7 @@ class ProtocolV2 final : public Protocol {
                                       "CONNECTING",
                                       "READY",
                                       "STANDBY",
-                                      "WAIT",           // ? CLIENT_WAIT
+                                      "WAIT",
                                       "REPLACING",      // ?
                                       "CLOSING"};
     return statenames[static_cast<int>(state)];
@@ -76,6 +78,24 @@ class ProtocolV2 final : public Protocol {
 
   seastar::future<> execution_done = seastar::now();
 
+  class Timer {
+    double last_dur_ = 0.0;
+    const SocketConnection& conn;
+    std::optional<seastar::abort_source> as;
+   public:
+    Timer(SocketConnection& conn) : conn(conn) {}
+    double last_dur() const { return last_dur_; }
+    seastar::future<> backoff(double seconds);
+    void cancel() {
+      last_dur_ = 0.0;
+      if (as) {
+        as->request_abort();
+        as = std::nullopt;
+      }
+    }
+  };
+  Timer protocol_timer;
+
  // TODO: Frame related implementations, probably to a separate class.
  private:
   bool record_io = false;
@@ -160,7 +180,7 @@ class ProtocolV2 final : public Protocol {
   void execute_standby();
 
   // WAIT
-  void execute_wait();
+  void execute_wait(bool max_backoff);
 
   // SERVER_WAIT
   void execute_server_wait();