rx_segments_desc.clear();
rx_segments_data.clear();
- next_payload_len = 0;
if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
ldout(cct, 30) << __func__
get_onwire_size(main_preamble.segments[idx].length),
main_preamble.segments[idx]
});
- next_payload_len += rx_segments_desc.back().onwire_length;
- }
-
- if (session_stream_handlers.rx) {
- rx_segments_desc.back().onwire_length += \
- session_stream_handlers.rx->get_extra_size_at_final();
- next_payload_len += \
- session_stream_handlers.rx->get_extra_size_at_final();
}
}
}
CtPtr ProtocolV2::handle_read_frame_dispatch() {
- ldout(cct, 10) << __func__ << " next payload_len=" << next_payload_len
+ ldout(cct, 10) << __func__
<< " tag=" << static_cast<uint32_t>(next_tag) << dendl;
switch (next_tag) {
}
if (rx_segments_desc.size() == rx_segments_data.size()) {
- // OK, all segments planned to read are read. Can go with dispatch.
- return handle_read_frame_dispatch();
+ // OK, all segments planned to read are read. Can go with epilogue.
+ if (session_stream_handlers.rx) {
+ return READ(FRAME_EPILOGUE_SIZE, handle_read_frame_epilogue_main);
+ } else {
+ return handle_read_frame_dispatch();
+ }
} else {
// TODO: for makeshift only. This will be more generic and throttled
return read_frame_segment();
front.clear();
middle.clear();
data.clear();
- extra.clear();
+ epilogue.clear();
current_header = header;
// front
ceph_assert(!middle.length());
middle = std::move(rx_segments_data[SegmentIndex::Msg::MIDDLE]);
- next_payload_len -= sizeof(ceph_msg_header2);
- next_payload_len -= front.length();
- next_payload_len -= middle.length();
return read_message_data_prepare();
}
return READB(read_len, bp.c_str(), handle_message_data);
}
- next_payload_len -= rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
- if (next_payload_len) {
+ state = READ_MESSAGE_COMPLETE;
+ // TODO: implement epilogue for non-secure frames
+ if (session_stream_handlers.rx) {
+ return READ(FRAME_EPILOGUE_SIZE, handle_read_frame_epilogue_main);
+ } else {
+ return handle_read_frame_dispatch();
+ }
+}
+
+CtPtr ProtocolV2::handle_read_frame_epilogue_main(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read data error " << dendl;
+ return _fault();
+ }
+
+ if (session_stream_handlers.rx) {
// if we still have more bytes to read is because we signed or encrypted
// the message payload
- ldout(cct, 1) << __func__ << " reading message payload extra bytes left="
- << next_payload_len << dendl;
+ ldout(cct, 1) << __func__ << " read frame epilogue bytes="
+ << FRAME_EPILOGUE_SIZE << dendl;
ceph_assert(session_stream_handlers.rx && session_stream_handlers.tx &&
auth_meta->is_mode_secure());
- extra.push_back(buffer::create(next_payload_len));
- return READB(next_payload_len, extra.c_str(), handle_message_extra_bytes);
+ ceph_assert(FRAME_EPILOGUE_SIZE == \
+ session_stream_handlers.rx->get_extra_size_at_final());
+
+ // I expect that ::temp_buffer is being used here.
+ epilogue.push_back(buffer::create_static(FRAME_EPILOGUE_SIZE, buffer));
}
- state = READ_MESSAGE_COMPLETE;
+ // FIXME
+ rx_segments_data.back().claim_append(epilogue);
return handle_read_frame_dispatch();
}
return CONTINUE(read_message_data);
}
-CtPtr ProtocolV2::handle_message_extra_bytes(char *buffer, int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " read message extra bytes error " << dendl;
- return _fault();
- }
-
- state = READ_MESSAGE_COMPLETE;
- return handle_read_frame_dispatch();
-}
-
CtPtr ProtocolV2::handle_message_complete() {
ldout(cct, 20) << __func__ << dendl;
current_header.data_crc, 0, current_header.flags};
if (auth_meta->is_mode_secure()) {
- //msg_payload.claim_append(extra);
-
if (front.length()) {
front = session_stream_handlers.rx->authenticated_decrypt_update(
std::move(front), segment_t::DEFAULT_ALIGNMENT);
middle = session_stream_handlers.rx->authenticated_decrypt_update(
std::move(middle), segment_t::DEFAULT_ALIGNMENT);
}
- if (data.length()) {
- data = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(data), segment_t::DEFAULT_ALIGNMENT);
- }
+ // FIXME: append epilogue. This is really ugly.
+ data.claim_append(rx_segments_data[SegmentIndex::Msg::DATA]);
try {
- session_stream_handlers.rx->authenticated_decrypt_update_final(
- std::move(extra), segment_t::DEFAULT_ALIGNMENT);
+ data = session_stream_handlers.rx->authenticated_decrypt_update_final(
+ std::move(data), segment_t::DEFAULT_ALIGNMENT);
} catch (ceph::crypto::onwire::MsgAuthError &e) {
ldout(cct, 5) << __func__ << " message authentication failed: "
<< e.what() << dendl;
front.clear();
middle.clear();
data.clear();
- extra.clear();
+ epilogue.clear();
// we might have been reused by another connection
// let's check if that is the case
boost::container::static_vector<ceph::bufferlist,
ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data;
+ ceph::bufferlist epilogue;
private:
ceph::msgr::v2::Tag sent_tag;
unsigned msg_left;
bufferlist data_buf;
bufferlist::iterator data_blp;
- bufferlist front, middle, data, extra;
+ bufferlist front, middle, data;
bool keepalive;
CONTINUATION_DECL(ProtocolV2, read_frame);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_epilogue_main);
CONTINUATION_DECL(ProtocolV2, throttle_message);
CONTINUATION_DECL(ProtocolV2, throttle_bytes);
CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
CONTINUATION_DECL(ProtocolV2, read_message_data);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data);
- READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_extra_bytes);
Ct<ProtocolV2> *read_frame();
Ct<ProtocolV2> *handle_read_frame_preamble_main(char *buffer, int r);
- Ct<ProtocolV2> *handle_read_frame_dispatch();
Ct<ProtocolV2> *read_frame_segment();
Ct<ProtocolV2> *handle_read_frame_segment(char *buffer, int r);
+ Ct<ProtocolV2> *handle_read_frame_epilogue_main(char *buffer, int r);
+ Ct<ProtocolV2> *handle_read_frame_dispatch();
Ct<ProtocolV2> *handle_frame_payload();
Ct<ProtocolV2> *ready();
Ct<ProtocolV2> *read_message_data_prepare();
Ct<ProtocolV2> *read_message_data();
Ct<ProtocolV2> *handle_message_data(char *buffer, int r);
- Ct<ProtocolV2> *handle_message_extra_bytes(char *buffer, int r);
Ct<ProtocolV2> *handle_message_complete();
Ct<ProtocolV2> *handle_keepalive2(ceph::bufferlist &payload);