From 7d71884bfbf3c1f77032bc3bc0cb9344ea74660a Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 6 Mar 2019 22:58:48 +0800 Subject: [PATCH] crimson/net: support frame read/write for ProtocolV2 Signed-off-by: Yingxin Cheng --- src/crimson/net/ProtocolV2.cc | 199 ++++++++++++++++++++++++++++++++++ src/crimson/net/ProtocolV2.h | 25 +++++ 2 files changed, 224 insertions(+) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index c0b81c833f51e..21415e16ab50d 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -86,6 +86,205 @@ void ProtocolV2::start_accept(SocketFRef&& sock, execute_accepting(); } +// TODO: Frame related implementations, probably to a separate class. + +void ProtocolV2::enable_recording() +{ + ceph_assert(!rxbuf.length()); + ceph_assert(!txbuf.length()); + ceph_assert(!record_io); + record_io = true; +} + +seastar::future ProtocolV2::read_exactly(size_t bytes) +{ + if (unlikely(record_io)) { + return socket->read_exactly(bytes) + .then([this] (auto bl) { + rxbuf.append(buffer::create(bl.share())); + return std::move(bl); + }); + } else { + return socket->read_exactly(bytes); + }; +} + +seastar::future ProtocolV2::read(size_t bytes) +{ + if (unlikely(record_io)) { + return socket->read(bytes) + .then([this] (auto buf) { + rxbuf.append(buf); + return std::move(buf); + }); + } else { + return socket->read(bytes); + } +} + +seastar::future<> ProtocolV2::write(bufferlist&& buf) +{ + if (unlikely(record_io)) { + txbuf.append(buf); + } + return socket->write(std::move(buf)); +} + +seastar::future<> ProtocolV2::write_flush(bufferlist&& buf) +{ + if (unlikely(record_io)) { + txbuf.append(buf); + } + return socket->write_flush(std::move(buf)); +} + +size_t ProtocolV2::get_current_msg_size() const +{ + ceph_assert(!rx_segments_desc.empty()); + size_t sum = 0; + // we don't include SegmentIndex::Msg::HEADER. + for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) { + sum += rx_segments_desc[idx].length; + } + return sum; +} + +seastar::future ProtocolV2::read_main_preamble() +{ + return read_exactly(FRAME_PREAMBLE_SIZE) + .then([this] (auto bl) { + if (session_stream_handlers.rx) { + session_stream_handlers.rx->reset_rx_handler(); + /* + bl = session_stream_handlers.rx->authenticated_decrypt_update( + std::move(bl), segment_t::DEFAULT_ALIGNMENT); + */ + } + + // I expect ceph_le32 will make the endian conversion for me. Passing + // everything through ::Decode is unnecessary. + const auto& main_preamble = \ + *reinterpret_cast(bl.get()); + + // verify preamble's CRC before any further processing + const auto rx_crc = ceph_crc32c(0, + reinterpret_cast(&main_preamble), + sizeof(main_preamble) - sizeof(main_preamble.crc)); + if (rx_crc != main_preamble.crc) { + logger().error("{} crc mismatch for main preamble rx_crc={} tx_crc={}", + conn, rx_crc, main_preamble.crc); + abort_in_fault(); + } + logger().debug("{} read main preamble: tag={}, len={}", conn, (int)main_preamble.tag, bl.size()); + + // currently we do support between 1 and MAX_NUM_SEGMENTS segments + if (main_preamble.num_segments < 1 || + main_preamble.num_segments > MAX_NUM_SEGMENTS) { + logger().error("{} unsupported num_segments={}", + conn, main_preamble.num_segments); + abort_in_fault(); + } + if (main_preamble.num_segments > MAX_NUM_SEGMENTS) { + logger().error("{} num_segments too much: {}", + conn, main_preamble.num_segments); + abort_in_fault(); + } + + rx_segments_desc.clear(); + rx_segments_data.clear(); + + for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) { + logger().debug("{} got new segment: len={} align={}", + conn, main_preamble.segments[idx].length, + main_preamble.segments[idx].alignment); + rx_segments_desc.emplace_back(main_preamble.segments[idx]); + } + + return static_cast(main_preamble.tag); + }); +} + +seastar::future<> ProtocolV2::read_frame_payload() +{ + ceph_assert(!rx_segments_desc.empty()); + ceph_assert(rx_segments_data.empty()); + + return seastar::do_until( + [this] { return rx_segments_desc.size() == rx_segments_data.size(); }, + [this] { + // description of current segment to read + const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size()); + // TODO: create aligned and contiguous buffer from socket + if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) { + logger().debug("{} cannot allocate {} aligned buffer at segment desc index {}", + conn, cur_rx_desc.alignment, rx_segments_data.size()); + } + // TODO: create aligned and contiguous buffer from socket + return read_exactly(cur_rx_desc.length) + .then([this] (auto tmp_bl) { + bufferlist data; + data.append(buffer::create(std::move(tmp_bl))); + logger().debug("{} read frame segment[{}], length={}", + conn, rx_segments_data.size(), data.length()); + if (session_stream_handlers.rx) { + // TODO + ceph_assert(false); + } + rx_segments_data.emplace_back(std::move(data)); + }); + } + ).then([this] { + // TODO: get_epilogue_size() + ceph_assert(!session_stream_handlers.rx); + return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE); + }).then([this] (auto bl) { + logger().debug("{} read frame epilogue length={}", conn, bl.size()); + + __u8 late_flags; + if (session_stream_handlers.rx) { + // TODO + ceph_assert(false); + } else { + auto& epilogue = *reinterpret_cast(bl.get()); + for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) { + const __u32 expected_crc = epilogue.crc_values[idx]; + const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1); + if (expected_crc != calculated_crc) { + logger().error("{} message integrity check failed at index {}:" + " expected_crc={} calculated_crc={}", + conn, (unsigned int)idx, expected_crc, calculated_crc); + abort_in_fault(); + } else { + logger().debug("{} message integrity check success at index {}: crc={}", + conn, (unsigned int)idx, expected_crc); + } + } + late_flags = epilogue.late_flags; + } + + // we do have a mechanism that allows transmitter to start sending message + // and abort after putting entire data field on wire. This will be used by + // the kernel client to avoid unnecessary buffering. + if (late_flags & FRAME_FLAGS_LATEABRT) { + // TODO + ceph_assert(false); + } + }); +} + +template +seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) +{ + auto bl = frame.get_buffer(session_stream_handlers); + logger().debug("{} write frame: tag={}, len={}", conn, + static_cast(frame.tag), bl.length()); + if (flush) { + return write_flush(std::move(bl)); + } else { + return write(std::move(bl)); + } +} + void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant) { if (!reentrant && _state == state) { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 3c761c9fa3cb5..a57d266870600 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -5,6 +5,7 @@ #include "Protocol.h" #include "msg/async/frames_v2.h" +#include "msg/async/crypto_onwire.h" namespace ceph::net { @@ -63,6 +64,30 @@ class ProtocolV2 final : public Protocol { uint64_t global_seq = 0; + // TODO: Frame related implementations, probably to a separate class. + private: + bool record_io = false; + ceph::bufferlist rxbuf; + ceph::bufferlist txbuf; + + void enable_recording(); + seastar::future read_exactly(size_t bytes); + seastar::future read(size_t bytes); + seastar::future<> write(bufferlist&& buf); + seastar::future<> write_flush(bufferlist&& buf); + + ceph::crypto::onwire::rxtx_t session_stream_handlers; + boost::container::static_vector rx_segments_desc; + boost::container::static_vector rx_segments_data; + + size_t get_current_msg_size() const; + seastar::future read_main_preamble(); + seastar::future<> read_frame_payload(); + template + seastar::future<> write_frame(F &frame, bool flush=true); + private: seastar::future<> fault(); seastar::future<> banner_exchange(); -- 2.39.5