]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: support the lossless connection and auth
authorKefu Chai <kchai@redhat.com>
Fri, 25 May 2018 06:40:54 +0000 (14:40 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 16:10:10 +0000 (00:10 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
12 files changed:
src/crimson/net/CMakeLists.txt
src/crimson/net/Connection.h
src/crimson/net/Dispatcher.cc [new file with mode: 0644]
src/crimson/net/Dispatcher.h
src/crimson/net/Fwd.h
src/crimson/net/Messenger.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/CMakeLists.txt
src/test/crimson/test_messenger.cc

index c707adddf63dadf446c82ef06681978b0d2b50a0..99cd65b03ad23f9adc75999011742ba80457a34f 100644 (file)
@@ -1,4 +1,5 @@
 set(crimson_net_srcs
+  Dispatcher.cc
   Errors.cc
   SocketConnection.cc
   SocketMessenger.cc)
index 67ba97e6de3ecb03e74d2fe9dcf67f61287a0954..963ccc655c925ac1c3c8f01ac49a107ed25db6eb 100644 (file)
@@ -14,6 +14,7 @@
 
 #pragma once
 
+#include <queue>
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include <core/future.hh>
 
@@ -21,6 +22,8 @@
 
 namespace ceph::net {
 
+using seq_num_t = uint64_t;
+
 class Connection : public boost::intrusive_ref_counter<Connection> {
  protected:
   Messenger *const messenger;
@@ -42,7 +45,8 @@ class Connection : public boost::intrusive_ref_counter<Connection> {
   virtual bool is_connected() = 0;
 
   /// complete a handshake from the client's perspective
-  virtual seastar::future<> client_handshake() = 0;
+  virtual seastar::future<> client_handshake(entity_type_t peer_type,
+                                            entity_type_t host_type) = 0;
 
   /// complete a handshake from the server's perspective
   virtual seastar::future<> server_handshake() = 0;
@@ -55,6 +59,35 @@ class Connection : public boost::intrusive_ref_counter<Connection> {
 
   /// close the connection and cancel any any pending futures from read/send
   virtual seastar::future<> close() = 0;
+
+  /// move all messages in the sent list back into the queue
+  virtual void requeue_sent() = 0;
+
+  /// get all messages in the out queue
+  virtual std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() = 0;
+
+public:
+  enum class state_t {
+    none,
+    open,
+    standby,
+    closed,
+    wait
+  };
+  /// the number of connections initiated in this session, increment when a
+  /// new connection is established
+  virtual uint32_t connect_seq() const = 0;
+
+  /// the client side should connect us with a gseq. it will be reset with a
+  /// the one of exsting connection if it's greater.
+  virtual uint32_t peer_global_seq() const = 0;
+
+  virtual seq_num_t rx_seq_num() const = 0;
+
+  /// current state of connection
+  virtual state_t get_state() const = 0;
+  virtual bool is_server_side() const = 0;
+  virtual bool is_lossy() const = 0;
 };
 
 } // namespace ceph::net
diff --git a/src/crimson/net/Dispatcher.cc b/src/crimson/net/Dispatcher.cc
new file mode 100644 (file)
index 0000000..e47290a
--- /dev/null
@@ -0,0 +1,11 @@
+#include "auth/Auth.h"
+#include "Dispatcher.h"
+
+namespace ceph::net
+{
+seastar::future<std::unique_ptr<AuthAuthorizer>>
+Dispatcher::ms_get_authorizer(peer_type_t, bool force_new)
+{
+  return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr);
+}
+}
index 646c70f3c9511f4bef864ed7f1c768baa11c87d6..494895946616de414beb62b07140ba270bff7889 100644 (file)
@@ -18,8 +18,9 @@
 
 #include "Fwd.h"
 
-namespace ceph {
-namespace net {
+class AuthAuthorizer;
+
+namespace ceph::net {
 
 class Dispatcher {
  public:
@@ -45,8 +46,14 @@ class Dispatcher {
     return seastar::make_ready_future<>();
   }
 
-  // TODO: authorizer
+  virtual seastar::future<msgr_tag_t, bufferlist>
+  ms_verify_authorizer(peer_type_t,
+                      auth_proto_t,
+                      bufferlist&) {
+    return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{});
+  }
+  virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
+  ms_get_authorizer(peer_type_t, bool force_new);
 };
 
-} // namespace net
-} // namespace ceph
+} // namespace ceph::net
index c928bdebaf0f96e3b601178a230201fa983056ce..d234048a35c32ef179c57b28aef094efd92ee3bc 100644 (file)
 
 #include "Errors.h"
 #include "msg/msg_types.h"
+#include "msg/Message.h"
+
+using peer_type_t = int;
+using auth_proto_t = int;
 
 class Message;
 using MessageRef = boost::intrusive_ptr<Message>;
 
 namespace ceph::net {
 
+using msgr_tag_t = uint8_t;
+
 class Connection;
 using ConnectionRef = boost::intrusive_ptr<Connection>;
 
index 5230b77c6d1527ca9a366bedd361d75b4442dde7..db01ac247a4bd1783145970741e342386824f89d 100644 (file)
@@ -25,6 +25,8 @@ namespace ceph::net {
 class Messenger {
   entity_name_t my_name;
   entity_addr_t my_addr;
+  uint32_t global_seq = 0;
+  uint32_t crc_flags = 0;
 
  public:
   Messenger(const entity_name_t& name)
@@ -45,11 +47,42 @@ class Messenger {
   virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
 
   /// establish a client connection and complete a handshake
-  virtual seastar::future<ConnectionRef> connect(const entity_addr_t& addr) = 0;
+  virtual seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
+                                                entity_type_t peer_type) = 0;
 
   /// stop listenening and wait for all connections to close. safe to destruct
   /// after this future becomes available
   virtual seastar::future<> shutdown() = 0;
+
+  uint32_t get_global_seq(uint32_t old=0) {
+    if (old > global_seq) {
+      global_seq = old;
+    }
+    return ++global_seq;
+  }
+  ConnectionRef lookup_conn(const entity_addr_t&) {
+    // TODO: replace handling
+    return nullptr;
+  }
+
+  // @returns a tuple of <is_valid, auth_reply, session_key>
+  virtual seastar::future<msgr_tag_t,    /// tag for error, 0 if authorized
+                          bufferlist>    /// auth_reply
+  verify_authorizer(peer_type_t peer_type,
+                   auth_proto_t protocol,
+                   bufferlist& auth) = 0;
+  virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
+  get_authorizer(peer_type_t peer_type,
+                bool force_new) = 0;
+  uint32_t get_crc_flags() const {
+    return crc_flags;
+  }
+  void set_crc_data() {
+    crc_flags |= MSG_CRC_DATA;
+  }
+  void set_crc_header() {
+    crc_flags |= MSG_CRC_HEADER;
+  }
 };
 
 } // namespace ceph::net
index 87905114b3fac7ee9fb21fa4fdbe6cd541c1f783..e820d058ba7697b8e421cd187c27c7db3125a405 100644 (file)
 
 #include <algorithm>
 #include <core/shared_future.hh>
+#include <core/sleep.hh>
 
+#include "Config.h"
+#include "Messenger.h"
 #include "SocketConnection.h"
 
 #include "include/msgr.h"
+#include "include/random.h"
+#include "auth/Auth.h"
+#include "auth/AuthSessionHandler.h"
 #include "msg/Message.h"
 
 using namespace ceph::net;
@@ -88,6 +94,9 @@ struct bufferlist_consumer {
 
 seastar::future<bufferlist> SocketConnection::read(size_t bytes)
 {
+  if (bytes == 0) {
+    return seastar::make_ready_future<bufferlist>();
+  }
   r.buffer.clear();
   r.remaining = bytes;
   return in.consume(bufferlist_consumer{r.buffer, r.remaining})
@@ -113,35 +122,16 @@ void SocketConnection::read_tags_until_next_message()
             // stop looping and notify read_header()
             return seastar::make_ready_future<seastar::stop_iteration>(
                 seastar::stop_iteration::yes);
-
           case CEPH_MSGR_TAG_ACK:
-            return in.read_exactly(sizeof(ceph_le64))
-              .then([] (auto buf) {
-                auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
-                std::cout << "ack " << *seq << std::endl;
-                return seastar::stop_iteration::no;
-              });
-
+            return handle_ack();
           case CEPH_MSGR_TAG_KEEPALIVE:
             break;
-
           case CEPH_MSGR_TAG_KEEPALIVE2:
-            return in.read_exactly(sizeof(ceph_timespec))
-              .then([] (auto buf) {
-                auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
-                std::cout << "keepalive2 " << t->tv_sec << std::endl;
-                // TODO: schedule ack
-                return seastar::stop_iteration::no;
-              });
-
+            return handle_keepalive2()
+              .then([this] { return seastar::stop_iteration::no; });
           case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
-            return in.read_exactly(sizeof(ceph_timespec))
-              .then([] (auto buf) {
-                auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
-                std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
-                return seastar::stop_iteration::no;
-              });
-
+            return handle_keepalive2_ack()
+              .then([this] { return seastar::stop_iteration::no; });
           case CEPH_MSGR_TAG_CLOSE:
             std::cout << "close" << std::endl;
             break;
@@ -156,6 +146,35 @@ void SocketConnection::read_tags_until_next_message()
     });
 }
 
+seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
+{
+  return in.read_exactly(sizeof(ceph_le64))
+    .then([this] (auto buf) {
+      auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+      discard_up_to(&sent, *seq);
+      return seastar::stop_iteration::no;
+    });
+}
+
+void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
+                                     seq_num_t seq)
+{
+  while (!queue->empty() &&
+         queue->front()->get_seq() < seq) {
+    queue->pop();
+  }
+}
+
+void SocketConnection::requeue_sent()
+{
+  out_seq -= sent.size();
+  while (!sent.empty()) {
+    auto m = sent.front();
+    sent.pop();
+    out_q.push(std::move(m));
+  }
+}
+
 seastar::future<MessageRef> SocketConnection::read_message()
 {
   return on_message.get_future()
@@ -190,18 +209,75 @@ seastar::future<MessageRef> SocketConnection::read_message()
                                   m.front, m.middle, m.data, nullptr);
       constexpr bool add_ref = false; // Message starts with 1 ref
       return MessageRef{msg, add_ref};
+    }).then([this] (MessageRef msg) {
+      if (msg) {
+        // TODO: set time stamps
+        msg->set_byte_throttler(policy.throttler_bytes);
+        if (!update_rx_seq(msg->get_seq())) {
+          msg.reset();
+        }
+      }
+      return msg;
     });
 }
 
