}
void ProtocolV2::reset_recv_state() {
- if (state == CONNECTING || state == READY) {
+ if ((state >= AUTH_CONNECTING && state <= SESSION_RECONNECTING) ||
+ state == READY) {
auth_meta.reset(new AuthConnectionMeta);
session_stream_handlers.tx.reset(nullptr);
session_stream_handlers.rx.reset(nullptr);
return nullptr;
}
- if (connection->policy.lossy && state != START_CONNECT &&
- state != CONNECTING) {
+ if (connection->policy.lossy &&
+ !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
requeue_sent();
if (out_queue.empty() && state >= START_ACCEPT &&
- state <= ACCEPTING_SESSION && !replacing) {
+ state <= SESSION_ACCEPTING && !replacing) {
ldout(cct, 2) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
connection->write_lock.unlock();
connection->write_lock.unlock();
- if (state != START_CONNECT &&
- state != CONNECTING &&
+ if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
state != WAIT &&
- state != ACCEPTING_SESSION /* due to connection race */) {
+ state != SESSION_ACCEPTING /* due to connection race */) {
// policy maybe empty when state is in accept
if (connection->policy.server) {
ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
- INTERCEPT(state == CONNECTING ? 3 : 4);
+ INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
return WRITE(bl, "banner", _wait_for_peer_banner);
}
next_payload_len = payload_len;
- INTERCEPT(state == CONNECTING ? 5 : 6);
+ INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
return READ(next_payload_len, _handle_peer_banner_payload);
}
this->connection_features = msgr2_required;
}
+ // at this point we can change how the client protocol behaves based on
+ // this->peer_required_features
+
+ if (state == BANNER_CONNECTING) {
+ state = HELLO_CONNECTING;
+ }
+ else {
+ ceph_assert(state == BANNER_ACCEPTING);
+ state = HELLO_ACCEPTING;
+ }
+
auto hello = HelloFrame::Encode(messenger->get_mytype(),
connection->target_addr);
- INTERCEPT(state == CONNECTING ? 7 : 8);
+ INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
return WRITE(hello, "hello frame", read_frame);
}
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
+ lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
+ return _fault();
+ }
+
auto hello = HelloFrame::Decode(payload);
ldout(cct, 5) << __func__ << " received hello:"
if (connection->get_peer_type() == -1) {
connection->set_peer_type(hello.entity_type());
- ceph_assert(state == ACCEPTING);
+ ceph_assert(state == HELLO_ACCEPTING);
connection->policy = messenger->get_policy(hello.entity_type());
ldout(cct, 10) << __func__ << " accept of host_type "
<< (int)hello.entity_type()
<< " policy.resetcheck=" << connection->policy.resetcheck
<< dendl;
} else {
+ ceph_assert(state == HELLO_CONNECTING);
if (connection->get_peer_type() != hello.entity_type()) {
ldout(cct, 1) << __func__ << " connection peer type does not match what"
<< " peer advertises " << connection->get_peer_type()
switch(sent_tag) {
case Tag::HELLO:
if (received_tag == Tag::HELLO) {
- ceph_assert(state == ACCEPTING);
+ ceph_assert(state == AUTH_ACCEPTING);
return TAG_MASK(Tag::AUTH_REQUEST);
} else {
return TAG_MASK(Tag::HELLO);
return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
} else {
- ceph_assert(state == CONNECTING);
+ ceph_assert(state == SESSION_CONNECTING);
return TAG_MASK(Tag::SERVER_IDENT) |
TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT);
}
return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
} else {
- ceph_assert(state == CONNECTING);
+ ceph_assert(state == SESSION_RECONNECTING);
return TAG_MASK(Tag::SESSION_RECONNECT_OK) |
TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) |
TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT);
// does it need throttle?
if (next_tag == Tag::MESSAGE) {
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
state = THROTTLE_MESSAGE;
return CONTINUE(throttle_message);
} else {
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
+
auto keepalive_frame = KeepAliveFrame::Decode(session_stream_handlers,
payload);
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
+
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(session_stream_handlers,
payload);
connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
+
auto ack = AckFrame::Decode(session_stream_handlers, payload);
handle_message_ack(ack.seq());
return CONTINUE(read_frame);
INTERCEPT(1);
- state = CONNECTING;
+ state = BANNER_CONNECTING;
global_seq = messenger->get_global_seq();
CtPtr ProtocolV2::post_client_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;
- // at this point we can change how the client protocol behaves based on
- // this->peer_required_features
+ state = AUTH_CONNECTING;
return send_auth_request();
}
connection, am.get(),
&am->auth_method, &preferred_modes, &bl);
connection->lock.lock();
- if (state != State::CONNECTING) {
+ if (state != AUTH_CONNECTING) {
+ ldout(cct, 1) << __func__ << " state changed!" << dendl;
return _fault();
}
if (r < 0) {
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != AUTH_CONNECTING) {
+ lderr(cct) << __func__ << " not in auth connect state!" << dendl;
+ return _fault();
+ }
+
auto bad_method = AuthBadMethodFrame::Decode(payload);
ldout(cct, 1) << __func__ << " method=" << bad_method.method()
<< " result " << cpp_strerror(bad_method.result())
bad_method.allowed_methods(),
bad_method.allowed_modes());
connection->lock.lock();
- if (state != State::CONNECTING || r < 0) {
+ if (state != AUTH_CONNECTING || r < 0) {
return _fault();
}
return send_auth_request(bad_method.allowed_methods());
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != AUTH_CONNECTING) {
+ lderr(cct) << __func__ << " not in auth connect state!" << dendl;
+ return _fault();
+ }
+
auto auth_more = AuthReplyMoreFrame::Decode(payload);
ldout(cct, 5) << __func__
<< " auth reply more len=" << auth_more.auth_payload().length()
int r = messenger->auth_client->handle_auth_reply_more(
connection, am.get(), auth_more.auth_payload(), &reply);
connection->lock.lock();
- if (state != State::CONNECTING) {
+ if (state != AUTH_CONNECTING) {
+ ldout(cct, 1) << __func__ << " state changed!" << dendl;
return _fault();
}
if (r < 0) {
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != AUTH_CONNECTING) {
+ lderr(cct) << __func__ << " not in auth connect state!" << dendl;
+ return _fault();
+ }
+
auto auth_done = AuthDoneFrame::Decode(payload);
ceph_assert(messenger->auth_client);
&am->session_key,
&am->connection_secret);
connection->lock.lock();
- if (state != State::CONNECTING) {
+ if (state != AUTH_CONNECTING) {
+ ldout(cct, 1) << __func__ << " state changed!" << dendl;
return _fault();
}
if (r < 0) {
if (!server_cookie) {
ceph_assert(connect_seq == 0);
+ state = SESSION_CONNECTING;
return send_client_ident();
} else { // reconnecting to previous session
+ state = SESSION_RECONNECTING;
ceph_assert(connect_seq > 0);
return send_reconnect();
}
}
}
connection->lock.lock();
- if (state != CONNECTING) {
+ if (state != SESSION_CONNECTING) {
ldout(cct, 1) << __func__
<< " state changed while learned_addr, mark_down or "
<< " replacing must be happened just now" << dendl;
ldout(cct, 5) << __func__ << " sending identification: "
<< "addrs=" << messenger->get_myaddrs()
- << " target=" << connection->target_addr
+ << " target=" << connection->target_addr
<< " gid=" << messenger->get_myname().num()
<< " global_seq=" << global_seq
<< " features_supported=" << std::hex
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_CONNECTING) {
+ lderr(cct) << __func__ << " not in session connect state!" << dendl;
+ return _fault();
+ }
+
auto ident_missing =
IdentMissingFeaturesFrame::Decode(session_stream_handlers, payload);
lderr(cct) << __func__
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto reset = ResetFrame::Decode(session_stream_handlers, payload);
ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
in_seq = 0;
}
+ state = SESSION_CONNECTING;
return send_client_ident();
}
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto retry = RetryFrame::Decode(session_stream_handlers, payload);
connect_seq = retry.connect_seq() + 1;
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto retry = RetryGlobalFrame::Decode(session_stream_handlers, payload);
global_seq = messenger->get_global_seq(retry.global_seq());
<< " payload.length()=" << payload.length()
<< dendl;
+ if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
+ return _fault();
+ }
+
state = WAIT;
WaitFrame::Decode(session_stream_handlers, payload);
return _fault();
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto reconnect_ok = ReconnectOkFrame::Decode(session_stream_handlers,
payload);
ldout(cct, 5) << __func__
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_CONNECTING) {
+ lderr(cct) << __func__ << " not in session connect state!" << dendl;
+ return _fault();
+ }
+
auto server_ident = ServerIdentFrame::Decode(session_stream_handlers,
payload);
ldout(cct, 5) << __func__ << " received server identification:"
INTERCEPT(2);
- state = ACCEPTING;
+ state = BANNER_ACCEPTING;
return _banner_exchange(CONTINUATION(post_server_banner_exchange));
}
CtPtr ProtocolV2::post_server_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;
- // at this point we can change how the server protocol behaves based on
- // this->peer_required_features
+ state = AUTH_ACCEPTING;
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
+ ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
+ << dendl;
+
+ if (state != AUTH_ACCEPTING) {
+ lderr(cct) << __func__ << " not in auth accept state!" << dendl;
+ return _fault();
+ }
+
auto request = AuthRequestFrame::Decode(payload);
ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
<< ", preferred_modes=" << request.preferred_modes()
more, am->auth_method, auth_payload,
&reply);
connection->lock.lock();
- if (state != ACCEPTING) {
+ if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
ldout(cct, 1) << __func__
<< " state changed while accept, it must be mark_down"
<< dendl;
ceph_assert(auth_meta);
session_stream_handlers = \
ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
+
+ state = SESSION_ACCEPTING;
+
auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
auth_meta->con_mode,
reply);
return WRITE(auth_done, "auth done", read_frame);
} else if (r == 0) {
+ state = AUTH_ACCEPTING_MORE;
+
auto more = AuthReplyMoreFrame::Encode(reply);
return WRITE(more, "auth reply more", read_frame);
} else if (r == -EBUSY) {
{
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+
+ if (state != AUTH_ACCEPTING_MORE) {
+ lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
+ return _fault();
+ }
+
auto auth_more = AuthRequestMoreFrame::Decode(payload);
return _handle_auth_request(auth_more.auth_payload(), true);
}
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " not in session accept state!" << dendl;
+ return _fault();
+ }
+
auto client_ident = ClientIdentFrame::Decode(session_stream_handlers,
payload);
connection_features =
client_ident.supported_features() & connection->policy.features_supported;
- state = ACCEPTING_SESSION;
peer_global_seq = client_ident.global_seq();
// Looks good so far, let's check if there is already an existing connection
connection->inject_delay();
connection->lock.lock();
- if (state != ACCEPTING_SESSION) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept, it must be mark_down"
<< dendl;
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " not in session accept state!" << dendl;
+ return _fault();
+ }
+
auto reconnect = ReconnectFrame::Decode(session_stream_handlers, payload);
ldout(cct, 5) << __func__
connection->inject_delay();
connection->lock.lock();
- if (state != ACCEPTING) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept, it must be mark_down"
<< dendl;
if (exproto->state == CLOSED) return;
ceph_assert(exproto->state == NONE);
- exproto->state = ACCEPTING_SESSION;
+ exproto->state = SESSION_ACCEPTING;
existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
existing->read_handler);
connection->inject_delay();
return _fault();
}
- if (state != ACCEPTING_SESSION) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept_conn, it must be mark_down"
<< dendl;
connection->inject_delay();
return _fault();
}
- if (state != ACCEPTING_SESSION) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept_conn, it must be mark_down"
<< dendl;