mon/MonClient.cc
${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
set(crimson_net_srcs
+ ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
net/Errors.cc
net/Messenger.cc
net/SocketConnection.cc
SocketMessenger& messenger)
: Protocol(proto_t::v2, dispatcher, conn),
messenger{messenger},
- protocol_timer{conn}
+ protocol_timer{conn},
+ tx_frame_asm(&session_stream_handlers)
{}
ProtocolV2::~ProtocolV2() {}
template <class F>
seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
{
- auto bl = frame.get_buffer(session_stream_handlers);
+ auto bl = frame.get_buffer(tx_frame_asm);
const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
conn, bl.length(), (int)main_preamble->tag,
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+ bl.append(keepalive_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
}
if (unlikely(_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
- bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+ bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
}
if (require_ack && !num_msgs) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
- bl.append(ack_frame.get_buffer(session_stream_handlers));
+ bl.append(ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
msg->get_payload(), msg->get_middle(), msg->get_data());
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(message.get_buffer(session_stream_handlers));
+ bl.append(message.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
});
ceph::crypto::onwire::rxtx_t session_stream_handlers;
boost::container::static_vector<ceph::msgr::v2::segment_t,
ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
+ ceph::msgr::v2::FrameAssembler tx_frame_asm;
ceph::msgr::v2::segment_bls_t rx_segments_data;
size_t get_current_msg_size() const;
async/PosixStack.cc
async/Stack.cc
async/crypto_onwire.cc
+ async/frames_v2.cc
async/net_handler.cc)
if(LINUX)
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
+ tx_frame_asm(&session_stream_handlers),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
}
bool ProtocolV2::append_frame(F& frame) {
ceph::bufferlist bl;
try {
- bl = frame.get_buffer(session_stream_handlers);
+ bl = frame.get_buffer(tx_frame_asm);
} catch (ceph::crypto::onwire::TxHandlerError &e) {
ldout(cct, 1) << __func__ << " " << e.what() << dendl;
return false;
}
+
+ ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+ << " bytes " << tx_frame_asm << dendl;
connection->outgoing_bl.append(bl);
return true;
}
F &frame) {
ceph::bufferlist bl;
try {
- bl = frame.get_buffer(session_stream_handlers);
+ bl = frame.get_buffer(tx_frame_asm);
} catch (ceph::crypto::onwire::TxHandlerError &e) {
ldout(cct, 1) << __func__ << " " << e.what() << dendl;
return _fault();
}
+
+ ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+ << " bytes " << tx_frame_asm << dendl;
return write(desc, next, bl);
}
boost::container::static_vector<ceph::msgr::v2::segment_t,
ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
+ ceph::msgr::v2::FrameAssembler tx_frame_asm;
ceph::msgr::v2::segment_bls_t rx_segments_data;
ceph::msgr::v2::Tag next_tag;
utime_t backoff; // backoff time
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "frames_v2.h"
+
+#include <ostream>
+
+#include <fmt/format.h>
+
+namespace ceph::msgr::v2 {
+
+// Discards trailing empty segments, unless there is just one segment.
+// A frame always has at least one (possibly empty) segment.
+static size_t calc_num_segments(const bufferlist segment_bls[],
+ size_t segment_count) {
+ ceph_assert(segment_count > 0 && segment_count <= MAX_NUM_SEGMENTS);
+ for (size_t i = segment_count; i-- > 0; ) {
+ if (segment_bls[i].length() > 0) {
+ return i + 1;
+ }
+ }
+ return 1;
+}
+
+void FrameAssembler::fill_preamble(Tag tag,
+ preamble_block_t& preamble) const {
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ ::memset(&preamble, 0, sizeof(preamble));
+
+ preamble.tag = static_cast<__u8>(tag);
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ preamble.segments[i].length = m_descs[i].logical_len;
+ preamble.segments[i].alignment = m_descs[i].align;
+ }
+ preamble.num_segments = m_descs.size();
+ preamble.crc = ceph_crc32c(
+ 0, reinterpret_cast<const unsigned char*>(&preamble),
+ sizeof(preamble) - sizeof(preamble.crc));
+}
+
+uint64_t FrameAssembler::get_frame_logical_len() const {
+ ceph_assert(!m_descs.empty());
+ uint64_t logical_len = 0;
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ logical_len += m_descs[i].logical_len;
+ }
+ return logical_len;
+}
+
+uint64_t FrameAssembler::get_frame_onwire_len() const {
+ ceph_assert(!m_descs.empty());
+ uint64_t onwire_len = get_preamble_onwire_len();
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ onwire_len += get_segment_onwire_len(i);
+ }
+ onwire_len += get_epilogue_onwire_len();
+ return onwire_len;
+}
+
+bufferlist FrameAssembler::asm_crc_rev0(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const {
+ epilogue_plain_block_t epilogue;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ ::memset(&epilogue, 0, sizeof(epilogue));
+
+ bufferlist frame_bl(sizeof(preamble) + sizeof(epilogue));
+ frame_bl.append(reinterpret_cast<const char*>(&preamble), sizeof(preamble));
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+ epilogue.crc_values[i] = segment_bls[i].crc32c(-1);
+ if (segment_bls[i].length() > 0) {
+ frame_bl.claim_append(segment_bls[i]);
+ }
+ }
+ frame_bl.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
+ return frame_bl;
+}
+
+bufferlist FrameAssembler::asm_secure_rev0(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const {
+ bufferlist preamble_bl(sizeof(preamble));
+ preamble_bl.append(reinterpret_cast<const char*>(&preamble),
+ sizeof(preamble));
+
+ epilogue_secure_block_t epilogue;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ ::memset(&epilogue, 0, sizeof(epilogue));
+ bufferlist epilogue_bl(sizeof(epilogue));
+ epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
+ sizeof(epilogue));
+
+ // preamble + MAX_NUM_SEGMENTS + epilogue
+ uint32_t onwire_lens[MAX_NUM_SEGMENTS + 2];
+ onwire_lens[0] = preamble_bl.length();
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ onwire_lens[i + 1] = segment_bls[i].length(); // already padded
+ }
+ onwire_lens[m_descs.size() + 1] = epilogue_bl.length();
+ m_crypto->tx->reset_tx_handler(onwire_lens,
+ onwire_lens + m_descs.size() + 2);
+ m_crypto->tx->authenticated_encrypt_update(preamble_bl);
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ if (segment_bls[i].length() > 0) {
+ m_crypto->tx->authenticated_encrypt_update(segment_bls[i]);
+ }
+ }
+ m_crypto->tx->authenticated_encrypt_update(epilogue_bl);
+ return m_crypto->tx->authenticated_encrypt_final();
+}
+
+bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
+ const uint16_t segment_aligns[],
+ size_t segment_count) {
+ m_descs.resize(calc_num_segments(segment_bls, segment_count));
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ m_descs[i].logical_len = segment_bls[i].length();
+ m_descs[i].align = segment_aligns[i];
+ }
+
+ preamble_block_t preamble;
+ fill_preamble(tag, preamble);
+
+ if (m_crypto->rx) {
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
+ // We're padding segments to biggest cipher's block size. Although
+ // AES-GCM can live without that as it's a stream cipher, we don't
+ // want to be fixed to stream ciphers only.
+ uint32_t padded_len = get_segment_padded_len(i);
+ if (padded_len > segment_bls[i].length()) {
+ uint32_t pad_len = padded_len - segment_bls[i].length();
+ segment_bls[i].reserve(pad_len);
+ segment_bls[i].append_zero(pad_len);
+ }
+ }
+ return asm_secure_rev0(preamble, segment_bls);
+ }
+ return asm_crc_rev0(preamble, segment_bls);
+}
+
+std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
+ if (!frame_asm.m_descs.empty()) {
+ os << frame_asm.get_preamble_onwire_len();
+ for (size_t i = 0; i < frame_asm.m_descs.size(); i++) {
+ os << " + " << frame_asm.get_segment_onwire_len(i)
+ << " (logical " << frame_asm.m_descs[i].logical_len
+ << "/" << frame_asm.m_descs[i].align << ")";
+ }
+ os << " + " << frame_asm.get_epilogue_onwire_len() << " ";
+ }
+ os << "rx=" << frame_asm.m_crypto->rx.get()
+ << " tx=" << frame_asm.m_crypto->tx.get();
+ return os;
+}
+
+} // namespace ceph::msgr::v2
#include "common/Clock.h"
#include "crypto_onwire.h"
#include <array>
+#include <iosfwd>
#include <utility>
#include <boost/container/static_vector.hpp>
return p2roundup<uint32_t>(logical_size, CRYPTO_BLOCK_SIZE);
}
-static inline ceph::bufferlist segment_onwire_bufferlist(ceph::bufferlist&& bl)
-{
- const auto padding_size = segment_onwire_size(bl.length()) - bl.length();
- if (padding_size) {
- bl.append_zero(padding_size);
+struct FrameError : std::runtime_error {
+ using runtime_error::runtime_error;
+};
+
+class FrameAssembler {
+public:
+ // crypto must be non-null
+ FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto)
+ : m_crypto(crypto) {}
+
+ size_t get_num_segments() const {
+ ceph_assert(!m_descs.empty());
+ return m_descs.size();
}
- return std::move(bl);
-}
-template <class T, uint16_t... SegmentAlignmentVs>
-struct Frame {
- static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs);
- static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
-protected:
- std::array<ceph::bufferlist, SegmentsNumV> segments;
+ uint32_t get_segment_logical_len(size_t seg_idx) const {
+ ceph_assert(seg_idx < m_descs.size());
+ return m_descs[seg_idx].logical_len;
+ }
-private:
- static constexpr std::array<uint16_t, SegmentsNumV> alignments {
- SegmentAlignmentVs...
- };
- ceph::bufferlist::contiguous_filler preamble_filler;
+ uint16_t get_segment_align(size_t seg_idx) const {
+ ceph_assert(seg_idx < m_descs.size());
+ return m_descs[seg_idx].align;
+ }
- __u8 calc_num_segments(const segment_t segments[])
- {
- for (__u8 num = SegmentsNumV; num > 0; num--) {
- if (segments[num-1].length) {
- return num;
- }
+ uint32_t get_preamble_onwire_len() const {
+ return sizeof(preamble_block_t);
+ }
+
+ uint32_t get_segment_onwire_len(size_t seg_idx) const {
+ ceph_assert(seg_idx < m_descs.size());
+ if (m_crypto->rx) {
+ return get_segment_padded_len(seg_idx);
}
- // frame always has at least one segment.
- return 1;
+ return m_descs[seg_idx].logical_len;
}
- // craft the main preamble. It's always present regardless of the number
- // of segments message is composed from.
- void fill_preamble() {
- ceph_assert(std::size(segments) <= MAX_NUM_SEGMENTS);
-
- preamble_block_t main_preamble;
- // FIPS zeroization audit 20191115: this memset is not security related.
- ::memset(&main_preamble, 0, sizeof(main_preamble));
-
- main_preamble.tag = static_cast<__u8>(T::tag);
- ceph_assert(main_preamble.tag != 0);
-
- // implementation detail: the first bufferlist of Frame::segments carries
- // space for preamble. This glueing isn't a part of the onwire format but
- // just our private detail.
- main_preamble.segments[0].length =
- segments[0].length() - FRAME_PREAMBLE_SIZE;
- main_preamble.segments[0].alignment = alignments[0];
-
- // there is no business in issuing frame without at least one segment
- // filled.
- if constexpr(SegmentsNumV > 1) {
- for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
- main_preamble.segments[idx].length = segments[idx].length();
- main_preamble.segments[idx].alignment = alignments[idx];
- }
+ uint32_t get_epilogue_onwire_len() const {
+ ceph_assert(!m_descs.empty());
+ if (m_crypto->rx) {
+ return sizeof(epilogue_secure_block_t) + get_auth_tag_len();
}
- // calculate the number of non-empty segments.
- // TODO: reorder segments to get DATA first
- main_preamble.num_segments = calc_num_segments(main_preamble.segments);
+ return sizeof(epilogue_plain_block_t);
+ }
- main_preamble.crc =
- ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
- sizeof(main_preamble) - sizeof(main_preamble.crc));
+ uint64_t get_frame_logical_len() const;
+ uint64_t get_frame_onwire_len() const;
- preamble_filler.copy_in(sizeof(main_preamble),
- reinterpret_cast<const char *>(&main_preamble));
- }
+ bufferlist assemble_frame(Tag tag, bufferlist segment_bls[],
+ const uint16_t segment_aligns[],
+ size_t segment_count);
- template <size_t... Is>
- void reset_tx_handler(
- ceph::crypto::onwire::rxtx_t &session_stream_handlers,
- std::index_sequence<Is...>)
- {
- session_stream_handlers.tx->reset_tx_handler({ segments[Is].length()...,
- sizeof(epilogue_secure_block_t) });
+private:
+ struct segment_desc_t {
+ uint32_t logical_len;
+ uint16_t align;
+ };
+
+ uint32_t get_segment_padded_len(size_t seg_idx) const {
+ return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
+ CRYPTO_BLOCK_SIZE);
}
-public:
- ceph::bufferlist get_buffer(
- ceph::crypto::onwire::rxtx_t &session_stream_handlers)
- {
- fill_preamble();
- if (session_stream_handlers.tx) {
- // we're padding segments to biggest cipher's block size. Although
- // AES-GCM can live without that as it's a stream cipher, we don't
- // to be fixed to stream ciphers only.
- for (auto& segment : segments) {
- segment = segment_onwire_bufferlist(std::move(segment));
- }
+ uint32_t get_auth_tag_len() const {
+ return m_crypto->rx->get_extra_size_at_final();
+ }
- // let's cipher allocate one huge buffer for entire ciphertext.
- reset_tx_handler(
- session_stream_handlers, std::make_index_sequence<SegmentsNumV>());
+ bufferlist asm_crc_rev0(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const;
+ bufferlist asm_secure_rev0(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const;
- for (auto& segment : segments) {
- if (segment.length()) {
- session_stream_handlers.tx->authenticated_encrypt_update(
- std::move(segment));
- }
- }
+ void fill_preamble(Tag tag, preamble_block_t& preamble) const;
+ friend std::ostream& operator<<(std::ostream& os,
+ const FrameAssembler& frame_asm);
- // in secure mode we craft only the late_flags. Signature (for AES-GCM
- // called auth tag) will be added by the cipher.
- {
- epilogue_secure_block_t epilogue;
- // FIPS zeroization audit 20191115: this memset is not security
- // related.
- ::memset(&epilogue, 0, sizeof(epilogue));
- ceph::bufferlist epilogue_bl;
- epilogue_bl.append(reinterpret_cast<const char*>(&epilogue),
- sizeof(epilogue));
- session_stream_handlers.tx->authenticated_encrypt_update(epilogue_bl);
- }
- return session_stream_handlers.tx->authenticated_encrypt_final();
- } else {
- // plain mode
- epilogue_plain_block_t epilogue;
- // FIPS zeroization audit 20191115: this memset is not security related.
- ::memset(&epilogue, 0, sizeof(epilogue));
-
- ceph::bufferlist::const_iterator hdriter(&segments.front(),
- FRAME_PREAMBLE_SIZE);
- epilogue.crc_values[SegmentIndex::Control::PAYLOAD] =
- hdriter.crc32c(hdriter.get_remaining(), -1);
- if constexpr(SegmentsNumV > 1) {
- for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
- epilogue.crc_values[idx] = segments[idx].crc32c(-1);
- }
- }
+ boost::container::static_vector<segment_desc_t, MAX_NUM_SEGMENTS> m_descs;
+ const ceph::crypto::onwire::rxtx_t* m_crypto;
+};
- ceph::bufferlist ret;
- for (auto& segment : segments) {
- ret.claim_append(segment);
- }
- ret.append(reinterpret_cast<const char*>(&epilogue), sizeof(epilogue));
- return ret;
- }
- }
+template <class T, uint16_t... SegmentAlignmentVs>
+struct Frame {
+ static constexpr size_t SegmentsNumV = sizeof...(SegmentAlignmentVs);
+ static_assert(SegmentsNumV > 0 && SegmentsNumV <= MAX_NUM_SEGMENTS);
+protected:
+ std::array<ceph::bufferlist, SegmentsNumV> segments;
- Frame()
- : preamble_filler(segments.front().append_hole(FRAME_PREAMBLE_SIZE)) {
- }
+private:
+ static constexpr std::array<uint16_t, SegmentsNumV> alignments {
+ SegmentAlignmentVs...
+ };
public:
+ ceph::bufferlist get_buffer(FrameAssembler& tx_frame_asm) {
+ auto bl = tx_frame_asm.assemble_frame(T::tag, segments.data(),
+ alignments.data(), SegmentsNumV);
+ ceph_assert(bl.length() == tx_frame_asm.get_frame_onwire_len());
+ return bl;
+ }
};
-
// ControlFrames are used to manage transceiver state (like connections) and
// orchestrate transfers of MessageFrames. They use only single segment with
// marshalling facilities -- derived classes specify frame structure through