// everything crimson is experimental...
return true;
}
+ ceph::PluginRegistry* get_plugin_registry() {
+ return _plugin_registry;
+ }
CryptoRandom* random() const;
PerfCountersCollectionImpl* get_perfcounters_collection();
crimson::common::ConfigProxy& _conf;
private:
std::unique_ptr<CryptoRandom> _crypto_random;
unsigned nref;
+ ceph::PluginRegistry* _plugin_registry;
};
}
#else
- ms_service_mode
flags:
- startup
+- name: ms_osd_compress_mode
+ type: str
+ level: advanced
+ desc: Compression policy to use in Messenger for communicating with OSD
+ default: none
+ services:
+ - osd
+ enum_values:
+ - none
+ - force
+ see_also:
+ - ms_compress_secure
+ flags:
+ - runtime
+- name: ms_osd_compress_min_size
+ type: uint
+ level: advanced
+ desc: Minimal message size eligable for on-wire compression
+ default: 1_K
+ services:
+ - osd
+ see_also:
+ - ms_osd_compress_mode
+ flags:
+ - runtime
+- name: ms_osd_compression_algorithm
+ type: str
+ level: advanced
+ desc: Compression algorithm to use in Messenger when communicating with OSD
+ long_desc: Compression algorithm for connections with OSD in order of preference
+ default: snappy zlib zstd lz4
+ services:
+ - osd
+ see_also:
+ - ms_osd_compress_mode
+ flags:
+ - runtime
+- name: ms_compress_secure
+ type: bool
+ level: advanced
+ desc: Allowing compression when on-wire encryption is enabled
+ long_desc: Combining encryption with compression reduces the level of security of
+ messages between peers. In case both encryption and compression are enabled,
+ compression setting will be ignored and message will not be compressed.
+ This behaviour can be override using this setting.
+ default: false
+ see_also:
+ - ms_osd_compress_mode
+ flags:
+ - runtime
- name: ms_learn_addr_from_peer
type: bool
level: advanced
${PROJECT_SOURCE_DIR}/src/common/PluginRegistry.cc
${PROJECT_SOURCE_DIR}/src/common/RefCountedObj.cc
${PROJECT_SOURCE_DIR}/src/common/util.cc
+ ${PROJECT_SOURCE_DIR}/src/compressor/Compressor.cc
${PROJECT_SOURCE_DIR}/src/crush/builder.c
${PROJECT_SOURCE_DIR}/src/crush/mapper.c
${PROJECT_SOURCE_DIR}/src/crush/crush.c
${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
set(crimson_net_srcs
${PROJECT_SOURCE_DIR}/src/msg/async/crypto_onwire.cc
+ ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc
${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
net/Errors.cc
net/Messenger.cc
logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
bool ok = false;
try {
- rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
bufferlist rx_epilogue;
rx_epilogue.append(buffer::create(std::move(bl)));
- ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
} catch (FrameError& e) {
logger().error("read_frame_payload: {} {}", conn, e.what());
abort_in_fault();
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
+ session_comp_handlers = { nullptr, nullptr };
enable_recording();
return banner_exchange(false);
}).then([this] (auto&& ret) {
#include "Protocol.h"
#include "msg/async/frames_v2.h"
#include "msg/async/crypto_onwire.h"
+#include "msg/async/compression_onwire.h"
namespace crimson::net {
seastar::future<> write_flush(bufferlist&& buf);
ceph::crypto::onwire::rxtx_t session_stream_handlers;
- ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false};
- ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false};
+ ceph::compression::onwire::rxtx_t session_comp_handlers;
+ ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false, &session_comp_handlers};
+ ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false, &session_comp_handlers};
ceph::bufferlist rx_preamble;
ceph::msgr::v2::segment_bls_t rx_segments_data;
#define HAVE_MSGR2_FEATURE(x, name) \
(((x) & (CEPH_MSGR2_FEATUREMASK_##name)) == (CEPH_MSGR2_FEATUREMASK_##name))
-DEFINE_MSGR2_FEATURE( 0, 1, REVISION_1) // msgr2.1
+DEFINE_MSGR2_FEATURE(0, 1, REVISION_1) // msgr2.1
+DEFINE_MSGR2_FEATURE(1, 1, COMPRESSION) // on-wire compression
-#define CEPH_MSGR2_SUPPORTED_FEATURES (CEPH_MSGR2_FEATURE_REVISION_1)
+/*
+ * Features supported. Should be everything above.
+ */
+#define CEPH_MSGR2_SUPPORTED_FEATURES \
+ (CEPH_MSGR2_FEATURE_REVISION_1 | \
+ CEPH_MSGR2_FEATURE_COMPRESSION | \
+ 0ULL)
+
+#define CEPH_MSGR2_REQUIRED_FEATURES (0ULL)
-#define CEPH_MSGR2_REQUIRED_FEATURES (0ull)
/*
Message.cc
Messenger.cc
Connection.cc
- msg_types.cc)
+ msg_types.cc
+ compressor_registry.cc)
list(APPEND msg_srcs
async/AsyncConnection.cc
async/PosixStack.cc
async/Stack.cc
async/crypto_onwire.cc
+ async/compression_onwire.cc
async/frames_v2.cc
async/net_handler.cc)
socket_priority(-1),
cct(cct_),
crcflags(get_default_crc_flags(cct->_conf)),
- auth_registry(cct)
+ auth_registry(cct),
+ comp_registry(cct)
{
auth_registry.refresh_config();
+ comp_registry.refresh_config();
}
void Messenger::set_endpoint_addr(const entity_addr_t& a,
#include "auth/Crypto.h"
#include "common/item_history.h"
#include "auth/AuthRegistry.h"
+#include "compressor_registry.h"
#include "include/ceph_assert.h"
#include <errno.h>
READY,
HANDLE_MESSAGE,
READ_MESSAGE_COMPLETE,
- SESSION_RETRY
+ SESSION_RETRY,
+ SEND_COMPRESSION_REQUEST,
+ HANDLE_COMPRESSION_REQUEST
};
virtual ~Interceptor() {}
auth_server = as;
}
+ // for compression
+ CompressorRegistry comp_registry;
+
protected:
/**
* std::set messenger's address
<< " cs=" << connect_seq << " l=" << connection->policy.lossy
<< " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
REVISION_1)
- << " rx=" << session_stream_handlers.rx.get()
+ << " crypto rx=" << session_stream_handlers.rx.get()
<< " tx=" << session_stream_handlers.tx.get()
+ << " comp rx=" << session_compression_handlers.rx.get()
+ << " tx=" << session_compression_handlers.tx.get()
<< ").";
}
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
- tx_frame_asm(&session_stream_handlers, false),
- rx_frame_asm(&session_stream_handlers, false),
+ tx_frame_asm(&session_stream_handlers, false, &session_compression_handlers),
+ rx_frame_asm(&session_stream_handlers, false, &session_compression_handlers),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
}
// `write_event()` unlocks it just before calling `write_message()`.
// `submit_to()` here is NOT blocking.
connection->center->submit_to(connection->center->get_id(), [this] {
- ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+ ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
<< dendl;
// Possibly unnecessary. See the comment in `deactivate_existing`.
std::lock_guard<std::mutex> l(connection->lock);
std::lock_guard<std::mutex> wl(connection->write_lock);
reset_security();
+ reset_compression();
}, /* always_async = */true);
} else {
reset_security();
+ reset_compression();
}
// clean read and write callbacks
}
}
+void ProtocolV2::reset_compression() {
+ ldout(cct, 5) << __func__ << dendl;
+
+ comp_meta = CompConnectionMeta{};
+ session_compression_handlers.rx.reset(nullptr);
+ session_compression_handlers.tx.reset(nullptr);
+}
+
void ProtocolV2::write_event() {
ldout(cct, 10) << __func__ << dendl;
ssize_t r = 0;
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
case Tag::WAIT:
+ case Tag::COMPRESSION_REQUEST:
+ case Tag::COMPRESSION_DONE:
return handle_frame_payload();
case Tag::MESSAGE:
return handle_message();
return handle_message_ack(payload);
case Tag::WAIT:
return handle_wait(payload);
+ case Tag::COMPRESSION_REQUEST:
+ return handle_compression_request(payload);
+ case Tag::COMPRESSION_DONE:
+ return handle_compression_done(payload);
default:
ceph_abort();
}
}
CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
- bool aborted;
+ bool ok = false;
try {
- rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
- aborted = !rx_frame_asm.disassemble_remaining_segments(
- rx_segments_data.data(), rx_epilogue);
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
} catch (FrameError& e) {
ldout(cct, 1) << __func__ << " " << e.what() << dendl;
return _fault();
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (aborted) {
+ if (!ok) {
reset_throttle();
state = READY;
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::finish_client_auth() {
+ if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+ return send_compression_request();
+ }
+
+ return start_session_connect();
+}
+
+CtPtr ProtocolV2::finish_server_auth() {
+ // server had sent AuthDone and client responded with correct pre-auth
+ // signature.
+ // We can start conditioanl msgr protocol
+ if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+ state = COMPRESSION_ACCEPTING;
+ } else {
+ // No msgr protocol features to process
+ // we can start accepting new sessions/reconnects.
+ state = SESSION_ACCEPTING;
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::start_session_connect() {
if (!server_cookie) {
ceph_assert(connect_seq == 0);
state = SESSION_CONNECTING;
return ready();
}
+CtPtr ProtocolV2::send_compression_request() {
+ state = COMPRESSION_CONNECTING;
+
+ const entity_type_t peer_type = connection->get_peer_type();
+ comp_meta.con_mode =
+ static_cast<Compressor::CompressionMode>(
+ messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
+ const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
+ auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
+
+ INTERCEPT(19);
+ return WRITE(comp_req_frame, "compression request", read_frame);
+}
+
+CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
+ if (state != COMPRESSION_CONNECTING) {
+ lderr(cct) << __func__ << " state changed!" << dendl;
+ return _fault();
+ }
+
+ auto response = CompressionDoneFrame::Decode(payload);
+ ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
+ << ", method=" << response.method() << ")" << dendl;
+
+ comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
+ if (comp_meta.is_compress() != response.is_compress()) {
+ comp_meta.con_mode = Compressor::COMP_NONE;
+ }
+ session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+ return start_session_connect();
+}
+
/* Server Protocol Methods */
CtPtr ProtocolV2::start_server_banner_exchange() {
}
if (state == AUTH_ACCEPTING_SIGN) {
- // server had sent AuthDone and client responded with correct pre-auth
- // signature. we can start accepting new sessions/reconnects.
- state = SESSION_ACCEPTING;
- return CONTINUE(read_frame);
+ // this happened on server side
+ return finish_server_auth();
} else if (state == AUTH_CONNECTING_SIGN) {
// this happened at client side
return finish_client_auth();
// this happens in the event center's thread as there should be
// no user outside its boundaries (simlarly to e.g. outgoing_bl).
auto temp_stream_handlers = std::move(session_stream_handlers);
+ auto temp_compression_handlers = std::move(session_compression_handlers);
exproto->auth_meta = auth_meta;
+ exproto->comp_meta = comp_meta;
ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
<< dendl;
new_worker,
new_center,
exproto,
- temp_stream_handlers=std::move(temp_stream_handlers)
+ temp_stream_handlers=std::move(temp_stream_handlers),
+ temp_compression_handlers=std::move(temp_compression_handlers)
](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
existing->outgoing_bl.clear();
existing->open_write = false;
exproto->session_stream_handlers = std::move(temp_stream_handlers);
+ exproto->session_compression_handlers = std::move(temp_compression_handlers);
existing->write_lock.unlock();
if (exproto->state == NONE) {
existing->shutdown_socket();
return WRITE(reconnect_ok, "reconnect ok", server_ready);
}
+
+
+CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
+ if (state != COMPRESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " state changed!" << dendl;
+ return _fault();
+ }
+
+ auto request = CompressionRequestFrame::Decode(payload);
+ ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
+ << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
+
+ const int peer_type = connection->get_peer_type();
+ if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
+ peer_type, auth_meta->is_mode_secure());
+ mode != Compressor::COMP_NONE && request.is_compress()) {
+ comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
+ if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
+ comp_meta.con_mode = mode;
+ }
+ } else {
+ comp_meta.con_method = Compressor::COMP_ALG_NONE;
+ }
+
+ auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
+
+ INTERCEPT(20);
+ return WRITE(response, "compression done", finish_compression);
+}
+
+CtPtr ProtocolV2::finish_compression() {
+ // TODO: having a possibility to check whether we're server or client could
+ // allow reusing finish_compression().
+
+ session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+ state = SESSION_ACCEPTING;
+ return CONTINUE(read_frame);
+}
#include "Protocol.h"
#include "crypto_onwire.h"
+#include "compression_meta.h"
+#include "compression_onwire.h"
#include "frames_v2.h"
class ProtocolV2 : public Protocol {
HELLO_CONNECTING,
AUTH_CONNECTING,
AUTH_CONNECTING_SIGN,
+ COMPRESSION_CONNECTING,
SESSION_CONNECTING,
SESSION_RECONNECTING,
START_ACCEPT,
AUTH_ACCEPTING,
AUTH_ACCEPTING_MORE,
AUTH_ACCEPTING_SIGN,
+ COMPRESSION_ACCEPTING,
SESSION_ACCEPTING,
READY,
THROTTLE_MESSAGE,
"HELLO_CONNECTING",
"AUTH_CONNECTING",
"AUTH_CONNECTING_SIGN",
+ "COMPRESSION_CONNECTING",
"SESSION_CONNECTING",
"SESSION_RECONNECTING",
"START_ACCEPT",
"AUTH_ACCEPTING",
"AUTH_ACCEPTING_MORE",
"AUTH_ACCEPTING_SIGN",
+ "COMPRESSION_ACCEPTING",
"SESSION_ACCEPTING",
"READY",
"THROTTLE_MESSAGE",
// TODO: move into auth_meta?
ceph::crypto::onwire::rxtx_t session_stream_handlers;
-
+ ceph::compression::onwire::rxtx_t session_compression_handlers;
+
+private:
entity_name_t peer_name;
State state;
uint64_t peer_supported_features; // CEPH_MSGR2_FEATURE_*
bool keepalive;
bool write_in_progress = false;
+ CompConnectionMeta comp_meta;
std::ostream& _conn_prefix(std::ostream *_dout);
void run_continuation(Ct<ProtocolV2> *pcontinuation);
void run_continuation(Ct<ProtocolV2> &continuation);
out_queue_entry_t _get_next_outgoing();
ssize_t write_message(Message *m, bool more);
void handle_message_ack(uint64_t seq);
+ void reset_compression();
CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
CONTINUATION_DECL(ProtocolV2, throttle_message);
CONTINUATION_DECL(ProtocolV2, throttle_bytes);
CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
+ CONTINUATION_DECL(ProtocolV2, finish_compression);
Ct<ProtocolV2> *read_frame();
Ct<ProtocolV2> *finish_auth();
Ct<ProtocolV2> *finish_client_auth();
+ Ct<ProtocolV2> *finish_server_auth();
Ct<ProtocolV2> *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r);
Ct<ProtocolV2> *read_frame_segment();
Ct<ProtocolV2> *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r);
Ct<ProtocolV2> *_handle_read_frame_epilogue_main();
Ct<ProtocolV2> *handle_read_frame_dispatch();
Ct<ProtocolV2> *handle_frame_payload();
+ Ct<ProtocolV2> *finish_compression();
Ct<ProtocolV2> *ready();
Ct<ProtocolV2> *handle_auth_reply_more(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_auth_done(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_auth_signature(ceph::bufferlist &payload);
+ Ct<ProtocolV2> *start_session_connect();
Ct<ProtocolV2> *send_client_ident();
Ct<ProtocolV2> *send_reconnect();
Ct<ProtocolV2> *handle_ident_missing_features(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_wait(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_reconnect_ok(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_server_ident(ceph::bufferlist &payload);
+ Ct<ProtocolV2> *send_compression_request();
+ Ct<ProtocolV2> *handle_compression_done(ceph::bufferlist &payload);
+
// Server Protocol
CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
Ct<ProtocolV2> *send_server_ident();
Ct<ProtocolV2> *send_reconnect_ok();
Ct<ProtocolV2> *server_ready();
+ Ct<ProtocolV2> *handle_compression_request(ceph::bufferlist &payload);
size_t get_current_msg_size() const;
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "compressor/Compressor.h"
+
+struct CompConnectionMeta {
+ TOPNSPC::Compressor::CompressionMode con_mode =
+ TOPNSPC::Compressor::COMP_NONE; // negotiated mode
+ TOPNSPC::Compressor::CompressionAlgorithm con_method =
+ TOPNSPC::Compressor::COMP_ALG_NONE; // negotiated method
+
+ bool is_compress() const {
+ return con_mode != TOPNSPC::Compressor::COMP_NONE;
+ }
+ TOPNSPC::Compressor::CompressionAlgorithm get_method() const {
+ return con_method;
+ }
+ TOPNSPC::Compressor::CompressionMode get_mode() const {
+ return con_mode;
+ }
+};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "compression_onwire.h"
+#include "compression_meta.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_ms
+
+namespace ceph::compression::onwire {
+
+rxtx_t rxtx_t::create_handler_pair(
+ CephContext* ctx,
+ const CompConnectionMeta& comp_meta,
+ std::uint64_t compress_min_size)
+{
+ if (comp_meta.is_compress()) {
+ CompressorRef compressor = Compressor::create(ctx, comp_meta.get_method());
+ if (compressor) {
+ return {std::make_unique<RxHandler>(ctx, compressor),
+ std::make_unique<TxHandler>(ctx, compressor,
+ comp_meta.get_mode(),
+ compress_min_size)};
+ }
+ }
+ return {};
+}
+
+std::optional<ceph::bufferlist> TxHandler::compress(const ceph::bufferlist &input)
+{
+ if (m_init_onwire_size < m_min_size) {
+ ldout(m_cct, 20) << __func__
+ << " discovered frame that is smaller than threshold, aborting compression"
+ << dendl;
+ return {};
+ }
+
+ m_compress_potential -= input.length();
+
+ ceph::bufferlist out;
+ if (input.length() == 0) {
+ ldout(m_cct, 20) << __func__
+ << " discovered an empty segment, skipping compression without aborting"
+ << dendl;
+ out.clear();
+ return out;
+ }
+
+ boost::optional<int32_t> compressor_message;
+ if (m_compressor->compress(input, out, compressor_message)) {
+ return {};
+ } else {
+ ldout(m_cct, 20) << __func__ << " uncompressed.length()=" << input.length()
+ << " compressed.length()=" << out.length() << dendl;
+ m_onwire_size += out.length();
+ return out;
+ }
+}
+
+std::optional<ceph::bufferlist> RxHandler::decompress(const ceph::bufferlist &input)
+{
+ ceph::bufferlist out;
+ if (input.length() == 0) {
+ ldout(m_cct, 20) << __func__
+ << " discovered an empty segment, skipping decompression without aborting"
+ << dendl;
+ out.clear();
+ return out;
+ }
+
+ boost::optional<int32_t> compressor_message;
+ if (m_compressor->decompress(input, out, compressor_message)) {
+ return {};
+ } else {
+ ldout(m_cct, 20) << __func__ << " compressed.length()=" << input.length()
+ << " uncompressed.length()=" << out.length() << dendl;
+ return out;
+ }
+}
+
+void TxHandler::done()
+{
+ ldout(m_cct, 25) << __func__ << " compression ratio=" << get_ratio() << dendl;
+}
+
+} // namespace ceph::compression::onwire
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_COMPRESSION_ONWIRE_H
+#define CEPH_COMPRESSION_ONWIRE_H
+
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+
+class CompConnectionMeta;
+
+namespace ceph::compression::onwire {
+ using Compressor = TOPNSPC::Compressor;
+ using CompressorRef = TOPNSPC::CompressorRef;
+
+ class Handler {
+ public:
+ Handler(CephContext* const cct, CompressorRef compressor)
+ : m_cct(cct), m_compressor(compressor) {}
+
+ protected:
+ CephContext* const m_cct;
+ CompressorRef m_compressor;
+ };
+
+ class RxHandler final : private Handler {
+ public:
+ RxHandler(CephContext* const cct, CompressorRef compressor)
+ : Handler(cct, compressor) {}
+ ~RxHandler() {};
+
+ /**
+ * Decompresses a bufferlist
+ *
+ * @param input compressed bufferlist
+ * @param out decompressed bufferlist
+ *
+ * @returns true on success, false on failure
+ */
+ std::optional<ceph::bufferlist> decompress(const ceph::bufferlist &input);
+ };
+
+ class TxHandler final : private Handler {
+ public:
+ TxHandler(CephContext* const cct, CompressorRef compressor, int mode, std::uint64_t min_size)
+ : Handler(cct, compressor),
+ m_min_size(min_size),
+ m_mode(static_cast<Compressor::CompressionMode>(mode))
+ {}
+ ~TxHandler() {}
+
+ void reset_handler(int num_segments, uint64_t size) {
+ m_init_onwire_size = size;
+ m_compress_potential = size;
+ m_onwire_size = 0;
+ }
+
+ void done();
+
+ /**
+ * Compresses a bufferlist
+ *
+ * @param input bufferlist to compress
+ * @param out compressed bufferlist
+ *
+ * @returns true on success, false on failure
+ */
+ std::optional<ceph::bufferlist> compress(const ceph::bufferlist &input);
+
+ double get_ratio() const {
+ return get_initial_size() / (double) get_final_size();
+ }
+
+ uint64_t get_initial_size() const {
+ return m_init_onwire_size;
+ }
+
+ uint64_t get_final_size() const {
+ return m_onwire_size;
+ }
+
+ private:
+ uint64_t m_min_size;
+ Compressor::CompressionMode m_mode;
+
+ uint64_t m_init_onwire_size;
+ uint64_t m_onwire_size;
+ uint64_t m_compress_potential;
+ };
+
+ struct rxtx_t {
+ std::unique_ptr<RxHandler> rx;
+ std::unique_ptr<TxHandler> tx;
+
+ static rxtx_t create_handler_pair(
+ CephContext* ctx,
+ const CompConnectionMeta& comp_meta,
+ std::uint64_t compress_min_size);
+ };
+}
+
+#endif // CEPH_COMPRESSION_ONWIRE_H
preamble.segments[i].alignment = m_descs[i].align;
}
preamble.num_segments = m_descs.size();
+ preamble.flags = m_flags;
+
preamble.crc = ceph_crc32c(
0, reinterpret_cast<const unsigned char*>(&preamble),
sizeof(preamble) - sizeof(preamble.crc));
bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
const uint16_t segment_aligns[],
- size_t segment_count) {
+ size_t segment_count) {
+ m_flags = 0;
m_descs.resize(calc_num_segments(segment_bls, segment_count));
for (size_t i = 0; i < m_descs.size(); i++) {
m_descs[i].logical_len = segment_bls[i].length();
m_descs[i].align = segment_aligns[i];
}
+ if (m_compression->tx) {
+ asm_compress(segment_bls);
+ }
+
preamble_block_t preamble;
fill_preamble(tag, preamble);
m_descs[i].logical_len = preamble->segments[i].length;
m_descs[i].align = preamble->segments[i].alignment;
}
+
+ m_flags = preamble->flags;
+ // If frame has been compressed,
+ // we need to make sure the compression handler has been setup
+ ceph_assert_always(!is_compressed() || m_compression->rx);
+
return static_cast<Tag>(preamble->tag);
}
return check_epilogue_late_status(epilogue->late_status);
}
+bool FrameAssembler::disassemble_segments(bufferlist& preamble_bl,
+ bufferlist segments_bls[], bufferlist& epilogue_bl) const {
+ disassemble_first_segment(preamble_bl, segments_bls[0]);
+ if (disassemble_remaining_segments(segments_bls, epilogue_bl)) {
+ if (is_compressed()) {
+ disassemble_decompress(segments_bls);
+ }
+ return true;
+ }
+
+ return false;
+}
+
void FrameAssembler::disassemble_first_segment(bufferlist& preamble_bl,
bufferlist& segment_bl) const {
ceph_assert(!m_descs.empty());
// no epilogue if only one segment
ceph_assert(epilogue_bl.length() == 0);
return true;
- }
- if (m_crypto->rx) {
+ } else if (m_crypto->rx) {
return disasm_remaining_secure_rev1(segment_bls, epilogue_bl);
+ } else {
+ return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
}
- return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
- }
- if (m_crypto->rx) {
+ } else if (m_crypto->rx) {
return disasm_all_secure_rev0(segment_bls, epilogue_bl);
- }
+ }
+
return disasm_all_crc_rev0(segment_bls, epilogue_bl);
}
}
os << "rev1=" << frame_asm.m_is_rev1
<< " rx=" << frame_asm.m_crypto->rx.get()
- << " tx=" << frame_asm.m_crypto->tx.get();
+ << " tx=" << frame_asm.m_crypto->tx.get()
+ << " comp rx=" << frame_asm.m_compression->rx.get()
+ << " comp tx=" << frame_asm.m_compression->tx.get()
+ << " compressed=" << frame_asm.is_compressed();
return os;
}
+void FrameAssembler::asm_compress(bufferlist segment_bls[]) {
+ std::array<bufferlist, MAX_NUM_SEGMENTS> compressed;
+
+ m_compression->tx->reset_handler(m_descs.size(), get_frame_logical_len());
+
+ bool abort = false;
+ for (size_t i = 0; (i < m_descs.size()) && !abort; i++) {
+ auto out = m_compression->tx->compress(segment_bls[i]);
+ if (!out) {
+ abort = true;
+ } else {
+ compressed[i] = std::move(*out);
+ }
+ }
+
+ if (!abort) {
+ m_compression->tx->done();
+
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ segment_bls[i].swap(compressed[i]);
+ m_descs[i].logical_len = segment_bls[i].length();
+ }
+
+ m_flags |= FRAME_EARLY_DATA_COMPRESSED;
+ }
+}
+
+void FrameAssembler::disassemble_decompress(bufferlist segment_bls[]) const {
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ auto out = m_compression->rx->decompress(segment_bls[i]);
+ if (!out) {
+ throw FrameError("Segment decompression failed");
+ } else {
+ segment_bls[i] = std::move(*out);
+ }
+ }
+}
+
} // namespace ceph::msgr::v2
#include "include/types.h"
#include "common/Clock.h"
#include "crypto_onwire.h"
+#include "compression_onwire.h"
#include <array>
#include <iosfwd>
#include <utility>
MESSAGE,
KEEPALIVE2,
KEEPALIVE2_ACK,
- ACK
+ ACK,
+ COMPRESSION_REQUEST,
+ COMPRESSION_DONE
};
struct segment_t {
__u8 num_segments;
segment_t segments[MAX_NUM_SEGMENTS];
- __u8 _reserved[2];
+
+ __u8 flags;
+ __u8 _reserved;
// CRC32 for this single preamble block.
ceph_le32 crc;
#define FRAME_LATE_STATUS_RESERVED_FALSE 0xe0
#define FRAME_LATE_STATUS_RESERVED_MASK 0xf0
+// For msgr 2.1, FRAME_EARLY_X flags are sent as part of epilogue.
+//
+// This flag indicates whether frame segments have been compressed by
+// sender, and used in segments' disassemblig phase.
+#define FRAME_EARLY_DATA_COMPRESSED 0X1
+
struct FrameError : std::runtime_error {
using runtime_error::runtime_error;
};
class FrameAssembler {
public:
// crypto must be non-null
- FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1)
- : m_crypto(crypto), m_is_rev1(is_rev1) {}
+ FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1,
+ const ceph::compression::onwire::rxtx_t* compression)
+ : m_crypto(crypto), m_is_rev1(is_rev1), m_compression(compression) {}
void set_is_rev1(bool is_rev1) {
m_descs.clear();
+ m_flags = 0;
m_is_rev1 = is_rev1;
}
Tag disassemble_preamble(bufferlist& preamble_bl);
+ bool disassemble_segments(bufferlist& preamble_bl,
+ bufferlist segments_bls[],
+ bufferlist& epilogue_bl) const;
+
+private:
+ struct segment_desc_t {
+ uint32_t logical_len;
+ uint16_t align;
+ };
+
+ uint32_t get_segment_padded_len(size_t seg_idx) const {
+ return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
+ CRYPTO_BLOCK_SIZE);
+ }
+
+ uint32_t get_auth_tag_len() const {
+ return m_crypto->rx->get_extra_size_at_final();
+ }
+
+ bool is_compressed() const {
+ return m_flags & FRAME_EARLY_DATA_COMPRESSED;
+ }
+
+ void asm_compress(bufferlist segment_bls[]);
+
+ bufferlist asm_crc_rev0(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const;
+ bufferlist asm_secure_rev0(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const;
+ bufferlist asm_crc_rev1(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const;
+ bufferlist asm_secure_rev1(const preamble_block_t& preamble,
+ bufferlist segment_bls[]) const;
+
// Like msgr1, and unlike msgr2.0, msgr2.1 allows interpreting the
// first segment before reading in the rest of the frame.
//
// user-provided buffers
// - read in epilogue
// - call disassemble_remaining_segments()
+ // - call disasm_all_decompress()
//
// For msgr2.0 (set_is_rev1(false)), disassemble_first_segment() is
// a noop. To accomodate, disassemble_remaining_segments() always
// - read in all segments
// - read in epilogue
// - call disassemble_remaining_segments()
+ // - call disasm_all_decompress()
//
// disassemble_remaining_segments() returns true if the frame is
// ready for dispatching, or false if it was aborted by the sender
bufferlist& segment_bl) const;
bool disassemble_remaining_segments(bufferlist segment_bls[],
bufferlist& epilogue_bl) const;
-
-private:
- struct segment_desc_t {
- uint32_t logical_len;
- uint16_t align;
- };
-
- uint32_t get_segment_padded_len(size_t seg_idx) const {
- return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
- CRYPTO_BLOCK_SIZE);
- }
-
- uint32_t get_auth_tag_len() const {
- return m_crypto->rx->get_extra_size_at_final();
- }
-
- bufferlist asm_crc_rev0(const preamble_block_t& preamble,
- bufferlist segment_bls[]) const;
- bufferlist asm_secure_rev0(const preamble_block_t& preamble,
- bufferlist segment_bls[]) const;
- bufferlist asm_crc_rev1(const preamble_block_t& preamble,
- bufferlist segment_bls[]) const;
- bufferlist asm_secure_rev1(const preamble_block_t& preamble,
- bufferlist segment_bls[]) const;
+ void disassemble_decompress(bufferlist segment_bls[]) const;
bool disasm_all_crc_rev0(bufferlist segment_bls[],
bufferlist& epilogue_bl) const;
const FrameAssembler& frame_asm);
boost::container::static_vector<segment_desc_t, MAX_NUM_SEGMENTS> m_descs;
+ __u8 m_flags;
const ceph::crypto::onwire::rxtx_t* m_crypto;
bool m_is_rev1; // msgr2.1?
+ const ceph::compression::onwire::rxtx_t* m_compression;
};
template <class T, uint16_t... SegmentAlignmentVs>
using Frame::Frame;
};
+struct CompressionRequestFrame : public ControlFrame<CompressionRequestFrame,
+ bool, // is compress
+ std::vector<uint32_t>> { // preferred methods
+ static const Tag tag = Tag::COMPRESSION_REQUEST;
+ using ControlFrame::Encode;
+ using ControlFrame::Decode;
+
+ inline bool &is_compress() { return get_val<0>(); }
+ inline std::vector<uint32_t> &preferred_methods() { return get_val<1>(); }
+
+protected:
+ using ControlFrame::ControlFrame;
+};
+
+struct CompressionDoneFrame : public ControlFrame<CompressionDoneFrame,
+ bool, // is compress
+ uint32_t> { // method
+ static const Tag tag = Tag::COMPRESSION_DONE;
+ using ControlFrame::Encode;
+ using ControlFrame::Decode;
+
+ inline bool &is_compress() { return get_val<0>(); }
+ inline uint32_t &method() { return get_val<1>(); }
+
+protected:
+ using ControlFrame::ControlFrame;
+};
+
} // namespace ceph::msgr::v2
#endif // _MSG_ASYNC_FRAMES_V2_
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "compressor_registry.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "CompressorRegistry(" << this << ") "
+
+CompressorRegistry::CompressorRegistry(CephContext *cct)
+ : cct(cct)
+{
+ cct->_conf.add_observer(this);
+}
+
+CompressorRegistry::~CompressorRegistry()
+{
+ cct->_conf.remove_observer(this);
+}
+
+const char** CompressorRegistry::get_tracked_conf_keys() const
+{
+ static const char *keys[] = {
+ "ms_osd_compress_mode",
+ "ms_osd_compression_algorithm",
+ "ms_osd_compress_min_size",
+ "ms_compress_secure",
+ nullptr
+ };
+ return keys;
+}
+
+void CompressorRegistry::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string>& changed)
+{
+ std::scoped_lock l(lock);
+ _refresh_config();
+}
+
+std::vector<uint32_t> CompressorRegistry::_parse_method_list(const string& s)
+{
+ std::vector<uint32_t> methods;
+
+ for_each_substr(s, ";,= \t", [&] (auto method) {
+ ldout(cct,20) << "adding algorithm method: " << method << dendl;
+
+ auto alg_type = Compressor::get_comp_alg_type(method);
+ if (alg_type) {
+ methods.push_back(*alg_type);
+ } else {
+ ldout(cct,5) << "WARNING: unknown algorithm method " << method << dendl;
+ }
+ });
+
+ if (methods.empty()) {
+ methods.push_back(Compressor::COMP_ALG_NONE);
+ }
+ ldout(cct,20) << __func__ << " " << s << " -> " << methods << dendl;
+
+ return methods;
+}
+
+void CompressorRegistry::_refresh_config()
+{
+ auto c_mode = Compressor::get_comp_mode_type(cct->_conf.get_val<string>("ms_osd_compress_mode"));
+
+ if (c_mode) {
+ ms_osd_compress_mode = *c_mode;
+ } else {
+ ldout(cct,1) << __func__ << " failed to identify ms_osd_compress_mode "
+ << ms_osd_compress_mode << dendl;
+
+ ms_osd_compress_mode = Compressor::COMP_NONE;
+ }
+
+ ms_osd_compression_methods = _parse_method_list(cct->_conf.get_val<string>("ms_osd_compression_algorithm"));
+ ms_osd_compress_min_size = cct->_conf.get_val<std::uint64_t>("ms_osd_compress_min_size");
+
+ ms_compress_secure = cct->_conf.get_val<bool>("ms_compress_secure");
+
+ ldout(cct,10) << __func__ << " ms_osd_compression_mode " << ms_osd_compress_mode
+ << " ms_osd_compression_methods " << ms_osd_compression_methods
+ << " ms_osd_compress_above_min_size " << ms_osd_compress_min_size
+ << " ms_compress_secure " << ms_compress_secure
+ << dendl;
+}
+
+Compressor::CompressionAlgorithm
+CompressorRegistry::pick_method(uint32_t peer_type,
+ const std::vector<uint32_t>& preferred_methods)
+{
+ std::vector<uint32_t> allowed_methods = get_methods(peer_type);
+ auto preferred = std::find_first_of(preferred_methods.begin(),
+ preferred_methods.end(),
+ allowed_methods.begin(),
+ allowed_methods.end());
+ if (preferred == preferred_methods.end()) {
+ ldout(cct,1) << "failed to pick compression method from client's "
+ << preferred_methods
+ << " and our " << allowed_methods << dendl;
+ return Compressor::COMP_ALG_NONE;
+ } else {
+ return static_cast<Compressor::CompressionAlgorithm>(*preferred);
+ }
+}
+
+Compressor::CompressionMode
+CompressorRegistry::get_mode(uint32_t peer_type, bool is_secure)
+{
+ std::scoped_lock l(lock);
+ ldout(cct, 20) << __func__ << " peer_type " << peer_type
+ << " is_secure " << is_secure << dendl;
+
+ if (is_secure && !ms_compress_secure) {
+ return Compressor::COMP_NONE;
+ }
+
+ switch (peer_type) {
+ case CEPH_ENTITY_TYPE_OSD:
+ return static_cast<Compressor::CompressionMode>(ms_osd_compress_mode);
+ default:
+ return Compressor::COMP_NONE;
+ }
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <vector>
+
+#include "compressor/Compressor.h"
+#include "common/ceph_mutex.h"
+#include "common/ceph_context.h"
+#include "common/config_cacher.h"
+
+class CompressorRegistry : public md_config_obs_t {
+public:
+ CompressorRegistry(CephContext *cct);
+ ~CompressorRegistry();
+
+ void refresh_config() {
+ std::scoped_lock l(lock);
+ _refresh_config();
+ }
+
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string>& changed) override;
+
+ Compressor::CompressionAlgorithm pick_method(uint32_t peer_type,
+ const std::vector<uint32_t>& preferred_methods);
+
+ Compressor::CompressionMode get_mode(uint32_t peer_type, bool is_secure);
+
+ const std::vector<uint32_t> get_methods(uint32_t peer_type) {
+ std::scoped_lock l(lock);
+ switch (peer_type) {
+ case CEPH_ENTITY_TYPE_OSD:
+ return ms_osd_compression_methods;
+ default:
+ return {};
+ }
+ }
+
+ uint64_t get_min_compression_size(uint32_t peer_type) const {
+ std::scoped_lock l(lock);
+ switch (peer_type) {
+ case CEPH_ENTITY_TYPE_OSD:
+ return ms_osd_compress_min_size;
+ default:
+ return 0;
+ }
+ }
+
+ bool get_is_compress_secure() const {
+ std::scoped_lock l(lock);
+ return ms_compress_secure;
+ }
+
+private:
+ CephContext *cct;
+ mutable ceph::mutex lock = ceph::make_mutex("CompressorRegistry::lock");
+
+ uint32_t ms_osd_compress_mode;
+ bool ms_compress_secure;
+ std::uint64_t ms_osd_compress_min_size;
+ std::vector<uint32_t> ms_osd_compression_methods;
+
+ void _refresh_config();
+ std::vector<uint32_t> _parse_method_list(const string& s);
+};
)
add_ceph_unittest(unittest_compression)
target_link_libraries(unittest_compression global GTest::GTest)
-add_dependencies(unittest_compression ceph_example)
+add_dependencies(unittest_compression ceph_example)
\ No newline at end of file
add_ceph_unittest(unittest_frames_v2)
target_link_libraries(unittest_frames_v2 os global ${UNITTEST_LIBS})
+add_executable(unittest_comp_registry
+ test_comp_registry.cc
+ $<TARGET_OBJECTS:unit-main>
+ )
+add_ceph_unittest(unittest_comp_registry)
+target_link_libraries(unittest_comp_registry global)
+
# test_userspace_event
if(HAVE_DPDK)
add_executable(ceph_test_userspace_event
--- /dev/null
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+#include "include/stringify.h"
+#include "compressor/Compressor.h"
+#include "msg/compressor_registry.h"
+#include "gtest/gtest.h"
+#include "common/ceph_context.h"
+#include "global/global_context.h"
+
+#include <sstream>
+
+TEST(CompressorRegistry, con_modes)
+{
+ auto cct = g_ceph_context;
+ CompressorRegistry reg(cct);
+ std::vector<uint32_t> methods;
+ uint32_t method;
+ uint32_t mode;
+
+ const std::vector<uint32_t> snappy_method = { Compressor::COMP_ALG_SNAPPY };
+ const std::vector<uint32_t> zlib_method = { Compressor::COMP_ALG_ZLIB };
+ const std::vector<uint32_t> both_methods = { Compressor::COMP_ALG_ZLIB, Compressor::COMP_ALG_SNAPPY};
+ const std::vector<uint32_t> no_method = { Compressor::COMP_ALG_NONE };
+
+ cct->_conf.set_val(
+ "enable_experimental_unrecoverable_data_corrupting_features", "*");
+
+ // baseline: compression for communication with osd is enabled
+ cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+ cct->_conf.set_val("ms_osd_compress_mode", "force");
+ cct->_conf.set_val("ms_osd_compression_algorithm", "snappy");
+ cct->_conf.set_val("ms_compress_secure", "false");
+ cct->_conf.apply_changes(NULL);
+
+ ASSERT_EQ(reg.get_is_compress_secure(), false);
+
+ methods = reg.get_methods(CEPH_ENTITY_TYPE_MON);
+ ASSERT_EQ(methods.size(), 0);
+ method = reg.pick_method(CEPH_ENTITY_TYPE_MON, both_methods);
+ ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_MON, false);
+ ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+ methods = reg.get_methods(CEPH_ENTITY_TYPE_OSD);
+ ASSERT_EQ(methods, snappy_method);
+ const std::vector<uint32_t> rev_both_methods (both_methods.rbegin(), both_methods.rend());
+ method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, rev_both_methods);
+ ASSERT_EQ(method, Compressor::COMP_ALG_SNAPPY);
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+ ASSERT_EQ(mode, Compressor::COMP_FORCE);
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true);
+ ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+ method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, zlib_method);
+ ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+
+ // disable compression mode
+ cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+ cct->_conf.set_val("ms_osd_compress_mode", "none");
+ cct->_conf.apply_changes(NULL);
+
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+ ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+ // no compression methods
+ cct->_conf.set_val("ms_osd_compress_mode", "force");
+ cct->_conf.set_val("ms_osd_compression_algorithm", "none");
+ cct->_conf.apply_changes(NULL);
+
+ method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, both_methods);
+ ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+
+ // min compression size
+ cct->_conf.set_val("ms_osd_compress_min_size", "1024");
+ cct->_conf.apply_changes(NULL);
+
+ uint32_t s = reg.get_min_compression_size(CEPH_ENTITY_TYPE_OSD);
+ ASSERT_EQ(s, 1024);
+
+ // allow secure with compression
+ cct->_conf.set_val("ms_osd_compress_mode", "force");
+ cct->_conf.set_val("ms_osd_compression_algorithm", "snappy");
+ cct->_conf.set_val("ms_compress_secure", "true");
+ cct->_conf.apply_changes(NULL);
+
+ ASSERT_EQ(reg.get_is_compress_secure(), true);
+
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true);
+ ASSERT_EQ(mode, Compressor::COMP_FORCE);
+
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+ ASSERT_EQ(mode, Compressor::COMP_FORCE);
+
+ // back to normalish, for the benefit of the next test(s)
+ cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+}
#include <string>
#include <tuple>
+#include "msg/async/compression_meta.h"
#include "auth/Auth.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
#include <gtest/gtest.h>
+#define COMP_THRESHOLD 1 << 10
+#define EXPECT_COMPRESSED(is_compressed, val1, val2) \
+ if (is_compressed && val1 > COMP_THRESHOLD) { \
+ EXPECT_GE(val1, val2); \
+ } else { \
+ EXPECT_EQ(val1, val2); \
+ }
+
namespace ceph::msgr::v2 {
// MessageFrame with the first segment not fixed to ceph_msg_header2
struct mode_t {
bool is_rev1;
bool is_secure;
+ bool is_compress;
};
static std::ostream& operator<<(std::ostream& os, const mode_t& m) {
os << "msgr2." << (m.is_rev1 ? "1" : "0")
- << (m.is_secure ? "-secure" : "-crc");
+ << (m.is_secure ? "-secure" : "-crc")
+ << (m.is_compress ? "-compress": "-nocompress");
return os;
}
static const mode_t modes[] = {
- {false, false},
- {false, true},
- {true, false},
- {true, true},
+ {false, false, false},
+ {false, true, false},
+ {true, false, false},
+ {true, true, false},
+ {false, false, true},
+ {false, true, true},
+ {true, false, true},
+ {true, true, true}
};
struct round_trip_instance_t {
if (epilogue_onwire_len > 0) {
frame_bl.splice(0, epilogue_onwire_len, &epilogue_bl);
}
- frame_asm.disassemble_first_segment(preamble_bl, segment_bls[0]);
- return frame_asm.disassemble_remaining_segments(segment_bls.data(),
- epilogue_bl);
+
+ return frame_asm.disassemble_segments(preamble_bl, segment_bls.data(), epilogue_bl);
}
class RoundTripTestBase : public ::testing::TestWithParam<
std::tuple<round_trip_instance_t, mode_t>> {
protected:
RoundTripTestBase()
- : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1),
- m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1),
+ : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1, &m_tx_comp),
+ m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1, &m_rx_comp),
m_header(make_bufferlist(std::get<0>(GetParam()).header_len, 'H')),
m_front(make_bufferlist(std::get<0>(GetParam()).front_len, 'F')),
m_middle(make_bufferlist(std::get<0>(GetParam()).middle_len, 'M')),
g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1,
/*crossed=*/true);
}
+
+ if (m.is_compress) {
+ CompConnectionMeta comp_meta;
+ comp_meta.con_mode = Compressor::COMP_FORCE;
+ comp_meta.con_method = Compressor::COMP_ALG_SNAPPY;
+ m_tx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD
+ );
+ m_rx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD
+ );
+ }
}
void check_frame_assembler(const FrameAssembler& frame_asm) {
const auto& [rti, m] = GetParam();
const auto& onwire_lens = rti.onwire_lens[m.is_rev1 << 1 | m.is_secure];
- EXPECT_EQ(rti.header_len + rti.front_len + rti.middle_len + rti.data_len,
+
+ EXPECT_COMPRESSED(m.is_compress, rti.header_len + rti.front_len + rti.middle_len + rti.data_len,
frame_asm.get_frame_logical_len());
ASSERT_EQ(rti.num_segments, frame_asm.get_num_segments());
- EXPECT_EQ(onwire_lens[0], frame_asm.get_preamble_onwire_len());
+ EXPECT_COMPRESSED(m.is_compress, onwire_lens[0], frame_asm.get_preamble_onwire_len());
for (size_t i = 0; i < rti.num_segments; i++) {
- EXPECT_EQ(onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i));
+ EXPECT_COMPRESSED(m.is_compress, onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i));
}
- EXPECT_EQ(onwire_lens[rti.num_segments + 1],
+ EXPECT_COMPRESSED(m.is_compress, onwire_lens[rti.num_segments + 1],
frame_asm.get_epilogue_onwire_len());
- EXPECT_EQ(std::accumulate(std::begin(onwire_lens), std::end(onwire_lens),
- uint64_t(0)),
- frame_asm.get_frame_onwire_len());
+ EXPECT_COMPRESSED(m.is_compress,
+ std::accumulate(std::begin(onwire_lens), std::end(onwire_lens),
+ uint64_t(0)),
+ frame_asm.get_frame_onwire_len());
}
void test_round_trip() {
ceph::crypto::onwire::rxtx_t m_tx_crypto;
ceph::crypto::onwire::rxtx_t m_rx_crypto;
+ ceph::compression::onwire::rxtx_t m_tx_comp;
+ ceph::compression::onwire::rxtx_t m_rx_comp;
FrameAssembler m_tx_frame_asm;
FrameAssembler m_rx_frame_asm;