#define READB(L, B, C) read(CONTINUATION(C), L, B)
-#ifdef UNIT_TESTS_BUILT
+#define TAG_MASK(T) (1 << ((uint64_t)(T)-1))
+#ifdef UNIT_TESTS_BUILT
#define INTERCEPT(S) { \
if(connection->interceptor) { \
auto a = connection->interceptor->intercept(connection, (S)); \
can_write(false),
bannerExchangeCallback(nullptr),
next_payload_len(0),
+ sent_tag(static_cast<Tag>(0)),
+ next_tag(static_cast<Tag>(0)),
keepalive(false) {
temp_buffer = new char[4096];
}
connection->pendingReadLen.reset();
connection->writeCallback.reset();
+ sent_tag = static_cast<Tag>(0);
+ next_tag = static_cast<Tag>(0);
+
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
connection->policy.throttler_messages) {
ldout(cct, 10) << __func__ << " releasing " << 1
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
+ sent_tag = message.tag;
+
m->trace.event("async writing message");
ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
<< " src=" << entity_name_t(messenger->get_myname())
s = in_seq;
AckFrame ack(session_stream_handlers, in_seq);
connection->outcoming_bl.claim_append(ack.get_buffer());
+ sent_tag = ack.tag;
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
ack_left -= left;
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_PARAM(next, ProtocolV2),
F &frame) {
+ sent_tag = frame.tag;
return write(desc, CONTINUATION(next), frame.get_buffer());
}
return callback;
}
+uint64_t ProtocolV2::expected_tags(Tag sent_tag, Tag received_tag) {
+ switch(sent_tag) {
+ case Tag::HELLO:
+ if (received_tag == Tag::HELLO) {
+ ceph_assert(state == ACCEPTING);
+ return TAG_MASK(Tag::AUTH_REQUEST);
+ } else {
+ return TAG_MASK(Tag::HELLO);
+ }
+ case Tag::AUTH_REQUEST:
+ case Tag::AUTH_REQUEST_MORE:
+ return TAG_MASK(Tag::AUTH_REPLY_MORE) | TAG_MASK(Tag::AUTH_DONE) |
+ TAG_MASK(Tag::AUTH_BAD_METHOD);
+ case Tag::AUTH_BAD_METHOD:
+ return TAG_MASK(Tag::AUTH_REQUEST);
+ case Tag::AUTH_REPLY_MORE:
+ return TAG_MASK(Tag::AUTH_REQUEST_MORE);
+ case Tag::AUTH_DONE:
+ return TAG_MASK(Tag::CLIENT_IDENT) | TAG_MASK(Tag::SESSION_RECONNECT);
+ case Tag::CLIENT_IDENT:
+ if (state == READY) {
+ return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
+ TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
+ } else {
+ ceph_assert(state == CONNECTING);
+ return TAG_MASK(Tag::SERVER_IDENT) |
+ TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT);
+ }
+ case Tag::SESSION_RECONNECT:
+ if (state == READY) {
+ return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
+ TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
+ } else {
+ ceph_assert(state == CONNECTING);
+ return TAG_MASK(Tag::SESSION_RECONNECT_OK) |
+ TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) |
+ TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT);
+ }
+ case Tag::SESSION_RESET:
+ return TAG_MASK(Tag::CLIENT_IDENT);
+ case Tag::SESSION_RETRY:
+ case Tag::SESSION_RETRY_GLOBAL:
+ return TAG_MASK(Tag::SESSION_RECONNECT);
+ case Tag::SERVER_IDENT:
+ case Tag::SESSION_RECONNECT_OK:
+ case Tag::KEEPALIVE2:
+ case Tag::KEEPALIVE2_ACK:
+ case Tag::ACK:
+ case Tag::MESSAGE:
+ ceph_assert(state == READY);
+ return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
+ TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
+ case Tag::IDENT_MISSING_FEATURES:
+ case Tag::WAIT:
+ return 0; // the peer should reset when receiving these tags
+ }
+ return 0;
+}
+
CtPtr ProtocolV2::read_frame() {
if (state == CLOSED) {
return nullptr;
return _fault();
}
+ Tag received_tag = next_tag;
next_tag = static_cast<Tag>(main_preamble.tag);
+ uint64_t expected_tag_mask = expected_tags(sent_tag, received_tag);
+ if (!(TAG_MASK(next_tag) & expected_tag_mask)) {
+ lderr(cct) << __func__ << " received unexpected tag: expected=0x"
+ << std::hex << expected_tag_mask << " got=0x"
+ << TAG_MASK(next_tag) << std::dec << dendl;
+ return _fault();
+ }
+
rx_segments_desc.clear();
rx_segments_data.clear();
next_payload_len = 0;