crimson/net: introduce protocol-level abstraction
authorYingxin Cheng <yingxincheng@gmail.com>
Thu, 14 Feb 2019 07:49:34 +0000 (15:49 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 02:48:50 +0000 (10:48 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/CMakeLists.txt
src/crimson/net/Fwd.h
src/crimson/net/Protocol.cc [new file with mode: 0644]
src/crimson/net/Protocol.h [new file with mode: 0644]
src/crimson/net/ProtocolV1.cc [new file with mode: 0644]
src/crimson/net/ProtocolV1.h [new file with mode: 0644]
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 20c545dad178408f7b9a587ddfad85aae435d59b..cb82ba81c8592622732b6e87707c3c3e4d309ec6 100644 (file)
@@ -122,7 +122,9 @@ set(crimson_net_srcs
   net/Messenger.cc
   net/SocketConnection.cc
   net/SocketMessenger.cc
-  net/Socket.cc)
+  net/Socket.cc
+  net/Protocol.cc
+  net/ProtocolV1.cc)
 set(crimson_thread_srcs
   thread/ThreadPool.cc
   thread/Throttle.cc)
index 8a0a1c96f22c8fba57f64ec039ad5429f56260e3..8dbb3cabbc65497255dccbde56ae9908bba24835 100644 (file)
@@ -26,6 +26,7 @@ using auth_proto_t = int;
 namespace ceph::net {
 
 using msgr_tag_t = uint8_t;
+using stop_t = seastar::stop_iteration;
 
 class Connection;
 using ConnectionRef = seastar::shared_ptr<Connection>;
diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc
new file mode 100644 (file)
index 0000000..193918b
--- /dev/null
@@ -0,0 +1,167 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Protocol.h"
+
+#include "crimson/common/log.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_ms);
+  }
+}
+
+namespace ceph::net {
+
+Protocol::Protocol(int type,
+                   Dispatcher& dispatcher,
+                   SocketConnection& conn)
+  : proto_type(type),
+    dispatcher(dispatcher),
+    conn(conn) {}
+
+Protocol::~Protocol()
+{
+  ceph_assert(pending_dispatch.is_closed());
+}
+
+bool Protocol::is_connected() const
+{
+  return write_state == write_state_t::open;
+}
+
+seastar::future<> Protocol::close()
+{
+  if (closed) {
+    // already closing
+    assert(close_ready.valid());
+    return close_ready.get_future();
+  }
+
+  // unregister_conn() drops a reference, so hold another until completion
+  auto cleanup = [conn_ref = conn.shared_from_this(), this] {
+      logger().debug("{} closed!", conn);
+    };
+
+  trigger_close();
+
+  // close_ready become valid only after state is state_t::closing
+  assert(!close_ready.valid());
+
+  if (socket) {
+    close_ready = socket->close()
+      .then([this] {
+        return pending_dispatch.close();
+      }).finally(std::move(cleanup));
+  } else {
+    close_ready = pending_dispatch.close().finally(std::move(cleanup));
+  }
+
+  closed = true;
+  set_write_state(write_state_t::drop);
+
+  return close_ready.get_future();
+}
+
+seastar::future<> Protocol::send(MessageRef msg)
+{
+  if (write_state != write_state_t::drop) {
+    conn.out_q.push(std::move(msg));
+    write_event();
+  }
+  return seastar::now();
+}
+
+seastar::future<> Protocol::keepalive()
+{
+  if (!need_keepalive) {
+    need_keepalive = true;
+    write_event();
+  }
+  return seastar::now();
+}
+
+void Protocol::notify_keepalive_ack()
+{
+  if (!need_keepalive_ack) {
+    need_keepalive_ack = true;
+    write_event();
+  }
+}
+
+void Protocol::write_event()
+{
+  if (write_dispatching) {
+    // already dispatching
+    return;
+  }
+  write_dispatching = true;
+  switch (write_state) {
+   case write_state_t::open:
+     [[fallthrough]];
+   case write_state_t::delay:
+    seastar::with_gate(pending_dispatch, [this] {
+      return seastar::repeat([this] {
+        switch (write_state) {
+         case write_state_t::open:
+          return seastar::futurize_apply([this] {
+            if (need_keepalive) {
+              return do_keepalive()
+              .then([this] { need_keepalive = false; });
+            }
+            return seastar::now();
+          }).then([this] {
+            if (need_keepalive_ack) {
+              return do_keepalive_ack()
+              .then([this] { need_keepalive_ack = false; });
+            }
+            return seastar::now();
+          }).then([this] {
+            if (!conn.out_q.empty()){
+              MessageRef msg = conn.out_q.front();
+              return write_message(msg)
+              .then([this, msg] {
+                if (msg == conn.out_q.front()) {
+                  conn.out_q.pop();
+                }
+                return stop_t::no;
+              });
+            } else {
+              return socket->flush()
+              .then([this] {
+                if (!conn.out_q.empty()) {
+                  return stop_t::no;
+                } else {
+                  write_dispatching = false;
+                  return stop_t::yes;
+                }
+              });
+            }
+          }).handle_exception([this] (std::exception_ptr eptr) {
+            logger().warn("{} write_event fault: {}", conn, eptr);
+            close();
+            return stop_t::no;
+          });
+         case write_state_t::delay:
+          // delay dispatching writes until open
+          return state_changed.get_shared_future()
+          .then([] { return stop_t::no; });
+         case write_state_t::drop:
+          write_dispatching = false;
+          return seastar::make_ready_future<stop_t>(stop_t::yes);
+         default:
+          ceph_assert(false);
+        }
+      });
+    });
+    return;
+   case write_state_t::drop:
+    write_dispatching = false;
+   default:
+    ceph_assert(false);
+  }
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h
new file mode 100644 (file)
index 0000000..6a0f76e
--- /dev/null
@@ -0,0 +1,89 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "Fwd.h"
+#include "SocketConnection.h"
+
+namespace ceph::net {
+
+class Protocol {
+ public:
+  Protocol(Protocol&&) = delete;
+  virtual ~Protocol();
+
+  bool is_connected() const;
+
+  // Reentrant closing
+  seastar::future<> close();
+
+  seastar::future<> send(MessageRef msg);
+
+  seastar::future<> keepalive();
+
+  virtual void start_connect(const entity_addr_t& peer_addr,
+                             const entity_type_t& peer_type) = 0;
+
+  virtual void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
+                            const entity_addr_t& peer_addr) = 0;
+
+ protected:
+  Protocol(int type,
+           Dispatcher& dispatcher,
+           SocketConnection& conn);
+
+  virtual void trigger_close() = 0;
+
+  // encode/write a message
+  virtual seastar::future<> write_message(MessageRef msg) = 0;
+
+  virtual seastar::future<> do_keepalive() = 0;
+
+  virtual seastar::future<> do_keepalive_ack() = 0;
+
+ public:
+  const int proto_type;
+
+ protected:
+  Dispatcher &dispatcher;
+  SocketConnection &conn;
+
+  seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
+  seastar::gate pending_dispatch;
+
+  // write_state is changed with state atomically, indicating the write
+  // behavior of the according state.
+  enum class write_state_t {
+    none,
+    delay,
+    open,
+    drop
+  };
+  void set_write_state(const write_state_t& state) {
+    write_state = state;
+    state_changed.set_value();
+    state_changed = seastar::shared_promise<>();
+  }
+
+  void notify_keepalive_ack();
+
+ private:
+  write_state_t write_state = write_state_t::none;
+  // wait until current state changed
+  seastar::shared_promise<> state_changed;
+
+  bool closed = false;
+  // become valid only after closed == true
+  seastar::shared_future<> close_ready;
+
+  bool need_keepalive = false;
+  bool need_keepalive_ack = false;
+  bool write_dispatching = false;
+  void write_event();
+};
+
+} // namespace ceph::net
diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc
new file mode 100644 (file)
index 0000000..cbfa4d2
--- /dev/null
@@ -0,0 +1,851 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ProtocolV1.h"
+
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/msgr.h"
+#include "include/random.h"
+#include "auth/Auth.h"
+#include "auth/AuthSessionHandler.h"
+
+#include "crimson/common/log.h"
+#include "Config.h"
+#include "Dispatcher.h"
+#include "Errors.h"
+#include "Socket.h"
+#include "SocketConnection.h"
+#include "SocketMessenger.h"
+
+WRITE_RAW_ENCODER(ceph_msg_connect);
+WRITE_RAW_ENCODER(ceph_msg_connect_reply);
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
+{
+  return out << "connect{features=" << std::hex << c.features << std::dec
+      << " host_type=" << c.host_type
+      << " global_seq=" << c.global_seq
+      << " connect_seq=" << c.connect_seq
+      << " protocol_version=" << c.protocol_version
+      << " authorizer_protocol=" << c.authorizer_protocol
+      << " authorizer_len=" << c.authorizer_len
+      << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
+}
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
+{
+  return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
+      << " features=" << std::hex << r.features << std::dec
+      << " global_seq=" << r.global_seq
+      << " connect_seq=" << r.connect_seq
+      << " protocol_version=" << r.protocol_version
+      << " authorizer_len=" << r.authorizer_len
+      << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
+}
+
+namespace {
+
+seastar::logger& logger() {
+  return ceph::get_logger(ceph_subsys_ms);
+}
+
+template <typename T>
+seastar::net::packet make_static_packet(const T& value) {
+    return { reinterpret_cast<const char*>(&value), sizeof(value) };
+}
+
+// store the banner in a non-const string for buffer::create_static()
+char banner[] = CEPH_BANNER;
+constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
+
+constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
+constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
+
+// check that the buffer starts with a valid banner without requiring it to
+// be contiguous in memory
+void validate_banner(bufferlist::const_iterator& p)
+{
+  auto b = std::cbegin(banner);
+  auto end = b + banner_size;
+  while (b != end) {
+    const char *buf{nullptr};
+    auto remaining = std::distance(b, end);
+    auto len = p.get_ptr_and_advance(remaining, &buf);
+    if (!std::equal(buf, buf + len, b)) {
+      throw std::system_error(
+          make_error_code(ceph::net::error::bad_connect_banner));
+    }
+    b += len;
+  }
+}
+
+// make sure that we agree with the peer about its address
+void validate_peer_addr(const entity_addr_t& addr,
+                        const entity_addr_t& expected)
+{
+  if (addr == expected) {
+    return;
+  }
+  // ok if server bound anonymously, as long as port/nonce match
+  if (addr.is_blank_ip() &&
+      addr.get_port() == expected.get_port() &&
+      addr.get_nonce() == expected.get_nonce()) {
+    return;
+  } else {
+    throw std::system_error(
+        make_error_code(ceph::net::error::bad_peer_address));
+  }
+}
+
+// return a static bufferptr to the given object
+template <typename T>
+bufferptr create_static(T& obj)
+{
+  return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
+}
+
+uint32_t get_proto_version(entity_type_t peer_type, bool connect)
+{
+  constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
+  // see also OSD.h, unlike other connection of simple/async messenger,
+  // crimson msgr is only used by osd
+  constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
+  if (peer_type == my_type) {
+    // internal
+    return CEPH_OSD_PROTOCOL;
+  } else {
+    // public
+    switch (connect ? peer_type : my_type) {
+      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+      default: return 0;
+    }
+  }
+}
+
+void discard_up_to(std::queue<MessageRef>* queue,
+                   ceph::net::seq_num_t seq)
+{
+  while (!queue->empty() &&
+         queue->front()->get_seq() < seq) {
+    queue->pop();
+  }
+}
+
+} // namespace anonymous
+
+namespace ceph::net {
+
+ProtocolV1::ProtocolV1(Dispatcher& dispatcher,
+                       SocketConnection& conn,
+                       SocketMessenger& messenger)
+  : Protocol(1, dispatcher, conn), messenger{messenger} {}
+
+ProtocolV1::~ProtocolV1() {}
+
+// connecting state
+
+void ProtocolV1::reset_session()
+{
+  conn.out_q = {};
+  conn.sent = {};
+  conn.in_seq = 0;
+  h.connect_seq = 0;
+  if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+    // Set out_seq to a random value, so CRC won't be predictable.
+    // Constant to limit starting sequence number to 2^31.  Nothing special
+    // about it, just a big number.
+    constexpr uint64_t SEQ_MASK = 0x7fffffff;
+    conn.out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
+  } else {
+    // previously, seq #'s always started at 0.
+    conn.out_seq = 0;
+  }
+}
+
+seastar::future<stop_t>
+ProtocolV1::handle_connect_reply(msgr_tag_t tag)
+{
+  switch (tag) {
+  case CEPH_MSGR_TAG_FEATURES:
+    logger().error("{} connect protocol feature mispatch", __func__);
+    throw std::system_error(make_error_code(error::negotiation_failure));
+  case CEPH_MSGR_TAG_BADPROTOVER:
+    logger().error("{} connect protocol version mispatch", __func__);
+    throw std::system_error(make_error_code(error::negotiation_failure));
+  case CEPH_MSGR_TAG_BADAUTHORIZER:
+    logger().error("{} got bad authorizer", __func__);
+    throw std::system_error(make_error_code(error::negotiation_failure));
+  case CEPH_MSGR_TAG_RESETSESSION:
+    reset_session();
+    return seastar::make_ready_future<stop_t>(stop_t::no);
+  case CEPH_MSGR_TAG_RETRY_GLOBAL:
+    h.global_seq = messenger.get_global_seq(h.reply.global_seq);
+    return seastar::make_ready_future<stop_t>(stop_t::no);
+  case CEPH_MSGR_TAG_RETRY_SESSION:
+    ceph_assert(h.reply.connect_seq > h.connect_seq);
+    h.connect_seq = h.reply.connect_seq;
+    return seastar::make_ready_future<stop_t>(stop_t::no);
+  case CEPH_MSGR_TAG_WAIT:
+    // TODO: state wait
+    throw std::system_error(make_error_code(error::negotiation_failure));
+  case CEPH_MSGR_TAG_SEQ:
+  case CEPH_MSGR_TAG_READY:
+    if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features);
+        missing) {
+      logger().error("{} missing required features", __func__);
+      throw std::system_error(make_error_code(error::negotiation_failure));
+    }
+    return seastar::futurize_apply([this, tag] {
+        if (tag == CEPH_MSGR_TAG_SEQ) {
+          return socket->read_exactly(sizeof(seq_num_t))
+            .then([this] (auto buf) {
+              auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+              discard_up_to(&conn.out_q, *acked_seq);
+              return socket->write_flush(make_static_packet(conn.in_seq));
+            });
+        }
+        // tag CEPH_MSGR_TAG_READY
+        return seastar::now();
+      }).then([this] {
+        // hooray!
+        h.peer_global_seq = h.reply.global_seq;
+        conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
+        h.connect_seq++;
+        h.backoff = 0ms;
+        conn.set_features(h.reply.features & h.connect.features);
+        if (h.authorizer) {
+          session_security.reset(
+              get_auth_session_handler(nullptr,
+                                       h.authorizer->protocol,
+                                       h.authorizer->session_key,
+                                       conn.features));
+        }
+        h.authorizer = nullptr;
+        return seastar::make_ready_future<stop_t>(stop_t::yes);
+      });
+    break;
+  default:
+    // unknown tag
+    logger().error("{} got unknown tag", __func__, int(tag));
+    throw std::system_error(make_error_code(error::negotiation_failure));
+  }
+}
+
+seastar::future<stop_t>
+ProtocolV1::repeat_connect()
+{
+  // encode ceph_msg_connect
+  memset(&h.connect, 0, sizeof(h.connect));
+  h.connect.features = conn.policy.features_supported;
+  h.connect.host_type = messenger.get_myname().type();
+  h.connect.global_seq = h.global_seq;
+  h.connect.connect_seq = h.connect_seq;
+  h.connect.protocol_version = get_proto_version(conn.peer_type, true);
+  // this is fyi, actually, server decides!
+  h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
+
+  h.authorizer = dispatcher.ms_get_authorizer(conn.peer_type);
+  bufferlist bl;
+  if (h.authorizer) {
+    h.connect.authorizer_protocol = h.authorizer->protocol;
+    h.connect.authorizer_len = h.authorizer->bl.length();
+    bl.append(create_static(h.connect));
+    bl.append(h.authorizer->bl);
+  } else {
+    h.connect.authorizer_protocol = 0;
+    h.connect.authorizer_len = 0;
+    bl.append(create_static(h.connect));
+  };
+  return socket->write_flush(std::move(bl))
+    .then([this] {
+      // read the reply
+      return socket->read(sizeof(h.reply));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.cbegin();
+      ::decode(h.reply, p);
+      ceph_assert(p.end());
+      return socket->read(h.reply.authorizer_len);
+    }).then([this] (bufferlist bl) {
+      if (h.authorizer) {
+        auto reply = bl.cbegin();
+        if (!h.authorizer->verify_reply(reply, nullptr)) {
+          logger().error("{} authorizer failed to verify reply", __func__);
+          throw std::system_error(make_error_code(error::negotiation_failure));
+        }
+      }
+      return handle_connect_reply(h.reply.tag);
+    });
+}
+
+void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
+                               const entity_type_t& _peer_type)
+{
+  ceph_assert(state == state_t::none);
+  logger().debug("{} trigger connecting, was {}", conn, static_cast<int>(state));
+  state = state_t::connecting;
+  set_write_state(write_state_t::delay);
+
+  ceph_assert(!socket);
+  conn.peer_addr = _peer_addr;
+  conn.peer_type = _peer_type;
+  messenger.register_conn(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  seastar::with_gate(pending_dispatch, [this] {
+      return seastar::connect(conn.peer_addr.in4_addr())
+        .then([this](seastar::connected_socket fd) {
+          if (state == state_t::closing) {
+            fd.shutdown_input();
+            fd.shutdown_output();
+            throw std::system_error(make_error_code(error::connection_aborted));
+          }
+          socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
+          // read server's handshake header
+          return socket->read(server_header_size);
+        }).then([this] (bufferlist headerbl) {
+          auto p = headerbl.cbegin();
+          validate_banner(p);
+          entity_addr_t saddr, caddr;
+          ::decode(saddr, p);
+          ::decode(caddr, p);
+          ceph_assert(p.end());
+          validate_peer_addr(saddr, conn.peer_addr);
+
+          conn.side = SocketConnection::side_t::connector;
+          conn.socket_port = caddr.get_port();
+          return messenger.learned_addr(caddr);
+        }).then([this] {
+          // encode/send client's handshake header
+          bufferlist bl;
+          bl.append(buffer::create_static(banner_size, banner));
+          ::encode(messenger.get_myaddr(), bl, 0);
+          h.global_seq = messenger.get_global_seq();
+          return socket->write_flush(std::move(bl));
+        }).then([=] {
+          return seastar::repeat([this] {
+            return repeat_connect();
+          });
+        }).then([this] {
+          // notify the dispatcher and allow them to reject the connection
+          return dispatcher.ms_handle_connect(
+            seastar::static_pointer_cast<SocketConnection>(
+              conn.shared_from_this()));
+        }).then([this] {
+          execute_open();
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // TODO: handle fault in the connecting state
+          logger().warn("{} connecting fault: {}", conn, eptr);
+          close();
+        });
+    });
+}
+
+// accepting state
+
+seastar::future<stop_t> ProtocolV1::send_connect_reply(
+    msgr_tag_t tag, bufferlist&& authorizer_reply)
+{
+  h.reply.tag = tag;
+  h.reply.features = static_cast<uint64_t>((h.connect.features &
+                                            conn.policy.features_supported) |
+                                           conn.policy.features_required);
+  h.reply.authorizer_len = authorizer_reply.length();
+  return socket->write(make_static_packet(h.reply))
+    .then([this, reply=std::move(authorizer_reply)]() mutable {
+      return socket->write_flush(std::move(reply));
+    }).then([] {
+      return stop_t::no;
+    });
+}
+
+seastar::future<stop_t> ProtocolV1::send_connect_reply_ready(
+    msgr_tag_t tag, bufferlist&& authorizer_reply)
+{
+  h.global_seq = messenger.get_global_seq();
+  h.reply.tag = tag;
+  h.reply.features = conn.policy.features_supported;
+  h.reply.global_seq = h.global_seq;
+  h.reply.connect_seq = h.connect_seq;
+  h.reply.flags = 0;
+  if (conn.policy.lossy) {
+    h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+  }
+  h.reply.authorizer_len = authorizer_reply.length();
+  return socket->write(make_static_packet(h.reply))
+    .then([this, reply=std::move(authorizer_reply)]() mutable {
+      if (reply.length()) {
+        return socket->write(std::move(reply));
+      } else {
+        return seastar::now();
+      }
+    }).then([this] {
+      if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+        return socket->write_flush(make_static_packet(conn.in_seq))
+          .then([this] {
+            return socket->read_exactly(sizeof(seq_num_t));
+          }).then([this] (auto buf) {
+            auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+            discard_up_to(&conn.out_q, *acked_seq);
+          });
+      } else {
+        return socket->flush();
+      }
+    }).then([this] {
+      return stop_t::yes;
+    });
+}
+
+seastar::future<stop_t> ProtocolV1::replace_existing(
+    SocketConnectionRef existing,
+    bufferlist&& authorizer_reply,
+    bool is_reset_from_peer)
+{
+  msgr_tag_t reply_tag;
+  if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
+      !is_reset_from_peer) {
+    reply_tag = CEPH_MSGR_TAG_SEQ;
+  } else {
+    reply_tag = CEPH_MSGR_TAG_READY;
+  }
+  messenger.unregister_conn(existing);
+  if (!existing->is_lossy()) {
+    // reset the in_seq if this is a hard reset from peer,
+    // otherwise we respect our original connection's value
+    conn.in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
+    // steal outgoing queue and out_seq
+    existing->requeue_sent();
+    std::tie(conn.out_seq, conn.out_q) = existing->get_out_queue();
+  }
+  return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
+}
+
+seastar::future<stop_t> ProtocolV1::handle_connect_with_existing(
+    SocketConnectionRef existing, bufferlist&& authorizer_reply)
+{
+  ProtocolV1 *exproto = dynamic_cast<ProtocolV1*>(existing->protocol.get());
+
+  if (h.connect.global_seq < exproto->peer_global_seq()) {
+    h.reply.global_seq = exproto->peer_global_seq();
+    return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
+  } else if (existing->is_lossy()) {
+    return replace_existing(existing, std::move(authorizer_reply));
+  } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) {
+    return replace_existing(existing, std::move(authorizer_reply), true);
+  } else if (h.connect.connect_seq < exproto->connect_seq()) {
+    // old attempt, or we sent READY but they didn't get it.
+    h.reply.connect_seq = exproto->connect_seq() + 1;
+    return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+  } else if (h.connect.connect_seq == exproto->connect_seq()) {
+    // if the existing connection successfully opened, and/or
+    // subsequently went to standby, then the peer should bump
+    // their connect_seq and retry: this is not a connection race
+    // we need to resolve here.
+    if (exproto->get_state() == state_t::open ||
+        exproto->get_state() == state_t::standby) {
+      if (conn.policy.resetcheck && exproto->connect_seq() == 0) {
+        return replace_existing(existing, std::move(authorizer_reply));
+      } else {
+        h.reply.connect_seq = exproto->connect_seq() + 1;
+        return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+      }
+    } else if (conn.peer_addr < messenger.get_myaddr() ||
+               existing->is_server_side()) {
+      // incoming wins
+      return replace_existing(existing, std::move(authorizer_reply));
+    } else {
+      return send_connect_reply(CEPH_MSGR_TAG_WAIT);
+    }
+  } else if (conn.policy.resetcheck &&
+             exproto->connect_seq() == 0) {
+    return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
+  } else {
+    return replace_existing(existing, std::move(authorizer_reply));
+  }
+}
+
+bool ProtocolV1::require_auth_feature() const
+{
+  if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
+    return false;
+  }
+  if (conf.cephx_require_signatures) {
+    return true;
+  }
+  if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
+      h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
+    return conf.cephx_cluster_require_signatures;
+  } else {
+    return conf.cephx_service_require_signatures;
+  }
+}
+
+seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
+{
+  return socket->read(sizeof(h.connect))
+    .then([this](bufferlist bl) {
+      auto p = bl.cbegin();
+      ::decode(h.connect, p);
+      conn.peer_type = h.connect.host_type;
+      return socket->read(h.connect.authorizer_len);
+    }).then([this] (bufferlist authorizer) {
+      if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
+        return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+            CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
+      }
+      if (require_auth_feature()) {
+        conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+      }
+      if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features;
+          feat_missing != 0) {
+        return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+            CEPH_MSGR_TAG_FEATURES, bufferlist{});
+      }
+      return dispatcher.ms_verify_authorizer(conn.peer_type,
+                                             h.connect.authorizer_protocol,
+                                             authorizer);
+    }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
+      memset(&h.reply, 0, sizeof(h.reply));
+      if (tag) {
+        return send_connect_reply(tag, std::move(authorizer_reply));
+      }
+      if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) {
+        if (existing->protocol->proto_type != 1) {
+          logger().warn("{} existing {} proto version is {} not 1, close existing",
+                        conn, *existing, existing->protocol->proto_type);
+          existing->close();
+        } else {
+          return handle_connect_with_existing(existing, std::move(authorizer_reply));
+        }
+      }
+      if (h.connect.connect_seq > 0) {
+        return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
+                                  std::move(authorizer_reply));
+      }
+      h.connect_seq = h.connect.connect_seq + 1;
+      h.peer_global_seq = h.connect.global_seq;
+      conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features);
+      // TODO: cct
+      return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
+    });
+}
+
+void ProtocolV1::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock,
+                              const entity_addr_t& _peer_addr)
+{
+  ceph_assert(state == state_t::none);
+  logger().debug("{} trigger accepting, was {}",
+                 conn, static_cast<int>(state));
+  state = state_t::accepting;
+  set_write_state(write_state_t::delay);
+
+  ceph_assert(!socket);
+  conn.peer_addr.u = _peer_addr.u;
+  conn.peer_addr.set_port(0);
+  conn.side = SocketConnection::side_t::acceptor;
+  conn.socket_port = _peer_addr.get_port();
+  socket = std::move(sock);
+  messenger.accept_conn(
+    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  seastar::with_gate(pending_dispatch, [this, _peer_addr] {
+      // encode/send server's handshake header
+      bufferlist bl;
+      bl.append(buffer::create_static(banner_size, banner));
+      ::encode(messenger.get_myaddr(), bl, 0);
+      ::encode(_peer_addr, bl, 0);
+      return socket->write_flush(std::move(bl))
+        .then([this] {
+          // read client's handshake header and connect request
+          return socket->read(client_header_size);
+        }).then([this] (bufferlist bl) {
+          auto p = bl.cbegin();
+          validate_banner(p);
+          entity_addr_t addr;
+          ::decode(addr, p);
+          ceph_assert(p.end());
+          conn.peer_addr.set_type(addr.get_type());
+          conn.peer_addr.set_port(addr.get_port());
+          conn.peer_addr.set_nonce(addr.get_nonce());
+          return seastar::repeat([this] {
+            return repeat_handle_connect();
+          });
+        }).then([this] {
+          // notify the dispatcher and allow them to reject the connection
+          return dispatcher.ms_handle_accept(
+            seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+        }).then([this] {
+          messenger.register_conn(
+            seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+          messenger.unaccept_conn(
+            seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+          execute_open();
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // TODO: handle fault in the accepting state
+          logger().warn("{} accepting fault: {}", conn, eptr);
+          close();
+        });
+    });
+}
+
+// open state
+
+seastar::future<> ProtocolV1::write_message(MessageRef msg)
+{
+  msg->set_seq(++conn.out_seq);
+  auto& header = msg->get_header();
+  header.src = messenger.get_myname();
+  msg->encode(conn.features, messenger.get_crc_flags());
+  bufferlist bl;
+  bl.append(CEPH_MSGR_TAG_MSG);
+  bl.append((const char*)&header, sizeof(header));
+  bl.append(msg->get_payload());
+  bl.append(msg->get_middle());
+  bl.append(msg->get_data());
+  auto& footer = msg->get_footer();
+  if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+    bl.append((const char*)&footer, sizeof(footer));
+  } else {
+    ceph_msg_footer_old old_footer;
+    if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
+      old_footer.front_crc = footer.front_crc;
+      old_footer.middle_crc = footer.middle_crc;
+    } else {
+      old_footer.front_crc = old_footer.middle_crc = 0;
+    }
+    if (messenger.get_crc_flags() & MSG_CRC_DATA) {
+      old_footer.data_crc = footer.data_crc;
+    } else {
+      old_footer.data_crc = 0;
+    }
+    old_footer.flags = footer.flags;
+    bl.append((const char*)&old_footer, sizeof(old_footer));
+  }
+  // write as a seastar::net::packet
+  return socket->write(std::move(bl));
+  // TODO: lossless policy
+  //  .then([this, msg = std::move(msg)] {
+  //    if (!policy.lossy) {
+  //      sent.push(std::move(msg));
+  //    }
+  //  });
+}
+
+seastar::future<> ProtocolV1::do_keepalive()
+{
+  k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+    ceph::coarse_real_clock::now());
+  logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
+  return socket->write(make_static_packet(k.req));
+}
+
+seastar::future<> ProtocolV1::do_keepalive_ack()
+{
+  logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
+  return socket->write(make_static_packet(k.ack));
+}
+
+seastar::future<> ProtocolV1::handle_keepalive2_ack()
+{
+  return socket->read_exactly(sizeof(ceph_timespec))
+    .then([this] (auto buf) {
+      auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+      k.ack_stamp = *t;
+      logger().debug("{} got keepalive2 ack {}", conn, t->tv_sec);
+    });
+}
+
+seastar::future<> ProtocolV1::handle_keepalive2()
+{
+  return socket->read_exactly(sizeof(ceph_timespec))
+    .then([this] (auto buf) {
+      k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
+      logger().debug("{} got keepalive2 {}", conn, k.ack.stamp.tv_sec);
+      notify_keepalive_ack();
+    });
+}
+
+seastar::future<> ProtocolV1::handle_ack()
+{
+  return socket->read_exactly(sizeof(ceph_le64))
+    .then([this] (auto buf) {
+      auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+      discard_up_to(&conn.sent, *seq);
+    });
+}
+
+seastar::future<> ProtocolV1::maybe_throttle()
+{
+  if (!conn.policy.throttler_bytes) {
+    return seastar::now();
+  }
+  const auto to_read = (m.header.front_len +
+                        m.header.middle_len +
+                        m.header.data_len);
+  return conn.policy.throttler_bytes->get(to_read);
+}
+
+seastar::future<> ProtocolV1::read_message()
+{
+  return socket->read(sizeof(m.header))
+    .then([this] (bufferlist bl) {
+      // throttle the traffic, maybe
+      auto p = bl.cbegin();
+      ::decode(m.header, p);
+      return maybe_throttle();
+    }).then([this] {
+      // read front
+      return socket->read(m.header.front_len);
+    }).then([this] (bufferlist bl) {
+      m.front = std::move(bl);
+      // read middle
+      return socket->read(m.header.middle_len);
+    }).then([this] (bufferlist bl) {
+      m.middle = std::move(bl);
+      // read data
+      return socket->read(m.header.data_len);
+    }).then([this] (bufferlist bl) {
+      m.data = std::move(bl);
+      // read footer
+      return socket->read(sizeof(m.footer));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.cbegin();
+      ::decode(m.footer, p);
+      auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
+                                  m.front, m.middle, m.data, nullptr);
+      // TODO: set time stamps
+      msg->set_byte_throttler(conn.policy.throttler_bytes);
+
+      if (!conn.update_rx_seq(msg->get_seq())) {
+        // skip this message
+        return;
+      }
+
+      constexpr bool add_ref = false; // Message starts with 1 ref
+      // TODO: change MessageRef with foreign_ptr
+      auto msg_ref = MessageRef{msg, add_ref};
+      // start dispatch, ignoring exceptions from the application layer
+      seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+          logger().debug("{} <= {}@{} === {}", messenger,
+                msg->get_source(), conn.peer_addr, *msg);
+          return dispatcher.ms_dispatch(
+              seastar::static_pointer_cast<SocketConnection>(
+                conn.shared_from_this()),
+              std::move(msg))
+            .handle_exception([this] (std::exception_ptr eptr) {
+              logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
+              ceph_assert(false);
+            });
+        });
+    });
+}
+
+seastar::future<> ProtocolV1::handle_tags()
+{
+  return seastar::keep_doing([this] {
+      // read the next tag
+      return socket->read_exactly(1)
+        .then([this] (auto buf) {
+          switch (buf[0]) {
+          case CEPH_MSGR_TAG_MSG:
+            return read_message();
+          case CEPH_MSGR_TAG_ACK:
+            return handle_ack();
+          case CEPH_MSGR_TAG_KEEPALIVE:
+            return seastar::now();
+          case CEPH_MSGR_TAG_KEEPALIVE2:
+            return handle_keepalive2();
+          case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+            return handle_keepalive2_ack();
+          case CEPH_MSGR_TAG_CLOSE:
+            logger().info("{} got tag close", conn);
+            throw std::system_error(make_error_code(error::connection_aborted));
+          default:
+            logger().error("{} got unknown msgr tag {}",
+                           conn, static_cast<int>(buf[0]));
+            throw std::system_error(make_error_code(error::read_eof));
+          }
+        });
+    });
+}
+
+void ProtocolV1::execute_open()
+{
+  logger().debug("{} trigger open, was {}", conn, static_cast<int>(state));
+  state = state_t::open;
+  set_write_state(write_state_t::open);
+
+  seastar::with_gate(pending_dispatch, [this] {
+      // start background processing of tags
+      return handle_tags()
+        .handle_exception_type([this] (const std::system_error& e) {
+          logger().warn("{} open fault: {}", conn, e);
+          if (e.code() == error::connection_aborted ||
+              e.code() == error::connection_reset) {
+            return dispatcher.ms_handle_reset(
+                seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+              .then([this] {
+                close();
+              });
+          } else if (e.code() == error::read_eof) {
+            return dispatcher.ms_handle_remote_reset(
+                seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+              .then([this] {
+                close();
+              });
+          } else {
+            throw e;
+          }
+        }).handle_exception([this] (std::exception_ptr eptr) {
+          // TODO: handle fault in the open state
+          logger().warn("{} open fault: {}", conn, eptr);
+          close();
+        });
+    });
+}
+
+// closing state
+
+void ProtocolV1::trigger_close()
+{
+  logger().debug("{} trigger closing, was {}",
+                 conn, static_cast<int>(state));
+
+  if (state == state_t::accepting) {
+    messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(
+      conn.shared_from_this()));
+  } else if (state >= state_t::connecting && state < state_t::closing) {
+    messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
+      conn.shared_from_this()));
+  } else {
+    // cannot happen
+    ceph_assert(false);
+  }
+
+  if (!socket) {
+    ceph_assert(state == state_t::connecting);
+  }
+
+  state = state_t::closing;
+}
+
+seastar::future<> ProtocolV1::fault()
+{
+  if (conn.policy.lossy) {
+    messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+  }
+  if (h.backoff.count()) {
+    h.backoff += h.backoff;
+  } else {
+    h.backoff = conf.ms_initial_backoff;
+  }
+  if (h.backoff > conf.ms_max_backoff) {
+    h.backoff = conf.ms_max_backoff;
+  }
+  return seastar::sleep(h.backoff);
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h
new file mode 100644 (file)
index 0000000..183c7e6
--- /dev/null
@@ -0,0 +1,125 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "Protocol.h"
+
+class AuthAuthorizer;
+class AuthSessionHandler;
+
+namespace ceph::net {
+
+class ProtocolV1 final : public Protocol {
+ public:
+  ProtocolV1(Dispatcher& dispatcher,
+             SocketConnection& conn,
+             SocketMessenger& messenger);
+  ~ProtocolV1() override;
+
+ private:
+  void start_connect(const entity_addr_t& peer_addr,
+                     const entity_type_t& peer_type) override;
+
+  void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
+                    const entity_addr_t& peer_addr) override;
+
+  void trigger_close() override;
+
+  seastar::future<> write_message(MessageRef msg) override;
+
+  seastar::future<> do_keepalive() override;
+
+  seastar::future<> do_keepalive_ack() override;
+
+ private:
+  SocketMessenger &messenger;
+
+  enum class state_t {
+    none,
+    accepting,
+    connecting,
+    open,
+    standby,
+    wait,
+    closing
+  };
+  state_t state = state_t::none;
+
+  // state for handshake
+  struct Handshake {
+    ceph_msg_connect connect;
+    ceph_msg_connect_reply reply;
+    AuthAuthorizer* authorizer = nullptr;
+    std::chrono::milliseconds backoff;
+    uint32_t connect_seq = 0;
+    uint32_t peer_global_seq = 0;
+    uint32_t global_seq;
+  } h;
+
+  std::unique_ptr<AuthSessionHandler> session_security;
+
+  // state for an incoming message
+  struct MessageReader {
+    ceph_msg_header header;
+    ceph_msg_footer footer;
+    bufferlist front;
+    bufferlist middle;
+    bufferlist data;
+  } m;
+
+  struct Keepalive {
+    struct {
+      const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
+      ceph_timespec stamp;
+    } __attribute__((packed)) req;
+    struct {
+      const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+      ceph_timespec stamp;
+    } __attribute__((packed)) ack;
+    ceph_timespec ack_stamp;
+  } k;
+
+ private:
+  // connecting
+  void reset_session();
+  seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
+  seastar::future<stop_t> repeat_connect();
+
+  // accepting
+  seastar::future<stop_t> send_connect_reply(
+      msgr_tag_t tag, bufferlist&& authorizer_reply = {});
+  seastar::future<stop_t> send_connect_reply_ready(
+      msgr_tag_t tag, bufferlist&& authorizer_reply);
+  seastar::future<stop_t> replace_existing(
+      SocketConnectionRef existing,
+      bufferlist&& authorizer_reply,
+      bool is_reset_from_peer = false);
+  seastar::future<stop_t> handle_connect_with_existing(
+      SocketConnectionRef existing, bufferlist&& authorizer_reply);
+  bool require_auth_feature() const;
+  seastar::future<stop_t> repeat_handle_connect();
+
+  // open
+  seastar::future<> handle_keepalive2_ack();
+  seastar::future<> handle_keepalive2();
+  seastar::future<> handle_ack();
+  seastar::future<> maybe_throttle();
+  seastar::future<> read_message();
+  seastar::future<> handle_tags();
+  void execute_open();
+
+  // replacing
+  // the number of connections initiated in this session, increment when a
+  // new connection is established
+  uint32_t connect_seq() const { return h.connect_seq; }
+  // the client side should connect us with a gseq. it will be reset with
+  // the one of exsting connection if it's greater.
+  uint32_t peer_global_seq() const { return h.peer_global_seq; }
+  // current state of ProtocolV1
+  state_t get_state() const { return state; }
+
+  seastar::future<> fault();
+};
+
+} // namespace ceph::net
index ff0c9c07b378d434f230c292ce1ea025ca0bff1a..9a547960a33a9267a273c6031041966fcfe52747 100644 (file)
 
 #include "SocketConnection.h"
 
