};
struct ClientIdentFrame
- : public SignedEncryptedFrame<ClientIdentFrame, entity_addrvec_t,
- entity_addr_t,
- int64_t,
- uint64_t, uint64_t, uint64_t, uint64_t> {
+ : public SignedEncryptedFrame<ClientIdentFrame,
+ entity_addrvec_t, // my addresses
+ entity_addr_t, // target address
+ int64_t, // global_id
+ uint64_t, // global seq
+ uint64_t, // supported features
+ uint64_t, // required features
+ uint64_t, // flags
+ uint64_t> { // client cookie
const ProtocolV2::Tag tag = ProtocolV2::Tag::CLIENT_IDENT;
using SignedEncryptedFrame::SignedEncryptedFrame;
inline uint64_t &supported_features() { return get_val<4>(); }
inline uint64_t &required_features() { return get_val<5>(); }
inline uint64_t &flags() { return get_val<6>(); }
+ inline uint64_t &cookie() { return get_val<7>(); }
};
struct ServerIdentFrame
};
struct ReconnectFrame
- : public SignedEncryptedFrame<ReconnectFrame, entity_addrvec_t,
- entity_addr_t,
- uint64_t,
- int64_t,
- uint64_t, uint64_t,
- uint64_t, uint64_t,
- uint64_t> {
+ : public SignedEncryptedFrame<ReconnectFrame,
+ entity_addrvec_t, // my addresses
+ uint64_t, // client cookie
+ uint64_t, // server cookie
+ uint64_t, // global sequence
+ uint64_t, // connect sequence
+ uint64_t> { // message sequence
const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT;
using SignedEncryptedFrame::SignedEncryptedFrame;
inline entity_addrvec_t &addrs() { return get_val<0>(); }
- inline entity_addr_t &target_addr() { return get_val<1>(); }
- inline uint64_t &cookie() { return get_val<2>(); }
- inline int64_t &gid() { return get_val<3>(); }
- inline uint64_t &global_seq() { return get_val<4>(); }
- inline uint64_t &connect_seq() { return get_val<5>(); }
- inline uint64_t &supported_features() { return get_val<6>(); }
- inline uint64_t &required_features() { return get_val<7>(); }
- inline uint64_t &msg_seq() { return get_val<8>(); }
+ inline uint64_t &client_cookie() { return get_val<1>(); }
+ inline uint64_t &server_cookie() { return get_val<2>(); }
+ inline uint64_t &global_seq() { return get_val<3>(); }
+ inline uint64_t &connect_seq() { return get_val<4>(); }
+ inline uint64_t &msg_seq() { return get_val<5>(); }
};
-struct ResetFrame : public Frame<ResetFrame> {
+struct ResetFrame : public SignedEncryptedFrame<ResetFrame, bool> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET;
+ using SignedEncryptedFrame::SignedEncryptedFrame;
+
+ inline bool &full() { return get_val<0>(); }
};
struct RetryFrame : public SignedEncryptedFrame<RetryFrame, uint64_t> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY;
using SignedEncryptedFrame::SignedEncryptedFrame;
- uint64_t connect_seq() { return get_val<0>(); }
+ inline uint64_t &connect_seq() { return get_val<0>(); }
};
struct RetryGlobalFrame
temp_buffer(nullptr),
state(NONE),
peer_required_features(0),
- cookie(0),
+ client_cookie(0),
+ server_cookie(0),
global_seq(0),
connect_seq(0),
peer_global_seq(0),
out_seq = 0;
in_seq = 0;
- cookie = 0;
+ client_cookie = 0;
+ server_cookie = 0;
connect_seq = 0;
peer_global_seq = 0;
message_seq = 0;
connection->fault();
reset_recv_state();
- if (state == ACCEPTING_SESSION && !reconnecting) {
- // if the connection is generating a new session and fails during this
- // process, we need to clean the cookie. Otherwise, the connection might
- // later try to reconnect to a non-existing session, and the other side
- // will force a SESSION_RESET, which will cause a drop of the out_queue.
- cookie = 0;
- }
reconnecting = false;
if (connection->policy.standby && out_queue.empty() && !keepalive &&
backoff.set_from_double(cct->_conf->ms_max_backoff);
}
+ if (server_cookie) {
+ connect_seq++;
+ }
+
global_seq = messenger->get_global_seq();
state = START_CONNECT;
connection->state = AsyncConnection::STATE_CONNECTING;
connection->write_lock.lock();
if (state == STANDBY && !connection->policy.server && is_queued()) {
ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
- if (cookie) { // only increment connect_seq if there is a session
+ if (server_cookie) { // only increment connect_seq if there is a session
connect_seq++;
}
connection->_connect();
case Tag::SERVER_IDENT:
case Tag::IDENT_MISSING_FEATURES:
case Tag::SESSION_RECONNECT:
+ case Tag::SESSION_RESET:
case Tag::SESSION_RETRY:
case Tag::SESSION_RETRY_GLOBAL:
case Tag::SESSION_RECONNECT_OK:
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
return READ(next_payload_len, handle_frame_payload);
- case Tag::SESSION_RESET:
- return handle_session_reset();
case Tag::WAIT:
return handle_wait();
case Tag::MESSAGE:
return handle_ident_missing_features(buffer, next_payload_len);
case Tag::SESSION_RECONNECT:
return handle_reconnect(buffer, next_payload_len);
+ case Tag::SESSION_RESET:
+ return handle_session_reset(buffer, next_payload_len);
case Tag::SESSION_RETRY:
return handle_session_retry(buffer, next_payload_len);
case Tag::SESSION_RETRY_GLOBAL:
connection->maybe_start_delay_thread();
state = READY;
- ldout(cct, 1) << __func__ << " entity=" << peer_name << " cookie=" << std::hex
- << cookie << std::dec << " in_seq=" << in_seq
+ ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
+ << std::hex << client_cookie << " server_cookie="
+ << server_cookie << std::dec << " in_seq=" << in_seq
<< " out_seq=" << out_seq << dendl;
return CONTINUE(read_frame);
auth_meta->connection_secret,
CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2));
- if (!cookie) {
+ if (!server_cookie) {
ceph_assert(connect_seq == 0);
return send_client_ident();
} else { // reconnecting to previous session
CtPtr ProtocolV2::send_client_ident() {
ldout(cct, 20) << __func__ << dendl;
+ if (!connection->policy.lossy && !client_cookie) {
+ client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
+
uint64_t flags = 0;
if (connection->policy.lossy) {
flags |= CEPH_MSG_CONNECT_LOSSY;
}
ClientIdentFrame client_ident(this, messenger->get_myaddrs(),
- connection->target_addr,
+ connection->target_addr,
messenger->get_myname().num(), global_seq,
connection->policy.features_supported,
connection->policy.features_required | msgr2_required,
- flags);
+ flags, client_cookie);
ldout(cct, 5) << __func__ << " sending identification: "
<< "addrs=" << messenger->get_myaddrs()
<< " features_supported=" << std::hex
<< connection->policy.features_supported
<< " features_required="
- << (connection->policy.features_required | msgr2_required)
- << " flags=" << flags << std::dec << dendl;
+ << (connection->policy.features_required | msgr2_required)
+ << " flags=" << flags
+ << " cookie=" << client_cookie << std::dec << dendl;
bufferlist &bl = client_ident.get_buffer();
return WRITE(bl, "client ident", read_frame);
ldout(cct, 20) << __func__ << dendl;
ReconnectFrame reconnect(this, messenger->get_myaddrs(),
- connection->target_addr,
- cookie,
- messenger->get_myname().num(),
- global_seq,
+ client_cookie,
+ server_cookie,
+ global_seq,
connect_seq,
- connection->policy.features_supported,
- connection->policy.features_required | msgr2_required,
- in_seq);
+ in_seq);
- ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie
+ ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
+ << std::hex << client_cookie << " server_cookie="
+ << server_cookie << std::dec
<< " gs=" << global_seq << " cs=" << connect_seq
<< " ms=" << in_seq << dendl;
- bufferlist &bl = reconnect.get_buffer();
- return WRITE(bl, "reconnect", read_frame);
+ return WRITE(reconnect.get_buffer(), "reconnect", read_frame);
}
CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
return _fault();
}
-CtPtr ProtocolV2::handle_session_reset() {
+CtPtr ProtocolV2::handle_session_reset(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << dendl;
- ldout(cct, 1) << __func__ << " received session reset" << dendl;
- reset_session();
+ ResetFrame reset(this, payload, length);
+
+ ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
+ << dendl;
+ if (reset.full()) {
+ reset_session();
+ } else {
+ server_cookie = 0;
+ connect_seq = 0;
+ in_seq = 0;
+ }
return send_client_ident();
}
return _fault();
}
- cookie = server_ident.cookie();
+ server_cookie = server_ident.cookie();
connection->set_peer_addrs(server_ident.addrs());
peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
ClientIdentFrame client_ident(this, payload, length);
- ldout(cct, 5) << __func__ << " received client identification: "
- << "addrs=" << client_ident.addrs()
- << " target=" << client_ident.target_addr()
+ ldout(cct, 5) << __func__ << " received client identification:"
+ << " addrs=" << client_ident.addrs()
+ << " target=" << client_ident.target_addr()
<< " gid=" << client_ident.gid()
<< " global_seq=" << client_ident.global_seq()
<< " features_supported=" << std::hex
<< client_ident.supported_features()
<< " features_required=" << client_ident.required_features()
- << " flags=" << client_ident.flags() << std::dec << dendl;
+ << " flags=" << client_ident.flags()
+ << " cookie=" << client_ident.cookie() << std::dec << dendl;
+
if (client_ident.addrs().empty() ||
client_ident.addrs().front() == entity_addr_t()) {
return _fault(); // a v2 peer should never do this
peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
connection->set_peer_id(client_ident.gid());
+ client_cookie = client_ident.cookie();
+
uint64_t feat_missing =
(connection->policy.features_required | msgr2_required) &
~(uint64_t)client_ident.supported_features();
ReconnectFrame reconnect(this, payload, length);
ldout(cct, 5) << __func__
- << " received reconnect: cookie=" << reconnect.cookie()
- << " target_addr=" << reconnect.target_addr()
- << " gid=" << reconnect.gid()
+ << " received reconnect:"
+ << " client_cookie=" << std::hex << reconnect.client_cookie()
+ << " server_cookie=" << reconnect.server_cookie() << std::dec
<< " gs=" << reconnect.global_seq()
<< " cs=" << reconnect.connect_seq()
<< " ms=" << reconnect.msg_seq()
- << dendl;
-
- if (!messenger->get_myaddrs().contains(reconnect.target_addr())) {
- ldout(cct,5) << __func__ << " peer is trying to reach "
- << reconnect.target_addr()
- << " which is not us (" << messenger->get_myaddrs() << ")"
- << dendl;
- return _fault();
- }
+ << dendl;
// Should we check if one of the ident.addrs match connection->target_addr
// as we do in ProtocolV1?
connection->set_peer_addrs(reconnect.addrs());
connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
-
- peer_name = entity_name_t(connection->get_peer_type(), reconnect.gid());
- connection_features = reconnect.supported_features() &
- connection->policy.features_supported;
peer_global_seq = reconnect.global_seq();
- uint64_t feat_missing =
- (connection->policy.features_required | msgr2_required) &
- ~(uint64_t)reconnect.supported_features();
- if (feat_missing) {
- ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
- << feat_missing << std::dec << dendl;
- IdentMissingFeaturesFrame ident_missing_features(this, feat_missing);
-
- bufferlist &bl = ident_missing_features.get_buffer();
- return WRITE(bl, "ident missing features", read_frame);
- }
-
connection->lock.unlock();
AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
return _fault();
}
- ResetFrame reset;
- bufferlist &bl = reset.get_buffer();
-
if (!existing) {
// there is no existing connection therefore cannot reconnect to previous
// session
ldout(cct, 0) << __func__
<< " no existing connection exists, reseting client" << dendl;
- return WRITE(bl, "session reset", read_frame);
+ ResetFrame reset(this, true);
+ return WRITE(reset.get_buffer(), "session reset", read_frame);
}
std::lock_guard<std::mutex> l(existing->lock);
if (exproto->state == CLOSED) {
ldout(cct, 5) << __func__ << " existing " << existing
<< " already closed. Reseting client" << dendl;
- return WRITE(bl, "session reset", read_frame);
+ ResetFrame reset(this, true);
+ return WRITE(reset.get_buffer(), "session reset", read_frame);
}
if (exproto->replacing) {
return WRITE(bl, "session retry", read_frame);
}
- if (connection->policy.resetcheck) {
- if (!exproto->cookie) {
- // server connection was reseted, reset client
- ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl;
- return WRITE(bl, "session reset", read_frame);
- } else if (exproto->cookie != reconnect.cookie()) {
- ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie
- << " cc=" << reconnect.cookie() << ", reseting client"
- << dendl;
- return WRITE(bl, "session reset", read_frame);
- }
- } else {
- if (exproto->cookie == 0) {
- // this happens when:
- // - a connects to b
- // - a sends client_ident
- // - b gets client_ident, sends server_ident and sets cookie X
- // - connection fault
- // - b reconnects to a with cookie X, connect_seq=1
- // - a has cookie==0
- ldout(cct, 5) << __func__ << " no cookie set, setting" << dendl;
- exproto->cookie = reconnect.cookie();
- }
+ if (exproto->client_cookie != reconnect.client_cookie()) {
+ ldout(cct, 1) << __func__ << " existing=" << existing
+ << " client cookie mismatch, I must have reseted:"
+ << " cc=" << std::hex << exproto->client_cookie
+ << " rcc=" << reconnect.client_cookie()
+ << ", reseting client."
+ << dendl;
+ ResetFrame reset(this, connection->policy.resetcheck);
+ return WRITE(reset.get_buffer(), "session reset", read_frame);
+ } else if (exproto->server_cookie == 0) {
+ // this happens when:
+ // - a connects to b
+ // - a sends client_ident
+ // - b gets client_ident, sends server_ident and sets cookie X
+ // - connection fault
+ // - b reconnects to a with cookie X, connect_seq=1
+ // - a has cookie==0
+ ldout(cct, 1) << __func__ << " I was a client and didn't received the"
+ << " server_ident. Asking peer to resume session"
+ << " establishment" << dendl;
+ ResetFrame reset(this, false);
+ return WRITE(reset.get_buffer(), "session reset", read_frame);
}
if (exproto->peer_global_seq > reconnect.global_seq()) {
<< " cgs=" << reconnect.global_seq()
<< ", ask client to retry global" << dendl;
RetryGlobalFrame retry(this, exproto->peer_global_seq);
- bufferlist &bl = retry.get_buffer();
- return WRITE(bl, "session retry", read_frame);
+ return WRITE(retry.get_buffer(), "session retry", read_frame);
}
if (exproto->connect_seq >= reconnect.connect_seq()) {
<< " ccs=" << reconnect.connect_seq()
<< " , ask client to retry" << dendl;
RetryFrame retry(this, exproto->connect_seq);
- bufferlist &bl = retry.get_buffer();
- return WRITE(bl, "session retry", read_frame);
+ return WRITE(retry.get_buffer(), "session retry", read_frame);
}
ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
return send_server_ident();
}
- if (exproto->cookie) {
+ if (exproto->server_cookie && exproto->client_cookie &&
+ exproto->client_cookie != client_cookie) {
// Found previous session
// peer has reseted and we're going to reuse the existing connection
// by replacing the communication socket
return reuse_connection(existing, exproto);
}
+ if (exproto->client_cookie == client_cookie) {
+ // session establishment interrupted between client_ident and server_ident,
+ // continuing...
+ ldout(cct, 1) << __func__ << " found previous session existing=" << existing
+ << ", continuing session establishment." << dendl;
+ return reuse_connection(existing, exproto);
+ }
+
if (exproto->state == READY || exproto->state == STANDBY) {
ldout(cct, 1) << __func__ << " existing=" << existing
<< " is READY/STANDBY, lets reuse it" << dendl;
}
exproto->reset_recv_state();
+ exproto->client_cookie = client_cookie;
+
exproto->peer_name = peer_name;
exproto->peer_global_seq = peer_global_seq;
exproto->connection_features = connection_features;
in_seq = 0;
if (!connection->policy.lossy) {
- cookie = ceph::util::generate_random_number<uint64_t>(0, -1ll);
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
}
uint64_t flags = 0;
connection->policy.features_supported,
connection->policy.features_required | msgr2_required,
flags,
- cookie);
+ server_cookie);
ldout(cct, 5) << __func__ << " sending identification:"
<< " addrs=" << messenger->get_myaddrs()
<< " global_seq=" << gs << " features_supported=" << std::hex
<< connection->policy.features_supported
<< " features_required="
- << (connection->policy.features_required | msgr2_required)
- << " flags=" << flags << " cookie=" << std::dec << cookie
+ << (connection->policy.features_required | msgr2_required)
+ << " flags=" << flags << " cookie=" << std::dec << server_cookie
<< dendl;
connection->lock.unlock();