+bool SocketConnection::update_rx_seq(seq_num_t seq)
+{
+  if (seq <= in_seq) {
+    if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
+        conf.ms_die_on_old_message) {
+      assert(0 == "old msgs despite reconnect_seq feature");
+    }
+    return false;
+  } else if (seq > in_seq + 1) {
+    if (conf.ms_die_on_skipped_message) {
+      assert(0 == "skipped incoming seq");
+    }
+    return false;
+  } else {
+    in_seq = seq;
+    return true;
+  }
+}
+
 seastar::future<> SocketConnection::write_message(MessageRef msg)
 {
+  msg->set_seq(++out_seq);
+  msg->encode(features, get_messenger()->get_crc_flags());
   bufferlist bl;
-  unsigned char tag = CEPH_MSGR_TAG_MSG;
-  encode(tag, bl);
-  encode_message(msg.get(), 0, bl);
+  bl.append(CEPH_MSGR_TAG_MSG);
+  auto& header = msg->get_header();
+  bl.append((const char*)&header, sizeof(header));
+  bl.append(msg->get_payload());
+  bl.append(msg->get_middle());
+  bl.append(msg->get_data());
+  auto& footer = msg->get_footer();
+  if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
+    bl.append((const char*)&footer, sizeof(footer));
+  } else {
+    ceph_msg_footer_old old_footer;
+    if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) {
+      old_footer.front_crc = footer.front_crc;
+      old_footer.middle_crc = footer.middle_crc;
+    } else {
+      old_footer.front_crc = old_footer.middle_crc = 0;
+    }
+    if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) {
+      old_footer.data_crc = footer.data_crc;
+    } else {
+      old_footer.data_crc = 0;
+    }
+    old_footer.flags = footer.flags;
+    bl.append((const char*)&old_footer, sizeof(old_footer));
+  }
   // write as a seastar::net::packet
   return out.write(std::move(bl))
