// TODO: retry on fault
}).then([this] {
// dispatch replies on this connection
- dispatch()
- .handle_exception([] (std::exception_ptr eptr) {});
+ dispatch();
});
});
}
});
}
-seastar::future<>
+void
SocketConnection::accept(seastar::connected_socket&& fd,
const entity_addr_t& _peer_addr)
{
socket.emplace(std::move(fd));
messenger.accept_conn(this);
state = state_t::accepting;
- return seastar::with_gate(pending_dispatch, [this] {
+ seastar::with_gate(pending_dispatch, [this] {
return start_accept()
.then([this] {
// notify the dispatcher and allow them to reject the connection
}).then([this] {
// dispatch messages until the connection closes or the dispatch
// queue shuts down
- return dispatch();
+ dispatch();
});
});
}
-seastar::future<>
+void
SocketConnection::dispatch()
{
- return seastar::with_gate(pending_dispatch, [this] {
+ seastar::with_gate(pending_dispatch, [this] {
return seastar::keep_doing([=] {
return read_message()
.then([=] (MessageRef msg) {
} else {
throw e;
}
+ }).handle_exception([] (std::exception_ptr eptr) {
+ // TODO: handle fault in the open state
});
});
}
seastar::future<> fault();
- seastar::future<> dispatch();
+ void dispatch();
/// start a handshake from the client's perspective,
/// only call when SocketConnection first construct
public:
void connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type);
- seastar::future<> accept(seastar::connected_socket&& socket,
- const entity_addr_t& peer_addr);
+ void accept(seastar::connected_socket&& socket,
+ const entity_addr_t& peer_addr);
/// read a message from a connection that has completed its handshake
seastar::future<MessageRef> read_message();
listener = seastar::listen(address, lo);
}
-seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
- seastar::socket_address paddr)
-{
- // allocate the connection
- entity_addr_t peer_addr;
- peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
- peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
- // initiate the handshake
- return conn->accept(std::move(socket), peer_addr);
-}
-
seastar::future<> SocketMessenger::start(Dispatcher *disp)
{
dispatcher = disp;
return listener->accept()
.then([this] (seastar::connected_socket socket,
seastar::socket_address paddr) {
- // start processing the connection
- accept(std::move(socket), paddr)
- .handle_exception([] (std::exception_ptr eptr) {});
+ // allocate the connection
+ entity_addr_t peer_addr;
+ peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
// don't wait before accepting another
+ conn->accept(std::move(socket), peer_addr);
});
}).handle_exception_type([this] (const std::system_error& e) {
// stop gracefully on connection_aborted