From a7796ae723ac876494afe5393c0e51c792a130f2 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Mon, 17 Dec 2018 21:51:49 +0800 Subject: [PATCH] crimson/net: fix address learning during banner exchange * Don't store my_addr in `Connection`, because my_addr can be learned and thus changed. * Support nonce in SocketMessenger. * Always set nonce when set_myaddr(). * Add learned_addr() for SocketMessenger. * Add side_t and socket_port to show the real connecting ports of the SocketConnection. * Fix bannder exchange logic for addresses, including nonce, type, ip, port, socket_port for my_addr and peer_addr. * Add more detailed logging prefixes for SocketConnection. Signed-off-by: Yingxin --- src/crimson/mon/MonClient.cc | 4 +-- src/crimson/net/Connection.h | 5 +--- src/crimson/net/Messenger.h | 2 +- src/crimson/net/SocketConnection.cc | 44 +++++++++++++++++------------ src/crimson/net/SocketConnection.h | 11 +++++++- src/crimson/net/SocketMessenger.cc | 33 ++++++++++++++++++---- src/crimson/net/SocketMessenger.h | 7 ++++- src/test/crimson/test_alien_echo.cc | 4 +-- src/test/crimson/test_messenger.cc | 12 +++++--- src/test/crimson/test_monc.cc | 2 +- 10 files changed, 85 insertions(+), 39 deletions(-) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 8817abc193d..058843c62a3 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -163,7 +163,7 @@ seastar::future Connection::do_auth() return reply.get_future(); }).then([this] (Ref m) { logger().info("mon {} => {} returns {}: {}", - conn->get_my_addr(), + conn->get_messenger()->get_myaddr(), conn->get_peer_addr(), *m, m->result); reply = decltype(reply){}; auto p = m->result_bl.cbegin(); @@ -360,7 +360,7 @@ seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn, Ref m) { logger().info("mon {} => {} returns {}: {}", - conn->get_my_addr(), + conn->get_messenger()->get_myaddr(), conn->get_peer_addr(), *m, m->result); auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 4e4c1f4220f..d8eb656fd72 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -27,17 +27,14 @@ using seq_num_t = uint64_t; class Connection : public boost::intrusive_ref_counter { protected: - entity_addr_t my_addr; entity_addr_t peer_addr; peer_type_t peer_type = -1; public: - Connection(const entity_addr_t& my_addr) - : my_addr(my_addr) {} + Connection() {} virtual ~Connection() {} virtual Messenger* get_messenger() const = 0; - const entity_addr_t& get_my_addr() const { return my_addr; } const entity_addr_t& get_peer_addr() const { return peer_addr; } virtual int get_peer_type() const = 0; diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 1e2b473a0f7..e78f1a373bc 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -36,7 +36,7 @@ class Messenger { const entity_name_t& get_myname() const { return my_name; } const entity_addr_t& get_myaddr() const { return my_addr; } - void set_myaddr(const entity_addr_t& addr) { + virtual void set_myaddr(const entity_addr_t& addr) { my_addr = addr; } diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 67206259240..c472fb82f2f 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -44,10 +44,8 @@ namespace { } SocketConnection::SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr, Dispatcher& dispatcher) - : Connection(my_addr), - messenger(messenger), + : messenger(messenger), dispatcher(dispatcher), send_ready(h.promise.get_future()) { @@ -587,7 +585,7 @@ SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, buf h.reply.connect_seq = existing->connect_seq() + 1; return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); } - } else if (peer_addr < my_addr || + } else if (peer_addr < messenger.get_myaddr() || existing->is_server_side()) { // incoming wins return replace_existing(existing, std::move(authorizer_reply)); @@ -801,15 +799,14 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, ceph_assert(p.end()); validate_peer_addr(saddr, peer_addr); - if (my_addr != caddr) { - // take peer's address for me, but preserve my nonce - caddr.nonce = my_addr.nonce; - my_addr = caddr; - } + side = side_t::connector; + socket_port = caddr.get_port(); + messenger.learned_addr(caddr); + // encode/send client's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); + ::encode(messenger.get_myaddr(), bl, 0); h.global_seq = messenger.get_global_seq(); return socket->write_flush(std::move(bl)); }).then([=] { @@ -840,17 +837,20 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, { ceph_assert(state == state_t::none); ceph_assert(!socket); - peer_addr = _peer_addr; + peer_addr.u = _peer_addr.u; + peer_addr.set_port(0); + side = side_t::acceptor; + socket_port = _peer_addr.get_port(); socket.emplace(std::move(fd)); messenger.accept_conn(this); logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); state = state_t::accepting; - seastar::with_gate(pending_dispatch, [this] { + seastar::with_gate(pending_dispatch, [this, _peer_addr] { // encode/send server's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); - ::encode(peer_addr, bl, 0); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(_peer_addr, bl, 0); return socket->write_flush(std::move(bl)) .then([this] { // read client's handshake header and connect request @@ -861,9 +861,9 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, entity_addr_t addr; ::decode(addr, p); ceph_assert(p.end()); - if (!addr.is_blank_ip()) { - peer_addr = addr; - } + peer_addr.set_type(addr.get_type()); + peer_addr.set_port(addr.get_port()); + peer_addr.set_nonce(addr.get_nonce()); }).then([this] { return seastar::repeat([this] { return repeat_handle_connect(); @@ -941,5 +941,13 @@ seastar::future<> SocketConnection::fault() void SocketConnection::print(ostream& out) const { messenger.print(out); - out << " >> " << peer_addr; + if (side == side_t::none) { + out << " >> " << peer_addr; + } else if (side == side_t::acceptor) { + out << " >> " << peer_addr + << "@" << socket_port; + } else { // side == side_t::connector + out << "@" << socket_port + << " >> " << peer_addr; + } } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 38713eb40b5..a48f5aff66b 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -40,6 +40,16 @@ class SocketConnection : public Connection { Dispatcher& dispatcher; seastar::gate pending_dispatch; + // if acceptor side, socket_port is different from peer_addr.get_port(); + // if connector side, socket_port is different from my_addr.get_port(). + enum class side_t { + none, + acceptor, + connector + }; + side_t side = side_t::none; + uint16_t socket_port = 0; + enum class state_t { none, accepting, @@ -157,7 +167,6 @@ class SocketConnection : public Connection { public: SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr, Dispatcher& dispatcher); ~SocketConnection(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 6e23f9d6286..9891d6191f4 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -23,10 +23,19 @@ using namespace ceph::net; SocketMessenger::SocketMessenger(const entity_name_t& myname, - const std::string& logic_name) - : Messenger{myname}, logic_name{logic_name} + const std::string& logic_name, + uint32_t nonce) + : Messenger{myname}, logic_name{logic_name}, nonce{nonce} {} +void SocketMessenger::set_myaddr(const entity_addr_t& addr) +{ + entity_addr_t my_addr = addr; + my_addr.nonce = nonce; + // TODO: propagate to all the cores of the Messenger + Messenger::set_myaddr(my_addr); +} + void SocketMessenger::bind(const entity_addr_t& addr) { if (addr.get_family() != AF_INET) { @@ -53,9 +62,8 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) seastar::socket_address paddr) { // allocate the connection entity_addr_t peer_addr; - peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); + SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); // don't wait before accepting another conn->start_accept(std::move(socket), peer_addr); }); @@ -76,7 +84,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe if (auto found = lookup_conn(peer_addr); found) { return found; } - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); + SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); conn->start_connect(peer_addr, peer_type); return conn; } @@ -99,6 +107,21 @@ seastar::future<> SocketMessenger::shutdown() }); } +void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + if (!get_myaddr().is_blank_ip()) { + // already learned or binded + return; + } + + // Only learn IP address if blank. + entity_addr_t addr = get_myaddr(); + addr.u = peer_addr_for_me.u; + addr.set_type(peer_addr_for_me.get_type()); + addr.set_port(get_myaddr().get_port()); + set_myaddr(addr); +} + void SocketMessenger::set_default_policy(const SocketPolicy& p) { policy_set.set_default(p); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index c71443112c1..372de243feb 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -38,13 +38,17 @@ class SocketMessenger final : public Messenger { ceph::net::PolicySet policy_set; // Distinguish messengers with meaningful names for debugging const std::string logic_name; + const uint32_t nonce; seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); public: SocketMessenger(const entity_name_t& myname, - const std::string& logic_name); + const std::string& logic_name, + uint32_t nonce); + + void set_myaddr(const entity_addr_t& addr) override; void bind(const entity_addr_t& addr) override; @@ -62,6 +66,7 @@ class SocketMessenger final : public Messenger { } public: + void learned_addr(const entity_addr_t &peer_addr_for_me); void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 78ef81d3067..046d48971b4 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -39,7 +39,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer { struct Server { ceph::thread::Throttle byte_throttler; static constexpr int64_t server_num = 0; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server"}; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0}; struct ServerDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -76,7 +76,7 @@ struct Server { struct Client { ceph::thread::Throttle byte_throttler; static constexpr int64_t client_num = 1; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client"}; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0}; struct ClientDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index a2e19d4b79d..e5a582c80b3 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -22,7 +22,7 @@ static seastar::future<> test_echo(unsigned rounds, entity_addr_t addr; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1}; struct ServerDispatcher : ceph::net::Dispatcher { seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { @@ -38,7 +38,7 @@ static seastar::future<> test_echo(unsigned rounds, struct { unsigned rounds; std::bernoulli_distribution keepalive_dist{}; - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2}; struct ClientDispatcher : ceph::net::Dispatcher { seastar::promise reply; unsigned count = 0u; @@ -81,8 +81,10 @@ static seastar::future<> test_echo(unsigned rounds, return seastar::do_with(test_state{}, [rounds, keepalive_ratio] (test_state& t) { // bind the server + t.addr.set_type(entity_addr_t::TYPE_LEGACY); t.addr.set_family(AF_INET); t.addr.set_port(9010); + t.addr.set_nonce(1); t.server.messenger.bind(t.addr); t.client.rounds = rounds; @@ -127,7 +129,7 @@ static seastar::future<> test_concurrent_dispatch() entity_addr_t addr; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3}; class ServerDispatcher : public ceph::net::Dispatcher { int count = 0; seastar::promise<> on_second; // satisfied on second dispatch @@ -151,15 +153,17 @@ static seastar::future<> test_concurrent_dispatch() } server; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4}; ceph::net::Dispatcher dispatcher; } client; }; return seastar::do_with(test_state{}, [] (test_state& t) { // bind the server + t.addr.set_type(entity_addr_t::TYPE_LEGACY); t.addr.set_family(AF_INET); t.addr.set_port(9010); + t.addr.set_nonce(3); t.server.messenger.bind(t.addr); return t.server.messenger.start(&t.server.dispatcher) diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index d90d7905f73..17775d00cb5 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -23,7 +23,7 @@ static seastar::future<> test_monc() conf->cluster = cluster; return conf.parse_config_files(conf_file_list); }).then([] { - return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc"}, + return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0}, [](ceph::net::Messenger& msgr) { auto& conf = ceph::common::local_conf(); if (conf->ms_crc_data) { -- 2.39.5