}
return ++global_seq;
}
- ConnectionRef lookup_conn(const entity_addr_t&) {
- // TODO: replace handling
- return nullptr;
- }
+ virtual ConnectionRef lookup_conn(const entity_addr_t&) = 0;
+ virtual void unregister_conn(ConnectionRef) = 0;
// @returns a tuple of <is_valid, auth_reply, session_key>
virtual seastar::future<msgr_tag_t, /// tag for error, 0 if authorized
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
});
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() == error::read_eof) {
+ close();
+ }
+ throw e;
}).then_wrapped([this] (auto fut) {
// satisfy the message promise
fut.forward_to(std::move(on_message));
seastar::future<> SocketConnection::close()
{
+ get_messenger()->unregister_conn(this);
return seastar::when_all(in.close(), out.close()).discard_result();
}
seastar::future<> SocketConnection::fault()
{
+ if (policy.lossy) {
+ get_messenger()->unregister_conn(this);
+ }
if (h.backoff.count()) {
h.backoff += h.backoff;
} else {
*
*/
+#include <tuple>
#include "auth/Auth.h"
#include "SocketMessenger.h"
#include "SocketConnection.h"
seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
{
- connections.push_back(conn);
+ auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+ std::ignore = i;
+ assert(added);
return seastar::repeat([=] {
return conn->read_message()
SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
const entity_addr_t& myaddr, entity_type_t host_type)
{
- if (auto found = std::find_if(connections.begin(),
- connections.end(),
- [&addr](auto conn) {
- return conn->get_peer_addr() == addr;
- });
- found != connections.end()) {
- return seastar::make_ready_future<ceph::net::ConnectionRef>(*found);
+ if (auto found = lookup_conn(addr); found) {
+ return seastar::make_ready_future<ceph::net::ConnectionRef>(found);
}
return seastar::connect(addr.in4_addr())
.then([=] (seastar::connected_socket socket) {
listener->abort_accept();
}
return seastar::parallel_for_each(connections.begin(), connections.end(),
- [this] (ConnectionRef conn) {
- return conn->close();
+ [this] (auto conn) {
+ return conn.second->close();
}).finally([this] { connections.clear(); });
}
+ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+{
+ if (auto found = connections.find(addr);
+ found != connections.end()) {
+ return found->second;
+ } else {
+ return nullptr;
+ }
+}
+
+void SocketMessenger::unregister_conn(ConnectionRef conn)
+{
+ assert(conn);
+ auto found = connections.find(conn->get_peer_addr());
+ assert(found != connections.end());
+ assert(found->second == conn);
+ connections.erase(found);
+}
+
seastar::future<msgr_tag_t, bufferlist>
SocketMessenger::verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
#pragma once
-#include <list>
+#include <map>
#include <boost/optional.hpp>
#include <core/reactor.hh>
boost::optional<seastar::server_socket> listener;
Dispatcher *dispatcher = nullptr;
uint32_t global_seq = 0;
-
- std::list<ConnectionRef> connections;
+ std::map<entity_addr_t, ConnectionRef> connections;
seastar::future<> dispatch(ConnectionRef conn);
entity_type_t host_type) override;
seastar::future<> shutdown() override;
+ ConnectionRef lookup_conn(const entity_addr_t& addr) override;
+ void unregister_conn(ConnectionRef) override;
seastar::future<msgr_tag_t, bufferlist>
verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,