]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: implement accepting/connecting states 24608/head
authorYingxin <yingxin.cheng@intel.com>
Mon, 15 Oct 2018 20:54:23 +0000 (04:54 +0800)
committerYingxin <yingxin.cheng@intel.com>
Wed, 17 Oct 2018 18:15:26 +0000 (02:15 +0800)
- introduce the new accepting/connecting connection states.
- return ConnectionRef immediately when connect().
- manage the ownership of the accepting connections.
- manage the ownership of the registered connections.
- encapsulate a Socket class because it is not created when constructing
  a SocketConnection, and allow it to be replaced in the future.
- refactor related interfaces.

Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/CMakeLists.txt
src/crimson/mon/MonClient.cc
src/crimson/net/Connection.h
src/crimson/net/Messenger.h
src/crimson/net/Socket.cc [new file with mode: 0644]
src/crimson/net/Socket.h [new file with mode: 0644]
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index 519c1629840c391307483aa9ceec3586a5702f8b..cc17277c7b07686fd8e24e3f53aad88398f8429d 100644 (file)
@@ -118,7 +118,8 @@ set(crimson_net_srcs
   net/Dispatcher.cc
   net/Errors.cc
   net/SocketConnection.cc
-  net/SocketMessenger.cc)
+  net/SocketMessenger.cc
+  net/Socket.cc)
 set(crimson_thread_srcs
   thread/ThreadPool.cc
   thread/Throttle.cc)