-    .then([this] { return out.flush(); });
+    .then([this] { return out.flush(); })
+    .then([this, msg = std::move(msg)] {
+      if (!policy.lossy) {
+        sent.push(std::move(msg));
+      }
+    });
 }
 
 seastar::future<> SocketConnection::send(MessageRef msg)
@@ -299,29 +375,379 @@ bufferptr create_static(T& obj)
   return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
 }
 
+bool SocketConnection::require_auth_feature() const
+{
+  if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
+    return false;
+  }
+  if (conf.cephx_require_signatures) {
+    return true;
+  }
+  if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
+      h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
+    return conf.cephx_cluster_require_signatures;
+  } else {
+    return conf.cephx_service_require_signatures;
+  }
+}
+
+uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
+{
+  constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
+  // see also OSD.h, unlike other connection of simple/async messenger,
+  // crimson msgr is only used by osd
+  constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
+  if (peer_type == my_type) {
+    // internal
+    return CEPH_OSD_PROTOCOL;
+  } else {
+    // public
+    switch (connect ? peer_type : my_type) {
+      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+      default: return 0;
+    }
+  }
+}
+
 seastar::future<> SocketConnection::handle_connect()
 {
-  memset(&h.reply, 0, sizeof(h.reply));
+  return read(sizeof(h.connect))
+    .then([this](bufferlist bl) {
+      auto p = bl.cbegin();
+      ::decode(h.connect, p);
+      return read(h.connect.authorizer_len);
+    }).then([this] (bufferlist authorizer) {
+      if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
+       return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+            CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
+      }
+      if (require_auth_feature()) {
+       policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+      }
+      if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
+         feat_missing != 0) {
+        return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+            CEPH_MSGR_TAG_FEATURES, bufferlist{});
+      }
+      return get_messenger()->verify_authorizer(get_peer_type(),
+                                               h.connect.authorizer_protocol,
+                                               authorizer);
+    }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
+      if (tag) {
+       return send_connect_reply(tag, std::move(authorizer_reply));
+      }
+      if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) {
+       return handle_connect_with_existing(existing, std::move(authorizer_reply));
+      } else if (h.connect.connect_seq > 0) {
+       return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
+                                 std::move(authorizer_reply));
+      }
+      h.connect_seq = h.connect.connect_seq + 1;
+      h.peer_global_seq = h.connect.global_seq;
+      set_features((uint64_t)h.reply.features & (uint64_t)h.connect.features);
+      // TODO: cct
+      return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
+    });
+}
 
