#include "crimson/common/log.h"
#include "Config.h"
+#include "Dispatcher.h"
#include "Errors.h"
#include "SocketMessenger.h"
}
SocketConnection::SocketConnection(SocketMessenger& messenger,
- const entity_addr_t& my_addr)
+ const entity_addr_t& my_addr,
+ Dispatcher& dispatcher)
: Connection(my_addr),
messenger(messenger),
+ dispatcher(dispatcher),
send_ready(h.promise.get_future())
{
}
SocketConnection::~SocketConnection()
{
+ ceph_assert(pending_dispatch.is_closed());
// errors were reported to callers of send()
ceph_assert(send_ready.available());
send_ready.ignore_ready_future();
assert(!close_ready.valid());
if (socket) {
- close_ready = socket->close().finally(std::move(cleanup));
+ close_ready = socket->close()
+ .then([this] {
+ return pending_dispatch.close();
+ }).finally(std::move(cleanup));
} else {
ceph_assert(state == state_t::connecting);
- close_ready = seastar::now();
+ close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
state = state_t::closing;
return close_ready.get_future();
}
seastar::future<>
-SocketConnection::start_connect(const entity_addr_t& _peer_addr,
- const entity_type_t& _peer_type)
+SocketConnection::start_connect()
{
- ceph_assert(state == state_t::none);
- ceph_assert(!socket);
- peer_addr = _peer_addr;
- peer_type = _peer_type;
- messenger.register_conn(this);
- state = state_t::connecting;
return seastar::connect(peer_addr.in4_addr())
.then([this](seastar::connected_socket fd) {
if (state == state_t::closing) {
});
}
-seastar::future<>
-SocketConnection::start_accept(seastar::connected_socket&& fd,
- const entity_addr_t& _peer_addr)
+void
+SocketConnection::connect(const entity_addr_t& _peer_addr,
+ const entity_type_t& _peer_type)
{
ceph_assert(state == state_t::none);
ceph_assert(!socket);
peer_addr = _peer_addr;
- socket.emplace(std::move(fd));
- messenger.accept_conn(this);
- state = state_t::accepting;
+ peer_type = _peer_type;
+ messenger.register_conn(this);
+ state = state_t::connecting;
+ seastar::with_gate(pending_dispatch, [this] {
+ return start_connect()
+ .then([this] {
+ // notify the dispatcher and allow them to reject the connection
+ return seastar::with_gate(messenger.pending_dispatch, [this] {
+ return dispatcher.ms_handle_connect(this);
+ });
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // close the connection before returning errors
+ return seastar::make_exception_future<>(eptr)
+ .finally([this] { close(); });
+ // TODO: retry on fault
+ }).then([this] {
+ // dispatch replies on this connection
+ dispatch()
+ .handle_exception([] (std::exception_ptr eptr) {});
+ });
+ });
+}
+
+seastar::future<>
+SocketConnection::start_accept()
+{
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
});
}
+seastar::future<>
+SocketConnection::accept(seastar::connected_socket&& fd,
+ const entity_addr_t& _peer_addr)
+{
+ ceph_assert(state == state_t::none);
+ ceph_assert(!socket);
+ peer_addr = _peer_addr;
+ socket.emplace(std::move(fd));
+ messenger.accept_conn(this);
+ state = state_t::accepting;
+ return seastar::with_gate(pending_dispatch, [this] {
+ return start_accept()
+ .then([this] {
+ // notify the dispatcher and allow them to reject the connection
+ return seastar::with_gate(messenger.pending_dispatch, [=] {
+ return dispatcher.ms_handle_accept(this);
+ });
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ // close the connection before returning errors
+ return seastar::make_exception_future<>(eptr)
+ .finally([this] { close(); });
+ }).then([this] {
+ // dispatch messages until the connection closes or the dispatch
+ // queue shuts down
+ return dispatch();
+ });
+ });
+}
+
+seastar::future<>
+SocketConnection::dispatch()
+{
+ return seastar::with_gate(pending_dispatch, [this] {
+ return seastar::keep_doing([=] {
+ return read_message()
+ .then([=] (MessageRef msg) {
+ // start dispatch, ignoring exceptions from the application layer
+ seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] {
+ return dispatcher.ms_dispatch(this, std::move(msg))
+ .handle_exception([] (std::exception_ptr eptr) {});
+ });
+ // return immediately to start on the next message
+ return seastar::now();
+ });
+ }).handle_exception_type([=] (const std::system_error& e) {
+ if (e.code() == error::connection_aborted ||
+ e.code() == error::connection_reset) {
+ return seastar::with_gate(messenger.pending_dispatch, [=] {
+ return dispatcher.ms_handle_reset(this);
+ });
+ } else if (e.code() == error::read_eof) {
+ return seastar::with_gate(messenger.pending_dispatch, [=] {
+ return dispatcher.ms_handle_remote_reset(this);
+ });
+ } else {
+ throw e;
+ }
+ });
+ });
+}
+
seastar::future<> SocketConnection::fault()
{
if (policy.lossy) {
#pragma once
+#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/shared_future.hh>
class SocketConnection : public Connection {
SocketMessenger& messenger;
std::optional<Socket> socket;
+ Dispatcher& dispatcher;
+ seastar::gate pending_dispatch;
enum class state_t {
none,
seastar::future<> fault();
+ seastar::future<> dispatch();
+
+ /// start a handshake from the client's perspective,
+ /// only call when SocketConnection first construct
+ seastar::future<> start_connect();
+ /// start a handshake from the server's perspective,
+ /// only call when SocketConnection first construct
+ seastar::future<> start_accept();
+
public:
SocketConnection(SocketMessenger& messenger,
- const entity_addr_t& my_addr);
+ const entity_addr_t& my_addr,
+ Dispatcher& dispatcher);
~SocketConnection();
Messenger* get_messenger() const override;
seastar::future<> close() override;
public:
- /// complete a handshake from the client's perspective
- seastar::future<> start_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type);
-
- /// complete a handshake from the server's perspective
- seastar::future<> start_accept(seastar::connected_socket&& socket,
- const entity_addr_t& peer_addr);
+ void connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
+ seastar::future<> accept(seastar::connected_socket&& socket,
+ const entity_addr_t& peer_addr);
/// read a message from a connection that has completed its handshake
seastar::future<MessageRef> read_message();
listener = seastar::listen(address, lo);
}
-seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
-{
- return seastar::keep_doing([=] {
- return conn->read_message()
- .then([=] (MessageRef msg) {
- // start dispatch, ignoring exceptions from the application layer
- seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] {
- return dispatcher->ms_dispatch(conn, std::move(msg))
- .handle_exception([] (std::exception_ptr eptr) {});
- });
- // return immediately to start on the next message
- return seastar::now();
- });
- }).handle_exception_type([=] (const std::system_error& e) {
- if (e.code() == error::connection_aborted ||
- e.code() == error::connection_reset) {
- return seastar::with_gate(pending_dispatch, [=] {
- return dispatcher->ms_handle_reset(conn);
- });
- } else if (e.code() == error::read_eof) {
- return seastar::with_gate(pending_dispatch, [=] {
- return dispatcher->ms_handle_remote_reset(conn);
- });
- } else {
- throw e;
- }
- });
-}
-
seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
seastar::socket_address paddr)
{
entity_addr_t peer_addr;
peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
// initiate the handshake
- return conn->start_accept(std::move(socket), peer_addr)
- .then([this, conn] {
- // notify the dispatcher and allow them to reject the connection
- return seastar::with_gate(pending_dispatch, [=] {
- return dispatcher->ms_handle_accept(conn);
- });
- }).handle_exception([conn] (std::exception_ptr eptr) {
- // close the connection before returning errors
- return seastar::make_exception_future<>(eptr)
- .finally([conn] { return conn->close(); });
- }).then([this, conn] {
- // dispatch messages until the connection closes or the dispatch
- // queue shuts down
- return dispatch(std::move(conn));
- });
+ return conn->accept(std::move(socket), peer_addr);
}
seastar::future<> SocketMessenger::start(Dispatcher *disp)
if (auto found = lookup_conn(peer_addr); found) {
return found;
}
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
- conn->start_connect(peer_addr, peer_type)
- .then([this, conn] {
- // notify the dispatcher and allow them to reject the connection
- return seastar::with_gate(pending_dispatch, [this, conn] {
- return dispatcher->ms_handle_connect(conn);
- });
- }).handle_exception([conn] (std::exception_ptr eptr) {
- // close the connection before returning errors
- return seastar::make_exception_future<>(eptr)
- .finally([conn] { return conn->close(); });
- // TODO: retry on fault
- }).then([this, conn] {
- // dispatch replies on this connection
- dispatch(conn)
- .handle_exception([] (std::exception_ptr eptr) {});
- });
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
+ conn->connect(peer_addr, peer_type);
return conn;
}
std::set<SocketConnectionRef> accepting_conns;
using Throttle = ceph::thread::Throttle;
ceph::net::PolicySet<Throttle> policy_set;
- seastar::gate pending_dispatch;
-
- seastar::future<> dispatch(SocketConnectionRef conn);
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
bool force_new) override;
public:
+ // TODO: change to per-connection messenger gate
+ seastar::gate pending_dispatch;
+
void set_default_policy(const SocketPolicy& p);
void set_policy(entity_type_t peer_type, const SocketPolicy& p);
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);