-#include <algorithm>
-#include <seastar/core/shared_future.hh>
-#include <seastar/core/sleep.hh>
-#include <seastar/net/packet.hh>
-
-#include "include/msgr.h"
-#include "include/random.h"
-#include "auth/Auth.h"
-#include "auth/AuthSessionHandler.h"
-
-#include "crimson/common/log.h"
 #include "Config.h"
-#include "Dispatcher.h"
-#include "Errors.h"
+#include "ProtocolV1.h"
 #include "SocketMessenger.h"
 
 using namespace ceph::net;
 
-template <typename T>
-seastar::net::packet make_static_packet(const T& value) {
-    return { reinterpret_cast<const char*>(&value), sizeof(value) };
-}
-
 namespace {
   seastar::logger& logger() {
     return ceph::get_logger(ceph_subsys_ms);
@@ -45,16 +28,13 @@ namespace {
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    Dispatcher& dispatcher)
-  : messenger(messenger),
-    dispatcher(dispatcher)
+  : messenger(messenger)
 {
   ceph_assert(&messenger.container().local() == &messenger);
+  protocol = std::make_unique<ProtocolV1>(dispatcher, *this, messenger);
 }
 
-SocketConnection::~SocketConnection()
-{
-  ceph_assert(pending_dispatch.is_closed());
-}
+SocketConnection::~SocketConnection() {}
 
 ceph::net::Messenger*
 SocketConnection::get_messenger() const {
@@ -64,156 +44,32 @@ SocketConnection::get_messenger() const {
 seastar::future<bool> SocketConnection::is_connected()
 {
   return seastar::smp::submit_to(shard_id(), [this] {
-      return write_state == write_state_t::open;
-    });
-}
-
-void SocketConnection::write_event()
-{
-  if (write_dispatching) {
-    // already dispatching
-    return;
-  }
-  write_dispatching = true;
-  switch (write_state) {
-   case write_state_t::open:
-   case write_state_t::delay:
-    seastar::with_gate(pending_dispatch, [this] {
-      return seastar::repeat([this] {
-        switch (write_state) {
-         case write_state_t::open:
-          return seastar::futurize_apply([this] {
-            if (m_keepalive) {
-              return do_keepalive()
-              .then([this] { m_keepalive = false; });
-            }
-            return seastar::now();
-          }).then([this] {
-            if (m_keepalive_ack) {
-              return do_keepalive_ack()
-              .then([this] { m_keepalive_ack = false; });
-            }
-            return seastar::now();
-          }).then([this] {
-            if (!out_q.empty()){
-              MessageRef msg = out_q.front();
-              return write_message(msg)
-              .then([this, msg] {
-                if (msg == out_q.front()) {
-                  out_q.pop();
-                }
-                return stop_t::no;
-              });
-            } else {
-              return socket->flush()
-              .then([this] {
-                if (!out_q.empty()) {
-                  return stop_t::no;
-                } else {
-                  write_dispatching = false;
-                  return stop_t::yes;
-                }
-              });
-            }
-          }).handle_exception([this] (std::exception_ptr eptr) {
-            logger().warn("{} write_event fault: {}", *this, eptr);
-            close();
-            return stop_t::no;
-          });
-         case write_state_t::delay:
-          // delay dispatching writes until open
-          return state_changed.get_shared_future()
-          .then([] { return stop_t::no; });
-         case write_state_t::drop:
-          write_dispatching = false;
-          return seastar::make_ready_future<stop_t>(stop_t::yes);
-         default:
-          ceph_assert(false);
-        }
-      });
+      return protocol->is_connected();
     });
-    return;
-   case write_state_t::drop:
-    write_dispatching = false;
-   default:
-    ceph_assert(false);
-  }
 }
 
 seastar::future<> SocketConnection::send(MessageRef msg)
 {
   logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg);
   return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
-    if (write_state != write_state_t::drop) {
-      out_q.push(std::move(msg));
-      write_event();
-    }
+    return protocol->send(std::move(msg));
   });
 }
 
 seastar::future<> SocketConnection::keepalive()
 {
   return seastar::smp::submit_to(shard_id(), [this] {
-    if (!m_keepalive) {
-      m_keepalive = true;
-      write_event();
-    }
+    return protocol->keepalive();
   });
 }
 
 seastar::future<> SocketConnection::close()
 {
   return seastar::smp::submit_to(shard_id(), [this] {
-      return do_close();
-    });
-}
-
-seastar::future<> SocketConnection::handle_tags()
-{
-  return seastar::keep_doing([this] {
-      // read the next tag
-      return socket->read_exactly(1)
-        .then([this] (auto buf) {
-          switch (buf[0]) {
-          case CEPH_MSGR_TAG_MSG:
-            return read_message();
-          case CEPH_MSGR_TAG_ACK:
-            return handle_ack();
-          case CEPH_MSGR_TAG_KEEPALIVE:
-            return seastar::now();
-          case CEPH_MSGR_TAG_KEEPALIVE2:
-            return handle_keepalive2();
-          case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
-            return handle_keepalive2_ack();
-          case CEPH_MSGR_TAG_CLOSE:
-            logger().info("{} got tag close", *this);
-            throw std::system_error(make_error_code(error::connection_aborted));
-          default:
-            logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0]));
-            throw std::system_error(make_error_code(error::read_eof));
-          }
-        });
-    });
-}
-
-seastar::future<> SocketConnection::handle_ack()
-{
-  return socket->read_exactly(sizeof(ceph_le64))
-    .then([this] (auto buf) {
-      auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
-      discard_up_to(&sent, *seq);
+      return protocol->close();
     });
 }
 