-  h.reply.protocol_version = CEPH_OSDC_PROTOCOL;
-  h.reply.tag = CEPH_MSGR_TAG_READY;
+seastar::future<>
+SocketConnection::send_connect_reply(msgr_tag_t tag,
+                                     bufferlist&& authorizer_reply)
+{
+  h.reply.tag = tag;
+  h.reply.features = static_cast<uint64_t>((h.connect.features &
+                                           policy.features_supported) |
+                                          policy.features_required);
+  h.reply.authorizer_len = authorizer_reply.length();
+  return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
+    .then([this, reply=std::move(authorizer_reply)]() mutable {
+      out.write(std::move(reply));
+    }).then([this] {
+      return out.flush();
+    });
+}
 
-  bufferlist bl;
-  bl.append(create_static(h.reply));
+seastar::future<>
+SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
+                                           bufferlist&& authorizer_reply)
+{
+  h.reply.tag = tag;
+  h.reply.features = policy.features_supported;
+  h.reply.global_seq = get_messenger()->get_global_seq();
+  h.reply.connect_seq = h.connect_seq;
+  h.reply.flags = 0;
+  if (policy.lossy) {
+    h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+  }
+  h.reply.authorizer_len = authorizer_reply.length();
+  return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
+    .then([this, reply=std::move(authorizer_reply)]() mutable {
+      if (reply.length()) {
+        return out.write(std::move(reply));
+      } else {
+        return seastar::now();
+      }
+    }).then([this] {
+      if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+       return out.write(reinterpret_cast<const char*>(&in_seq),
+                         sizeof(in_seq)).then([this] {
+           return out.flush();
+          }).then([this] {
+            return in.read_exactly(sizeof(seq_num_t));
+          }).then([this] (auto buf) {
+            auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+            discard_up_to(&out_q, *acked_seq);
+          });
+      } else {
+        return out.flush();
+      }
+    }).then([this] {
+      state = state_t::open;
+    });
+}
 
-  return out.write(std::move(bl))
-    .then([this] { return out.flush(); });
+seastar::future<>
+SocketConnection::handle_keepalive2()
+{
+  return in.read_exactly(sizeof(ceph_timespec))
+    .then([this] (auto buf) {
+      auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+      k.reply_stamp = *t;
+      std::cout << "keepalive2 " << t->tv_sec << std::endl;
+      char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+      return out.write(reinterpret_cast<const char*>(&tag), sizeof(tag));
+    }).then([this] {
+      out.write(reinterpret_cast<const char*>(&k.reply_stamp),
+                sizeof(k.reply_stamp));
+    }).then([this] {
+      return out.flush();
+    });
+}
+
+seastar::future<>
+SocketConnection::handle_keepalive2_ack()
+{
+  return in.read_exactly(sizeof(ceph_timespec))
+    .then([this] (auto buf) {
+      auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+      k.ack_stamp = *t;
+      std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
+    });
+}
+
+seastar::future<>
+SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply)
+{
+  if (h.connect.global_seq < existing->peer_global_seq()) {
+    h.reply.global_seq = existing->peer_global_seq();
+    return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
+  } else if (existing->is_lossy()) {
+    return replace_existing(existing, std::move(authorizer_reply));
+  } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
+    return replace_existing(existing, std::move(authorizer_reply), true);
+  } else if (h.connect.connect_seq < existing->connect_seq()) {
+    // old attempt, or we sent READY but they didn't get it.
+    h.reply.connect_seq = existing->connect_seq() + 1;
+    return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+  } else if (h.connect.connect_seq == existing->connect_seq()) {
+    // if the existing connection successfully opened, and/or
+    // subsequently went to standby, then the peer should bump
+    // their connect_seq and retry: this is not a connection race
+    // we need to resolve here.
+    if (existing->get_state() == state_t::open ||
+       existing->get_state() == state_t::standby) {
+      if (policy.resetcheck && existing->connect_seq() == 0) {
+       return replace_existing(existing, std::move(authorizer_reply));
+      } else {
+       h.reply.connect_seq = existing->connect_seq() + 1;
+       return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+      }
+    } else if (get_peer_addr() < get_my_addr() ||
+              existing->is_server_side()) {
+      // incoming wins
+      return replace_existing(existing, std::move(authorizer_reply));
+    } else {
+      return send_connect_reply(CEPH_MSGR_TAG_WAIT);
+    }
+  } else if (policy.resetcheck &&
+            existing->connect_seq() == 0) {
+    return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
+  } else {
+    return replace_existing(existing, std::move(authorizer_reply));
+  }
 }
 
