}
}
-seastar::future<bool> ProtocolV2::process_wait()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::process_wait()
{
return read_frame_payload().then([this] {
// handle_wait() logic
logger().warn("{} GOT WaitFrame", conn);
WaitFrame::Decode(rx_segments_data.back());
- return false;
+ return next_step_t::wait;
});
}
-seastar::future<bool> ProtocolV2::client_connect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_connect()
{
// send_client_ident() logic
if (!conn.policy.lossy && !client_cookie) {
" (client does not support all server features)",
conn, ident_missing.features());
abort_in_fault();
- // won't be executed
- return false;
+ return next_step_t::none;
});
case Tag::WAIT:
return process_wait();
ceph_abort("unexpected exception from ms_handle_connect()");
});
}).then([this] {
- return true;
+ return next_step_t::ready;
});
default: {
unexpected_tag(tag, conn, "post_client_connect");
- // won't be executed
- return seastar::make_ready_future<bool>(false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
});
}
-seastar::future<bool> ProtocolV2::client_reconnect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_reconnect()
{
// send_reconnect() logic
auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
ceph_abort("unexpected exception from ms_handle_connect()");
});
}).then([this] {
- return true;
+ return next_step_t::ready;
});
default: {
unexpected_tag(tag, conn, "post_client_reconnect");
- // won't be executed
- return seastar::make_ready_future<bool>(false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
});
ceph_assert(false);
return client_reconnect();
}
- }).then([this] (bool proceed_or_wait) {
- if (proceed_or_wait) {
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready: {
logger().info("{} connected: gs={}, pgs={}, cs={},"
" client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq, conn.out_seq);
execute_ready();
- } else {
+ break;
+ }
+ case next_step_t::wait: {
execute_wait();
+ break;
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in CONNECTING state
});
}
-seastar::future<bool> ProtocolV2::send_wait()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_wait()
{
auto wait = WaitFrame::Encode();
logger().warn("{} WRITE WaitFrame", conn);
return write_frame(wait).then([this] {
- return false;
+ return next_step_t::wait;
});
}
-seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
{
// handle_existing_connection() logic
logger().trace("{} {}: {}", conn, __func__, *existing);
if (exproto->state == state_t::CLOSING) {
logger().warn("{} existing connection {} already closed.", conn, *existing);
- return send_server_ident().then([this] {
- return true;
- });
+ return send_server_ident();
}
if (exproto->state == state_t::REPLACING) {
" this connection", conn, *existing);
exproto->dispatch_reset();
exproto->close();
- return send_server_ident().then([this] {
- return true;
- });
+ return send_server_ident();
}
// TODO: lossless policy
ceph_assert(false);
}
-seastar::future<bool> ProtocolV2::server_connect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_connect()
{
return read_frame_payload().then([this] {
// handle_client_ident() logic
logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
conn, feat_missing);
return write_frame(ident_missing_features).then([this] {
- return false;
+ return next_step_t::wait;
});
}
connection_features =
}
// if everything is OK reply with server identification
- return send_server_ident().then([this] {
- // goto ready
- return true;
- });
+ return send_server_ident();
});
}
-seastar::future<bool> ProtocolV2::read_reconnect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::read_reconnect()
{
return read_main_preamble()
.then([this] (Tag tag) {
});
}
-seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry(uint64_t connect_seq)
{
auto retry = RetryFrame::Encode(connect_seq);
logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
});
}
-seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry_global(uint64_t global_seq)
{
auto retry = RetryGlobalFrame::Encode(global_seq);
logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
});
}
-seastar::future<bool> ProtocolV2::send_reset(bool full)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_reset(bool full)
{
auto reset = ResetFrame::Encode(full);
logger().warn("{} WRITE ResetFrame: full={}", conn, full);
});
}
-seastar::future<bool> ProtocolV2::server_reconnect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_reconnect()
{
return read_frame_payload().then([this] {
// handle_reconnect() logic
return server_reconnect();
default: {
unexpected_tag(tag, conn, "post_server_auth");
- // won't be executed
- return seastar::make_ready_future<bool>(false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
- }).then([this] (bool proceed_or_wait) {
- if (proceed_or_wait) {
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready: {
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq, conn.out_seq);
execute_ready();
- } else {
+ break;
+ }
+ case next_step_t::wait: {
execute_server_wait();
+ break;
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in ACCEPTING state
// ACCEPTING or REPLACING state
-seastar::future<> ProtocolV2::send_server_ident()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_server_ident()
{
// send_server_ident() logic
});
return write_frame(server_ident);
+ }).then([] {
+ return next_step_t::ready;
});
}
void reset_session(bool full);
seastar::future<entity_type_t, entity_addr_t> banner_exchange();
+ enum class next_step_t {
+ ready,
+ wait,
+ none, // protocol should have been aborted or failed
+ };
+
// CONNECTING (client)
seastar::future<> handle_auth_reply();
inline seastar::future<> client_auth() {
}
seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
- seastar::future<bool> process_wait();
- seastar::future<bool> client_connect();
- seastar::future<bool> client_reconnect();
+ seastar::future<next_step_t> process_wait();
+ seastar::future<next_step_t> client_connect();
+ seastar::future<next_step_t> client_reconnect();
void execute_connecting();
// ACCEPTING (server)
seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
seastar::future<> server_auth();
- seastar::future<bool> send_wait();
+ seastar::future<next_step_t> send_wait();
- seastar::future<bool> handle_existing_connection(SocketConnectionRef existing);
- seastar::future<bool> server_connect();
+ seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing);
+ seastar::future<next_step_t> server_connect();
- seastar::future<bool> read_reconnect();
- seastar::future<bool> send_retry(uint64_t connect_seq);
- seastar::future<bool> send_retry_global(uint64_t global_seq);
- seastar::future<bool> send_reset(bool full);
- seastar::future<bool> server_reconnect();
+ seastar::future<next_step_t> read_reconnect();
+ seastar::future<next_step_t> send_retry(uint64_t connect_seq);
+ seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
+ seastar::future<next_step_t> send_reset(bool full);
+ seastar::future<next_step_t> server_reconnect();
void execute_accepting();
seastar::future<> finish_auth();
// ACCEPTING/REPLACING (server)
- seastar::future<> send_server_ident();
+ seastar::future<next_step_t> send_server_ident();
// REPLACING (server)
seastar::future<> send_reconnect_ok();