-void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
-                                     seq_num_t seq)
-{
-  while (!queue->empty() &&
-         queue->front()->get_seq() < seq) {
-    queue->pop();
-  }
-}
-
 void SocketConnection::requeue_sent()
 {
   out_seq -= sent.size();
@@ -224,71 +80,6 @@ void SocketConnection::requeue_sent()
   }
 }
 
-seastar::future<> SocketConnection::maybe_throttle()
-{
-  if (!policy.throttler_bytes) {
-    return seastar::now();
-  }
-  const auto to_read = (m.header.front_len +
-                        m.header.middle_len +
-                        m.header.data_len);
-  return policy.throttler_bytes->get(to_read);
-}
-
-seastar::future<> SocketConnection::read_message()
-{
-  return socket->read(sizeof(m.header))
-    .then([this] (bufferlist bl) {
-      // throttle the traffic, maybe
-      auto p = bl.cbegin();
-      ::decode(m.header, p);
-      return maybe_throttle();
-    }).then([this] {
-      // read front
-      return socket->read(m.header.front_len);
-    }).then([this] (bufferlist bl) {
-      m.front = std::move(bl);
-      // read middle
-      return socket->read(m.header.middle_len);
-    }).then([this] (bufferlist bl) {
-      m.middle = std::move(bl);
-      // read data
-      return socket->read(m.header.data_len);
-    }).then([this] (bufferlist bl) {
-      m.data = std::move(bl);
-      // read footer
-      return socket->read(sizeof(m.footer));
-    }).then([this] (bufferlist bl) {
-      auto p = bl.cbegin();
-      ::decode(m.footer, p);
-      auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
-                                  m.front, m.middle, m.data, nullptr);
-      // TODO: set time stamps
-      msg->set_byte_throttler(policy.throttler_bytes);
-
-      if (!update_rx_seq(msg->get_seq())) {
-        // skip this message
-        return;
-      }
-
-      constexpr bool add_ref = false; // Message starts with 1 ref
-      // TODO: change MessageRef with foreign_ptr
-      auto msg_ref = MessageRef{msg, add_ref};
-      // start dispatch, ignoring exceptions from the application layer
-      seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
-         logger().debug("{} <= {}@{} === {}", messenger,
-                msg->get_source(), get_peer_addr(), *msg);
-          return dispatcher.ms_dispatch(
-              seastar::static_pointer_cast<SocketConnection>(shared_from_this()),
-              std::move(msg))
-            .handle_exception([this] (std::exception_ptr eptr) {
-              logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
-              ceph_assert(false);
-            });
-        });
-    });
-}
-
 bool SocketConnection::update_rx_seq(seq_num_t seq)
 {
   if (seq <= in_seq) {
@@ -308,707 +99,18 @@ bool SocketConnection::update_rx_seq(seq_num_t seq)
   }
 }
 