-seastar::future<> SocketConnection::handle_connect_reply()
+seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
+                                                     bufferlist&& authorizer_reply,
+                                                    bool is_reset_from_peer)
 {
-  if (h.reply.tag != CEPH_MSGR_TAG_READY) {
+  msgr_tag_t reply_tag;
+  if ((h.connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
+    reply_tag = CEPH_MSGR_TAG_SEQ;
+  } else {
+    reply_tag = CEPH_MSGR_TAG_READY;
+  }
+  get_messenger()->unregister_conn(existing);
+  if (!existing->is_lossy()) {
+    // reset the in_seq if this is a hard reset from peer,
+    // otherwise we respect our original connection's value
+    in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
+    // steal outgoing queue and out_seq
+    existing->requeue_sent();
+    std::tie(out_seq, out_q) = existing->get_out_queue();
+  }
+  return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
+}
+
+seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
+{
+  switch (tag) {
+  case CEPH_MSGR_TAG_FEATURES:
+    return fault();
+  case CEPH_MSGR_TAG_BADPROTOVER:
+    return fault();
+  case CEPH_MSGR_TAG_BADAUTHORIZER:
+    if (h.got_bad_auth) {
+      throw std::system_error(make_error_code(error::negotiation_failure));
+    }
+    h.got_bad_auth = true;
+    // try harder
+    return get_messenger()->get_authorizer(h.peer_type, true)
+      .then([this](auto&& auth) {
+        h.authorizer = std::move(auth);
+       return seastar::now();
+      });
+  case CEPH_MSGR_TAG_RESETSESSION:
+    reset_session();
+    return seastar::now();
+  case CEPH_MSGR_TAG_RETRY_GLOBAL:
+    h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq);
+    return seastar::now();
+  case CEPH_MSGR_TAG_RETRY_SESSION:
+    assert(h.reply.connect_seq > h.connect_seq);
+    h.connect_seq = h.reply.connect_seq;
+    return seastar::now();
+  case CEPH_MSGR_TAG_WAIT:
+    return fault();
+  case CEPH_MSGR_TAG_SEQ:
+    break;
+  case CEPH_MSGR_TAG_READY:
+    break;
+  }
+  if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
+      missing) {
+    return fault();
+  }
+  if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+    return in.read_exactly(sizeof(seq_num_t))
+      .then([this] (auto buf) {
+        auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+        discard_up_to(&out_q, *acked_seq);
+      }).then([this] {
+        return out.write(reinterpret_cast<const char*>(&in_seq), sizeof(in_seq));
+      }).then([this] {
+        return out.flush();
+      }).then([this] {
+        return handle_connect_reply(CEPH_MSGR_TAG_READY);
+      });
+  }
+  if (h.reply.tag == CEPH_MSGR_TAG_READY) {
+    // hooray!
+    h.peer_global_seq = h.reply.global_seq;
+    policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
+    state = state_t::open;
+    h.connect_seq++;
+    h.backoff = 0ms;
+    set_features(h.reply.features & h.connect.features);
+    if (h.authorizer) {
+      session_security.reset(
+          get_auth_session_handler(nullptr,
+                                   h.authorizer->protocol,
+                                   h.authorizer->session_key,
+                                   features));
+    }
+    h.authorizer.reset();
+    return seastar::now();
+  } else {
+    // unknown tag
     throw std::system_error(make_error_code(error::negotiation_failure));
   }
-  return seastar::now();
 }
 
