execute_accepting();
}
+// TODO: Frame related implementations, probably to a separate class.
+
+void ProtocolV2::enable_recording()
+{
+ ceph_assert(!rxbuf.length());
+ ceph_assert(!txbuf.length());
+ ceph_assert(!record_io);
+ record_io = true;
+}
+
+seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
+{
+ if (unlikely(record_io)) {
+ return socket->read_exactly(bytes)
+ .then([this] (auto bl) {
+ rxbuf.append(buffer::create(bl.share()));
+ return std::move(bl);
+ });
+ } else {
+ return socket->read_exactly(bytes);
+ };
+}
+
+seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
+{
+ if (unlikely(record_io)) {
+ return socket->read(bytes)
+ .then([this] (auto buf) {
+ rxbuf.append(buf);
+ return std::move(buf);
+ });
+ } else {
+ return socket->read(bytes);
+ }
+}
+
+seastar::future<> ProtocolV2::write(bufferlist&& buf)
+{
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return socket->write(std::move(buf));
+}
+
+seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
+{
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return socket->write_flush(std::move(buf));
+}
+
+size_t ProtocolV2::get_current_msg_size() const
+{
+ ceph_assert(!rx_segments_desc.empty());
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
+ sum += rx_segments_desc[idx].length;
+ }
+ return sum;
+}
+
+seastar::future<Tag> ProtocolV2::read_main_preamble()
+{
+ return read_exactly(FRAME_PREAMBLE_SIZE)
+ .then([this] (auto bl) {
+ if (session_stream_handlers.rx) {
+ session_stream_handlers.rx->reset_rx_handler();
+ /*
+ bl = session_stream_handlers.rx->authenticated_decrypt_update(
+ std::move(bl), segment_t::DEFAULT_ALIGNMENT);
+ */
+ }
+
+ // I expect ceph_le32 will make the endian conversion for me. Passing
+ // everything through ::Decode is unnecessary.
+ const auto& main_preamble = \
+ *reinterpret_cast<const preamble_block_t*>(bl.get());
+
+ // verify preamble's CRC before any further processing
+ const auto rx_crc = ceph_crc32c(0,
+ reinterpret_cast<const unsigned char*>(&main_preamble),
+ sizeof(main_preamble) - sizeof(main_preamble.crc));
+ if (rx_crc != main_preamble.crc) {
+ logger().error("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
+ conn, rx_crc, main_preamble.crc);
+ abort_in_fault();
+ }
+ logger().debug("{} read main preamble: tag={}, len={}", conn, (int)main_preamble.tag, bl.size());
+
+ // currently we do support between 1 and MAX_NUM_SEGMENTS segments
+ if (main_preamble.num_segments < 1 ||
+ main_preamble.num_segments > MAX_NUM_SEGMENTS) {
+ logger().error("{} unsupported num_segments={}",
+ conn, main_preamble.num_segments);
+ abort_in_fault();
+ }
+ if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
+ logger().error("{} num_segments too much: {}",
+ conn, main_preamble.num_segments);
+ abort_in_fault();
+ }
+
+ rx_segments_desc.clear();
+ rx_segments_data.clear();
+
+ for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
+ logger().debug("{} got new segment: len={} align={}",
+ conn, main_preamble.segments[idx].length,
+ main_preamble.segments[idx].alignment);
+ rx_segments_desc.emplace_back(main_preamble.segments[idx]);
+ }
+
+ return static_cast<Tag>(main_preamble.tag);
+ });
+}
+
+seastar::future<> ProtocolV2::read_frame_payload()
+{
+ ceph_assert(!rx_segments_desc.empty());
+ ceph_assert(rx_segments_data.empty());
+
+ return seastar::do_until(
+ [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
+ [this] {
+ // description of current segment to read
+ const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
+ // TODO: create aligned and contiguous buffer from socket
+ if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
+ logger().debug("{} cannot allocate {} aligned buffer at segment desc index {}",
+ conn, cur_rx_desc.alignment, rx_segments_data.size());
+ }
+ // TODO: create aligned and contiguous buffer from socket
+ return read_exactly(cur_rx_desc.length)
+ .then([this] (auto tmp_bl) {
+ bufferlist data;
+ data.append(buffer::create(std::move(tmp_bl)));
+ logger().debug("{} read frame segment[{}], length={}",
+ conn, rx_segments_data.size(), data.length());
+ if (session_stream_handlers.rx) {
+ // TODO
+ ceph_assert(false);
+ }
+ rx_segments_data.emplace_back(std::move(data));
+ });
+ }
+ ).then([this] {
+ // TODO: get_epilogue_size()
+ ceph_assert(!session_stream_handlers.rx);
+ return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
+ }).then([this] (auto bl) {
+ logger().debug("{} read frame epilogue length={}", conn, bl.size());
+
+ __u8 late_flags;
+ if (session_stream_handlers.rx) {
+ // TODO
+ ceph_assert(false);
+ } else {
+ auto& epilogue = *reinterpret_cast<const epilogue_plain_block_t*>(bl.get());
+ for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
+ const __u32 expected_crc = epilogue.crc_values[idx];
+ const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
+ if (expected_crc != calculated_crc) {
+ logger().error("{} message integrity check failed at index {}:"
+ " expected_crc={} calculated_crc={}",
+ conn, (unsigned int)idx, expected_crc, calculated_crc);
+ abort_in_fault();
+ } else {
+ logger().debug("{} message integrity check success at index {}: crc={}",
+ conn, (unsigned int)idx, expected_crc);
+ }
+ }
+ late_flags = epilogue.late_flags;
+ }
+
+ // we do have a mechanism that allows transmitter to start sending message
+ // and abort after putting entire data field on wire. This will be used by
+ // the kernel client to avoid unnecessary buffering.
+ if (late_flags & FRAME_FLAGS_LATEABRT) {
+ // TODO
+ ceph_assert(false);
+ }
+ });
+}
+
+template <class F>
+seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
+{
+ auto bl = frame.get_buffer(session_stream_handlers);
+ logger().debug("{} write frame: tag={}, len={}", conn,
+ static_cast<uint32_t>(frame.tag), bl.length());
+ if (flush) {
+ return write_flush(std::move(bl));
+ } else {
+ return write(std::move(bl));
+ }
+}
+
void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
{
if (!reentrant && _state == state) {