]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: clean seastar-msgr class dependencies 24576/head
authorYingxin <yingxin.cheng@intel.com>
Mon, 15 Oct 2018 09:46:55 +0000 (17:46 +0800)
committerYingxin <yingxin.cheng@intel.com>
Mon, 15 Oct 2018 20:43:35 +0000 (04:43 +0800)
Remove protocol-specific interfaces from Messenger/Connection classes,
and let SocketMessenger manage SocketConnection instead of Connection.

Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/Fwd.h
src/crimson/net/Messenger.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index e5e2a11f92e557464fe5ef59c37b4362699bafed..6aa600d3b5e2e82b3f8310d76d10a7f131630ac2 100644 (file)
@@ -27,18 +27,16 @@ using seq_num_t = uint64_t;
 class Connection : public boost::intrusive_ref_counter<Connection,
                                                       boost::thread_unsafe_counter> {
  protected:
-  Messenger *const messenger;
   entity_addr_t my_addr;
   entity_addr_t peer_addr;
 
  public:
-  Connection(Messenger *messenger, const entity_addr_t& my_addr,
+  Connection(const entity_addr_t& my_addr,
              const entity_addr_t& peer_addr)
-    : messenger(messenger), my_addr(my_addr), peer_addr(peer_addr) {}
+    : my_addr(my_addr), peer_addr(peer_addr) {}
   virtual ~Connection() {}
 
-  Messenger* get_messenger() const { return messenger; }
-
+  virtual Messenger* get_messenger() const = 0;
   const entity_addr_t& get_my_addr() const { return my_addr; }
   const entity_addr_t& get_peer_addr() const { return peer_addr; }
   virtual int get_peer_type() const = 0;
@@ -46,16 +44,6 @@ class Connection : public boost::intrusive_ref_counter<Connection,
   /// true if the handshake has completed and no errors have been encountered
   virtual bool is_connected() = 0;
 
-  /// complete a handshake from the client's perspective
-  virtual seastar::future<> client_handshake(entity_type_t peer_type,
-                                            entity_type_t host_type) = 0;
-
-  /// complete a handshake from the server's perspective
-  virtual seastar::future<> server_handshake() = 0;
-
-  /// read a message from a connection that has completed its handshake
-  virtual seastar::future<MessageRef> read_message() = 0;
-
   /// send a message over a connection that has completed its handshake
   virtual seastar::future<> send(MessageRef msg) = 0;
 
@@ -65,35 +53,6 @@ class Connection : public boost::intrusive_ref_counter<Connection,
 
   /// close the connection and cancel any any pending futures from read/send
   virtual seastar::future<> close() = 0;
-
-  /// move all messages in the sent list back into the queue
-  virtual void requeue_sent() = 0;
-
-  /// get all messages in the out queue
-  virtual std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() = 0;
-
-public:
-  enum class state_t {
-    none,
-    open,
-    standby,
-    closed,
-    wait
-  };
-  /// the number of connections initiated in this session, increment when a
-  /// new connection is established
-  virtual uint32_t connect_seq() const = 0;
-
-  /// the client side should connect us with a gseq. it will be reset with a
-  /// the one of exsting connection if it's greater.
-  virtual uint32_t peer_global_seq() const = 0;
-
-  virtual seq_num_t rx_seq_num() const = 0;
-
-  /// current state of connection
-  virtual state_t get_state() const = 0;
-  virtual bool is_server_side() const = 0;
-  virtual bool is_lossy() const = 0;
 };
 
 } // namespace ceph::net
index d234048a35c32ef179c57b28aef094efd92ee3bc..5aa04812d6021473c8b05a21b668a0c8d313de69 100644 (file)
 
 #include <boost/intrusive_ptr.hpp>
 
-#include "Errors.h"
 #include "msg/msg_types.h"
 #include "msg/Message.h"
 
 using peer_type_t = int;
 using auth_proto_t = int;
 
-class Message;
-using MessageRef = boost::intrusive_ptr<Message>;
-
 namespace ceph::net {
 
 using msgr_tag_t = uint8_t;
@@ -38,4 +34,3 @@ class Dispatcher;
 class Messenger;
 
 } // namespace ceph::net
