SocketMessenger& messenger)
: Protocol(proto_t::v2, dispatcher, conn),
messenger{messenger},
- protocol_timer{conn},
- tx_frame_asm(&session_stream_handlers, false)
+ protocol_timer{conn}
{}
ProtocolV2::~ProtocolV2() {}
size_t ProtocolV2::get_current_msg_size() const
{
- ceph_assert(!rx_segments_desc.empty());
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
size_t sum = 0;
// we don't include SegmentIndex::Msg::HEADER.
- for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
- sum += rx_segments_desc[idx].length;
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
}
return sum;
}
{
return read_exactly(sizeof(preamble_block_t))
.then([this] (auto bl) {
- if (session_stream_handlers.rx) {
- session_stream_handlers.rx->reset_rx_handler();
- /*
- bl = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(bl), segment_t::DEFAULT_ALIGNMENT);
- */
- }
-
- // I expect ceph_le32 will make the endian conversion for me. Passing
- // everything through ::Decode is unnecessary.
- const auto& main_preamble = \
- *reinterpret_cast<const preamble_block_t*>(bl.get());
- logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
- conn, bl.size(), (int)main_preamble.tag,
- (int)main_preamble.num_segments, main_preamble.crc);
-
- // verify preamble's CRC before any further processing
- const auto rx_crc = ceph_crc32c(0,
- reinterpret_cast<const unsigned char*>(&main_preamble),
- sizeof(main_preamble) - sizeof(main_preamble.crc));
- if (rx_crc != main_preamble.crc) {
- logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
- conn, rx_crc, main_preamble.crc);
- abort_in_fault();
- }
-
- // currently we do support between 1 and MAX_NUM_SEGMENTS segments
- if (main_preamble.num_segments < 1 ||
- main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- logger().warn("{} unsupported num_segments={}",
- conn, main_preamble.num_segments);
- abort_in_fault();
- }
- if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- logger().warn("{} num_segments too much: {}",
- conn, main_preamble.num_segments);
- abort_in_fault();
- }
-
- rx_segments_desc.clear();
rx_segments_data.clear();
-
- for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
- logger().trace("{} GOT frame segment: len={} align={}",
- conn, main_preamble.segments[idx].length,
- main_preamble.segments[idx].alignment);
- rx_segments_desc.emplace_back(main_preamble.segments[idx]);
+ try {
+ bufferlist preamble;
+ preamble.append(buffer::create(std::move(bl)));
+ const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ INTERCEPT_FRAME(tag, bp_type_t::READ);
+ return tag;
+ } catch (FrameError& e) {
+ logger().warn("{} read_main_preamble: {}", conn, e.what());
+ abort_in_fault();
}
-
- INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
- return static_cast<Tag>(main_preamble.tag);
});
}
seastar::future<> ProtocolV2::read_frame_payload()
{
- ceph_assert(!rx_segments_desc.empty());
ceph_assert(rx_segments_data.empty());
return seastar::do_until(
- [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
+ [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
[this] {
- // description of current segment to read
- const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
// TODO: create aligned and contiguous buffer from socket
- if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
+ const size_t seg_idx = rx_segments_data.size();
+ if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+ alignment != segment_t::DEFAULT_ALIGNMENT) {
logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
- conn, cur_rx_desc.alignment, rx_segments_data.size());
+ conn, alignment, rx_segments_data.size());
}
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
// TODO: create aligned and contiguous buffer from socket
- return read_exactly(cur_rx_desc.length)
- .then([this] (auto tmp_bl) {
+ return read_exactly(onwire_len).then([this] (auto tmp_bl) {
logger().trace("{} RECV({}) frame segment[{}]",
conn, tmp_bl.size(), rx_segments_data.size());
bufferlist data;
});
}
).then([this] {
- // TODO: get_epilogue_size()
ceph_assert(!session_stream_handlers.rx);
- return read_exactly(sizeof(epilogue_crc_rev0_block_t));
+ return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
}).then([this] (auto bl) {
logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
-
- __u8 late_flags;
- if (session_stream_handlers.rx) {
- // TODO
- ceph_assert(false);
- } else {
- auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
- for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
- const __u32 expected_crc = epilogue.crc_values[idx];
- const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
- if (expected_crc != calculated_crc) {
- logger().warn("{} message integrity check failed at index {}:"
- " expected_crc={} calculated_crc={}",
- conn, (unsigned int)idx, expected_crc, calculated_crc);
- abort_in_fault();
- } else {
- logger().trace("{} message integrity check success at index {}: crc={}",
- conn, (unsigned int)idx, expected_crc);
- }
- }
- late_flags = epilogue.late_flags;
+ bool ok = false;
+ try {
+ // TODO: v2.1 rx_frame_asm.disassemble_first_segment();
+ bufferlist rx_epilogue;
+ rx_epilogue.append(buffer::create(std::move(bl)));
+ ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ logger().error("read_frame_payload: {} {}", conn, e.what());
+ abort_in_fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ logger().error("read_frame_payload: {} bad auth tag", conn);
+ abort_in_fault();
}
- logger().trace("{} GOT frame epilogue: late_flags={}",
- conn, (unsigned)late_flags);
-
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (late_flags & FRAME_LATE_FLAG_ABORTED) {
+ if (!ok) {
// TODO
ceph_assert(false);
}