-seastar::future<> SocketConnection::client_handshake()
+void SocketConnection::reset_session()
+{
+  decltype(out_q){}.swap(out_q);
+  decltype(sent){}.swap(sent);
+  in_seq = 0;
+  h.connect_seq = 0;
+  if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
+    // Set out_seq to a random value, so CRC won't be predictable.
+    // Constant to limit starting sequence number to 2^31.  Nothing special
+    // about it, just a big number.
+    constexpr uint64_t SEQ_MASK = 0x7fffffff;
+    out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
+  } else {
+    // previously, seq #'s always started at 0.
+    out_seq = 0;
+  }
+}
+
+seastar::future<> SocketConnection::connect(entity_type_t peer_type,
+                                            entity_type_t host_type)
+{
+  // encode ceph_msg_connect
+  h.peer_type = peer_type;
+  memset(&h.connect, 0, sizeof(h.connect));
+  h.connect.features = policy.features_supported;
+  h.connect.host_type = host_type;
+  h.connect.global_seq = h.global_seq;
+  h.connect.connect_seq = h.connect_seq;
+  h.connect.protocol_version = get_proto_version(peer_type, true);
+  // this is fyi, actually, server decides!
+  h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
+
+  return get_messenger()->get_authorizer(peer_type, false)
+    .then([this](auto&& auth) {
+      h.authorizer = std::move(auth);
+      bufferlist bl;
+      if (h.authorizer) {
+        h.connect.authorizer_protocol = h.authorizer->protocol;
+        h.connect.authorizer_len = h.authorizer->bl.length();
+        bl.append(create_static(h.connect));
+        bl.append(h.authorizer->bl);
+      } else {
+        h.connect.authorizer_protocol = 0;
+        h.connect.authorizer_len = 0;
+        bl.append(create_static(h.connect));
+      };
+      return bl;
+    }).then([this](bufferlist&& bl) {
+      return out.write(std::move(bl));
+    }).then([this] {
+      return out.flush();
+    }).then([this] {
+     // read the reply
+      return read(sizeof(h.reply));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.cbegin();
+      ::decode(h.reply, p);
+      assert(p.end());
+      return read(h.reply.authorizer_len);
+    }).then([this] (bufferlist bl) {
+      if (h.authorizer) {
+        auto reply = bl.cbegin();
+        if (!h.authorizer->verify_reply(reply)) {
+          throw std::system_error(make_error_code(error::negotiation_failure));
+        }
+      }
+      return handle_connect_reply(h.reply.tag);
+    });
+}
+
+seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
+                                                     entity_type_t host_type)
 {
   // read server's handshake header
   return read(server_header_size)
@@ -344,24 +770,11 @@ seastar::future<> SocketConnection::client_handshake()
       bufferlist bl;
       bl.append(buffer::create_static(banner_size, banner));
       ::encode(my_addr, bl, 0);
-
-      // encode ceph_msg_connect
-      memset(&h.connect, 0, sizeof(h.connect));
-      h.connect.protocol_version = CEPH_OSDC_PROTOCOL;
-      bl.append(create_static(h.connect));
-
-      // TODO: append authorizer
-      return out.write(std::move(bl))
-        .then([this] { return out.flush(); });
-    }).then([this] {
-      // read the reply
-      return read(sizeof(h.reply));
-    }).then([this] (bufferlist bl) {
-      auto p = bl.begin();
-      ::decode(h.reply, p);
-      // TODO: read authorizer
-      assert(p.end());
-      return handle_connect_reply();
+      return out.write(std::move(bl)).then([this] { return out.flush(); });
+    }).then([=] {
+      }).then([=] {
+        return seastar::do_until([=] { return state == state_t::open; },
+                                 [=] { return connect(peer_type, host_type); });
     }).then([this] {
       // start background processing of tags
       read_tags_until_next_message();
@@ -382,17 +795,19 @@ seastar::future<> SocketConnection::server_handshake()
     .then([this] { return out.flush(); })
     .then([this] {
       // read client's handshake header and connect request
-      return read(client_header_size + sizeof(h.connect));
+      return read(client_header_size);
     }).then([this] (bufferlist bl) {
       auto p = bl.cbegin();
       validate_banner(p);
       entity_addr_t addr;
       ::decode(addr, p);
-      ::decode(h.connect, p);
       assert(p.end());
-      // TODO: read authorizer
-
-      return handle_connect();
+      if (!addr.is_blank_ip()) {
+        peer_addr = addr;
+      }
+    }).then([this] {
+      return seastar::do_until([this] { return state == state_t::open; },
+                               [this] { return handle_connect(); });
     }).then([this] {
       // start background processing of tags
       read_tags_until_next_message();
@@ -401,3 +816,16 @@ seastar::future<> SocketConnection::server_handshake()
       fut.forward_to(std::move(h.promise));
     });
 }
+
+seastar::future<> SocketConnection::fault()
+{
+  if (h.backoff.count()) {
+    h.backoff += h.backoff;
+  } else {
+    h.backoff = conf.ms_initial_backoff;
+  }
+  if (h.backoff > conf.ms_max_backoff) {
+    h.backoff = conf.ms_max_backoff;
+  }
+  return seastar::sleep(h.backoff);
+}
index bf86803fe185f753ecf884ee659936f275403199..30a54dc5810ebcabf626d14e08a2557671068171 100644 (file)
 
 #include <core/reactor.hh>
 
+#include "msg/Policy.h"
 #include "Connection.h"
 
-namespace ceph {
-namespace net {
+class AuthSessionHandler;
+
+namespace ceph::net {
 
 class SocketConnection : public Connection {
   seastar::connected_socket socket;
   seastar::input_stream<char> in;
   seastar::output_stream<char> out;
 
+  state_t state = state_t::none;
+
   /// buffer state for read()
   struct Reader {
     bufferlist buffer;
@@ -39,13 +43,40 @@ class SocketConnection : public Connection {
   struct Handshake {
     ceph_msg_connect connect;
     ceph_msg_connect_reply reply;
+    bool got_bad_auth = false;
+    std::unique_ptr<AuthAuthorizer> authorizer;
+    peer_type_t peer_type;
+    std::chrono::milliseconds backoff;
+    uint32_t connect_seq = 0;
+    uint32_t peer_global_seq = 0;
+    uint32_t global_seq;
     seastar::promise<> promise;
   } h;
 
   /// server side of handshake negotiation
   seastar::future<> handle_connect();
+  seastar::future<> handle_connect_with_existing(ConnectionRef existing,
+                                                bufferlist&& authorizer_reply);
+  seastar::future<> replace_existing(ConnectionRef existing,
+                                    bufferlist&& authorizer_reply,
+                                    bool is_reset_from_peer = false);
+  seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
+                                      bufferlist&& authorizer_reply = {});
+  seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
+                                            bufferlist&& authorizer_reply);
+
+  seastar::future<> handle_keepalive2();
+  seastar::future<> handle_keepalive2_ack();
+
+  bool require_auth_feature() const;
+  int get_peer_type() const {
+    return h.connect.host_type;
+  }
+  uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
   /// client side of handshake negotiation
-  seastar::future<> handle_connect_reply();
+  seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
+  seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
+  void reset_session();
 
   /// state for an incoming message
   struct MessageReader {
@@ -61,6 +92,7 @@ class SocketConnection : public Connection {
   seastar::promise<> on_message;
 
   void read_tags_until_next_message();
+  seastar::future<seastar::stop_iteration> handle_ack();
 
   /// becomes available when handshake completes, and when all previous messages
   /// have been sent to the output stream. send() chains new messages as
@@ -70,6 +102,39 @@ class SocketConnection : public Connection {
   /// encode/write a message
   seastar::future<> write_message(MessageRef msg);
 
+  ceph::net::Policy policy;
+  uint64_t features;
+  void set_features(uint64_t new_features) {
+    features = new_features;
+  }
+  bool has_feature(uint64_t feature) const {
+    return features & feature;
+  }
+
+  /// the seq num of the last transmitted message
+  seq_num_t out_seq = 0;
+  /// the seq num of the last received message
+  seq_num_t in_seq = 0;
+  /// update the seq num of last received message
+  /// @returns true if the @c seq is valid, and @c in_seq is updated,
+  ///          false otherwise.
+  bool update_rx_seq(seq_num_t seq);
+
+  std::unique_ptr<AuthSessionHandler> session_security;
+
+  // messages to be resent after connection gets reset
+  std::queue<MessageRef> out_q;
+  // messages sent, but not yet acked by peer
+  std::queue<MessageRef> sent;
+  static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
+
+  struct Keepalive {
+    ceph_timespec reply_stamp;
+    ceph_timespec ack_stamp;
+  } k;
+
+  seastar::future<> fault();
+
  public:
   SocketConnection(Messenger *messenger,
                    const entity_addr_t& my_addr,
@@ -79,7 +144,8 @@ class SocketConnection : public Connection {
 
   bool is_connected() override;
 
-  seastar::future<> client_handshake() override;
+  seastar::future<> client_handshake(entity_type_t peer_type,
+                                    entity_type_t host_type) override;
 
   seastar::future<> server_handshake() override;
 
@@ -88,7 +154,32 @@ class SocketConnection : public Connection {
   seastar::future<> send(MessageRef msg) override;
 
   seastar::future<> close() override;
+
+  uint32_t connect_seq() const override {
+    return h.connect_seq;
+  }
+  uint32_t peer_global_seq() const override {
+    return h.peer_global_seq;
+  }
+  seq_num_t rx_seq_num() const {
+    return in_seq;
+  }
+  state_t get_state() const override {
+    return state;
+  }
+  bool is_server_side() const override {
+    return policy.server;
+  }
+  bool is_lossy() const override {
+    return policy.lossy;
+  }
+
+private:
+  void requeue_sent() override;
+  std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() override {
+    return {out_seq, std::move(out_q)};
+  }
+
 };
 
-} // namespace net
-} // namespace ceph
+} // namespace ceph::net
index 62e0cb3cdd7f9a11cd393c327b143cf46c2742e0..ebfc715191f915b1cf8b3b81a518078c0caab950 100644 (file)
@@ -12,6 +12,7 @@
  *
  */
 
+#include "auth/Auth.h"
 #include "SocketMessenger.h"
 #include "SocketConnection.h"
 #include "Dispatcher.h"
@@ -113,8 +114,9 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
   return seastar::now();
 }
 
-seastar::future<ceph::net::ConnectionRef> SocketMessenger::connect(const entity_addr_t& addr,
-                                                        const entity_addr_t& myaddr)
+seastar::future<ceph::net::ConnectionRef>
+SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
+                        const entity_addr_t& myaddr, entity_type_t host_type)
 {
   if (auto found = std::find_if(connections.begin(),
                                connections.end(),
@@ -129,11 +131,12 @@ seastar::future<ceph::net::ConnectionRef> SocketMessenger::connect(const entity_
       ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
                                                 std::move(socket));
       // complete the handshake before returning to the caller
-      return conn->client_handshake()
+      return conn->client_handshake(peer_type, host_type)
         .handle_exception([conn] (std::exception_ptr eptr) {
           // close the connection before returning errors
           return seastar::make_exception_future<>(eptr)
             .finally([conn] { return conn->close(); });
+         // TODO: retry on fault
         }).then([=] {
           dispatcher->ms_handle_connect(conn);
           // dispatch replies on this connection
@@ -154,3 +157,27 @@ seastar::future<> SocketMessenger::shutdown()
       return conn->close();
     }).finally([this] { connections.clear(); });
 }
+
+seastar::future<msgr_tag_t, bufferlist>
+SocketMessenger::verify_authorizer(peer_type_t peer_type,
+                                  auth_proto_t protocol,
+                                  bufferlist& auth)
+{
+  if (dispatcher) {
+    return dispatcher->ms_verify_authorizer(peer_type, protocol, auth);
+  } else {
+    return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+        CEPH_MSGR_TAG_BADAUTHORIZER,
+        bufferlist{});
+  }
+}
+
+seastar::future<std::unique_ptr<AuthAuthorizer>>
+SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new)
+{
+  if (dispatcher) {
+    return dispatcher->ms_get_authorizer(peer_type, force_new);
+  } else {
+    return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr);
+  }
+}
index 5cca096a65908f30a552f5064ea8fd31f538aa40..34bf375f66220d51bfe7fdd52d6ce4276ba83d2e 100644 (file)
 
 namespace ceph::net {
 
-class SocketMessenger : public Messenger {
+class SocketMessenger final : public Messenger {
   boost::optional<seastar::server_socket> listener;
   Dispatcher *dispatcher = nullptr;
+  uint32_t global_seq = 0;
+
   std::list<ConnectionRef> connections;
 
   seastar::future<> dispatch(ConnectionRef conn);
@@ -40,9 +42,18 @@ class SocketMessenger : public Messenger {
   seastar::future<> start(Dispatcher *dispatcher) override;
 
   seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
-                                         const entity_addr_t& myaddr) override;
+                                        entity_type_t peer_type,
+                                         const entity_addr_t& myaddr,
+                                        entity_type_t host_type) override;
 
   seastar::future<> shutdown() override;
+  seastar::future<msgr_tag_t, bufferlist>
+  verify_authorizer(peer_type_t peer_type,
+                   auth_proto_t protocol,
+                   bufferlist& auth) override;
+  seastar::future<std::unique_ptr<AuthAuthorizer>>
+  get_authorizer(peer_type_t peer_type,
+                bool force_new) override;
 };
 
 } // namespace ceph::net
index 24ce135a9dfd809517a345adc76f794d842a0c06..691db771fec7b6c589113aed7d5cf53efdfd071a 100644 (file)
@@ -15,7 +15,8 @@ target_link_libraries(unittest_seastar_denc ceph-common global Seastar::seastar)
 set(test_messenger_srcs
   test_messenger.cc
   $<TARGET_OBJECTS:seastar_buffer_obj>
-  $<TARGET_OBJECTS:crimson_net_objs>)
+  $<TARGET_OBJECTS:crimson_net_objs>
+  $<TARGET_OBJECTS:crimson_thread_objs>)
 add_executable(unittest_seastar_messenger ${test_messenger_srcs})
 add_ceph_unittest(unittest_seastar_messenger)
 target_link_libraries(unittest_seastar_messenger ceph-common Seastar::seastar)
index 70b6b2922011077dd76fc5b438cadcd95316bae1..13e2e1ffdb55f83f281e9a46449e9b81009483af 100644 (file)
@@ -46,7 +46,7 @@ static seastar::future<> test_echo()
         .then([&] {
           return t.client.messenger.start(&t.client.dispatcher)
             .then([&] {
-              return t.client.messenger.connect(t.addr);
+              return t.client.messenger.connect(t.addr, entity_name_t::TYPE_OSD);
             }).then([] (ceph::net::ConnectionRef conn) {
               std::cout << "client connected" << std::endl;
               return conn->send(MessageRef{new MPing(), false});