]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: start on SocketConnection negotiation
authorCasey Bodley <cbodley@redhat.com>
Sat, 21 Oct 2017 20:34:12 +0000 (16:34 -0400)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 06:09:22 +0000 (14:09 +0800)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/crimson/net/Errors.cc
src/crimson/net/Errors.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index fe182377dfcf46287f2f496a4b6e2f6c3a6d4be5..e925b9acd353bfdeef37a61fae163dcee04438ba 100644 (file)
@@ -25,6 +25,12 @@ const std::error_category& net_category()
 
     std::string message(int ev) const override {
       switch (static_cast<error>(ev)) {
+        case error::bad_connect_banner:
+          return "bad connect banner";
+        case error::bad_peer_address:
+          return "bad peer address";
+        case error::negotiation_failure:
+          return "negotiation failure";
         case error::read_eof:
           return "read eof";
         case error::connection_aborted:
index af3720ae469a66b3c5e9aa65a3844f303740ea35..55921a9af5dd738abba05d8765d4782504e26778 100644 (file)
@@ -20,6 +20,9 @@ namespace ceph::net {
 
 /// net error codes
 enum class error {
+  bad_connect_banner,
+  bad_peer_address,
+  negotiation_failure,
   read_eof,
   connection_aborted,
   connection_refused,
index 8ecafa43bdb35a47c5057d003878fe47a6fd3841..26a3f1b16859315fb70e60ba513e733351f0562b 100644 (file)
@@ -12,6 +12,7 @@
  *
  */
 
+#include <algorithm>
 #include <core/shared_future.hh>
 
 #include "SocketConnection.h"
@@ -28,7 +29,7 @@ SocketConnection::SocketConnection(Messenger *messenger,
     socket(std::move(fd)),
     in(socket.input()),
     out(socket.output()),
-    send_ready(seastar::now())
+    send_ready(h.promise.get_future())
 {
 }
 
@@ -156,12 +157,175 @@ seastar::future<> SocketConnection::close()
   return seastar::when_all(in.close(), out.close()).discard_result();
 }
 
+// handshake
+
+/// store the banner in a non-const string for buffer::create_static()
+static char banner[] = CEPH_BANNER;
+constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
+
+constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
+constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
+
+WRITE_RAW_ENCODER(ceph_msg_connect);
+WRITE_RAW_ENCODER(ceph_msg_connect_reply);
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
+{
+  return out << "connect{features=" << std::hex << c.features << std::dec
+      << " host_type=" << c.host_type
+      << " global_seq=" << c.global_seq
+      << " connect_seq=" << c.connect_seq
+      << " protocol_version=" << c.protocol_version
+      << " authorizer_protocol=" << c.authorizer_protocol
+      << " authorizer_len=" << c.authorizer_len
+      << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
+}
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
+{
+  return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
+      << " features=" << std::hex << r.features << std::dec
+      << " global_seq=" << r.global_seq
+      << " connect_seq=" << r.connect_seq
+      << " protocol_version=" << r.protocol_version
+      << " authorizer_len=" << r.authorizer_len
+      << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
+}
+
+// check that the buffer starts with a valid banner without requiring it to
+// be contiguous in memory
+static void validate_banner(bufferlist::const_iterator& p)
+{
+  auto b = std::cbegin(banner);
+  auto end = b + banner_size;
+  while (b != end) {
+    const char *buf{nullptr};
+    auto remaining = std::distance(b, end);
+    auto len = p.get_ptr_and_advance(remaining, &buf);
+    if (!std::equal(buf, buf + len, b)) {
+      throw std::system_error(make_error_code(error::bad_connect_banner));
+    }
+    b += len;
+  }
+}
+
+// make sure that we agree with the peer about its address
+static void validate_peer_addr(const entity_addr_t& addr,
+                               const entity_addr_t& expected)
+{
+  if (addr == expected) {
+    return;
+  }
+  // ok if server bound anonymously, as long as port/nonce match
+  if (addr.is_blank_ip() &&
+      addr.get_port() == expected.get_port() &&
+      addr.get_nonce() == expected.get_nonce()) {
+    return;
+  } else {
+    throw std::system_error(make_error_code(error::bad_peer_address));
+  }
+}
+
+/// return a static bufferptr to the given object
+template <typename T>
+bufferptr create_static(T& obj)
+{
+  return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
+}
+
+seastar::future<> SocketConnection::handle_connect()
+{
+  memset(&h.reply, 0, sizeof(h.reply));
+
+  h.reply.protocol_version = CEPH_OSDC_PROTOCOL;
+  h.reply.tag = CEPH_MSGR_TAG_READY;
+
+  bufferlist bl;
+  bl.append(create_static(h.reply));
+
+  return out.write(std::move(bl))
+    .then([this] { return out.flush(); });
+}
+
+seastar::future<> SocketConnection::handle_connect_reply()
+{
+  if (h.reply.tag != CEPH_MSGR_TAG_READY) {
+    throw std::system_error(make_error_code(error::negotiation_failure));
+  }
+  return seastar::now();
+}
+
 seastar::future<> SocketConnection::client_handshake()
 {
-  return seastar::now(); // TODO
+  // read server's handshake header
+  return read(server_header_size)
+    .then([this] (bufferlist headerbl) {
+      auto p = headerbl.cbegin();
+      validate_banner(p);
+      entity_addr_t saddr, caddr;
+      ::decode(saddr, p);
+      ::decode(caddr, p);
+      assert(p.end());
+      validate_peer_addr(saddr, peer_addr);
+
+      if (my_addr != caddr) {
+        // take peer's address for me, but preserve my port/nonce
+        caddr.set_port(my_addr.get_port());
+        caddr.nonce = my_addr.nonce;
+        my_addr = caddr;
+      }
+      // encode/send client's handshake header
+      bufferlist bl;
+      bl.append(buffer::create_static(banner_size, banner));
+      ::encode(my_addr, bl, 0);
+
+      // encode ceph_msg_connect
+      memset(&h.connect, 0, sizeof(h.connect));
+      h.connect.protocol_version = CEPH_OSDC_PROTOCOL;
+      bl.append(create_static(h.connect));
+
+      // TODO: append authorizer
+      return out.write(std::move(bl))
+        .then([this] { return out.flush(); });
+    }).then([this] {
+      // read the reply
+      return read(sizeof(h.reply));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.begin();
+      ::decode(h.reply, p);
+      // TODO: read authorizer
+      assert(p.end());
+      return handle_connect_reply();
+    }).then_wrapped([this] (auto fut) {
+      // satisfy the handshake's promise
+      fut.forward_to(std::move(h.promise));
+    });
 }
 
 seastar::future<> SocketConnection::server_handshake()
 {
-  return seastar::now(); // TODO
+  // encode/send server's handshake header
+  bufferlist bl;
+  bl.append(buffer::create_static(banner_size, banner));
+  ::encode(my_addr, bl, 0);
+  ::encode(peer_addr, bl, 0);
+  return out.write(std::move(bl))
+    .then([this] { return out.flush(); })
+    .then([this] {
+      // read client's handshake header and connect request
+      return read(client_header_size + sizeof(h.connect));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.cbegin();
+      validate_banner(p);
+      entity_addr_t addr;
+      ::decode(addr, p);
+      ::decode(h.connect, p);
+      assert(p.end());
+      // TODO: read authorizer
+
+      return handle_connect();
+    }).then_wrapped([this] (auto fut) {
+      // satisfy the handshake's promise
+      fut.forward_to(std::move(h.promise));
+    });
 }
index ff77f0dd3360d92c772d7390005404300fcac783..de0062b7c309320bb48b2343961d0c21ac4077a9 100644 (file)
@@ -35,6 +35,18 @@ class SocketConnection : public Connection {
   /// read the requested number of bytes into a bufferlist
   seastar::future<bufferlist> read(size_t bytes);
 
+  /// state for handshake
+  struct Handshake {
+    ceph_msg_connect connect;
+    ceph_msg_connect_reply reply;
+    seastar::promise<> promise;
+  } h;
+
+  /// server side of handshake negotiation
+  seastar::future<> handle_connect();
+  /// client side of handshake negotiation
+  seastar::future<> handle_connect_reply();
+
   /// state for an incoming message
   struct MessageReader {
     ceph_msg_header header;