bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
- return write_flush(std::move(bl))
- .then([this] {
+ return write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
return read_exactly(banner_len); // or read exactly?
.then([this] (Tag tag) {
switch (tag) {
case Tag::AUTH_BAD_METHOD:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_auth_bad_method() logic
auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
logger().warn("{} got AuthBadMethod, method={} reslt={}, "
return client_auth(bad_method.allowed_methods());
});
case Tag::AUTH_REPLY_MORE:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_auth_reply_more() logic
auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
logger().debug("{} auth reply more len={}",
return handle_auth_reply();
});
case Tag::AUTH_DONE:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_auth_done() logic
auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
ceph_assert(messenger.get_auth_client());
}
auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes, bl);
- return write_frame(frame)
- .then([this] {
+ return write_frame(frame).then([this] {
return handle_auth_reply();
});
}
seastar::future<bool> ProtocolV2::process_wait()
{
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_wait() logic
logger().debug("{} received WAIT (connection race)", conn);
WaitFrame::Decode(rx_segments_data.back());
a.u.sa.sa_family = AF_INET;
a.set_type(entity_addr_t::TYPE_MSGR2);
logger().debug("{} learn from addr {}", conn, a);
- return messenger.learned_addr(a)
- .then([this] {
+ return messenger.learned_addr(a).then([this] {
uint64_t flags = 0;
if (conn.policy.lossy) {
flags |= CEPH_MSG_CONNECT_LOSSY;
}).then([this] (Tag tag) {
switch (tag) {
case Tag::IDENT_MISSING_FEATURES:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_ident_missing_features() logic
auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
logger().error("{} client does not support all server features: {}",
case Tag::WAIT:
return process_wait();
case Tag::SERVER_IDENT:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_server_ident() logic
auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
logger().debug("{} received server identification:"
" server_cookie={} gs={} cs={} ms={}",
conn, client_cookie, server_cookie,
global_seq, connect_seq, conn.in_seq);
- return write_frame(reconnect)
- .then([this] {
+ return write_frame(reconnect).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
switch (tag) {
case Tag::SESSION_RETRY_GLOBAL:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_session_retry_global() logic
auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
global_seq = messenger.get_global_seq(retry.global_seq());
return client_reconnect();
});
case Tag::SESSION_RETRY:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_session_retry() logic
auto retry = RetryFrame::Decode(rx_segments_data.back());
connect_seq = retry.connect_seq() + 1;
return client_reconnect();
});
case Tag::SESSION_RESET:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_session_reset() logic
auto reset = ResetFrame::Decode(rx_segments_data.back());
logger().warn("{} received session reset full={}", reset.full());
case Tag::WAIT:
return process_wait();
case Tag::SESSION_RECONNECT_OK:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_reconnect_ok() logic
auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
logger().debug("{} received reconnect ok:"
allowed_methods, allowed_modes);
auto bad_method = AuthBadMethodFrame::Encode(
auth_meta->auth_method, r, allowed_methods, allowed_modes);
- return write_frame(bad_method)
- .then([this] {
+ return write_frame(bad_method).then([this] {
return server_auth();
});
}
case 1: {
auto auth_done = AuthDoneFrame::Encode(
conn.peer_global_id, auth_meta->con_mode, reply);
- return write_frame(auth_done)
- .then([this] {
+ return write_frame(auth_done).then([this] {
ceph_assert(auth_meta);
// TODO
ceph_assert(!auth_meta->is_mode_secure());
// auth more
case 0: {
auto more = AuthReplyMoreFrame::Encode(reply);
- return write_frame(more)
- .then([this] {
+ return write_frame(more).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
seastar::future<bool> ProtocolV2::send_wait()
{
auto wait = WaitFrame::Encode();
- return write_frame(wait)
- .then([this] {
+ return write_frame(wait).then([this] {
return false;
});
}
if (exproto->state == state_t::CLOSING) {
logger().warn("{} existing {} already closed.", conn, *existing);
- return send_server_ident()
- .then([this] {
+ return send_server_ident().then([this] {
return true;
});
}
" this connection", conn, *existing);
exproto->dispatch_reset();
exproto->close();
- return send_server_ident()
- .then([this] {
+ return send_server_ident().then([this] {
return true;
});
}
seastar::future<bool> ProtocolV2::server_connect()
{
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_client_ident() logic
auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
logger().debug("{} received client identification: addrs={} target={}"
if (feat_missing) {
logger().warn("{} peer missing required features {}", conn, feat_missing);
auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
- return write_frame(ident_missing_features)
- .then([this] {
+ return write_frame(ident_missing_features).then([this] {
return false;
});
}
}
// if everything is OK reply with server identification
- return send_server_ident()
- .then([this] {
+ return send_server_ident().then([this] {
// goto ready
return true;
});
seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
{
auto retry = RetryFrame::Encode(connect_seq);
- return write_frame(retry)
- .then([this] {
+ return write_frame(retry).then([this] {
return read_reconnect();
});
}
seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
{
auto retry = RetryGlobalFrame::Encode(global_seq);
- return write_frame(retry)
- .then([this] {
+ return write_frame(retry).then([this] {
return read_reconnect();
});
}
seastar::future<bool> ProtocolV2::send_reset(bool full)
{
auto reset = ResetFrame::Encode(full);
- return write_frame(reset)
- .then([this] {
+ return write_frame(reset).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
seastar::future<bool> ProtocolV2::server_reconnect()
{
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_reconnect() logic
auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
ceph_assert(record_io);
record_io = false;
rxbuf.clear();
- return write_frame(sig_frame)
- .then([this] {
+ return write_frame(sig_frame).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
});
}
case Tag::ACK:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_message_ack() logic
auto ack = AckFrame::Decode(rx_segments_data.back());
handle_message_ack(ack.seq());
});
case Tag::KEEPALIVE2:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
last_keepalive_ack_to_send = keepalive_frame.timestamp();
notify_keepalive_ack();
});
case Tag::KEEPALIVE2_ACK:
- return read_frame_payload()
- .then([this] {
+ return read_frame_payload().then([this] {
// handle_keepalive2_ack() logic
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
conn.last_keepalive_ack = keepalive_ack_frame.timestamp();