-seastar::future<> SocketConnection::write_message(MessageRef msg)
-{
-  msg->set_seq(++out_seq);
-  auto& header = msg->get_header();
-  header.src = messenger.get_myname();
-  msg->encode(features, messenger.get_crc_flags());
-  bufferlist bl;
-  bl.append(CEPH_MSGR_TAG_MSG);
-  bl.append((const char*)&header, sizeof(header));
-  bl.append(msg->get_payload());
-  bl.append(msg->get_middle());
-  bl.append(msg->get_data());
-  auto& footer = msg->get_footer();
-  if (HAVE_FEATURE(features, MSG_AUTH)) {
-    bl.append((const char*)&footer, sizeof(footer));
-  } else {
-    ceph_msg_footer_old old_footer;
-    if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
-      old_footer.front_crc = footer.front_crc;
-      old_footer.middle_crc = footer.middle_crc;
-    } else {
-      old_footer.front_crc = old_footer.middle_crc = 0;
-    }
-    if (messenger.get_crc_flags() & MSG_CRC_DATA) {
-      old_footer.data_crc = footer.data_crc;
-    } else {
-      old_footer.data_crc = 0;
-    }
-    old_footer.flags = footer.flags;
-    bl.append((const char*)&old_footer, sizeof(old_footer));
-  }
-  // write as a seastar::net::packet
-  return socket->write(std::move(bl));
-  // TODO: lossless policy
-  //  .then([this, msg = std::move(msg)] {
-  //    if (!policy.lossy) {
-  //      sent.push(std::move(msg));
-  //    }
-  //  });
-}
-
-seastar::future<> SocketConnection::do_keepalive()
-{
-  k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
-    ceph::coarse_real_clock::now());
-  logger().debug("{} write keepalive2 {}", *this, k.req.stamp.tv_sec);
-  return socket->write(make_static_packet(k.req));
-}
-
-seastar::future<> SocketConnection::do_keepalive_ack()
-{
-  logger().debug("{} write keepalive2 ack {}", *this, k.ack.stamp.tv_sec);
-  return socket->write(make_static_packet(k.ack));
-}
-
-seastar::future<> SocketConnection::do_close()
-{
-  if (state == state_t::closing) {
-    // already closing
-    assert(close_ready.valid());
-    return close_ready.get_future();
-  }
-
-  // unregister_conn() drops a reference, so hold another until completion
-  auto cleanup = [conn_ref = shared_from_this(), this] {
-      logger().debug("{} closed!", *this);
-    };
-
-  if (state == state_t::accepting) {
-    messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  } else if (state >= state_t::connecting && state < state_t::closing) {
-    messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  } else {
-    // cannot happen
-    ceph_assert(false);
-  }
-
-  // close_ready become valid only after state is state_t::closing
-  assert(!close_ready.valid());
-
-  if (socket) {
-    close_ready = socket->close()
-      .then([this] {
-        return pending_dispatch.close();
-      }).finally(std::move(cleanup));
-  } else {
-    ceph_assert(state == state_t::connecting);
-    close_ready = pending_dispatch.close().finally(std::move(cleanup));
-  }
-
-  logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state));
-  state = state_t::closing;
-  write_state = write_state_t::drop;
-  state_changed.set_value();
-  state_changed = seastar::shared_promise<>();
-
-  return close_ready.get_future();
-}
-
-// handshake
-
-/// store the banner in a non-const string for buffer::create_static()
-static char banner[] = CEPH_BANNER;
-constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
-
-constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
-constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
-
-WRITE_RAW_ENCODER(ceph_msg_connect);
-WRITE_RAW_ENCODER(ceph_msg_connect_reply);
-
-std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
-{
-  return out << "connect{features=" << std::hex << c.features << std::dec
-      << " host_type=" << c.host_type
-      << " global_seq=" << c.global_seq
-      << " connect_seq=" << c.connect_seq
-      << " protocol_version=" << c.protocol_version
-      << " authorizer_protocol=" << c.authorizer_protocol
-      << " authorizer_len=" << c.authorizer_len
-      << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
-}
-
-std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
-{
-  return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
-      << " features=" << std::hex << r.features << std::dec
-      << " global_seq=" << r.global_seq
-      << " connect_seq=" << r.connect_seq
-      << " protocol_version=" << r.protocol_version
-      << " authorizer_len=" << r.authorizer_len
-      << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
-}
-
-// check that the buffer starts with a valid banner without requiring it to
-// be contiguous in memory
-static void validate_banner(bufferlist::const_iterator& p)
-{
-  auto b = std::cbegin(banner);
-  auto end = b + banner_size;
-  while (b != end) {
-    const char *buf{nullptr};
-    auto remaining = std::distance(b, end);
-    auto len = p.get_ptr_and_advance(remaining, &buf);
-    if (!std::equal(buf, buf + len, b)) {
-      throw std::system_error(make_error_code(error::bad_connect_banner));
-    }
-    b += len;
-  }
-}
-
-// make sure that we agree with the peer about its address
-static void validate_peer_addr(const entity_addr_t& addr,
-                               const entity_addr_t& expected)
-{
-  if (addr == expected) {
-    return;
-  }
-  // ok if server bound anonymously, as long as port/nonce match
-  if (addr.is_blank_ip() &&
-      addr.get_port() == expected.get_port() &&
-      addr.get_nonce() == expected.get_nonce()) {
-    return;
-  } else {
-    throw std::system_error(make_error_code(error::bad_peer_address));
-  }
-}
-
-/// return a static bufferptr to the given object
-template <typename T>
-bufferptr create_static(T& obj)
-{
-  return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
-}
-
-bool SocketConnection::require_auth_feature() const
-{
-  if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
-    return false;
-  }
-  if (conf.cephx_require_signatures) {
-    return true;
-  }
-  if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
-      h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
-    return conf.cephx_cluster_require_signatures;
-  } else {
-    return conf.cephx_service_require_signatures;
-  }
-}
-
-uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
-{
-  constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
-  // see also OSD.h, unlike other connection of simple/async messenger,
-  // crimson msgr is only used by osd
-  constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
-  if (peer_type == my_type) {
-    // internal
-    return CEPH_OSD_PROTOCOL;
-  } else {
-    // public
-    switch (connect ? peer_type : my_type) {
-      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
-      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
-      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
-      default: return 0;
-    }
-  }
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::repeat_handle_connect()
-{
-  return socket->read(sizeof(h.connect))
-    .then([this](bufferlist bl) {
-      auto p = bl.cbegin();
-      ::decode(h.connect, p);
-      peer_type = h.connect.host_type;
-      return socket->read(h.connect.authorizer_len);
-    }).then([this] (bufferlist authorizer) {
-      if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
-       return seastar::make_ready_future<msgr_tag_t, bufferlist>(
-            CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
-      }
-      if (require_auth_feature()) {
-       policy.features_required |= CEPH_FEATURE_MSG_AUTH;
-      }
-      if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
-         feat_missing != 0) {
-        return seastar::make_ready_future<msgr_tag_t, bufferlist>(
-            CEPH_MSGR_TAG_FEATURES, bufferlist{});
-      }
-      return dispatcher.ms_verify_authorizer(peer_type,
-                                             h.connect.authorizer_protocol,
-                                             authorizer);
-    }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
-      memset(&h.reply, 0, sizeof(h.reply));
-      if (tag) {
-       return send_connect_reply(tag, std::move(authorizer_reply));
-      }
-      if (auto existing = messenger.lookup_conn(peer_addr); existing) {
-       return handle_connect_with_existing(existing, std::move(authorizer_reply));
-      } else if (h.connect.connect_seq > 0) {
-       return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
-                                 std::move(authorizer_reply));
-      }
-      h.connect_seq = h.connect.connect_seq + 1;
-      h.peer_global_seq = h.connect.global_seq;
-      set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
-      // TODO: cct
-      return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
-    });
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::send_connect_reply(msgr_tag_t tag,
-                                     bufferlist&& authorizer_reply)
-{
-  h.reply.tag = tag;
-  h.reply.features = static_cast<uint64_t>((h.connect.features &
-                                           policy.features_supported) |
-                                          policy.features_required);
-  h.reply.authorizer_len = authorizer_reply.length();
-  return socket->write(make_static_packet(h.reply))
-    .then([this, reply=std::move(authorizer_reply)]() mutable {
-      return socket->write_flush(std::move(reply));
-    }).then([] {
-      return stop_t::no;
-    });
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
-                                           bufferlist&& authorizer_reply)
-{
-  h.global_seq = messenger.get_global_seq();
-  h.reply.tag = tag;
-  h.reply.features = policy.features_supported;
-  h.reply.global_seq = h.global_seq;
-  h.reply.connect_seq = h.connect_seq;
-  h.reply.flags = 0;
-  if (policy.lossy) {
-    h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
-  }
-  h.reply.authorizer_len = authorizer_reply.length();
-  return socket->write(make_static_packet(h.reply))
-    .then([this, reply=std::move(authorizer_reply)]() mutable {
-      if (reply.length()) {
-        return socket->write(std::move(reply));
-      } else {
-        return seastar::now();
-      }
-    }).then([this] {
-      if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
-        return socket->write_flush(make_static_packet(in_seq))
-          .then([this] {
-            return socket->read_exactly(sizeof(seq_num_t));
-          }).then([this] (auto buf) {
-            auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
-            discard_up_to(&out_q, *acked_seq);
-          });
-      } else {
-        return socket->flush();
-      }
-    }).then([this] {
-      return stop_t::yes;
-    });
-}
-
-seastar::future<>
-SocketConnection::handle_keepalive2()
-{
-  return socket->read_exactly(sizeof(ceph_timespec))
-    .then([this] (auto buf) {
-      k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
-      logger().debug("{} got keepalive2 {}", *this, k.ack.stamp.tv_sec);
-      if (!m_keepalive_ack) {
-        m_keepalive_ack = true;
-        write_event();
-      }
-    });
-}
-
-seastar::future<>
-SocketConnection::handle_keepalive2_ack()
-{
-  return socket->read_exactly(sizeof(ceph_timespec))
-    .then([this] (auto buf) {
-      auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
-      k.ack_stamp = *t;
-      logger().debug("{} got keepalive2 ack {}", *this, t->tv_sec);
-    });
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
-{
-  if (h.connect.global_seq < existing->peer_global_seq()) {
-    h.reply.global_seq = existing->peer_global_seq();
-    return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
-  } else if (existing->is_lossy()) {
-    return replace_existing(existing, std::move(authorizer_reply));
-  } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
-    return replace_existing(existing, std::move(authorizer_reply), true);
-  } else if (h.connect.connect_seq < existing->connect_seq()) {
-    // old attempt, or we sent READY but they didn't get it.
-    h.reply.connect_seq = existing->connect_seq() + 1;
-    return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
-  } else if (h.connect.connect_seq == existing->connect_seq()) {
-    // if the existing connection successfully opened, and/or
-    // subsequently went to standby, then the peer should bump
-    // their connect_seq and retry: this is not a connection race
-    // we need to resolve here.
-    if (existing->get_state() == state_t::open ||
-       existing->get_state() == state_t::standby) {
-      if (policy.resetcheck && existing->connect_seq() == 0) {
-       return replace_existing(existing, std::move(authorizer_reply));
-      } else {
-       h.reply.connect_seq = existing->connect_seq() + 1;
-       return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
-      }
-    } else if (peer_addr < messenger.get_myaddr() ||
-              existing->is_server_side()) {
-      // incoming wins
-      return replace_existing(existing, std::move(authorizer_reply));
-    } else {
-      return send_connect_reply(CEPH_MSGR_TAG_WAIT);
-    }
-  } else if (policy.resetcheck &&
-            existing->connect_seq() == 0) {
-    return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
-  } else {
-    return replace_existing(existing, std::move(authorizer_reply));
-  }
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::replace_existing(SocketConnectionRef existing,
-                                   bufferlist&& authorizer_reply,
-                                   bool is_reset_from_peer)
-{
-  msgr_tag_t reply_tag;
-  if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
-      !is_reset_from_peer) {
-    reply_tag = CEPH_MSGR_TAG_SEQ;
-  } else {
-    reply_tag = CEPH_MSGR_TAG_READY;
-  }
-  messenger.unregister_conn(existing);
-  if (!existing->is_lossy()) {
-    // reset the in_seq if this is a hard reset from peer,
-    // otherwise we respect our original connection's value
-    in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
-    // steal outgoing queue and out_seq
-    existing->requeue_sent();
-    std::tie(out_seq, out_q) = existing->get_out_queue();
-  }
-  return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::handle_connect_reply(msgr_tag_t tag)
-{
-  switch (tag) {
-  case CEPH_MSGR_TAG_FEATURES:
-    logger().error("{} connect protocol feature mispatch", __func__);
-    throw std::system_error(make_error_code(error::negotiation_failure));
-  case CEPH_MSGR_TAG_BADPROTOVER:
-    logger().error("{} connect protocol version mispatch", __func__);
-    throw std::system_error(make_error_code(error::negotiation_failure));
-  case CEPH_MSGR_TAG_BADAUTHORIZER:
-    logger().error("{} got bad authorizer", __func__);
-    throw std::system_error(make_error_code(error::negotiation_failure));
-  case CEPH_MSGR_TAG_RESETSESSION:
-    reset_session();
-    return seastar::make_ready_future<stop_t>(stop_t::no);
-  case CEPH_MSGR_TAG_RETRY_GLOBAL:
-    h.global_seq = messenger.get_global_seq(h.reply.global_seq);
-    return seastar::make_ready_future<stop_t>(stop_t::no);
-  case CEPH_MSGR_TAG_RETRY_SESSION:
-    ceph_assert(h.reply.connect_seq > h.connect_seq);
-    h.connect_seq = h.reply.connect_seq;
-    return seastar::make_ready_future<stop_t>(stop_t::no);
-  case CEPH_MSGR_TAG_WAIT:
-    // TODO: state wait
-    throw std::system_error(make_error_code(error::negotiation_failure));
-  case CEPH_MSGR_TAG_SEQ:
-  case CEPH_MSGR_TAG_READY:
-    if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
-        missing) {
-      logger().error("{} missing required features", __func__);
-      throw std::system_error(make_error_code(error::negotiation_failure));
-    }
-    return seastar::futurize_apply([this, tag] {
-        if (tag == CEPH_MSGR_TAG_SEQ) {
-          return socket->read_exactly(sizeof(seq_num_t))
-            .then([this] (auto buf) {
-              auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
-              discard_up_to(&out_q, *acked_seq);
-              return socket->write_flush(make_static_packet(in_seq));
-            });
-        }
-        // tag CEPH_MSGR_TAG_READY
-        return seastar::now();
-      }).then([this] {
-        // hooray!
-        h.peer_global_seq = h.reply.global_seq;
-        policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
-        h.connect_seq++;
-        h.backoff = 0ms;
-        set_features(h.reply.features & h.connect.features);
-        if (h.authorizer) {
-          session_security.reset(
-              get_auth_session_handler(nullptr,
-                                       h.authorizer->protocol,
-                                       h.authorizer->session_key,
-                                       features));
-        }
-        h.authorizer = nullptr;
-        return seastar::make_ready_future<stop_t>(stop_t::yes);
-      });
-    break;
-  default:
-    // unknown tag
-    logger().error("{} got unknown tag", __func__, int(tag));
-    throw std::system_error(make_error_code(error::negotiation_failure));
-  }
-}
-
-void SocketConnection::reset_session()
-{
-  decltype(out_q){}.swap(out_q);
-  decltype(sent){}.swap(sent);
-  in_seq = 0;
-  h.connect_seq = 0;
-  if (HAVE_FEATURE(features, MSG_AUTH)) {
-    // Set out_seq to a random value, so CRC won't be predictable.
-    // Constant to limit starting sequence number to 2^31.  Nothing special
-    // about it, just a big number.
-    constexpr uint64_t SEQ_MASK = 0x7fffffff;
-    out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
-  } else {
-    // previously, seq #'s always started at 0.
-    out_seq = 0;
-  }
-}
-
-seastar::future<seastar::stop_iteration>
-SocketConnection::repeat_connect()
-{
-  // encode ceph_msg_connect
-  memset(&h.connect, 0, sizeof(h.connect));
-  h.connect.features = policy.features_supported;
-  h.connect.host_type = messenger.get_myname().type();
-  h.connect.global_seq = h.global_seq;
-  h.connect.connect_seq = h.connect_seq;
-  h.connect.protocol_version = get_proto_version(peer_type, true);
-  // this is fyi, actually, server decides!
-  h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
-
-  h.authorizer = dispatcher.ms_get_authorizer(peer_type);
-  bufferlist bl;
-  if (h.authorizer) {
-    h.connect.authorizer_protocol = h.authorizer->protocol;
-    h.connect.authorizer_len = h.authorizer->bl.length();
-    bl.append(create_static(h.connect));
-    bl.append(h.authorizer->bl);
-  } else {
-    h.connect.authorizer_protocol = 0;
-    h.connect.authorizer_len = 0;
-    bl.append(create_static(h.connect));
-  }
-  return socket->write_flush(std::move(bl))
-    .then([this] {
-      // read the reply
-      return socket->read(sizeof(h.reply));
-    }).then([this] (bufferlist bl) {
-      auto p = bl.cbegin();
-      ::decode(h.reply, p);
-      ceph_assert(p.end());
-      return socket->read(h.reply.authorizer_len);
-    }).then([this] (bufferlist bl) {
-      if (h.authorizer) {
-        auto reply = bl.cbegin();
-        if (!h.authorizer->verify_reply(reply, nullptr)) {
-          logger().error("{} authorizer failed to verify reply", __func__);
-          throw std::system_error(make_error_code(error::negotiation_failure));
-        }
-      }
-      return handle_connect_reply(h.reply.tag);
-    });
-}
-
 void
 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
                                 const entity_type_t& _peer_type)
 {
-  ceph_assert(state == state_t::none);
-  logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
-  state = state_t::connecting;
-  write_state = write_state_t::delay;
-  state_changed.set_value();
-  state_changed = seastar::shared_promise<>();
-
-  ceph_assert(!socket);
-  peer_addr = _peer_addr;
-  peer_type = _peer_type;
-  messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  seastar::with_gate(pending_dispatch, [this] {
-      return seastar::connect(peer_addr.in4_addr())
-        .then([this](seastar::connected_socket fd) {
-          if (state == state_t::closing) {
-            fd.shutdown_input();
-            fd.shutdown_output();
-            throw std::system_error(make_error_code(error::connection_aborted));
-          }
-          socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
-          // read server's handshake header
-          return socket->read(server_header_size);
-        }).then([this] (bufferlist headerbl) {
-          auto p = headerbl.cbegin();
-          validate_banner(p);
-          entity_addr_t saddr, caddr;
-          ::decode(saddr, p);
-          ::decode(caddr, p);
-          ceph_assert(p.end());
-          validate_peer_addr(saddr, peer_addr);
-
-          side = side_t::connector;
-          socket_port = caddr.get_port();
-          return messenger.learned_addr(caddr);
-        }).then([this] {
-          // encode/send client's handshake header
-          bufferlist bl;
-          bl.append(buffer::create_static(banner_size, banner));
-          ::encode(messenger.get_myaddr(), bl, 0);
-          h.global_seq = messenger.get_global_seq();
-          return socket->write_flush(std::move(bl));
-        }).then([=] {
-          return seastar::repeat([this] {
-            return repeat_connect();
-          });
-        }).then([this] {
-          // notify the dispatcher and allow them to reject the connection
-          return dispatcher.ms_handle_connect(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-        }).then([this] {
-          execute_open();
-        }).handle_exception([this] (std::exception_ptr eptr) {
-          // TODO: handle fault in the connecting state
-          logger().warn("{} connecting fault: {}", *this, eptr);
-          close();
-        });
-    });
+  protocol->start_connect(_peer_addr, _peer_type);
 }
 
 void
 SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock,
                                const entity_addr_t& _peer_addr)
 {
-  ceph_assert(state == state_t::none);
-  logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
-  state = state_t::accepting;
-  write_state = write_state_t::delay;
-  state_changed.set_value();
-  state_changed = seastar::shared_promise<>();
-
-  ceph_assert(!socket);
-  peer_addr.u = _peer_addr.u;
-  peer_addr.set_port(0);
-  side = side_t::acceptor;
-  socket_port = _peer_addr.get_port();
-  socket = std::move(sock);
-  messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  seastar::with_gate(pending_dispatch, [this, _peer_addr] {
-      // encode/send server's handshake header
-      bufferlist bl;
-      bl.append(buffer::create_static(banner_size, banner));
-      ::encode(messenger.get_myaddr(), bl, 0);
-      ::encode(_peer_addr, bl, 0);
-      return socket->write_flush(std::move(bl))
-        .then([this] {
-          // read client's handshake header and connect request
-          return socket->read(client_header_size);
-        }).then([this] (bufferlist bl) {
-          auto p = bl.cbegin();
-          validate_banner(p);
-          entity_addr_t addr;
-          ::decode(addr, p);
-          ceph_assert(p.end());
-          peer_addr.set_type(addr.get_type());
-          peer_addr.set_port(addr.get_port());
-          peer_addr.set_nonce(addr.get_nonce());
-          return seastar::repeat([this] {
-            return repeat_handle_connect();
-          });
-        }).then([this] {
-          // notify the dispatcher and allow them to reject the connection
-          return dispatcher.ms_handle_accept(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-        }).then([this] {
-          messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-          messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-          execute_open();
-        }).handle_exception([this] (std::exception_ptr eptr) {
-          // TODO: handle fault in the accepting state
-          logger().warn("{} accepting fault: {}", *this, eptr);
-          close();
-        });
-    });
-}
-
-void
-SocketConnection::execute_open()
-{
-  logger().debug("{} trigger open, was {}", *this, static_cast<int>(state));
-  state = state_t::open;
-  write_state = write_state_t::open;
-  state_changed.set_value();
-  state_changed = seastar::shared_promise<>();
-
-  seastar::with_gate(pending_dispatch, [this] {
-      // start background processing of tags
-      return handle_tags()
-        .handle_exception_type([this] (const std::system_error& e) {
-          logger().warn("{} open fault: {}", *this, e);
-          if (e.code() == error::connection_aborted ||
-              e.code() == error::connection_reset) {
-            return dispatcher.ms_handle_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
-              .then([this] {
-                close();
-              });
-          } else if (e.code() == error::read_eof) {
-            return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
-              .then([this] {
-                close();
-              });
-          } else {
-            throw e;
-          }
-        }).handle_exception([this] (std::exception_ptr eptr) {
-          // TODO: handle fault in the open state
-          logger().warn("{} open fault: {}", *this, eptr);
-          close();
-        });
-    });
-}
-
-seastar::future<> SocketConnection::fault()
-{
-  if (policy.lossy) {
-    messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
-  }
-  if (h.backoff.count()) {
-    h.backoff += h.backoff;
-  } else {
-    h.backoff = conf.ms_initial_backoff;
-  }
-  if (h.backoff > conf.ms_max_backoff) {
-    h.backoff = conf.ms_max_backoff;
-  }
-  return seastar::sleep(h.backoff);
+  protocol->start_accept(std::move(sock), _peer_addr);
 }
 
 seastar::shard_id SocketConnection::shard_id() const {
index 93794310c0c81f3dd00492ef6c9b2d6a427d97ce..a403b6790d313960de39c1797ee0e70f06b17a4f 100644 (file)
 
 #pragma once
 
-#include <seastar/core/gate.hh>
-#include <seastar/core/reactor.hh>
-#include <seastar/core/shared_future.hh>
 #include <seastar/core/sharded.hh>
 
 #include "msg/Policy.h"
 #include "Connection.h"
-#include "Socket.h"
 #include "crimson/thread/Throttle.h"
 
-class AuthAuthorizer;
-class AuthSessionHandler;
-
 namespace ceph::net {
 
-using stop_t = seastar::stop_iteration;
-
+class Protocol;
+class Socket;
 class SocketMessenger;
 class SocketConnection;
 using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
 
 class SocketConnection : public Connection {
   SocketMessenger& messenger;
-  seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
-  Dispatcher& dispatcher;
-  seastar::gate pending_dispatch;
+  std::unique_ptr<Protocol> protocol;
 
   // if acceptor side, socket_port is different from peer_addr.get_port();
   // if connector side, socket_port is different from my_addr.get_port().
@@ -51,84 +42,6 @@ class SocketConnection : public Connection {
   side_t side = side_t::none;
   uint16_t socket_port = 0;
 
-  enum class state_t {
-    none,
-    accepting,
-    connecting,
-    open,
-    standby,
-    wait,
-    closing
-  };
-  state_t state = state_t::none;
-  // wait until current state changed
-  seastar::shared_promise<> state_changed;
-
-  // write_state is changed with state atomically, indicating the write
-  // behavior of the according state.
-  enum class write_state_t {
-    none,
-    delay,
-    open,
-    drop
-  };
-  write_state_t write_state = write_state_t::none;
-
-  /// become valid only when state is state_t::closing
-  seastar::shared_future<> close_ready;
-
-  /// state for handshake
-  struct Handshake {
-    ceph_msg_connect connect;
-    ceph_msg_connect_reply reply;
-    AuthAuthorizer* authorizer = nullptr;
-    std::chrono::milliseconds backoff;
-    uint32_t connect_seq = 0;
-    uint32_t peer_global_seq = 0;
-    uint32_t global_seq;
-  } h;
-
-  /// server side of handshake negotiation
-  seastar::future<stop_t> repeat_handle_connect();
-  seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
-                                                        bufferlist&& authorizer_reply);
-  seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
-                                            bufferlist&& authorizer_reply,
-                                            bool is_reset_from_peer = false);
-  seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
-                                              bufferlist&& authorizer_reply = {});
-  seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
-                                                    bufferlist&& authorizer_reply);
-
-  seastar::future<> handle_keepalive2();
-  seastar::future<> handle_keepalive2_ack();
-
-  bool require_auth_feature() const;
-  uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
-  /// client side of handshake negotiation
-  seastar::future<stop_t> repeat_connect();
-  seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
-  void reset_session();
-
-  /// state for an incoming message
-  struct MessageReader {
-    ceph_msg_header header;
-    ceph_msg_footer footer;
-    bufferlist front;
-    bufferlist middle;
-    bufferlist data;
-  } m;
-
-  seastar::future<> maybe_throttle();
-  seastar::future<> handle_tags();
-  seastar::future<> handle_ack();
-
-  bool write_dispatching = false;
-  void write_event();
-
-  /// encode/write a message
-  seastar::future<> write_message(MessageRef msg);
-
   ceph::net::Policy<ceph::thread::Throttle> policy;
   uint64_t features;
   void set_features(uint64_t new_features) {
@@ -144,42 +57,15 @@ class SocketConnection : public Connection {
   ///          false otherwise.
   bool update_rx_seq(seq_num_t seq);
 
-  seastar::future<> read_message();
-
-  std::unique_ptr<AuthSessionHandler> session_security;
-
   // messages to be resent after connection gets reset
   std::queue<MessageRef> out_q;
   // messages sent, but not yet acked by peer
   std::queue<MessageRef> sent;
-  static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
-
-  struct Keepalive {
-    struct {
-      const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
-      ceph_timespec stamp;
-    } __attribute__((packed)) req;
-    struct {
-      const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
-      ceph_timespec stamp;
-    } __attribute__((packed)) ack;
-    ceph_timespec ack_stamp;
-  } k;
-  bool m_keepalive = false;
-  bool m_keepalive_ack = false;
-
-  seastar::future<> fault();
-
-  void execute_open();
-
-  seastar::future<> do_keepalive();
-  seastar::future<> do_keepalive_ack();
-  seastar::future<> do_close();
 
  public:
   SocketConnection(SocketMessenger& messenger,
                    Dispatcher& dispatcher);
-  ~SocketConnection();
+  ~SocketConnection() override;
 
   Messenger* get_messenger() const override;
 
@@ -209,28 +95,14 @@ class SocketConnection : public Connection {
   void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
                     const entity_addr_t& peer_addr);
 
-  /// the number of connections initiated in this session, increment when a
-  /// new connection is established
-  uint32_t connect_seq() const {
-    return h.connect_seq;
-  }
-
-  /// the client side should connect us with a gseq. it will be reset with
-  /// the one of exsting connection if it's greater.
-  uint32_t peer_global_seq() const {
-    return h.peer_global_seq;
-  }
   seq_num_t rx_seq_num() const {
     return in_seq;
   }
 
-  /// current state of connection
-  state_t get_state() const {
-    return state;
-  }
   bool is_server_side() const {
     return policy.server;
   }
+
   bool is_lossy() const {
     return policy.lossy;
   }
@@ -241,6 +113,9 @@ class SocketConnection : public Connection {
   std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
     return {out_seq, std::move(out_q)};
   }
+
+  friend class Protocol;
+  friend class ProtocolV1;
 };
 
 } // namespace ceph::net