-
index e67b304f366ed3d580edebbd280efac1c8aad1de..149be598975fbe16dcb1e7bac540dccdf221ffa5 100644 (file)
@@ -60,8 +60,6 @@ class Messenger {
     }
     return ++global_seq;
   }
-  virtual ConnectionRef lookup_conn(const entity_addr_t&) = 0;
-  virtual void unregister_conn(ConnectionRef) = 0;
 
   // @returns a tuple of <is_valid, auth_reply, session_key>
   virtual seastar::future<msgr_tag_t,    /// tag for error, 0 if authorized
index 9595cca9e1a050f372fd4ad318360b2d7ee58099..bc38696cd813b475803165b3bf8247e404d8e381 100644 (file)
  *
  */
 
+#include "SocketConnection.h"
+
 #include <algorithm>
 #include <seastar/core/shared_future.hh>
 #include <seastar/core/sleep.hh>
 #include <seastar/net/packet.hh>
 
-#include "Config.h"
-#include "Messenger.h"
-#include "SocketConnection.h"
-
 #include "include/msgr.h"
 #include "include/random.h"
 #include "auth/Auth.h"
 #include "auth/AuthSessionHandler.h"
-#include "msg/Message.h"
 
 #include "crimson/common/log.h"
+#include "Config.h"
+#include "Errors.h"
+#include "SocketMessenger.h"
 
 using namespace ceph::net;
 
@@ -42,11 +42,12 @@ namespace {
   }
 }
 
-SocketConnection::SocketConnection(Messenger *messenger,
+SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    const entity_addr_t& my_addr,
                                    const entity_addr_t& peer_addr,
                                    seastar::connected_socket&& fd)
-  : Connection(messenger, my_addr, peer_addr),
+  : Connection(my_addr, peer_addr),
+    messenger(messenger),
     socket(std::move(fd)),
     in(socket.input()),
     out(socket.output()),
@@ -61,6 +62,11 @@ SocketConnection::~SocketConnection()
   send_ready.ignore_ready_future();
 }
 
+ceph::net::Messenger*
+SocketConnection::get_messenger() const {
+  return &messenger;
+}
+
 bool SocketConnection::is_connected()
 {
   return !send_ready.failed();
@@ -284,7 +290,7 @@ bool SocketConnection::update_rx_seq(seq_num_t seq)
 seastar::future<> SocketConnection::write_message(MessageRef msg)
 {
   msg->set_seq(++out_seq);
-  msg->encode(features, get_messenger()->get_crc_flags());
+  msg->encode(features, messenger.get_crc_flags());
   bufferlist bl;
   bl.append(CEPH_MSGR_TAG_MSG);
   auto& header = msg->get_header();
@@ -297,13 +303,13 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
     bl.append((const char*)&footer, sizeof(footer));
   } else {
     ceph_msg_footer_old old_footer;
-    if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) {
+    if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
       old_footer.front_crc = footer.front_crc;
       old_footer.middle_crc = footer.middle_crc;
     } else {
       old_footer.front_crc = old_footer.middle_crc = 0;
     }
-    if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) {
+    if (messenger.get_crc_flags() & MSG_CRC_DATA) {
       old_footer.data_crc = footer.data_crc;
     } else {
       old_footer.data_crc = 0;
@@ -359,9 +365,9 @@ seastar::future<> SocketConnection::close()
   state = state_t::closed;
 
   // unregister_conn() drops a reference, so hold another until completion
-  auto cleanup = [conn = ConnectionRef(this)] {};
+  auto cleanup = [conn = SocketConnectionRef(this)] {};
 
-  get_messenger()->unregister_conn(this);
+  messenger.unregister_conn(this);
 
   // close_ready become valid only after state is state_t::closed
   assert(!close_ready.valid());
@@ -503,15 +509,15 @@ seastar::future<> SocketConnection::handle_connect()
         return seastar::make_ready_future<msgr_tag_t, bufferlist>(
             CEPH_MSGR_TAG_FEATURES, bufferlist{});
       }
-      return get_messenger()->verify_authorizer(get_peer_type(),
-                                               h.connect.authorizer_protocol,
-                                               authorizer);
+      return messenger.verify_authorizer(get_peer_type(),
+                                          h.connect.authorizer_protocol,
+                                          authorizer);
     }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
       memset(&h.reply, 0, sizeof(h.reply));
       if (tag) {
        return send_connect_reply(tag, std::move(authorizer_reply));
       }
