]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: fix address learning during banner exchange
authorYingxin <yingxin.cheng@intel.com>
Mon, 17 Dec 2018 13:51:49 +0000 (21:51 +0800)
committerYingxin <yingxincheng@gmail.com>
Thu, 3 Jan 2019 07:50:31 +0000 (15:50 +0800)
* Don't store my_addr in `Connection`, because my_addr can be learned
  and thus changed.
* Support nonce in SocketMessenger.
* Always set nonce when set_myaddr().
* Add learned_addr() for SocketMessenger.
* Add side_t and socket_port to show the real connecting
  ports of the SocketConnection.
* Fix bannder exchange logic for addresses, including nonce, type, ip,
  port, socket_port for my_addr and peer_addr.
* Add more detailed logging prefixes for SocketConnection.

Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/mon/MonClient.cc
src/crimson/net/Connection.h
src/crimson/net/Messenger.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_messenger.cc
src/test/crimson/test_monc.cc

index 8817abc193dd6265cda22855d4b44efd10ec037a..058843c62a34846099f14271843a307b6004c3e4 100644 (file)
@@ -163,7 +163,7 @@ seastar::future<bool> Connection::do_auth()
     return reply.get_future();
   }).then([this] (Ref<MAuthReply> m) {
     logger().info("mon {} => {} returns {}: {}",
-                   conn->get_my_addr(),
+                   conn->get_messenger()->get_myaddr(),
                    conn->get_peer_addr(), *m, m->result);
     reply = decltype(reply){};
     auto p = m->result_bl.cbegin();
@@ -360,7 +360,7 @@ seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn,
                                                Ref<MAuthReply> m)
 {
   logger().info("mon {} => {} returns {}: {}",
-                conn->get_my_addr(),
+                conn->get_messenger()->get_myaddr(),
                 conn->get_peer_addr(), *m, m->result);
   auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
                             [peer_addr = conn->get_peer_addr()](auto& mc) {
index 4e4c1f4220f1ed977752455e1987251c58ef0453..d8eb656fd7292058f3e7b2dbe7ebe526e649bcc4 100644 (file)
@@ -27,17 +27,14 @@ using seq_num_t = uint64_t;
 class Connection : public boost::intrusive_ref_counter<Connection,
                                                       boost::thread_unsafe_counter> {
  protected:
-  entity_addr_t my_addr;
   entity_addr_t peer_addr;
   peer_type_t peer_type = -1;
 
  public:
-  Connection(const entity_addr_t& my_addr)
-    : my_addr(my_addr) {}
+  Connection() {}
   virtual ~Connection() {}
 
   virtual Messenger* get_messenger() const = 0;
-  const entity_addr_t& get_my_addr() const { return my_addr; }
   const entity_addr_t& get_peer_addr() const { return peer_addr; }
   virtual int get_peer_type() const = 0;
 
index 1e2b473a0f7affea84cc4723fbef8d2a05f45bb4..e78f1a373bc3de5b6fb498f66bafbd70d5b2bddf 100644 (file)
@@ -36,7 +36,7 @@ class Messenger {
 
   const entity_name_t& get_myname() const { return my_name; }
   const entity_addr_t& get_myaddr() const { return my_addr; }
-  void set_myaddr(const entity_addr_t& addr) {
+  virtual void set_myaddr(const entity_addr_t& addr) {
     my_addr = addr;
   }
 
index 672062592401dc07e36d84284f49c696958ddc86..c472fb82f2fcad643427afcc9dc26c75b1b0aa0f 100644 (file)
@@ -44,10 +44,8 @@ namespace {
 }
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
-                                   const entity_addr_t& my_addr,
                                    Dispatcher& dispatcher)
-  : Connection(my_addr),
-    messenger(messenger),
+  : messenger(messenger),
     dispatcher(dispatcher),
     send_ready(h.promise.get_future())
 {
@@ -587,7 +585,7 @@ SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, buf
        h.reply.connect_seq = existing->connect_seq() + 1;
        return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
       }
-    } else if (peer_addr < my_addr ||
+    } else if (peer_addr < messenger.get_myaddr() ||
               existing->is_server_side()) {
       // incoming wins
       return replace_existing(existing, std::move(authorizer_reply));
@@ -801,15 +799,14 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
           ceph_assert(p.end());
           validate_peer_addr(saddr, peer_addr);
 
-          if (my_addr != caddr) {
-            // take peer's address for me, but preserve my nonce
-            caddr.nonce = my_addr.nonce;
-            my_addr = caddr;
-          }
+          side = side_t::connector;
+          socket_port = caddr.get_port();
+          messenger.learned_addr(caddr);
+
           // encode/send client's handshake header
           bufferlist bl;
           bl.append(buffer::create_static(banner_size, banner));
-          ::encode(my_addr, bl, 0);
+          ::encode(messenger.get_myaddr(), bl, 0);
           h.global_seq = messenger.get_global_seq();
           return socket->write_flush(std::move(bl));
         }).then([=] {
@@ -840,17 +837,20 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
 {
   ceph_assert(state == state_t::none);
   ceph_assert(!socket);
-  peer_addr = _peer_addr;
+  peer_addr.u = _peer_addr.u;
+  peer_addr.set_port(0);
+  side = side_t::acceptor;
+  socket_port = _peer_addr.get_port();
   socket.emplace(std::move(fd));
   messenger.accept_conn(this);
   logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
   state = state_t::accepting;
-  seastar::with_gate(pending_dispatch, [this] {
+  seastar::with_gate(pending_dispatch, [this, _peer_addr] {
       // encode/send server's handshake header
       bufferlist bl;
       bl.append(buffer::create_static(banner_size, banner));
-      ::encode(my_addr, bl, 0);
-      ::encode(peer_addr, bl, 0);
+      ::encode(messenger.get_myaddr(), bl, 0);
+      ::encode(_peer_addr, bl, 0);
       return socket->write_flush(std::move(bl))
         .then([this] {
           // read client's handshake header and connect request
@@ -861,9 +861,9 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
           entity_addr_t addr;
           ::decode(addr, p);
           ceph_assert(p.end());
-          if (!addr.is_blank_ip()) {
-            peer_addr = addr;
-          }
+          peer_addr.set_type(addr.get_type());
+          peer_addr.set_port(addr.get_port());
+          peer_addr.set_nonce(addr.get_nonce());
         }).then([this] {
           return seastar::repeat([this] {
             return repeat_handle_connect();
@@ -941,5 +941,13 @@ seastar::future<> SocketConnection::fault()
 
 void SocketConnection::print(ostream& out) const {
     messenger.print(out);
-    out << " >> " << peer_addr;
+    if (side == side_t::none) {
+      out << " >> " << peer_addr;
+    } else if (side == side_t::acceptor) {
+      out << " >> " << peer_addr
+          << "@" << socket_port;
+    } else { // side == side_t::connector
+      out << "@" << socket_port
+          << " >> " << peer_addr;
+    }
 }
index 38713eb40b58090a422c78637a55c764d6823c66..a48f5aff66b3217e3669060746278de15999aefd 100644 (file)
@@ -40,6 +40,16 @@ class SocketConnection : public Connection {
   Dispatcher& dispatcher;
   seastar::gate pending_dispatch;
 
+  // if acceptor side, socket_port is different from peer_addr.get_port();
+  // if connector side, socket_port is different from my_addr.get_port().
+  enum class side_t {
+    none,
+    acceptor,
+    connector
+  };
+  side_t side = side_t::none;
+  uint16_t socket_port = 0;
+
   enum class state_t {
     none,
     accepting,
@@ -157,7 +167,6 @@ class SocketConnection : public Connection {
 
  public:
   SocketConnection(SocketMessenger& messenger,
-                   const entity_addr_t& my_addr,
                    Dispatcher& dispatcher);
   ~SocketConnection();
 
index 6e23f9d62865d48e860094bb6d0aa83c2286a4f4..9891d6191f49a39f6e68780c0fb597e0223b88de 100644 (file)
 using namespace ceph::net;
 
 SocketMessenger::SocketMessenger(const entity_name_t& myname,
-                                 const std::string& logic_name)
-  : Messenger{myname}, logic_name{logic_name}
+                                 const std::string& logic_name,
+                                 uint32_t nonce)
+  : Messenger{myname}, logic_name{logic_name}, nonce{nonce}
 {}
 
+void SocketMessenger::set_myaddr(const entity_addr_t& addr)
+{
+  entity_addr_t my_addr = addr;
+  my_addr.nonce = nonce;
+  // TODO: propagate to all the cores of the Messenger
+  Messenger::set_myaddr(my_addr);
+}
+
 void SocketMessenger::bind(const entity_addr_t& addr)
 {
   if (addr.get_family() != AF_INET) {
@@ -53,9 +62,8 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
                         seastar::socket_address paddr) {
             // allocate the connection
             entity_addr_t peer_addr;
-            peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
             peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-            SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
+            SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
             // don't wait before accepting another
             conn->start_accept(std::move(socket), peer_addr);
           });
@@ -76,7 +84,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
   if (auto found = lookup_conn(peer_addr); found) {
     return found;
   }
-  SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
+  SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
   conn->start_connect(peer_addr, peer_type);
   return conn;
 }
@@ -99,6 +107,21 @@ seastar::future<> SocketMessenger::shutdown()
     });
 }
 
+void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+  if (!get_myaddr().is_blank_ip()) {
+    // already learned or binded
+    return;
+  }
+
+  // Only learn IP address if blank.
+  entity_addr_t addr = get_myaddr();
+  addr.u = peer_addr_for_me.u;
+  addr.set_type(peer_addr_for_me.get_type());
+  addr.set_port(get_myaddr().get_port());
+  set_myaddr(addr);
+}
+
 void SocketMessenger::set_default_policy(const SocketPolicy& p)
 {
   policy_set.set_default(p);
index c71443112c1252add4639e5662b2355e224cf8de..372de243feb355fedb27e44505e2385145d4ebe5 100644 (file)
@@ -38,13 +38,17 @@ class SocketMessenger final : public Messenger {
   ceph::net::PolicySet<Throttle> policy_set;
   // Distinguish messengers with meaningful names for debugging
   const std::string logic_name;
+  const uint32_t nonce;
 
   seastar::future<> accept(seastar::connected_socket socket,
                            seastar::socket_address paddr);
 
  public:
   SocketMessenger(const entity_name_t& myname,
-                  const std::string& logic_name);
+                  const std::string& logic_name,
+                  uint32_t nonce);
+
+  void set_myaddr(const entity_addr_t& addr) override;
 
   void bind(const entity_addr_t& addr) override;
 
@@ -62,6 +66,7 @@ class SocketMessenger final : public Messenger {
   }
 
  public:
+  void learned_addr(const entity_addr_t &peer_addr_for_me);
   void set_default_policy(const SocketPolicy& p);
   void set_policy(entity_type_t peer_type, const SocketPolicy& p);
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
index 78ef81d3067972a2d4b2a21ad6a1bbe7105692b1..046d48971b41da231e429f164c3a3240ab142b99 100644 (file)
@@ -39,7 +39,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer {
 struct Server {
   ceph::thread::Throttle byte_throttler;
   static constexpr int64_t server_num = 0;
-  ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server"};
+  ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0};
   struct ServerDispatcher : ceph::net::Dispatcher {
     unsigned count = 0;
     seastar::condition_variable on_reply;
@@ -76,7 +76,7 @@ struct Server {
 struct Client {
   ceph::thread::Throttle byte_throttler;
   static constexpr int64_t client_num = 1;
-  ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client"};
+  ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0};
   struct ClientDispatcher : ceph::net::Dispatcher {
     unsigned count = 0;
     seastar::condition_variable on_reply;
index a2e19d4b79d0a1e9d52aaf405a0547ab20789818..e5a582c80b378cdb87dcca80ebe2b2924b514df2 100644 (file)
@@ -22,7 +22,7 @@ static seastar::future<> test_echo(unsigned rounds,
     entity_addr_t addr;
 
     struct {
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1"};
+      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1};
       struct ServerDispatcher : ceph::net::Dispatcher {
         seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
                                       MessageRef m) override {
@@ -38,7 +38,7 @@ static seastar::future<> test_echo(unsigned rounds,
     struct {
       unsigned rounds;
       std::bernoulli_distribution keepalive_dist{};
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1"};
+      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2};
       struct ClientDispatcher : ceph::net::Dispatcher {
         seastar::promise<MessageRef> reply;
         unsigned count = 0u;
@@ -81,8 +81,10 @@ static seastar::future<> test_echo(unsigned rounds,
   return seastar::do_with(test_state{},
     [rounds, keepalive_ratio] (test_state& t) {
       // bind the server
+      t.addr.set_type(entity_addr_t::TYPE_LEGACY);
       t.addr.set_family(AF_INET);
       t.addr.set_port(9010);
+      t.addr.set_nonce(1);
       t.server.messenger.bind(t.addr);
 
       t.client.rounds = rounds;
@@ -127,7 +129,7 @@ static seastar::future<> test_concurrent_dispatch()
     entity_addr_t addr;
 
     struct {
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2"};
+      ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3};
       class ServerDispatcher : public ceph::net::Dispatcher {
         int count = 0;
         seastar::promise<> on_second; // satisfied on second dispatch
@@ -151,15 +153,17 @@ static seastar::future<> test_concurrent_dispatch()
     } server;
 
     struct {
-      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2"};
+      ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4};
       ceph::net::Dispatcher dispatcher;
     } client;
   };
   return seastar::do_with(test_state{},
     [] (test_state& t) {
       // bind the server
+      t.addr.set_type(entity_addr_t::TYPE_LEGACY);
       t.addr.set_family(AF_INET);
       t.addr.set_port(9010);
+      t.addr.set_nonce(3);
       t.server.messenger.bind(t.addr);
 
       return t.server.messenger.start(&t.server.dispatcher)
index d90d7905f73116201b626e653493b73d0fe1a5aa..17775d00cb521aefe08d41b7ab421584d1ff2529 100644 (file)
@@ -23,7 +23,7 @@ static seastar::future<> test_monc()
     conf->cluster = cluster;
     return conf.parse_config_files(conf_file_list);
   }).then([] {
-    return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc"},
+    return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0},
                             [](ceph::net::Messenger& msgr) {
       auto& conf = ceph::common::local_conf();
       if (conf->ms_crc_data) {