return repeat_connect();
});
// TODO: handle errors for state_t::connecting
- }).then([this] {
- state = state_t::open;
- // start background processing of tags
- read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
.finally([this] { close(); });
// TODO: retry on fault
}).then([this] {
- // dispatch replies on this connection
- dispatch();
+ execute_open();
});
});
}
return repeat_handle_connect();
});
// TODO: handle errors for state_t::accepting
- }).then([this] {
- state = state_t::open;
- // start background processing of tags
- read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
return seastar::make_exception_future<>(eptr)
.finally([this] { close(); });
}).then([this] {
- // dispatch messages until the connection closes or the dispatch
- // queue shuts down
- dispatch();
+ execute_open();
});
});
}
void
-SocketConnection::dispatch()
+SocketConnection::execute_open()
{
+ state = state_t::open;
seastar::with_gate(pending_dispatch, [this] {
- return seastar::keep_doing([=] {
+ // start background processing of tags
+ read_tags_until_next_message();
+ return seastar::keep_doing([this] {
return read_message()
- .then([=] (MessageRef msg) {
+ .then([this] (MessageRef msg) {
// start dispatch, ignoring exceptions from the application layer
- seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] {
+ seastar::with_gate(messenger.pending_dispatch, [this, msg = std::move(msg)] {
return dispatcher.ms_dispatch(this, std::move(msg))
.handle_exception([] (std::exception_ptr eptr) {});
});
// return immediately to start on the next message
return seastar::now();
});
- }).handle_exception_type([=] (const std::system_error& e) {
+ }).handle_exception_type([this] (const std::system_error& e) {
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
- return seastar::with_gate(messenger.pending_dispatch, [=] {
+ return seastar::with_gate(messenger.pending_dispatch, [this] {
return dispatcher.ms_handle_reset(this);
});
} else if (e.code() == error::read_eof) {
- return seastar::with_gate(messenger.pending_dispatch, [=] {
+ return seastar::with_gate(messenger.pending_dispatch, [this] {
return dispatcher.ms_handle_remote_reset(this);
});
} else {