-      if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) {
+      if (auto existing = messenger.lookup_conn(peer_addr); existing) {
        return handle_connect_with_existing(existing, std::move(authorizer_reply));
       } else if (h.connect.connect_seq > 0) {
        return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
@@ -546,7 +552,7 @@ seastar::future<>
 SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
                                            bufferlist&& authorizer_reply)
 {
-  h.global_seq = get_messenger()->get_global_seq();
+  h.global_seq = messenger.get_global_seq();
   h.reply.tag = tag;
   h.reply.features = policy.features_supported;
   h.reply.global_seq = h.global_seq;
@@ -607,7 +613,7 @@ SocketConnection::handle_keepalive2_ack()
 }
 
 seastar::future<>
-SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply)
+SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
 {
   if (h.connect.global_seq < existing->peer_global_seq()) {
     h.reply.global_seq = existing->peer_global_seq();
@@ -633,7 +639,7 @@ SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlis
        h.reply.connect_seq = existing->connect_seq() + 1;
        return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
       }
-    } else if (get_peer_addr() < get_my_addr() ||
+    } else if (peer_addr < my_addr ||
               existing->is_server_side()) {
       // incoming wins
       return replace_existing(existing, std::move(authorizer_reply));
@@ -648,7 +654,7 @@ SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlis
   }
 }
 
-seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
+seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing,
                                                      bufferlist&& authorizer_reply,
                                                     bool is_reset_from_peer)
 {
@@ -659,7 +665,7 @@ seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
   } else {
     reply_tag = CEPH_MSGR_TAG_READY;
   }
-  get_messenger()->unregister_conn(existing);
+  messenger.unregister_conn(existing);
   if (!existing->is_lossy()) {
     // reset the in_seq if this is a hard reset from peer,
     // otherwise we respect our original connection's value
@@ -685,7 +691,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
     }
     h.got_bad_auth = true;
     // try harder
