#define READB(L, B, C) read(CONTINUATION(C), L, B)
-#define TAG_MASK(T) (1 << ((uint64_t)(T)-1))
-
#ifdef UNIT_TESTS_BUILT
+
#define INTERCEPT(S) { \
if(connection->interceptor) { \
auto a = connection->interceptor->intercept(connection, (S)); \
can_write(false),
bannerExchangeCallback(nullptr),
next_payload_len(0),
- sent_tag(static_cast<Tag>(0)),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
temp_buffer = new char[4096];
connection->pendingReadLen.reset();
connection->writeCallback.reset();
- sent_tag = static_cast<Tag>(0);
next_tag = static_cast<Tag>(0);
reset_throttle();
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
- sent_tag = message.tag;
-
m->trace.event("async writing message");
ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
<< " src=" << entity_name_t(messenger->get_myname())
s = in_seq;
auto ack = AckFrame::Encode(session_stream_handlers, in_seq);
connection->outcoming_bl.claim_append(ack.get_buffer());
- sent_tag = ack.tag;
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
ack_left -= left;
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_PARAM(next, ProtocolV2),
F &frame) {
- sent_tag = frame.tag;
return write(desc, CONTINUATION(next), frame.get_buffer());
}
return callback;
}
-uint64_t ProtocolV2::expected_tags(Tag sent_tag, Tag received_tag) {
- switch(sent_tag) {
- case Tag::HELLO:
- if (received_tag == Tag::HELLO) {
- ceph_assert(state == AUTH_ACCEPTING);
- return TAG_MASK(Tag::AUTH_REQUEST);
- } else {
- return TAG_MASK(Tag::HELLO);
- }
- case Tag::AUTH_REQUEST:
- case Tag::AUTH_REQUEST_MORE:
- return TAG_MASK(Tag::AUTH_REPLY_MORE) | TAG_MASK(Tag::AUTH_DONE) |
- TAG_MASK(Tag::AUTH_BAD_METHOD);
- case Tag::AUTH_BAD_METHOD:
- return TAG_MASK(Tag::AUTH_REQUEST);
- case Tag::AUTH_REPLY_MORE:
- return TAG_MASK(Tag::AUTH_REQUEST_MORE);
- case Tag::AUTH_DONE:
- return TAG_MASK(Tag::CLIENT_IDENT) | TAG_MASK(Tag::SESSION_RECONNECT);
- case Tag::CLIENT_IDENT:
- if (state == READY) {
- return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
- TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
- } else {
- ceph_assert(state == SESSION_CONNECTING);
- return TAG_MASK(Tag::SERVER_IDENT) |
- TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT);
- }
- case Tag::SESSION_RECONNECT:
- if (state == READY) {
- return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
- TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
- } else {
- ceph_assert(state == SESSION_RECONNECTING);
- return TAG_MASK(Tag::SESSION_RECONNECT_OK) |
- TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) |
- TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT);
- }
- case Tag::SESSION_RESET:
- return TAG_MASK(Tag::CLIENT_IDENT);
- case Tag::SESSION_RETRY:
- case Tag::SESSION_RETRY_GLOBAL:
- return TAG_MASK(Tag::SESSION_RECONNECT);
- case Tag::SERVER_IDENT:
- case Tag::SESSION_RECONNECT_OK:
- case Tag::KEEPALIVE2:
- case Tag::KEEPALIVE2_ACK:
- case Tag::ACK:
- case Tag::MESSAGE:
- ceph_assert(state == READY);
- return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
- TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
- case Tag::IDENT_MISSING_FEATURES:
- case Tag::WAIT:
- return 0; // the peer should reset when receiving these tags
- }
- return 0;
-}
-
CtPtr ProtocolV2::read_frame() {
if (state == CLOSED) {
return nullptr;
return _fault();
}
- Tag received_tag = next_tag;
next_tag = static_cast<Tag>(main_preamble.tag);
- uint64_t expected_tag_mask = expected_tags(sent_tag, received_tag);
- if (!(TAG_MASK(next_tag) & expected_tag_mask)) {
- lderr(cct) << __func__ << " received unexpected tag: expected=0x"
- << std::hex << expected_tag_mask << " got=0x"
- << TAG_MASK(next_tag) << std::dec << dendl;
- return _fault();
- }
-
rx_segments_desc.clear();
rx_segments_data.clear();