index ab8e53ce8544e02ceaab4a28c69392484dc60fa4..9f1994ba9f5b31e6d5fce006f90b4a73afd9ad3b 100644 (file)
@@ -500,14 +500,13 @@ seastar::future<> Client::reopen_session(int rank)
   return seastar::parallel_for_each(mons, [this](auto rank) {
     auto peer = monmap.get_addr(rank);
     logger().info("connecting to mon.{}", rank);
-    return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then([this](auto conn) {
-      auto& mc = pending_conns.emplace_back(conn, &keyring);
-      return mc.authenticate(
-        monmap.get_epoch(), entity_name,
-        auth_methods, want_keys).handle_exception([conn](auto ep) {
-        return conn->close().then([ep = std::move(ep)] {
-          std::rethrow_exception(ep);
-        });
+    auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
+    auto& mc = pending_conns.emplace_back(conn, &keyring);
+    return mc.authenticate(
+      monmap.get_epoch(), entity_name,
+      auth_methods, want_keys).handle_exception([conn](auto ep) {
+      return conn->close().then([ep = std::move(ep)] {
+        std::rethrow_exception(ep);
       });
     }).then([peer, this] {
       if (!is_hunting()) {
index 6aa600d3b5e2e82b3f8310d76d10a7f131630ac2..cc2f4eabf85771a826e0df13c8a22d2955760a87 100644 (file)
@@ -29,11 +29,11 @@ class Connection : public boost::intrusive_ref_counter<Connection,
  protected:
   entity_addr_t my_addr;
   entity_addr_t peer_addr;
+  peer_type_t peer_type = -1;
 
  public:
-  Connection(const entity_addr_t& my_addr,
-             const entity_addr_t& peer_addr)
-    : my_addr(my_addr), peer_addr(peer_addr) {}
+  Connection(const entity_addr_t& my_addr)
+    : my_addr(my_addr) {}
   virtual ~Connection() {}
 
   virtual Messenger* get_messenger() const = 0;
index 149be598975fbe16dcb1e7bac540dccdf221ffa5..fcdd5ad32a6e0a073532ddbea8f67b8ef5e71385 100644 (file)
@@ -46,9 +46,10 @@ class Messenger {
   /// start the messenger
   virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
 
-  /// establish a client connection and complete a handshake
-  virtual seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
-                                                entity_type_t peer_type) = 0;
+  /// either return an existing connection to the peer,
+  /// or a new pending connection
+  virtual ConnectionRef connect(const entity_addr_t& peer_addr,
+                                const entity_type_t& peer_type) = 0;
 
   /// stop listenening and wait for all connections to close. safe to destruct
   /// after this future becomes available
diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc
new file mode 100644 (file)
index 0000000..3c12c61
--- /dev/null
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Socket.h"
+
+#include "Errors.h"
+
+namespace ceph::net {
+
+namespace {
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+  bufferlist& bl;
+  size_t& remaining;
+
+  bufferlist_consumer(bufferlist& bl, size_t& remaining)
+    : bl(bl), remaining(remaining) {}
+
+  using tmp_buf = seastar::temporary_buffer<char>;
+  using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
+
+  // consume some or all of a buffer segment
+  seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
+    if (remaining >= data.size()) {
+      // consume the whole buffer
+      remaining -= data.size();
+      bl.append(buffer::create_foreign(std::move(data)));
+      if (remaining > 0) {
+        // return none to request more segments
+        return seastar::make_ready_future<consumption_result_type>(
+            seastar::continue_consuming{});
+      } else {
+        // return an empty buffer to singal that we're done
+        return seastar::make_ready_future<consumption_result_type>(
+            consumption_result_type::stop_consuming_type({}));
+      }
+    }
+    if (remaining > 0) {
+      // consume the front
+      bl.append(buffer::create_foreign(data.share(0, remaining)));
+      data.trim_front(remaining);
+      remaining = 0;
+    }
+    // give the rest back to signal that we're done
+    return seastar::make_ready_future<consumption_result_type>(
+        consumption_result_type::stop_consuming_type{std::move(data)});
+  };
+};
+
+} // anonymous namespace
+
+seastar::future<bufferlist> Socket::read(size_t bytes)
+{
+  if (bytes == 0) {
+    return seastar::make_ready_future<bufferlist>();
+  }
+  r.buffer.clear();
+  r.remaining = bytes;
+  return in.consume(bufferlist_consumer{r.buffer, r.remaining})
+    .then([this] {
+      if (r.remaining) { // throw on short reads
+        throw std::system_error(make_error_code(error::read_eof));
+      }
+      return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+    });
+}
+
+} // namespace ceph::net
diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h
new file mode 100644 (file)
index 0000000..07ab189
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/reactor.hh>
+#include <seastar/net/packet.hh>
+
+#include "include/buffer.h"
+
+namespace ceph::net {
+
+class Socket
+{
+  seastar::connected_socket socket;
+  seastar::input_stream<char> in;
+  seastar::output_stream<char> out;
+
+  /// buffer state for read()
+  struct {
+    bufferlist buffer;
+    size_t remaining;
+  } r;
+
+ public:
+  explicit Socket(seastar::connected_socket&& _socket)
+    : socket(std::move(_socket)),
+      in(socket.input()),
+      out(socket.output()) {}
+  Socket(Socket&& o) = default;
+
+  /// read the requested number of bytes into a bufferlist
+  seastar::future<bufferlist> read(size_t bytes);
+  using tmp_buf = seastar::temporary_buffer<char>;
+  using packet = seastar::net::packet;
+  seastar::future<tmp_buf> read_exactly(size_t bytes) {
+    return in.read_exactly(bytes);
+  }
+
+  seastar::future<> write(packet&& buf) {
+    return out.write(std::move(buf));
+  }
+  seastar::future<> flush() {
+    return out.flush();
+  }
+  seastar::future<> write_flush(packet&& buf) {
+    return out.write(std::move(buf)).then([this] { return out.flush(); });
+  }
+
+  /// Socket can only be closed once.
+  seastar::future<> close() {
+    return seastar::when_all(in.close(), out.close()).discard_result();
+  }
+};
+
+} // namespace ceph::net
index 1f0dd74de144ed2a8a3d2e8669384edf548eadbf..26bd1e491f3076f7f5eb29655fbe4b9fb761d6f2 100644 (file)
@@ -43,14 +43,9 @@ namespace {
 }
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
-                                   const entity_addr_t& my_addr,
-                                   const entity_addr_t& peer_addr,
-                                   seastar::connected_socket&& fd)
-  : Connection(my_addr, peer_addr),
+                                   const entity_addr_t& my_addr)
+  : Connection(my_addr),
     messenger(messenger),
-    socket(std::move(fd)),
-    in(socket.input()),
-    out(socket.output()),
     send_ready(h.promise.get_future())
 {
 }
@@ -72,67 +67,11 @@ bool SocketConnection::is_connected()
   return !send_ready.failed();
 }
 
-// an input_stream consumer that reads buffer segments into a bufferlist up to
-// the given number of remaining bytes
-struct bufferlist_consumer {
-  bufferlist& bl;
-  size_t& remaining;
-
-  bufferlist_consumer(bufferlist& bl, size_t& remaining)
-    : bl(bl), remaining(remaining) {}
-
-  using tmp_buf = seastar::temporary_buffer<char>;
-  using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
-
-  // consume some or all of a buffer segment
-  seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
-    if (remaining >= data.size()) {
-      // consume the whole buffer
-      remaining -= data.size();
-      bl.append(buffer::create_foreign(std::move(data)));
-      if (remaining > 0) {
-        // return none to request more segments
-        return seastar::make_ready_future<consumption_result_type>(
-            seastar::continue_consuming{});
-      } else {
-        // return an empty buffer to singal that we're done
-        return seastar::make_ready_future<consumption_result_type>(
-            consumption_result_type::stop_consuming_type({}));
-      }
-    }
-    if (remaining > 0) {
-      // consume the front
-      bl.append(buffer::create_foreign(data.share(0, remaining)));
-      data.trim_front(remaining);
-      remaining = 0;
-    }
-    // give the rest back to signal that we're done
-    return seastar::make_ready_future<consumption_result_type>(
-        consumption_result_type::stop_consuming_type{std::move(data)});
-  };
-};
-
-seastar::future<bufferlist> SocketConnection::read(size_t bytes)
-{
-  if (bytes == 0) {
-    return seastar::make_ready_future<bufferlist>();
-  }
-  r.buffer.clear();
-  r.remaining = bytes;
-  return in.consume(bufferlist_consumer{r.buffer, r.remaining})
-    .then([this] {
-      if (r.remaining) { // throw on short reads
-        throw std::system_error(make_error_code(error::read_eof));
-      }
-      return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
-    });
-}
-
 void SocketConnection::read_tags_until_next_message()
 {
   seastar::repeat([this] {
       // read the next tag
-      return in.read_exactly(1)
+      return socket->read_exactly(1)
         .then([this] (auto buf) {
           if (buf.empty()) {
             throw std::system_error(make_error_code(error::read_eof));
@@ -172,7 +111,7 @@ void SocketConnection::read_tags_until_next_message()
 
 seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
 {
-  return in.read_exactly(sizeof(ceph_le64))
+  return socket->read_exactly(sizeof(ceph_le64))
     .then([this] (auto buf) {
       auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
       discard_up_to(&sent, *seq);
@@ -216,7 +155,7 @@ seastar::future<MessageRef> SocketConnection::do_read_message()
     .then([this] {
       on_message = seastar::promise<>{};
       // read header
-      return read(sizeof(m.header));
+      return socket->read(sizeof(m.header));
     }).then([this] (bufferlist bl) {
       // throttle the traffic, maybe
       auto p = bl.cbegin();
@@ -224,19 +163,19 @@ seastar::future<MessageRef> SocketConnection::do_read_message()
       return maybe_throttle();
     }).then([this] {
       // read front
-      return read(m.header.front_len);
+      return socket->read(m.header.front_len);
     }).then([this] (bufferlist bl) {
       m.front = std::move(bl);
       // read middle
-      return read(m.header.middle_len);
+      return socket->read(m.header.middle_len);
     }).then([this] (bufferlist bl) {
       m.middle = std::move(bl);
       // read data
-      return read(m.header.data_len);
+      return socket->read(m.header.data_len);
     }).then([this] (bufferlist bl) {
       m.data = std::move(bl);
       // read footer
-      return read(sizeof(m.footer));
+      return socket->read(sizeof(m.footer));
     }).then([this] (bufferlist bl) {
       // resume background processing of tags
       read_tags_until_next_message();
@@ -316,8 +255,7 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
     bl.append((const char*)&old_footer, sizeof(old_footer));
   }
   // write as a seastar::net::packet
-  return out.write(std::move(bl))
-    .then([this] { return out.flush(); })
+  return socket->write_flush(std::move(bl))
     .then([this, msg = std::move(msg)] {
       if (!policy.lossy) {
         sent.push(std::move(msg));
@@ -344,9 +282,7 @@ seastar::future<> SocketConnection::keepalive()
   seastar::shared_future<> f = send_ready.then([this] {
       k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
         ceph::coarse_real_clock::now());
-      return out.write(make_static_packet(k.req));
-    }).then([this] {
-      return out.flush();
+      return socket->write_flush(make_static_packet(k.req));
     });
   send_ready = f.get_future();
   return f.get_future();
@@ -354,24 +290,28 @@ seastar::future<> SocketConnection::keepalive()
 
 seastar::future<> SocketConnection::close()
 {
-  if (state == state_t::closed) {
+  if (state == state_t::closing) {
     // already closing
     assert(close_ready.valid());
     return close_ready.get_future();
   }
 
-  state = state_t::closed;
-
   // unregister_conn() drops a reference, so hold another until completion
   auto cleanup = [conn = SocketConnectionRef(this)] {};
 
-  messenger.unregister_conn(this);
+  if (state == state_t::accepting) {
+    messenger.unaccept_conn(this);
+  } else if (state >= state_t::connecting && state < state_t::closing) {
+    messenger.unregister_conn(this);
+  } else {
+    // cannot happen
+    ceph_assert(false);
+  }
+  state = state_t::closing;
 
-  // close_ready become valid only after state is state_t::closed
+  // close_ready become valid only after state is state_t::closing
   assert(!close_ready.valid());
-  close_ready = seastar::when_all(in.close(), out.close())
-    .discard_result()
-    .finally(std::move(cleanup));
+  close_ready = socket->close().finally(std::move(cleanup));
   return close_ready.get_future();
 }
 
@@ -487,13 +427,15 @@ uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool conne
   }
 }
 
-seastar::future<> SocketConnection::handle_connect()
+seastar::future<>
+SocketConnection::repeat_handle_connect()
 {
-  return read(sizeof(h.connect))
+  return socket->read(sizeof(h.connect))
     .then([this](bufferlist bl) {
       auto p = bl.cbegin();
       ::decode(h.connect, p);
-      return read(h.connect.authorizer_len);
+      peer_type = h.connect.host_type;
+      return socket->read(h.connect.authorizer_len);
     }).then([this] (bufferlist authorizer) {
       if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
        return seastar::make_ready_future<msgr_tag_t, bufferlist>(
@@ -507,9 +449,9 @@ seastar::future<> SocketConnection::handle_connect()
         return seastar::make_ready_future<msgr_tag_t, bufferlist>(
             CEPH_MSGR_TAG_FEATURES, bufferlist{});
       }
-      return messenger.verify_authorizer(get_peer_type(),
-                                          h.connect.authorizer_protocol,
-                                          authorizer);
+      return messenger.verify_authorizer(peer_type,
+                                         h.connect.authorizer_protocol,
+                                         authorizer);
     }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
       memset(&h.reply, 0, sizeof(h.reply));
       if (tag) {
@@ -538,11 +480,9 @@ SocketConnection::send_connect_reply(msgr_tag_t tag,
                                            policy.features_supported) |
                                           policy.features_required);
   h.reply.authorizer_len = authorizer_reply.length();
-  return out.write(make_static_packet(h.reply))
+  return socket->write(make_static_packet(h.reply))
     .then([this, reply=std::move(authorizer_reply)]() mutable {
-      return out.write(std::move(reply));
-    }).then([this] {
-      return out.flush();
+      return socket->write_flush(std::move(reply));
     });
 }
 
@@ -560,28 +500,28 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
     h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
   }
   h.reply.authorizer_len = authorizer_reply.length();
-  return out.write(make_static_packet(h.reply))
+  return socket->write(make_static_packet(h.reply))
     .then([this, reply=std::move(authorizer_reply)]() mutable {
       if (reply.length()) {
-        return out.write(std::move(reply));
+        return socket->write(std::move(reply));
       } else {
         return seastar::now();
       }
     }).then([this] {
       if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
-        return out.write(make_static_packet(in_seq))
+        return socket->write_flush(make_static_packet(in_seq))
           .then([this] {
-            return out.flush();
-          }).then([this] {
-            return in.read_exactly(sizeof(seq_num_t));
+            return socket->read_exactly(sizeof(seq_num_t));
           }).then([this] (auto buf) {
             auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
             discard_up_to(&out_q, *acked_seq);
           });
       } else {
-        return out.flush();
+        return socket->flush();
       }
     }).then([this] {
+      messenger.register_conn(this);
+      messenger.unaccept_conn(this);
       state = state_t::open;
     });
 }
@@ -589,20 +529,18 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
 seastar::future<>
 SocketConnection::handle_keepalive2()
 {
-  return in.read_exactly(sizeof(ceph_timespec))
+  return socket->read_exactly(sizeof(ceph_timespec))
     .then([this] (auto buf) {
       k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
       std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
-      return out.write(make_static_packet(k.ack));
-    }).then([this] {
-      return out.flush();
+      return socket->write_flush(make_static_packet(k.ack));
     });
 }
 
 seastar::future<>
 SocketConnection::handle_keepalive2_ack()
 {
-  return in.read_exactly(sizeof(ceph_timespec))
+  return socket->read_exactly(sizeof(ceph_timespec))
     .then([this] (auto buf) {
       auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
       k.ack_stamp = *t;
@@ -689,7 +627,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
     }
     h.got_bad_auth = true;
     // try harder
-    return messenger.get_authorizer(h.peer_type, true)
+    return messenger.get_authorizer(peer_type, true)
       .then([this](auto&& auth) {
         h.authorizer = std::move(auth);
        return seastar::now();
@@ -716,13 +654,11 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
     return fault();
   }
   if (tag == CEPH_MSGR_TAG_SEQ) {
-    return in.read_exactly(sizeof(seq_num_t))
+    return socket->read_exactly(sizeof(seq_num_t))
       .then([this] (auto buf) {
         auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
         discard_up_to(&out_q, *acked_seq);
-        return out.write(make_static_packet(in_seq));
-      }).then([this] {
-        return out.flush();
+        return socket->write_flush(make_static_packet(in_seq));
       }).then([this] {
         return handle_connect_reply(CEPH_MSGR_TAG_READY);
       });
@@ -769,14 +705,12 @@ void SocketConnection::reset_session()
   }
 }
 
-seastar::future<> SocketConnection::connect(entity_type_t peer_type,
-                                            entity_type_t host_type)
+seastar::future<> SocketConnection::repeat_connect()
 {
   // encode ceph_msg_connect
-  h.peer_type = peer_type;
   memset(&h.connect, 0, sizeof(h.connect));
   h.connect.features = policy.features_supported;
-  h.connect.host_type = host_type;
+  h.connect.host_type = messenger.get_myname().type();
   h.connect.global_seq = h.global_seq;
   h.connect.connect_seq = h.connect_seq;
   h.connect.protocol_version = get_proto_version(peer_type, true);
@@ -797,17 +731,15 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type,
         h.connect.authorizer_len = 0;
         bl.append(create_static(h.connect));
       };
-      return out.write(std::move(bl));
-    }).then([this] {
-      return out.flush();
+      return socket->write_flush(std::move(bl));
     }).then([this] {
      // read the reply
-      return read(sizeof(h.reply));
+      return socket->read(sizeof(h.reply));
     }).then([this] (bufferlist bl) {
       auto p = bl.cbegin();
       ::decode(h.reply, p);
       ceph_assert(p.end());
-      return read(h.reply.authorizer_len);
+      return socket->read(h.reply.authorizer_len);
     }).then([this] (bufferlist bl) {
       if (h.authorizer) {
         auto reply = bl.cbegin();
@@ -820,12 +752,22 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type,
     });
 }
 
-seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
-                                                     entity_type_t host_type)
-{
-  // read server's handshake header
-  return read(server_header_size)
-    .then([this] (bufferlist headerbl) {
+seastar::future<>
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+                                const entity_type_t& _peer_type)
+{
+  ceph_assert(state == state_t::none);
+  ceph_assert(!socket);
+  peer_addr = _peer_addr;
+  peer_type = _peer_type;
+  messenger.register_conn(this);
+  state = state_t::connecting;
+  return seastar::connect(peer_addr.in4_addr())
+    .then([this](seastar::connected_socket fd) {
+      socket.emplace(std::move(fd));
+      // read server's handshake header
+      return socket->read(server_header_size);
+    }).then([this] (bufferlist headerbl) {
       auto p = headerbl.cbegin();
       validate_banner(p);
       entity_addr_t saddr, caddr;
@@ -844,10 +786,10 @@ seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
       bl.append(buffer::create_static(banner_size, banner));
       ::encode(my_addr, bl, 0);
       h.global_seq = messenger.get_global_seq();
-      return out.write(std::move(bl)).then([this] { return out.flush(); });
+      return socket->write_flush(std::move(bl));
     }).then([=] {
       return seastar::do_until([=] { return state == state_t::open; },
-                               [=] { return connect(peer_type, host_type); });
+                               [=] { return repeat_connect(); });
     }).then([this] {
       // start background processing of tags
       read_tags_until_next_message();
@@ -857,18 +799,25 @@ seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
     });
 }
 
-seastar::future<> SocketConnection::server_handshake()
-{
+seastar::future<>
+SocketConnection::start_accept(seastar::connected_socket&& fd,
+                               const entity_addr_t& _peer_addr)
+{
+  ceph_assert(state == state_t::none);
+  ceph_assert(!socket);
+  peer_addr = _peer_addr;
+  socket.emplace(std::move(fd));
+  messenger.accept_conn(this);
+  state = state_t::accepting;
   // encode/send server's handshake header
   bufferlist bl;
   bl.append(buffer::create_static(banner_size, banner));
   ::encode(my_addr, bl, 0);
   ::encode(peer_addr, bl, 0);
-  return out.write(std::move(bl))
-    .then([this] { return out.flush(); })
+  return socket->write_flush(std::move(bl))
     .then([this] {
       // read client's handshake header and connect request
-      return read(client_header_size);
+      return socket->read(client_header_size);
     }).then([this] (bufferlist bl) {
       auto p = bl.cbegin();
       validate_banner(p);
@@ -880,7 +829,7 @@ seastar::future<> SocketConnection::server_handshake()
       }
     }).then([this] {
       return seastar::do_until([this] { return state == state_t::open; },
-                               [this] { return handle_connect(); });
+                               [this] { return repeat_handle_connect(); });
     }).then([this] {
       // start background processing of tags
       read_tags_until_next_message();
index b53f7e1a96211c7474b3cdb3cdf59051e44d0464..87c8e5d123277b71ce09090b4ef072298dcb63ab 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "msg/Policy.h"
 #include "Connection.h"
+#include "Socket.h"
 #include "crimson/thread/Throttle.h"
 
 class AuthAuthorizer;
@@ -32,38 +33,28 @@ using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
 
 class SocketConnection : public Connection {
   SocketMessenger& messenger;
-  seastar::connected_socket socket;
-  seastar::input_stream<char> in;
-  seastar::output_stream<char> out;
+  std::optional<Socket> socket;
 
   enum class state_t {
     none,
+    accepting,
+    connecting,
     open,
     standby,
-    closed,
-    wait
+    wait,
+    closing
   };
   state_t state = state_t::none;
 
-  /// become valid only when state is state_t::closed
+  /// become valid only when state is state_t::closing
   seastar::shared_future<> close_ready;
 
-  /// buffer state for read()
-  struct Reader {
-    bufferlist buffer;
-    size_t remaining;
-  } r;
-
-  /// read the requested number of bytes into a bufferlist
-  seastar::future<bufferlist> read(size_t bytes);
-
   /// state for handshake
   struct Handshake {
     ceph_msg_connect connect;
     ceph_msg_connect_reply reply;
     bool got_bad_auth = false;
     std::unique_ptr<AuthAuthorizer> authorizer;
-    peer_type_t peer_type;
     std::chrono::milliseconds backoff;
     uint32_t connect_seq = 0;
     uint32_t peer_global_seq = 0;
@@ -72,7 +63,7 @@ class SocketConnection : public Connection {
   } h;
 
   /// server side of handshake negotiation
-  seastar::future<> handle_connect();
+  seastar::future<> repeat_handle_connect();
   seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
                                                 bufferlist&& authorizer_reply);
   seastar::future<> replace_existing(SocketConnectionRef existing,
@@ -89,7 +80,7 @@ class SocketConnection : public Connection {
   bool require_auth_feature() const;
   uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
   /// client side of handshake negotiation
-  seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
+  seastar::future<> repeat_connect();
   seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
   void reset_session();
 
@@ -159,15 +150,13 @@ class SocketConnection : public Connection {
 
  public:
   SocketConnection(SocketMessenger& messenger,
-                   const entity_addr_t& my_addr,
-                   const entity_addr_t& peer_addr,
-                   seastar::connected_socket&& socket);
+                   const entity_addr_t& my_addr);
   ~SocketConnection();
 
   Messenger* get_messenger() const override;
 
   int get_peer_type() const override {
-    return h.connect.host_type;
+    return peer_type;
   }
 
   bool is_connected() override;
@@ -180,11 +169,12 @@ class SocketConnection : public Connection {
 
  public:
   /// complete a handshake from the client's perspective
-  seastar::future<> client_handshake(entity_type_t peer_type,
-                                    entity_type_t host_type);
+  seastar::future<> start_connect(const entity_addr_t& peer_addr,
+                                  const entity_type_t& peer_type);
 
   /// complete a handshake from the server's perspective
-  seastar::future<> server_handshake();
+  seastar::future<> start_accept(seastar::connected_socket&& socket,
+                                 const entity_addr_t& peer_addr);
 
   /// read a message from a connection that has completed its handshake
   seastar::future<MessageRef> read_message();
index 1fefd4e1dd2003e3f0f749e120b0e02d657e8763..827267f238dccc4a7c83e51708f8bfbc3710792a 100644 (file)
@@ -42,10 +42,6 @@ void SocketMessenger::bind(const entity_addr_t& addr)
 
 seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
 {
-  auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
-  std::ignore = i;
-  ceph_assert(added);
-
   return seastar::keep_doing([=] {
       return conn->read_message()
         .then([=] (MessageRef msg) {
@@ -80,11 +76,10 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
   entity_addr_t peer_addr;
   peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
   peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(),
-                                                  peer_addr, std::move(socket));
+  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
   // initiate the handshake
-  return conn->server_handshake()
-    .then([=] {
+  return conn->start_accept(std::move(socket), peer_addr)
+    .then([this, conn] {
       // notify the dispatcher and allow them to reject the connection
       return seastar::with_gate(pending_dispatch, [=] {
           return dispatcher->ms_handle_accept(conn);
@@ -126,35 +121,30 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
   return seastar::now();
 }
 
-seastar::future<ceph::net::ConnectionRef>
-SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type)
+ceph::net::ConnectionRef
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
 {
-  if (auto found = lookup_conn(addr); found) {
-    return seastar::make_ready_future<ceph::net::ConnectionRef>(found);
+  if (auto found = lookup_conn(peer_addr); found) {
+    return found;
   }
-  return seastar::connect(addr.in4_addr())
-    .then([=] (seastar::connected_socket socket) {
-      SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), addr,
-                                                      std::move(socket));
-      // complete the handshake before returning to the caller
-      return conn->client_handshake(peer_type, get_myname().type())
-        .then([=] {
-          // notify the dispatcher and allow them to reject the connection
-          return seastar::with_gate(pending_dispatch, [=] {
-            return dispatcher->ms_handle_connect(conn);
-          });
-        }).handle_exception([conn] (std::exception_ptr eptr) {
-          // close the connection before returning errors
-          return seastar::make_exception_future<>(eptr)
-            .finally([conn] { return conn->close(); });
-         // TODO: retry on fault
-        }).then([=] {
-          // dispatch replies on this connection
-          dispatch(conn)
-            .handle_exception([] (std::exception_ptr eptr) {});
-          return ConnectionRef(conn);
-        });
+  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
+  conn->start_connect(peer_addr, peer_type)
+    .then([this, conn] {
+      // notify the dispatcher and allow them to reject the connection
+      return seastar::with_gate(pending_dispatch, [this, conn] {
+        return dispatcher->ms_handle_connect(conn);
+      });
+    }).handle_exception([conn] (std::exception_ptr eptr) {
+      // close the connection before returning errors
+      return seastar::make_exception_future<>(eptr)
+        .finally([conn] { return conn->close(); });
+      // TODO: retry on fault
+    }).then([this, conn] {
+      // dispatch replies on this connection
+      dispatch(conn)
+        .handle_exception([] (std::exception_ptr eptr) {});
     });
+  return conn;
 }
 
 seastar::future<> SocketMessenger::shutdown()
@@ -163,11 +153,15 @@ seastar::future<> SocketMessenger::shutdown()
     listener->abort_accept();
   }
   // close all connections
-  return seastar::parallel_for_each(connections.begin(), connections.end(),
-    [this] (auto conn) {
-      return conn.second->close();
+  return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
+      return conn->close();
+    }).then([this] {
+      ceph_assert(accepting_conns.empty());
+      return seastar::parallel_for_each(connections, [] (auto conn) {
+          return conn.second->close();
+        });
     }).finally([this] {
-      connections.clear();
+      ceph_assert(connections.empty());
       // closing connections will unblock any dispatchers that were waiting to
       // send(). wait for any pending calls to finish
       return pending_dispatch.close();
@@ -202,6 +196,23 @@ ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t&
   }
 }
 
+void SocketMessenger::accept_conn(SocketConnectionRef conn)
+{
+  accepting_conns.insert(conn);
+}
+
+void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
+{
+  accepting_conns.erase(conn);
+}
+
+void SocketMessenger::register_conn(SocketConnectionRef conn)
+{
+  auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+  std::ignore = i;
+  ceph_assert(added);
+}
+
 void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 {
   ceph_assert(conn);
index e025daa733e1ee207420adb5c7056870544cbf65..d2ef0b6456d1caef3e4fba99b0834e778c41d864 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <map>
 #include <optional>
+#include <set>
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 
@@ -32,6 +33,7 @@ class SocketMessenger final : public Messenger {
   std::optional<seastar::server_socket> listener;
   Dispatcher *dispatcher = nullptr;
   std::map<entity_addr_t, SocketConnectionRef> connections;
+  std::set<SocketConnectionRef> accepting_conns;
   using Throttle = ceph::thread::Throttle;
   ceph::net::PolicySet<Throttle> policy_set;
   seastar::gate pending_dispatch;
@@ -48,8 +50,8 @@ class SocketMessenger final : public Messenger {
 
   seastar::future<> start(Dispatcher *dispatcher) override;
 
-  seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
-                                        entity_type_t peer_type) override;
+  ConnectionRef connect(const entity_addr_t& peer_addr,
+                        const entity_type_t& peer_type) override;
 
   seastar::future<> shutdown() override;
 
@@ -68,6 +70,9 @@ class SocketMessenger final : public Messenger {
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
 
   SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+  void accept_conn(SocketConnectionRef);
+  void unaccept_conn(SocketConnectionRef);
+  void register_conn(SocketConnectionRef);
   void unregister_conn(SocketConnectionRef);
 };