-    return get_messenger()->get_authorizer(h.peer_type, true)
+    return messenger.get_authorizer(h.peer_type, true)
       .then([this](auto&& auth) {
         h.authorizer = std::move(auth);
        return seastar::now();
@@ -694,7 +700,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
     reset_session();
     return seastar::now();
   case CEPH_MSGR_TAG_RETRY_GLOBAL:
-    h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq);
+    h.global_seq = messenger.get_global_seq(h.reply.global_seq);
     return seastar::now();
   case CEPH_MSGR_TAG_RETRY_SESSION:
     ceph_assert(h.reply.connect_seq > h.connect_seq);
@@ -779,7 +785,7 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type,
   // this is fyi, actually, server decides!
   h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
 
-  return get_messenger()->get_authorizer(peer_type, false)
+  return messenger.get_authorizer(peer_type, false)
     .then([this](auto&& auth) {
       h.authorizer = std::move(auth);
       bufferlist bl;
@@ -839,7 +845,7 @@ seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
       bufferlist bl;
       bl.append(buffer::create_static(banner_size, banner));
       ::encode(my_addr, bl, 0);
-      h.global_seq = get_messenger()->get_global_seq();
+      h.global_seq = messenger.get_global_seq();
       return out.write(std::move(bl)).then([this] { return out.flush(); });
     }).then([=] {
       return seastar::do_until([=] { return state == state_t::open; },
@@ -889,7 +895,7 @@ seastar::future<> SocketConnection::server_handshake()
 seastar::future<> SocketConnection::fault()
 {
   if (policy.lossy) {
-    get_messenger()->unregister_conn(this);
+    messenger.unregister_conn(this);
   }
   if (h.backoff.count()) {
     h.backoff += h.backoff;
index 974c082ddcd2626eeed75b406b7b91239e413baa..b53f7e1a96211c7474b3cdb3cdf59051e44d0464 100644 (file)
 #include "Connection.h"
 #include "crimson/thread/Throttle.h"
 
+class AuthAuthorizer;
 class AuthSessionHandler;
 
 namespace ceph::net {
 
+class SocketMessenger;
+class SocketConnection;
+using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
+
 class SocketConnection : public Connection {
+  SocketMessenger& messenger;
   seastar::connected_socket socket;
   seastar::input_stream<char> in;
   seastar::output_stream<char> out;
 
+  enum class state_t {
+    none,
+    open,
+    standby,
+    closed,
+    wait
+  };
   state_t state = state_t::none;
 
   /// become valid only when state is state_t::closed
@@ -60,9 +73,9 @@ class SocketConnection : public Connection {
 
   /// server side of handshake negotiation
   seastar::future<> handle_connect();
-  seastar::future<> handle_connect_with_existing(ConnectionRef existing,
+  seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
                                                 bufferlist&& authorizer_reply);
-  seastar::future<> replace_existing(ConnectionRef existing,
+  seastar::future<> replace_existing(SocketConnectionRef existing,
                                     bufferlist&& authorizer_reply,
                                     bool is_reset_from_peer = false);
   seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
@@ -74,9 +87,6 @@ class SocketConnection : public Connection {
   seastar::future<> handle_keepalive2_ack();
 
   bool require_auth_feature() const;
-  int get_peer_type() const override {
-    return h.connect.host_type;
-  }
   uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
   /// client side of handshake negotiation
   seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
@@ -148,20 +158,19 @@ class SocketConnection : public Connection {
   seastar::future<> fault();
 
  public:
-  SocketConnection(Messenger *messenger,
+  SocketConnection(SocketMessenger& messenger,
                    const entity_addr_t& my_addr,
                    const entity_addr_t& peer_addr,
                    seastar::connected_socket&& socket);
   ~SocketConnection();
 
-  bool is_connected() override;
-
-  seastar::future<> client_handshake(entity_type_t peer_type,
-                                    entity_type_t host_type) override;
+  Messenger* get_messenger() const override;
 
-  seastar::future<> server_handshake() override;
+  int get_peer_type() const override {
+    return h.connect.host_type;
+  }
 
-  seastar::future<MessageRef> read_message() override;
+  bool is_connected() override;
 
   seastar::future<> send(MessageRef msg) override;
 
@@ -169,31 +178,49 @@ class SocketConnection : public Connection {
 
   seastar::future<> close() override;
 
-  uint32_t connect_seq() const override {
+ public:
+  /// complete a handshake from the client's perspective
+  seastar::future<> client_handshake(entity_type_t peer_type,
+                                    entity_type_t host_type);
+
+  /// complete a handshake from the server's perspective
+  seastar::future<> server_handshake();
+
+  /// read a message from a connection that has completed its handshake
+  seastar::future<MessageRef> read_message();
+
+  /// the number of connections initiated in this session, increment when a
+  /// new connection is established
+  uint32_t connect_seq() const {
     return h.connect_seq;
   }
-  uint32_t peer_global_seq() const override {
+
+  /// the client side should connect us with a gseq. it will be reset with
+  /// the one of exsting connection if it's greater.
+  uint32_t peer_global_seq() const {
     return h.peer_global_seq;
   }
   seq_num_t rx_seq_num() const {
     return in_seq;
   }
-  state_t get_state() const override {
+
+  /// current state of connection
+  state_t get_state() const {
     return state;
   }
-  bool is_server_side() const override {
+  bool is_server_side() const {
     return policy.server;
   }
-  bool is_lossy() const override {
+  bool is_lossy() const {
     return policy.lossy;
   }
 
-private:
-  void requeue_sent() override;
-  std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() override {
+  /// move all messages in the sent list back into the queue
+  void requeue_sent();
+
+  std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
     return {out_seq, std::move(out_q)};
   }
-
 };
 
 } // namespace ceph::net
index f86a52fb752ba8e7a9e4d9a76f580ce544b7ced6..1fefd4e1dd2003e3f0f749e120b0e02d657e8763 100644 (file)
  *
  */
 
+#include "SocketMessenger.h"
+
 #include <tuple>
+
 #include "auth/Auth.h"
-#include "SocketMessenger.h"
-#include "SocketConnection.h"
+#include "Errors.h"
 #include "Dispatcher.h"
-#include "msg/Message.h"
 
 using namespace ceph::net;
 
@@ -39,7 +40,7 @@ void SocketMessenger::bind(const entity_addr_t& addr)
   listener = seastar::listen(address, lo);
 }
 
-seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
+seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
 {
   auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
   std::ignore = i;
@@ -79,8 +80,8 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
   entity_addr_t peer_addr;
   peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
   peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-  ConnectionRef conn = new SocketConnection(this, get_myaddr(),
-                                            peer_addr, std::move(socket));
+  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(),
+                                                  peer_addr, std::move(socket));
   // initiate the handshake
   return conn->server_handshake()
     .then([=] {
@@ -133,8 +134,8 @@ SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type)
   }
   return seastar::connect(addr.in4_addr())
     .then([=] (seastar::connected_socket socket) {
-      ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
-                                                std::move(socket));
+      SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), addr,
+                                                      std::move(socket));
       // complete the handshake before returning to the caller
       return conn->client_handshake(peer_type, get_myname().type())
         .then([=] {
@@ -151,7 +152,7 @@ SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type)
           // dispatch replies on this connection
           dispatch(conn)
             .handle_exception([] (std::exception_ptr eptr) {});
-          return conn;
+          return ConnectionRef(conn);
         });
     });
 }
@@ -191,7 +192,7 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
   policy_set.set_throttlers(peer_type, throttle, nullptr);
 }
 
-ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
 {
   if (auto found = connections.find(addr);
       found != connections.end()) {
@@ -201,7 +202,7 @@ ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
   }
 }
 
-void SocketMessenger::unregister_conn(ConnectionRef conn)
+void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 {
   ceph_assert(conn);
   auto found = connections.find(conn->get_peer_addr());
index 9297b37087f7559b87076ca341cab2698cf6f9bc..e025daa733e1ee207420adb5c7056870544cbf65 100644 (file)
@@ -21,6 +21,7 @@
 
 #include "msg/Policy.h"
 #include "Messenger.h"
+#include "SocketConnection.h"
 #include "crimson/thread/Throttle.h"
 
 namespace ceph::net {
@@ -30,12 +31,12 @@ using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
 class SocketMessenger final : public Messenger {
   std::optional<seastar::server_socket> listener;
   Dispatcher *dispatcher = nullptr;
-  std::map<entity_addr_t, ConnectionRef> connections;
+  std::map<entity_addr_t, SocketConnectionRef> connections;
   using Throttle = ceph::thread::Throttle;
   ceph::net::PolicySet<Throttle> policy_set;
   seastar::gate pending_dispatch;
 
-  seastar::future<> dispatch(ConnectionRef conn);
+  seastar::future<> dispatch(SocketConnectionRef conn);
 
   seastar::future<> accept(seastar::connected_socket socket,
                            seastar::socket_address paddr);
@@ -51,18 +52,23 @@ class SocketMessenger final : public Messenger {
                                         entity_type_t peer_type) override;
 
   seastar::future<> shutdown() override;
-  void set_default_policy(const SocketPolicy& p);
-  void set_policy(entity_type_t peer_type, const SocketPolicy& p);
-  void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
-  ConnectionRef lookup_conn(const entity_addr_t& addr) override;
-  void unregister_conn(ConnectionRef) override;
+
   seastar::future<msgr_tag_t, bufferlist>
   verify_authorizer(peer_type_t peer_type,
                    auth_proto_t protocol,
                    bufferlist& auth) override;
+
   seastar::future<std::unique_ptr<AuthAuthorizer>>
   get_authorizer(peer_type_t peer_type,
                 bool force_new) override;
+
+ public:
+  void set_default_policy(const SocketPolicy& p);
+  void set_policy(entity_type_t peer_type, const SocketPolicy& p);
+  void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
+
+  SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+  void unregister_conn(SocketConnectionRef);
 };
 
 } // namespace ceph::net