state(NONE),
peer_required_features(0),
cookie(0),
+ connect_seq(0),
+ peer_global_seq(0),
message_seq(0),
+ replacing(false),
can_write(false),
- connect_seq(0),
bannerExchangeCallback(nullptr),
next_frame_len(0),
keepalive(false) {
out_queue.clear();
}
+void ProtocolV2::reset_session() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ std::lock_guard<std::mutex> l(connection->write_lock);
+ if (connection->delay_state) {
+ connection->delay_state->discard();
+ }
+
+ connection->dispatch_queue->discard_queue(connection->conn_id);
+ discard_out_queue();
+ connection->outcoming_bl.clear();
+
+ connection->dispatch_queue->queue_remote_reset(connection);
+
+ out_seq = 0;
+ in_seq = 0;
+ cookie = 0;
+ connect_seq = 0;
+ peer_global_seq = 0;
+ message_seq = 0;
+ ack_left = 0;
+ can_write = false;
+}
+
void ProtocolV2::stop() {
ldout(cct, 2) << __func__ << dendl;
if (state == CLOSED) {
}
}
+uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
+ ldout(cct, 10) << __func__ << " " << seq << dendl;
+ std::lock_guard<std::mutex> l(connection->write_lock);
+ if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
+ return seq;
+ }
+ list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+ uint64_t count = out_seq;
+ while (!rq.empty()) {
+ pair<bufferlist, Message *> p = rq.front();
+ if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
+ ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
+ << p.second->get_seq() << " <= " << seq << ", discarding"
+ << dendl;
+ p.second->put();
+ rq.pop_front();
+ count++;
+ }
+ if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
+ return count;
+}
+
void ProtocolV2::reset_recv_state() {
// clean read and write callbacks
connection->pendingReadLen.reset();
requeue_sent();
if (out_queue.empty() && state >= START_ACCEPT &&
- state <= ACCEPTED_CLIENT_IDENT) {
+ state <= ACCEPTING_SESSION) {
ldout(cct, 10) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
+ connection->write_lock.unlock();
stop();
connection->dispatch_queue->queue_reset(connection);
- connection->write_lock.unlock();
return nullptr;
}
+ replacing = false;
connection->fault();
reset_recv_state();
out_queue[m->get_priority()].emplace_back(std::move(bl), m);
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
- if (can_write) {
+ if ((!replacing && can_write) || state == STANDBY) {
connection->center->dispatch_event_external(connection->write_handler);
}
}
connection->write_lock.lock();
if (state == STANDBY && !connection->policy.server && is_queued()) {
ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
+ if (cookie) { // only increment connect_seq if there is a session
+ connect_seq++;
+ }
connection->_connect();
} else if (connection->cs && state != NONE && state != CLOSED &&
state != START_CONNECT) {
}
}
-bool ProtocolV2::is_queued() { return false; }
+bool ProtocolV2::is_queued() {
+ return !out_queue.empty() || connection->is_queued();
+}
CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
int len, char *buffer) {
ceph_assert(state == ACCEPTING);
connection->policy = messenger->get_policy(peer_type);
- ldout(cct, 10) << __func__ << " accept of host_type " << peer_type
+ ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type
<< ", policy.lossy=" << connection->policy.lossy
<< " policy.server=" << connection->policy.server
<< " policy.standby=" << connection->policy.standby
this->peer_required_features = peer_required_features;
+ if (cct->_conf->ms_inject_internal_delays &&
+ cct->_conf->ms_inject_socket_failures) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 10) << __func__ << " sleep for "
+ << cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+ }
+
CtPtr callback;
callback = bannerExchangeCallback;
bannerExchangeCallback = nullptr;
case Tag::AUTH_DONE:
case Tag::IDENT:
case Tag::IDENT_MISSING_FEATURES:
+ case Tag::SESSION_RECONNECT:
+ case Tag::SESSION_RETRY:
+ case Tag::SESSION_RETRY_GLOBAL:
+ case Tag::SESSION_RECONNECT_OK:
case Tag::KEEPALIVE2:
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
return READ(next_frame_len, handle_frame_payload);
+ case Tag::SESSION_RESET:
+ return handle_session_reset();
+ case Tag::WAIT:
+ return handle_wait();
case Tag::MESSAGE:
return handle_message();
}
return handle_ident(buffer, next_frame_len);
case Tag::IDENT_MISSING_FEATURES:
return handle_ident_missing_features(buffer, next_frame_len);
+ case Tag::SESSION_RECONNECT:
+ return handle_reconnect(buffer, next_frame_len);
+ case Tag::SESSION_RETRY:
+ return handle_session_retry(buffer, next_frame_len);
+ case Tag::SESSION_RETRY_GLOBAL:
+ return handle_session_retry_global(buffer, next_frame_len);
+ case Tag::SESSION_RECONNECT_OK:
+ return handle_reconnect_ok(buffer, next_frame_len);
case Tag::KEEPALIVE2:
return handle_keepalive2(buffer, next_frame_len);
case Tag::KEEPALIVE2_ACK:
}
}
- state = READY;
+ connection->maybe_start_delay_thread();
+ state = READY;
return CONTINUE(read_frame);
}
ldout(cct, 20) << __func__ << dendl;
state = CONNECTING;
+ global_seq = messenger->get_global_seq();
+
return _banner_exchange(CONTINUATION(post_client_banner_exchange));
}
ldout(cct, 1) << __func__ << " authentication done,"
<< " flags=" << auth_done.flags << dendl;
- return send_client_ident();
+ if (!cookie) {
+ ceph_assert(connect_seq == 0);
+ return send_client_ident();
+ } else { // reconnecting to previous session
+ ceph_assert(connect_seq > 0);
+ return send_reconnect();
+ }
}
CtPtr ProtocolV2::send_client_ident() {
flags |= CEPH_MSG_CONNECT_LOSSY;
}
- cookie = ceph::util::generate_random_number<uint64_t>(0, -1ll);
-
- IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(),
- connection->policy.features_supported,
- connection->policy.features_required, flags, cookie);
+ ClientIdentFrame client_ident(messenger->get_myaddrs(),
+ messenger->get_myname().num(), global_seq,
+ connection->policy.features_supported,
+ connection->policy.features_required, flags);
ldout(cct, 5) << __func__ << " sending identification: "
- << "addrs: " << ident.addrs << " gid: " << ident.gid
- << " features_supported: " << std::hex
- << ident.supported_features
- << " features_required: " << ident.required_features
- << " flags: " << ident.flags << " cookie: " << std::dec
- << ident.cookie << dendl;
-
- bufferlist &bl = ident.get_buffer();
+ << "addrs=" << messenger->get_myaddrs()
+ << " gid=" << messenger->get_myname().num()
+ << " global_seq=" << global_seq
+ << " features_supported=" << std::hex
+ << connection->policy.features_supported
+ << " features_required=" << connection->policy.features_required
+ << " flags=" << flags << std::dec << dendl;
+
+ bufferlist &bl = client_ident.get_buffer();
return WRITE(bl, handle_client_ident_write);
}
return CONTINUE(read_frame);
}
+CtPtr ProtocolV2::send_reconnect() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ ReconnectFrame reconnect(messenger->get_myaddrs(), cookie, global_seq,
+ connect_seq, in_seq);
+
+ ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie
+ << " gs=" << global_seq << " cs=" << connect_seq
+ << " ms=" << in_seq << dendl;
+ bufferlist &bl = reconnect.get_buffer();
+ return WRITE(bl, handle_reconnect_write);
+}
+
+CtPtr ProtocolV2::handle_reconnect_write(int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " reconnect write failed r=" << r << " ("
+ << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+
+ return CONTINUE(read_frame);
+}
+
CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
return _fault();
}
+CtPtr ProtocolV2::handle_session_reset() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ ldout(cct, 1) << __func__ << " received session reset" << dendl;
+ reset_session();
+
+ return send_client_ident();
+}
+
+CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+ RetryFrame retry(payload, length);
+ connect_seq = retry.connect_seq + 1;
+
+ ldout(cct, 1) << __func__
+ << " received session retry connect_seq=" << retry.connect_seq
+ << ", inc to cs=" << connect_seq << dendl;
+
+ return send_reconnect();
+}
+
+CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+ RetryGlobalFrame retry(payload, length);
+ global_seq = messenger->get_global_seq(retry.global_seq);
+
+ ldout(cct, 1) << __func__ << " received session retry global global_seq="
+ << retry.global_seq << ", choose new gs=" << global_seq
+ << dendl;
+
+ return send_reconnect();
+}
+
+CtPtr ProtocolV2::handle_wait() {
+ ldout(cct, 20) << __func__ << dendl;
+ ldout(cct, 1) << __func__ << " received WAIT (connection race)" << dendl;
+ state = WAIT;
+ return _fault();
+}
+
+CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
+
+ ReconnectOkFrame reconnect_ok(payload, length);
+ ldout(cct, 5) << __func__
+ << " reconnect accepted: sms=" << reconnect_ok.msg_seq << dendl;
+
+ out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq);
+
+ backoff = utime_t();
+ ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
+ << ", lossy = " << connection->policy.lossy << ", features "
+ << connection->get_features() << dendl;
+
+ if (connection->delay_state) {
+ ceph_assert(connection->delay_state->ready());
+ }
+
+ connection->dispatch_queue->queue_connect(connection);
+ messenger->ms_deliver_handle_fast_connect(connection);
+
+ return ready();
+}
+
CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
- IdentFrame server_ident(payload, length);
+ ServerIdentFrame server_ident(payload, length);
ldout(cct, 5) << __func__ << " received server identification: "
- << "addrs: " << server_ident.addrs
- << " gid: " << server_ident.gid
- << " features_supported: " << std::hex
+ << "addrs=" << server_ident.addrs << " gid=" << server_ident.gid
+ << " global_seq=" << server_ident.global_seq
+ << " features_supported=" << std::hex
<< server_ident.supported_features
- << " features_required: " << server_ident.required_features
- << " flags: " << server_ident.flags << " cookie: " << std::dec
+ << " features_required=" << server_ident.required_features
+ << " flags=" << server_ident.flags << " cookie=" << std::dec
<< server_ident.cookie << dendl;
+ cookie = server_ident.cookie;
+
connection->set_peer_addrs(server_ident.addrs);
connection->peer_global_id = server_ident.gid;
- connection->set_features(server_ident.required_features &
+ connection->set_features(server_ident.supported_features &
connection->policy.features_supported);
+ peer_global_seq = server_ident.global_seq;
+
+ connection->policy.lossy = server_ident.flags & CEPH_MSG_CONNECT_LOSSY;
+
+ backoff = utime_t();
+ ldout(cct, 10) << __func__ << " connect success " << connect_seq
+ << ", lossy = " << connection->policy.lossy << ", features "
+ << connection->get_features() << dendl;
+
+ if (connection->delay_state) {
+ ceph_assert(connection->delay_state->ready());
+ }
+
+ connection->dispatch_queue->queue_connect(connection);
+ messenger->ms_deliver_handle_fast_connect(connection);
return ready();
}
CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
- IdentFrame client_ident(payload, length);
+ ClientIdentFrame client_ident(payload, length);
ldout(cct, 5) << __func__ << " received client identification: "
- << "addrs: " << client_ident.addrs
- << " gid: " << client_ident.gid
- << " features_supported: " << std::hex
+ << "addrs=" << client_ident.addrs << " gid=" << client_ident.gid
+ << " global_seq=" << client_ident.global_seq
+ << " features_supported=" << std::hex
<< client_ident.supported_features
- << " features_required: " << client_ident.required_features
- << " flags: " << client_ident.flags << " cookie: " << std::dec
- << client_ident.cookie << dendl;
+ << " features_required=" << client_ident.required_features
+ << " flags=" << client_ident.flags << std::dec << dendl;
if (client_ident.addrs.empty()) {
connection->set_peer_addr(connection->target_addr);
// Should we check if one of the ident.addrs match connection->target_addr
// as we do in ProtocolV1?
connection->set_peer_addrs(client_ident.addrs);
+ connection->target_addr = client_ident.addrs.msgr2_addr();
}
uint64_t feat_missing = connection->policy.features_required &
return WRITE(bl, handle_ident_missing_features_write);
}
- state = ACCEPTED_CLIENT_IDENT;
+ connection_features =
+ client_ident.supported_features & connection->policy.features_supported;
+
+ state = ACCEPTING_SESSION;
+ peer_global_seq = client_ident.global_seq;
+
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+
+ connection->lock.unlock();
+ AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
+
+ connection->inject_delay();
+
+ connection->lock.lock();
+ if (state != ACCEPTING_SESSION) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED);
+ return _fault();
+ }
+
+ if (existing) {
+ return handle_existing_connection(existing);
+ }
// if everything is OK reply with server identification
- connection->peer_global_id = client_ident.gid;
- cookie = client_ident.cookie;
+ return send_server_ident();
+}
+
+CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
+
+ ReconnectFrame reconnect(payload, length);
+
+ ldout(cct, 5) << __func__
+ << " received reconnect: cookie=" << reconnect.cookie
+ << " gs=" << reconnect.global_seq
+ << " cs=" << reconnect.connect_seq
+ << " ms=" << reconnect.msg_seq << dendl;
+
+ if (reconnect.addrs.empty()) {
+ connection->set_peer_addr(connection->target_addr);
+ } else {
+ // Should we check if one of the ident.addrs match connection->target_addr
+ // as we do in ProtocolV1?
+ connection->set_peer_addrs(reconnect.addrs);
+ connection->target_addr = reconnect.addrs.msgr2_addr();
+ }
+
+ connection->lock.unlock();
+ AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
+
+ connection->inject_delay();
+
+ connection->lock.lock();
+ if (state != ACCEPTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED);
+ return _fault();
+ }
+
+ ResetFrame reset;
+ bufferlist &bl = reset.get_buffer();
+
+ if (!existing) {
+ // there is no existing connection therefore cannot reconnect to previous
+ // session
+ ldout(cct, 0) << __func__
+ << " no existing connection exists, reseting client" << dendl;
+ return WRITE(bl, handle_session_reset_write);
+ }
+
+ std::lock_guard<std::mutex> l(existing->lock);
+
+ ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
+ if (!exproto) {
+ ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
+ ceph_assert(false);
+ }
+
+ if (exproto->state == CLOSED) {
+ ldout(cct, 5) << __func__ << " existing " << existing
+ << " already closed. Reseting client" << dendl;
+ return WRITE(bl, handle_session_reset_write);
+ }
+
+ if (exproto->replacing) {
+ ldout(cct, 1) << __func__
+ << " existing racing replace happened while replacing."
+ << " existing=" << existing << dendl;
+ RetryGlobalFrame retry(exproto->peer_global_seq);
+ bufferlist &bl = retry.get_buffer();
+ return WRITE(bl, handle_session_retry_write);
+ }
+
+ if (!exproto->cookie) {
+ // server connection was reseted, reset client
+ ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl;
+ return WRITE(bl, handle_session_reset_write);
+ } else if (exproto->cookie != reconnect.cookie) {
+ ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie
+ << " cc=" << reconnect.cookie << ", reseting client" << dendl;
+ return WRITE(bl, handle_session_reset_write);
+ }
+
+ if (exproto->peer_global_seq > reconnect.global_seq) {
+ ldout(cct, 5) << __func__
+ << " stale global_seq: sgs=" << exproto->peer_global_seq
+ << " cgs=" << reconnect.global_seq
+ << ", ask client to retry global" << dendl;
+ RetryGlobalFrame retry(exproto->peer_global_seq);
+ bufferlist &bl = retry.get_buffer();
+ return WRITE(bl, handle_session_retry_write);
+ }
+
+ if (exproto->connect_seq >= reconnect.connect_seq) {
+ ldout(cct, 5) << __func__
+ << " stale connect_seq scs=" << exproto->connect_seq
+ << " ccs=" << reconnect.connect_seq
+ << " , ask client to retry" << dendl;
+ RetryFrame retry(exproto->connect_seq);
+ bufferlist &bl = retry.get_buffer();
+ return WRITE(bl, handle_session_retry_write);
+ }
+
+ // everything looks good
+ exproto->connect_seq = reconnect.connect_seq;
+ exproto->message_seq = reconnect.msg_seq;
+
+ return reuse_connection(existing, exproto, true);
+}
+
+CtPtr ProtocolV2::handle_session_reset_write(int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " session reset write failed r=" << r << " ("
+ << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_session_retry_write(int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " session retry write failed r=" << r << " ("
+ << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+ ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
+
+ std::lock_guard<std::mutex> l(existing->lock);
+
+ ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
+ if (!exproto) {
+ ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
+ ceph_assert(false);
+ }
+
+ if (exproto->state == CLOSED) {
+ ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
+ << dendl;
+ return send_server_ident();
+ }
+
+ if (exproto->replacing) {
+ ldout(cct, 1) << __func__
+ << " existing racing replace happened while replacing."
+ << " existing=" << existing << dendl;
+ WaitFrame wait;
+ bufferlist &bl = wait.get_buffer();
+ return WRITE(bl, handle_wait_write);
+ }
+
+ if (existing->policy.lossy) {
+ // existing connection can be thrown out in favor of this one
+ ldout(cct, 1)
+ << __func__
+ << " accept replacing existing (lossy) channel (new one lossy="
+ << connection->policy.lossy << ")" << dendl;
+ existing->protocol->stop();
+ existing->dispatch_queue->queue_reset(existing.get());
+ return send_server_ident();
+ }
+
+ if (exproto->cookie) { // Found previous session
+ // peer has reseted and we're going to reuse the existing connection
+ // by replacing the communication socket
+ if (connection->policy.resetcheck) {
+ exproto->reset_session();
+ }
+ return reuse_connection(existing, exproto, false);
+ }
+
+ if (exproto->state == READY || exproto->state == STANDBY) {
+ ldout(cct, 10) << __func__
+ << " existing connection is READY/STANDBY, lets reuse it"
+ << dendl;
+ return reuse_connection(existing, exproto, false);
+ }
+
+ // Looks like a connection race: server and client are both connecting to
+ // each other at the same time.
+ if (connection->peer_addrs->msgr2_addr() <
+ messenger->get_myaddrs().msgr2_addr() ||
+ existing->policy.server) {
+ // this connection wins
+ ldout(cct, 10) << __func__
+ << " connection race detected, replacing existing="
+ << existing << " socket by this connection's socket"
+ << dendl;
+ return reuse_connection(existing, exproto, false);
+ } else {
+ // the existing connection wins
+ ldout(cct, 10) << __func__
+ << " connection race detected, this connection loses"
+ << dendl;
+ ceph_assert(connection->peer_addrs->msgr2_addr() >
+ messenger->get_myaddrs().msgr2_addr());
+ WaitFrame wait;
+ bufferlist &bl = wait.get_buffer();
+ return WRITE(bl, handle_wait_write);
+ }
+}
+
+CtPtr ProtocolV2::handle_wait_write(int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " wait write failed r=" << r << " ("
+ << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+ ProtocolV2 *exproto, bool reconnect) {
+ ldout(cct, 20) << __func__ << " existing=" << existing
+ << " reconnect=" << reconnect << dendl;
+
+ connection->inject_delay();
+
+ std::lock_guard<std::mutex> l(existing->write_lock);
+
+ connection->center->delete_file_event(connection->cs.fd(),
+ EVENT_READABLE | EVENT_WRITABLE);
+
+ if (existing->delay_state) {
+ existing->delay_state->flush();
+ ceph_assert(!connection->delay_state);
+ }
+ exproto->reset_recv_state();
+ if (!reconnect) {
+ exproto->peer_global_seq = peer_global_seq;
+ exproto->connection_features = connection_features;
+ }
+
+ auto temp_cs = std::move(connection->cs);
+ EventCenter *new_center = connection->center;
+ Worker *new_worker = connection->worker;
+ // avoid _stop shutdown replacing socket
+ // queue a reset on the new connection, which we're dumping for the old
+ stop();
+
+ connection->dispatch_queue->queue_reset(connection);
+ ldout(messenger->cct, 1) << __func__ << " stop myself to swap existing"
+ << dendl;
+ exproto->can_write = false;
+ exproto->replacing = true;
+ existing->state_offset = 0;
+ // avoid previous thread modify event
+ exproto->state = NONE;
+ existing->state = AsyncConnection::STATE_NONE;
+ // Discard existing prefetch buffer in `recv_buf`
+ existing->recv_start = existing->recv_end = 0;
+ // there shouldn't exist any buffer
+ ceph_assert(connection->recv_start == connection->recv_end);
+
+ auto deactivate_existing = std::bind(
+ [existing, new_worker, new_center, exproto,
+ reconnect](ConnectedSocket &cs) mutable {
+ // we need to delete time event in original thread
+ {
+ std::lock_guard<std::mutex> l(existing->lock);
+ existing->write_lock.lock();
+ exproto->requeue_sent();
+ existing->outcoming_bl.clear();
+ existing->open_write = false;
+ existing->write_lock.unlock();
+ if (exproto->state == NONE) {
+ existing->shutdown_socket();
+ existing->cs = std::move(cs);
+ existing->worker->references--;
+ new_worker->references++;
+ existing->logger = new_worker->get_perf_counter();
+ existing->worker = new_worker;
+ existing->center = new_center;
+ if (existing->delay_state)
+ existing->delay_state->set_center(new_center);
+ } else if (exproto->state == CLOSED) {
+ auto back_to_close = std::bind(
+ [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
+ new_center->submit_to(new_center->get_id(),
+ std::move(back_to_close), true);
+ return;
+ } else {
+ ceph_abort();
+ }
+ }
+
+ // Before changing existing->center, it may already exists some
+ // events in existing->center's queue. Then if we mark down
+ // `existing`, it will execute in another thread and clean up
+ // connection. Previous event will result in segment fault
+ auto transfer_existing = [existing, exproto, reconnect]() mutable {
+ std::lock_guard<std::mutex> l(existing->lock);
+ if (exproto->state == CLOSED) return;
+ ceph_assert(exproto->state == NONE);
+
+ exproto->state = ACCEPTING_SESSION;
+ existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
+ existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
+ existing->read_handler);
+ if (!reconnect) {
+ CONTINUATION_RUN2(exproto, exproto->send_server_ident())
+ } else {
+ CONTINUATION_RUN2(exproto, exproto->send_reconnect_ok())
+ }
+ };
+ if (existing->center->in_thread())
+ transfer_existing();
+ else
+ existing->center->submit_to(existing->center->get_id(),
+ std::move(transfer_existing), true);
+ },
+ std::move(temp_cs));
+
+ existing->center->submit_to(existing->center->get_id(),
+ std::move(deactivate_existing), true);
+ return nullptr;
+}
+
+CtPtr ProtocolV2::send_server_ident() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ // this is required for the case when this connection is being replaced
+ out_seq = discard_requeued_up_to(out_seq, 0);
+ in_seq = 0;
+
+ if (!connection->policy.lossy) {
+ cookie = ceph::util::generate_random_number<uint64_t>(0, -1ll);
+ }
uint64_t flags = 0;
if (connection->policy.lossy) {
flags = flags | CEPH_MSG_CONNECT_LOSSY;
}
- IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(),
- connection->policy.features_supported,
- connection->policy.features_required, flags, cookie);
+
+ uint64_t gs = messenger->get_global_seq();
+ ServerIdentFrame server_ident(
+ messenger->get_myaddrs(), messenger->get_myname().num(), gs,
+ connection->policy.features_supported,
+ connection->policy.features_required, flags, cookie);
ldout(cct, 5) << __func__ << " sending identification: "
- << "addrs: " << ident.addrs << " gid: " << ident.gid
- << " features_supported: " << std::hex
- << ident.supported_features
- << " features_required: " << ident.required_features
- << " flags: " << ident.flags << " cookie: " << std::dec
- << ident.cookie << dendl;
+ << "addrs=" << messenger->get_myaddrs()
+ << " gid=" << messenger->get_myname().num()
+ << " global_seq=" << gs << " features_supported=" << std::hex
+ << connection->policy.features_supported
+ << " features_required=" << connection->policy.features_required
+ << " flags=" << flags << " cookie=" << std::dec << cookie
+ << dendl;
connection->lock.unlock();
// Because "replacing" will prevent other connections preempt this addr,
connection->inject_delay();
connection->lock.lock();
+ replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
- << connection->peer_addrs.msgr2_addr()
+ << connection->peer_addrs->msgr2_addr()
<< " just fail later one(this)" << dendl;
- ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();
return _fault();
}
- if (state != ACCEPTED_CLIENT_IDENT) {
+ if (state != ACCEPTING_SESSION) {
ldout(cct, 1) << __func__
<< " state changed while accept_conn, it must be mark_down"
<< dendl;
ceph_assert(state == CLOSED || state == NONE);
- ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();
return _fault();
}
- bufferlist &bl = ident.get_buffer();
- return WRITE(bl, handle_send_server_ident_write);
+ connection->set_features(connection_features);
+
+ // notify
+ connection->dispatch_queue->queue_accept(connection);
+ messenger->ms_deliver_handle_fast_accept(connection);
+
+ bufferlist &bl = server_ident.get_buffer();
+ return WRITE(bl, handle_server_ident_write);
}
-CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
+CtPtr ProtocolV2::handle_server_ident_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
- << " (" << cpp_strerror(r) << ")" << dendl;
+ ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
+ << cpp_strerror(r) << ")" << dendl;
+ connection->inject_delay();
+ return _fault();
+ }
+
+ if (connection->delay_state) {
+ ceph_assert(connection->delay_state->ready());
+ }
+
+ return ready();
+}
+
+CtPtr ProtocolV2::send_reconnect_ok() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ out_seq = discard_requeued_up_to(out_seq, message_seq);
+
+ uint64_t ms = in_seq;
+ ReconnectOkFrame reconnect_ok(ms);
+
+ ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
+
+ connection->lock.unlock();
+ // Because "replacing" will prevent other connections preempt this addr,
+ // it's safe that here we don't acquire Connection's lock
+ ssize_t r = messenger->accept_conn(connection);
+
+ connection->inject_delay();
+
+ connection->lock.lock();
+ replacing = false;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
+ << connection->peer_addrs->msgr2_addr()
+ << " just fail later one(this)" << dendl;
+ connection->inject_delay();
+ return _fault();
+ }
+ if (state != ACCEPTING_SESSION) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept_conn, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED || state == NONE);
+ connection->inject_delay();
return _fault();
}
connection->dispatch_queue->queue_accept(connection);
messenger->ms_deliver_handle_fast_accept(connection);
- return CONTINUE(read_frame);
+ bufferlist &bl = reconnect_ok.get_buffer();
+ return WRITE(bl, handle_reconnect_ok_write);
}
-CtPtr ProtocolV2::handle_send_server_ident_write(int r) {
+CtPtr ProtocolV2::handle_reconnect_ok_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
+ ldout(cct, 1) << __func__ << " reconnect ok write failed r=" << r << " ("
<< cpp_strerror(r) << ")" << dendl;
+ connection->inject_delay();
return _fault();
}
+ if (connection->delay_state) {
+ ceph_assert(connection->delay_state->ready());
+ }
+
return ready();
}
CONNECTING,
START_ACCEPT,
ACCEPTING,
- ACCEPTED_CLIENT_IDENT,
+ ACCEPTING_SESSION,
READY,
THROTTLE_MESSAGE,
THROTTLE_BYTES,
"CONNECTING",
"START_ACCEPT",
"ACCEPTING",
- "ACCEPTED_CLIENT_IDENT",
+ "ACCEPTING_SESSION",
"READY",
"THROTTLE_MESSAGE",
"THROTTLE_BYTES",
return statenames[state];
}
-public:
enum class Tag : uint32_t {
AUTH_REQUEST,
AUTH_BAD_METHOD,
AUTH_DONE,
IDENT,
IDENT_MISSING_FEATURES,
+ SESSION_RECONNECT,
+ SESSION_RESET,
+ SESSION_RETRY,
+ SESSION_RETRY_GLOBAL,
+ SESSION_RECONNECT_OK,
+ WAIT,
MESSAGE,
KEEPALIVE2,
KEEPALIVE2_ACK,
bufferlist auth_payload;
AuthRequestFrame(uint32_t method, bufferlist &auth_payload)
- : Frame(Tag::AUTH_REQUEST),
- method(method),
- len(auth_payload.length()),
- auth_payload(auth_payload) {
+ : Frame(Tag::AUTH_REQUEST) {
encode(method, payload, 0);
- encode(len, payload, 0);
+ encode(auth_payload.length(), payload, 0);
payload.claim_append(auth_payload);
}
std::vector<__u32> allowed_methods;
AuthBadMethodFrame(uint32_t method, std::vector<__u32> methods)
- : Frame(Tag::AUTH_BAD_METHOD),
- method(method),
- allowed_methods(methods) {
+ : Frame(Tag::AUTH_BAD_METHOD) {
encode(method, payload, 0);
- encode((uint32_t)allowed_methods.size(), payload, 0);
- for (const auto &a_meth : allowed_methods) {
+ encode((uint32_t)methods.size(), payload, 0);
+ for (const auto &a_meth : methods) {
encode(a_meth, payload, 0);
}
}
std::string error_msg;
AuthBadAuthFrame(uint32_t error_code, std::string error_msg)
- : Frame(Tag::AUTH_BAD_AUTH),
- error_code(error_code),
- error_msg(error_msg) {
+ : Frame(Tag::AUTH_BAD_AUTH) {
encode(error_code, payload, 0);
encode(error_msg, payload, 0);
}
uint32_t len;
bufferlist auth_payload;
- AuthMoreFrame(bufferlist &auth_payload)
- : Frame(Tag::AUTH_MORE),
- len(auth_payload.length()),
- auth_payload(auth_payload) {
- encode(len, payload, 0);
+ AuthMoreFrame(bufferlist &auth_payload) : Frame(Tag::AUTH_MORE) {
+ encode(auth_payload.length(), payload, 0);
payload.claim_append(auth_payload);
}
struct AuthDoneFrame : public Frame {
uint64_t flags;
- AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE), flags(flags) {
+ AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE) {
encode(flags, payload, 0);
}
}
};
- struct IdentFrame : public SignedEncryptedFrame {
+ struct ClientIdentFrame : public SignedEncryptedFrame {
entity_addrvec_t addrs;
int64_t gid;
+ uint64_t global_seq;
uint64_t supported_features; // CEPH_FEATURE_*
uint64_t required_features; // CEPH_FEATURE_*
uint64_t flags; // CEPH_MSG_CONNECT_*
- uint64_t cookie;
- IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features,
- uint64_t required_features, uint64_t flags, uint64_t cookie)
- : SignedEncryptedFrame(Tag::IDENT),
- addrs(addrs),
- gid(gid),
- supported_features(supported_features),
- required_features(required_features),
- flags(flags),
- cookie(cookie) {
+ ClientIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
+ uint64_t global_seq, uint64_t supported_features,
+ uint64_t required_features, uint64_t flags)
+ : SignedEncryptedFrame(Tag::IDENT) {
encode(addrs, payload, -1ll);
encode(gid, payload, -1ll);
+ encode(global_seq, payload, -1ll);
encode(supported_features, payload, -1ll);
encode(required_features, payload, -1ll);
encode(flags, payload, -1ll);
+ }
+
+ ClientIdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ bufferlist bl;
+ bl.push_back(buffer::create_static(length, payload));
+ try {
+ auto ti = bl.cbegin();
+ decode_frame(ti);
+ } catch (const buffer::error &e) {
+ }
+ }
+
+ ClientIdentFrame() : SignedEncryptedFrame() {}
+
+ protected:
+ void decode_frame(ceph::buffer::list::const_iterator &ti) {
+ decode(addrs, ti);
+ decode(gid, ti);
+ decode(global_seq, ti);
+ decode(supported_features, ti);
+ decode(required_features, ti);
+ decode(flags, ti);
+ }
+ };
+
+ struct ServerIdentFrame : public ClientIdentFrame {
+ uint64_t cookie;
+
+ ServerIdentFrame(const entity_addrvec_t &addrs, int64_t gid,
+ uint64_t global_seq, uint64_t supported_features,
+ uint64_t required_features, uint64_t flags,
+ uint64_t cookie)
+ : ClientIdentFrame(addrs, gid, global_seq, supported_features,
+ required_features, flags) {
encode(cookie, payload, -1ll);
}
- IdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ ServerIdentFrame(char *payload, uint32_t length) : ClientIdentFrame() {
+ bufferlist bl;
+ bl.push_back(buffer::create_static(length, payload));
+ try {
+ auto ti = bl.cbegin();
+ ClientIdentFrame::decode_frame(ti);
+ decode(cookie, ti);
+ } catch (const buffer::error &e) {
+ }
+ }
+ };
+
+ struct ReconnectFrame : public SignedEncryptedFrame {
+ entity_addrvec_t addrs;
+ uint64_t cookie;
+ uint64_t global_seq;
+ uint64_t connect_seq;
+ uint64_t msg_seq;
+
+ ReconnectFrame(const entity_addrvec_t &addrs, uint64_t cookie,
+ uint64_t global_seq, uint64_t connect_seq, uint64_t msg_seq)
+ : SignedEncryptedFrame(Tag::SESSION_RECONNECT) {
+ encode(addrs, payload, -1ll);
+ encode(cookie, payload, 0);
+ encode(global_seq, payload, 0);
+ encode(connect_seq, payload, 0);
+ encode(msg_seq, payload, 0);
+ }
+
+ ReconnectFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
bufferlist bl;
bl.push_back(buffer::create_static(length, payload));
try {
auto ti = bl.cbegin();
decode(addrs, ti);
- decode(gid, ti);
- decode(supported_features, ti);
- decode(required_features, ti);
- decode(flags, ti);
decode(cookie, ti);
+ decode(global_seq, ti);
+ decode(connect_seq, ti);
+ decode(msg_seq, ti);
+ } catch (const buffer::error &e) {
+ }
+ }
+ };
+
+ struct ResetFrame : public SignedEncryptedFrame {
+ ResetFrame() : SignedEncryptedFrame(Tag::SESSION_RESET) {}
+ };
+
+ struct RetryFrame : public SignedEncryptedFrame {
+ uint64_t connect_seq;
+
+ RetryFrame(uint64_t connect_seq)
+ : SignedEncryptedFrame(Tag::SESSION_RETRY) {
+ encode(connect_seq, payload);
+ }
+
+ RetryFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ bufferlist bl;
+ bl.push_back(buffer::create_static(length, payload));
+ try {
+ auto ti = bl.cbegin();
+ decode(connect_seq, ti);
+ } catch (const buffer::error &e) {
+ }
+ }
+ };
+
+ struct RetryGlobalFrame : public SignedEncryptedFrame {
+ uint64_t global_seq;
+
+ RetryGlobalFrame(uint64_t global_seq)
+ : SignedEncryptedFrame(Tag::SESSION_RETRY_GLOBAL) {
+ encode(global_seq, payload);
+ }
+
+ RetryGlobalFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ bufferlist bl;
+ bl.push_back(buffer::create_static(length, payload));
+ try {
+ auto ti = bl.cbegin();
+ decode(global_seq, ti);
+ } catch (const buffer::error &e) {
+ }
+ }
+ };
+
+ struct WaitFrame : public SignedEncryptedFrame {
+ WaitFrame() : SignedEncryptedFrame(Tag::WAIT) {}
+ };
+
+ struct ReconnectOkFrame : public SignedEncryptedFrame {
+ uint64_t msg_seq;
+
+ ReconnectOkFrame(uint64_t msg_seq)
+ : SignedEncryptedFrame(Tag::SESSION_RECONNECT_OK) {
+ encode(msg_seq, payload, 0);
+ }
+
+ ReconnectOkFrame(char *payload, uint32_t length) : SignedEncryptedFrame() {
+ bufferlist bl;
+ bl.push_back(buffer::create_static(length, payload));
+ try {
+ auto ti = bl.cbegin();
+ decode(msg_seq, ti);
} catch (const buffer::error &e) {
}
}
char *temp_buffer;
State state;
uint64_t peer_required_features;
+ uint64_t connection_features;
uint64_t cookie;
+ uint64_t global_seq;
+ uint64_t connect_seq;
+ uint64_t peer_global_seq;
uint64_t message_seq;
+ bool replacing;
bool can_write;
std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
std::list<Message *> sent;
- __u32 connect_seq;
std::atomic<uint64_t> out_seq{0};
std::atomic<uint64_t> in_seq{0};
std::atomic<uint64_t> ack_left{0};
bufferlist &bl);
void requeue_sent();
+ uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
void reset_recv_state();
Ct<ProtocolV2> *_fault();
void discard_out_queue();
+ void reset_session();
void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
Message *_get_next_outgoing(bufferlist *bl);
ssize_t write_message(Message *m, bufferlist &bl, bool more);
CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_write);
Ct<ProtocolV2> *start_client_banner_exchange();
Ct<ProtocolV2> *post_client_banner_exchange();
Ct<ProtocolV2> *handle_auth_done(char *payload, uint32_t length);
Ct<ProtocolV2> *send_client_ident();
Ct<ProtocolV2> *handle_client_ident_write(int r);
+ Ct<ProtocolV2> *send_reconnect();
+ Ct<ProtocolV2> *handle_reconnect_write(int r);
Ct<ProtocolV2> *handle_ident_missing_features(char *payload, uint32_t length);
+ Ct<ProtocolV2> *handle_session_reset();
+ Ct<ProtocolV2> *handle_session_retry(char *payload, uint32_t length);
+ Ct<ProtocolV2> *handle_session_retry_global(char *payload, uint32_t length);
+ Ct<ProtocolV2> *handle_wait();
+ Ct<ProtocolV2> *handle_reconnect_ok(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_server_ident(char *payload, uint32_t length);
// Server Protocol
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2,
handle_ident_missing_features_write);
- WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_send_server_ident_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_reset_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_retry_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_wait_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_server_ident_write);
+ WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_ok_write);
Ct<ProtocolV2> *start_server_banner_exchange();
Ct<ProtocolV2> *post_server_banner_exchange();
Ct<ProtocolV2> *handle_auth_done_write(int r);
Ct<ProtocolV2> *handle_client_ident(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
- Ct<ProtocolV2> *handle_send_server_ident_write(int r);
+ Ct<ProtocolV2> *handle_reconnect(char *payload, uint32_t length);
+ Ct<ProtocolV2> *handle_session_reset_write(int r);
+ Ct<ProtocolV2> *handle_session_retry_write(int r);
+ Ct<ProtocolV2> *handle_existing_connection(AsyncConnectionRef existing);
+ Ct<ProtocolV2> *handle_wait_write(int r);
+ Ct<ProtocolV2> *reuse_connection(AsyncConnectionRef existing,
+ ProtocolV2 *exproto, bool reconnect);
+ Ct<ProtocolV2> *send_server_ident();
+ Ct<ProtocolV2> *handle_server_ident_write(int r);
+ Ct<ProtocolV2> *send_reconnect_ok();
+ Ct<ProtocolV2> *handle_reconnect_ok_write(int r);
};
#endif /* _MSG_ASYNC_PROTOCOL_V2_ */