From 2041989cc2ba01cf99c901ce69d17c35715e219c Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 13 Jun 2018 11:05:26 +0800 Subject: [PATCH] crimson/net: use std::map for tracking connections * s/list/map/ for better lookup performance. * and connection management related changes Signed-off-by: Kefu Chai --- src/crimson/net/Messenger.h | 6 ++--- src/crimson/net/SocketConnection.cc | 9 +++++++ src/crimson/net/SocketMessenger.cc | 37 +++++++++++++++++++++-------- src/crimson/net/SocketMessenger.h | 7 +++--- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index db01ac247a4..989659f7d10 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -60,10 +60,8 @@ class Messenger { } return ++global_seq; } - ConnectionRef lookup_conn(const entity_addr_t&) { - // TODO: replace handling - return nullptr; - } + virtual ConnectionRef lookup_conn(const entity_addr_t&) = 0; + virtual void unregister_conn(ConnectionRef) = 0; // @returns a tuple of virtual seastar::future( seastar::stop_iteration::no); }); + }).handle_exception_type([this] (const std::system_error& e) { + if (e.code() == error::read_eof) { + close(); + } + throw e; }).then_wrapped([this] (auto fut) { // satisfy the message promise fut.forward_to(std::move(on_message)); @@ -296,6 +301,7 @@ seastar::future<> SocketConnection::send(MessageRef msg) seastar::future<> SocketConnection::close() { + get_messenger()->unregister_conn(this); return seastar::when_all(in.close(), out.close()).discard_result(); } @@ -819,6 +825,9 @@ seastar::future<> SocketConnection::server_handshake() seastar::future<> SocketConnection::fault() { + if (policy.lossy) { + get_messenger()->unregister_conn(this); + } if (h.backoff.count()) { h.backoff += h.backoff; } else { diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index ebfc715191f..5a9d283eab0 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -12,6 +12,7 @@ * */ +#include #include "auth/Auth.h" #include "SocketMessenger.h" #include "SocketConnection.h" @@ -40,7 +41,9 @@ void SocketMessenger::bind(const entity_addr_t& addr) seastar::future<> SocketMessenger::dispatch(ConnectionRef conn) { - connections.push_back(conn); + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + assert(added); return seastar::repeat([=] { return conn->read_message() @@ -118,13 +121,8 @@ seastar::future SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type, const entity_addr_t& myaddr, entity_type_t host_type) { - if (auto found = std::find_if(connections.begin(), - connections.end(), - [&addr](auto conn) { - return conn->get_peer_addr() == addr; - }); - found != connections.end()) { - return seastar::make_ready_future(*found); + if (auto found = lookup_conn(addr); found) { + return seastar::make_ready_future(found); } return seastar::connect(addr.in4_addr()) .then([=] (seastar::connected_socket socket) { @@ -153,11 +151,30 @@ seastar::future<> SocketMessenger::shutdown() listener->abort_accept(); } return seastar::parallel_for_each(connections.begin(), connections.end(), - [this] (ConnectionRef conn) { - return conn->close(); + [this] (auto conn) { + return conn.second->close(); }).finally([this] { connections.clear(); }); } +ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +{ + if (auto found = connections.find(addr); + found != connections.end()) { + return found->second; + } else { + return nullptr; + } +} + +void SocketMessenger::unregister_conn(ConnectionRef conn) +{ + assert(conn); + auto found = connections.find(conn->get_peer_addr()); + assert(found != connections.end()); + assert(found->second == conn); + connections.erase(found); +} + seastar::future SocketMessenger::verify_authorizer(peer_type_t peer_type, auth_proto_t protocol, diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 34bf375f662..68e5dc7382c 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include @@ -26,8 +26,7 @@ class SocketMessenger final : public Messenger { boost::optional listener; Dispatcher *dispatcher = nullptr; uint32_t global_seq = 0; - - std::list connections; + std::map connections; seastar::future<> dispatch(ConnectionRef conn); @@ -47,6 +46,8 @@ class SocketMessenger final : public Messenger { entity_type_t host_type) override; seastar::future<> shutdown() override; + ConnectionRef lookup_conn(const entity_addr_t& addr) override; + void unregister_conn(ConnectionRef) override; seastar::future verify_authorizer(peer_type_t peer_type, auth_proto_t protocol, -- 2.39.5