]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: skeleton code for ProtocolV2 logic
authorYingxin Cheng <yingxincheng@gmail.com>
Thu, 28 Feb 2019 08:40:24 +0000 (16:40 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:18 +0000 (11:21 +0800)
crimson ProtocolV2 class is following a state-machine design style:
* states are defined in ProtocolV2::state_t;
* call `execute_<state_name>()` methods to trigger different states;
* V2 logics are implemented in each execute_<state_name>() methods, and
  with explicit transitions to other states at the end of the execute_*;
* each state is associated with a write state defined in Protocol.h:
  - none: not allowed to send;
  - delay: messages can be queued, but will be delayed to send;
  - open: dispatch queued message/keepalive/ack;
  - drop: not send any messages, drop them all.

crimson ProtocolV2 is alike async ProtocolV2, with some considerations:
* explicit and encapsulated client/server handshake workflow.
* futurized-exception-based fault handling, which can interrupt protocol
  workflow at any time in each state.
* introduced SERVER_WAIT state, meaning to wait for peer-client's socket
  to reset or be replaced, and expect no further reads.
* introduced an explicit REPLACING state, async-msgr would be at the
  NONE state during replacing.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/CMakeLists.txt
src/crimson/net/ProtocolV2.cc [new file with mode: 0644]
src/crimson/net/ProtocolV2.h [new file with mode: 0644]
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index cb82ba81c8592622732b6e87707c3c3e4d309ec6..a895fe20e0180eb24376bc5bfdad3e618f0975a2 100644 (file)
@@ -124,7 +124,8 @@ set(crimson_net_srcs
   net/SocketMessenger.cc
   net/Socket.cc
   net/Protocol.cc
-  net/ProtocolV1.cc)
+  net/ProtocolV1.cc
+  net/ProtocolV2.cc)
 set(crimson_thread_srcs
   thread/ThreadPool.cc
   thread/Throttle.cc)
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
new file mode 100644 (file)
index 0000000..6e33929
--- /dev/null
@@ -0,0 +1,524 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV2.h"
+
+#include "include/msgr.h"
+
+#include "Dispatcher.h"
+#include "Errors.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+#include "SocketMessenger.h"
+
+using namespace ceph::msgr::v2;
+
+namespace {
+
+seastar::logger& logger() {
+  return ceph::get_logger(ceph_subsys_ms);
+}
+
+seastar::future<> abort_in_fault() {
+  throw std::system_error(make_error_code(ceph::net::error::negotiation_failure));
+}
+
+seastar::future<> abort_in_close() {
+  throw std::system_error(make_error_code(ceph::net::error::connection_aborted));
+}
+
+inline void expect_tag(const Tag& expected,
+                       const Tag& actual,
+                       ceph::net::SocketConnection& conn,
+                       const char *where) {
+  if (actual != expected) {
+    logger().error("{} {} received wrong tag: {}, expected {}",
+                   conn, where,
+                   static_cast<uint32_t>(actual),
+                   static_cast<uint32_t>(expected));
+    abort_in_fault();
+  }
+}
+
+inline seastar::future<> unexpected_tag(const Tag& unexpected,
+                                        ceph::net::SocketConnection& conn,
+                                        const char *where) {
+  logger().error("{} {} received unexpected tag: {}",
+                 conn, where, static_cast<uint32_t>(unexpected));
+  return abort_in_fault();
+}
+
+} // namespace anonymous
+
+namespace ceph::net {
+
+ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
+                       SocketConnection& conn,
+                       SocketMessenger& messenger)
+  : Protocol(2, dispatcher, conn, messenger) {}
+
+ProtocolV2::~ProtocolV2() {}
+
+void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
+                               const entity_type_t& _peer_type)
+{
+  ceph_assert(state == state_t::NONE);
+  ceph_assert(!socket);
+  conn.peer_addr = _peer_addr;
+  conn.peer_type = _peer_type;
+  // TODO: lossless policy
+  conn.policy = SocketPolicy::lossy_client(0);
+  messenger.register_conn(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  execute_connecting();
+}
+
+void ProtocolV2::start_accept(SocketFRef&& sock,
+                              const entity_addr_t& _peer_addr)
+{
+  ceph_assert(state == state_t::NONE);
+  ceph_assert(!socket);
+  socket = std::move(sock);
+  messenger.accept_conn(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  execute_accepting();
+}
+
+void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
+{
+  if (!reentrant && _state == state) {
+    logger().error("{} is not allowed to re-trigger state {}",
+                   conn, get_state_name(state));
+    ceph_assert(false);
+  }
+  logger().debug("{} trigger {}, was {}",
+                 conn, get_state_name(_state), get_state_name(state));
+  state = _state;
+  set_write_state(_write_state);
+}
+
+seastar::future<> ProtocolV2::fault()
+{
+  logger().warn("{} fault during {}",
+                conn, get_state_name(state));
+  // TODO: <fault logic here: e.g. backoff, policies, etc.>
+  // TODO: <conditions to call execute_standby()>
+  // TODO: <conditions to call execute_connecting()>
+  close();
+  return seastar::now();
+}
+
+seastar::future<> ProtocolV2::banner_exchange()
+{
+  // 1. <prepare and send banner>
+  // 2. then: <read banner>
+  // 3. then: <process banner and read banner_payload>
+  // 4. then: <process banner_payload and send HelloFrame>
+  // 5. then: <read peer HelloFrame>
+  // 6. then: <process peer HelloFrame>
+  return seastar::now();
+}
+
+// CONNECTING state
+
+seastar::future<> ProtocolV2::handle_auth_reply()
+{
+//return read_main_preamble()
+//.then([this] (Tag tag) {
+//  switch (tag) {
+//    case Tag::AUTH_BAD_METHOD:
+//      handle_auth_bad_method() logic
+//      return client_auth(bad_method.allowed_methods());
+//    case Tag::AUTH_REPLY_MORE:
+//      <prepare AuthReplyMoreFrame and send it>
+//      <then:>
+//        return handle_auth_reply()
+//    case Tag::AUTH_DONE:
+//      <handle AuthDoneFrame>
+//      <client auth is successful!>
+        return seastar::now();
+//    default: {
+//      return unexpected_tag(tag, conn, __func__);
+//    }
+//});
+}
+
+seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods)
+{
+  // send_auth_request() logic
+  // <prepare AuthRequestFrame and send>
+  // <then:>
+        return handle_auth_reply();
+}
+
+seastar::future<bool> ProtocolV2::process_wait()
+{
+//return read_frame_payload()
+//.then([this] (bufferlist payload) {
+//  handle_wait() logic
+//  return false;
+//});
+  return seastar::make_ready_future<bool>(false);
+}
+
+seastar::future<bool> ProtocolV2::client_connect()
+{
+  // send_client_ident() logic
+  // <prepare and send ClientIdentFrame>
+  // return read_main_preamble()
+  // .then([this] (Tag tag) {
+  //   switch (tag) {
+  //     case Tag::IDENT_MISSING_FEATURES:
+  //       abort_in_fault();
+  //     case Tag::WAIT:
+  //       return process_wait();
+  //     case Tag::SERVER_IDENT:
+  //       <handle ServerIdentFrame>
+           return seastar::make_ready_future<bool>(true);
+  //     default: {
+  //       unexpected_tag(tag, conn, "post_client_connect");
+  //     }
+  //   }
+  // });
+}
+
+seastar::future<bool> ProtocolV2::client_reconnect()
+{
+  // send_reconnect() logic
+  // <prepare ReconnectFrame and send>
+  // <then:>
+  //   return read_main_preamble()
+  //   .then([this] (Tag tag) {
+  //     switch (tag) {
+  //       case Tag::SESSION_RETRY_GLOBAL:
+  //         <handle RetryGlobalFrame>
+  //         return client_reconnect();
+  //       case Tag::SESSION_RETRY:
+  //         <handle RetryFrame>
+  //         return client_reconnect();
+  //       case Tag::SESSION_RESET:
+  //         <handle ResetFrame>
+  //         return client_connect();
+  //       case Tag::WAIT:
+  //         return process_wait();
+  //       case Tag::SESSION_RECONNECT_OK:
+  //         <handle ReconnectOkFrame>
+             return seastar::make_ready_future<bool>(true);
+  //       default: {
+  //         unexpected_tag(tag, conn, "post_client_reconnect");
+  //       }
+  //     }
+  //   });
+}
+
+void ProtocolV2::execute_connecting()
+{
+  trigger_state(state_t::CONNECTING, write_state_t::delay, true);
+  seastar::with_gate(pending_dispatch, [this] {
+      return Socket::connect(conn.peer_addr)
+        .then([this](SocketFRef sock) {
+          socket = std::move(sock);
+          if (state == state_t::CLOSING) {
+            return socket->close().then([this] {
+              logger().info("{} is closed during Socket::connect()", conn);
+              abort_in_fault();
+            });
+          }
+          return seastar::now();
+        }).then([this] {
+          return banner_exchange();
+        }).then([this] {
+          return client_auth();
+        }).then([this] {
+          if (1) { // TODO check connect or reconnect
+            return client_connect();
+          } else {
+            // TODO: lossless policy
+            ceph_assert(false);
+            return client_reconnect();
+          }
+        }).then([this] (bool proceed_or_wait) {
+          if (proceed_or_wait) {
+            execute_ready();
+          } else {
+            execute_wait();
+          }
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // TODO: handle fault in CONNECTING state
+          return fault();
+        });
+    });
+}
+
+// ACCEPTING state
+
+seastar::future<> ProtocolV2::_auth_bad_method(int r)
+{
+  // _auth_bad_method() logic
+  // <prepare and send AuthBadMethodFrame>
+  // <then:>
+       return server_auth();
+}
+
+seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+{
+  // _handle_auth_request() logic
+  // <case done:>
+  //   <prepare and send AuthDoneFrame>
+  //   <then: server auth successful!>
+         return seastar::now();
+  // <case more:>
+  //   <prepare and send AuthReplyMoreFrame>
+  //   <then:>
+  //     return read_main_preamble()
+  //     .then([this] (Tag tag) {
+  //       expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
+  //       <handle_auth_request_more() logic>
+  //       <process AuthRequestMoreFrame>
+  //       return _handle_auth_request(auth_more.auth_payload(), true);
+  //     });
+  // <case bad:>
+  //   return _auth_bad_method();
+}
+
+seastar::future<> ProtocolV2::server_auth()
+{
+  // return read_main_preamble()
+  // .then([this] (Tag tag) {
+  //   expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
+  //   <handle_auth_request() logic>
+  //   <case bad request:>
+  //     return _auth_bad_method();
+  //   <...>
+       auto dummy_auth = bufferlist{};
+       return _handle_auth_request(/*request.auth_payload()*/dummy_auth, false);
+  // });
+}
+
+seastar::future<bool> ProtocolV2::send_wait()
+{
+  // <prepare and send WaitFrame>
+  // <then:>
+       return seastar::make_ready_future<bool>(false);
+}
+
+seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
+{
+  // TODO: lossless policy
+  ceph_assert(false);
+}
+
+seastar::future<bool> ProtocolV2::server_connect()
+{
+  // handle_client_ident() logic
+  // <process ClientIdentFrame>
+  // <case feature missing:>
+  //  <prepare and send IdentMissingFeaturesFrame>
+  //  <then: trigger SERVER_WAIT>
+  //    return seastar::make_ready_future<bool>(false);
+  // <case existing:>
+  //  return handle_existing_connection(existing);
+  // <case everything OK:>
+      return send_server_ident()
+      .then([this] {
+        // goto ready
+        return true;
+      });
+}
+
+seastar::future<bool> ProtocolV2::read_reconnect()
+{
+  // return read_main_preamble()
+  // .then([this] (Tag tag) {
+  //   expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+       return server_reconnect();
+  // });
+}
+
+seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
+{
+  // <prepare and send RetryFrame>
+  // <then:>
+       return read_reconnect();
+}
+
+seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
+{
+  // <prepare and send RetryGlobalFrame>
+  // <then:>
+       return read_reconnect();
+}
+
+seastar::future<bool> ProtocolV2::send_reset(bool full)
+{
+  // <prepare and send ResetFrame>
+  // <then:>
+  //   return read_main_preamble()
+  //   .then([this] (Tag tag) {
+  //     expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+         return server_connect();
+  //   });
+}
+
+seastar::future<bool> ProtocolV2::server_reconnect()
+{
+  // handle_reconnect() logic
+  // <process ReconnectFrame>
+  // <case no existing:>
+       return send_reset(0);
+  // <retry global cases:>
+  //   return send_retry_global();
+  // <case wait:>
+  //   return send_wait();
+  // <case retry:>
+  //   return send_retry();
+  // <other reset cases:>
+  //   return send_reset();
+  // TODO: lossless policy
+  //   return reuse_connection(existing, exproto);
+}
+
+void ProtocolV2::execute_accepting()
+{
+  trigger_state(state_t::ACCEPTING, write_state_t::none, false);
+  seastar::with_gate(pending_dispatch, [this] {
+      return banner_exchange()
+        .then([this] {
+          return server_auth();
+        }).then([this] {
+      //  return read_main_preamble()
+      //}).then([this] (Tag tag) {
+      //  switch (tag) {
+      //    case Tag::CLIENT_IDENT:
+              return server_connect();
+      //    case Tag::SESSION_RECONNECT:
+      //      return server_reconnect();
+      //    default: {
+      //      unexpected_tag(tag, conn, "post_server_auth");
+      //    }
+      //  }
+        }).then([this] (bool proceed_or_wait) {
+          if (proceed_or_wait) {
+            messenger.register_conn(
+              seastar::static_pointer_cast<SocketConnection>(
+                conn.shared_from_this()));
+            messenger.unaccept_conn(
+              seastar::static_pointer_cast<SocketConnection>(
+                conn.shared_from_this()));
+            execute_ready();
+          } else {
+            execute_server_wait();
+          }
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // TODO: handle fault in ACCEPTING state
+          return fault();
+        });
+    });
+}
+
+// ACCEPTING or REPLACING state
+
+seastar::future<> ProtocolV2::send_server_ident()
+{
+  // send_server_ident() logic
+  // <prepare and send ServerIdentFrame>
+
+  return seastar::now();
+}
+
+// REPLACING state
+
+seastar::future<> ProtocolV2::send_reconnect_ok()
+{
+  // send_reconnect_ok() logic
+  // <prepare and send ReconnectOKFrame>
+
+  return seastar::now();
+}
+
+// READY state
+
+seastar::future<> ProtocolV2::write_message(MessageRef msg)
+{
+  // TODO not implemented
+  // <scheduled by parent, to send out the message on the wire>
+  ceph_assert(false);
+}
+
+seastar::future<> ProtocolV2::do_keepalive()
+{
+  // TODO not implemented
+  // <scheduled by parent, to send out KeepAliveFrame on the wire>
+  ceph_assert(false);
+}
+
+seastar::future<> ProtocolV2::do_keepalive_ack()
+{
+  // TODO not implemented
+  // <scheduled by parent, to send out KeepAliveAckFrame on the wire>
+  ceph_assert(false);
+}
+
+void ProtocolV2::execute_ready()
+{
+  // <schedule sending messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame,
+  //  i.e. trigger READY state with write_state_t::open>
+  //   trigger_state(state_t::READY, write_state_t::open, false);
+  // TODO: schedule reading messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame
+  state = state_t::READY;
+  logger().info("{} reaches READY state successfully.", conn);
+  close();
+}
+
+// STANDBY state
+
+void ProtocolV2::execute_standby()
+{
+  // TODO not implemented
+  // trigger_state(state_t::STANDBY, write_state_t::delay, false);
+  ceph_assert(false);
+}
+
+// WAIT state
+
+void ProtocolV2::execute_wait()
+{
+  // TODO not implemented
+  // trigger_state(state_t::WAIT, write_state_t::delay, false);
+  ceph_assert(false);
+}
+
+// SERVER_WAIT state
+
+void ProtocolV2::execute_server_wait()
+{
+  // TODO not implemented
+  // trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
+  ceph_assert(false);
+}
+
+// CLOSING state
+
+void ProtocolV2::trigger_close()
+{
+  if (state == state_t::ACCEPTING) {
+    messenger.unaccept_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+  } else if (state >= state_t::CONNECTING && state < state_t::CLOSING) {
+    messenger.unregister_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+  } else {
+    // cannot happen
+    ceph_assert(false);
+  }
+
+  if (!socket) {
+    ceph_assert(state == state_t::CONNECTING);
+  }
+
+  trigger_state(state_t::CLOSING, write_state_t::drop, false);
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h
new file mode 100644 (file)
index 0000000..5fbb02c
--- /dev/null
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "Protocol.h"
+#include "msg/async/frames_v2.h"
+
+namespace ceph::net {
+
+class ProtocolV2 final : public Protocol {
+ public:
+  ProtocolV2(Dispatcher& dispatcher,
+             SocketConnection& conn,
+             SocketMessenger& messenger);
+  ~ProtocolV2() override;
+
+ private:
+  void start_connect(const entity_addr_t& peer_addr,
+                     const entity_type_t& peer_type) override;
+
+  void start_accept(SocketFRef&& socket,
+                    const entity_addr_t& peer_addr) override;
+
+  void trigger_close() override;
+
+  seastar::future<> write_message(MessageRef msg) override;
+
+  seastar::future<> do_keepalive() override;
+
+  seastar::future<> do_keepalive_ack() override;
+
+ private:
+  SocketMessenger &messenger;
+
+  enum class state_t {
+    NONE = 0,
+    ACCEPTING,
+    CONNECTING,
+    READY,
+    STANDBY,
+    WAIT,           // ? CLIENT_WAIT
+    SERVER_WAIT,    // ?
+    REPLACING,      // ?
+    CLOSING
+  };
+  state_t state = state_t::NONE;
+
+  static const char *get_state_name(state_t state) {
+    const char *const statenames[] = {"NONE",
+                                      "ACCEPTING",
+                                      "CONNECTING",
+                                      "READY",
+                                      "STANDBY",
+                                      "WAIT",           // ? CLIENT_WAIT
+                                      "SERVER_WAIT",    // ?
+                                      "REPLACING",      // ?
+                                      "CLOSING"};
+    return statenames[static_cast<int>(state)];
+  }
+
+  void trigger_state(state_t state, write_state_t write_state, bool reentrant);
+
+  // TODO: the rest of protocol data structures and variables.
+
+ private:
+  seastar::future<> fault();
+  seastar::future<> banner_exchange();
+
+  // CONNECTING (client)
+  seastar::future<> handle_auth_reply();
+  inline seastar::future<> client_auth() {
+    std::vector<uint32_t> empty;
+    return client_auth(empty);
+  }
+  seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
+
+  seastar::future<bool> process_wait();
+  seastar::future<bool> client_connect();
+  seastar::future<bool> client_reconnect();
+  void execute_connecting();
+
+  // ACCEPTING (server)
+  seastar::future<> _auth_bad_method(int r);
+  seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
+  seastar::future<> server_auth();
+
+  seastar::future<bool> send_wait();
+
+  seastar::future<bool> handle_existing_connection(SocketConnectionRef existing);
+  seastar::future<bool> server_connect();
+
+  seastar::future<bool> read_reconnect();
+  seastar::future<bool> send_retry(uint64_t connect_seq);
+  seastar::future<bool> send_retry_global(uint64_t global_seq);
+  seastar::future<bool> send_reset(bool full);
+  seastar::future<bool> server_reconnect();
+
+  void execute_accepting();
+
+  // ACCEPTING/REPLACING (server)
+  seastar::future<> send_server_ident();
+
+  // REPLACING (server)
+  seastar::future<> send_reconnect_ok();
+
+  // READY
+  void execute_ready();
+
+  // STANDBY
+  void execute_standby();
+
+  // WAIT
+  void execute_wait();
+
+  // SERVER_WAIT
+  void execute_server_wait();
+};
+
+} // namespace ceph::net
index 6b0e0a5ce3053f42b63aa73a5741c87c89e23ac7..dc35c66a05e83244a8b43285a0e8fcf75a0bb1a1 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "Config.h"
 #include "ProtocolV1.h"
+#include "ProtocolV2.h"
 #include "SocketMessenger.h"
 
 using namespace ceph::net;
@@ -33,8 +34,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
 {
   ceph_assert(&messenger.container().local() == &messenger);
   if (is_msgr2) {
-    // TODO: ProtocolV2
-    ceph_assert(false);
+    protocol = std::make_unique<ProtocolV2>(dispatcher, *this, messenger);
   } else {
     protocol = std::make_unique<ProtocolV1>(dispatcher, *this, messenger);
   }
index a4cf67a0aca02eb2c297ffcebbf3b867b807c8f9..22753500e4efb37f1ed40b951834479bc864dd6c 100644 (file)
@@ -117,6 +117,7 @@ class SocketConnection : public Connection {
 
   friend class Protocol;
   friend class ProtocolV1;
+  friend class ProtocolV2;
 };
 
 } // namespace ceph::net