From: Maya Gilad Date: Thu, 27 May 2021 10:16:34 +0000 (+0300) Subject: msgr: added on-wire-compression to msgr protocol 2.0 X-Git-Tag: v17.1.0~1178^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2e46894f31e7684df5764541f1c2f4dcfcc934cf;p=ceph.git msgr: added on-wire-compression to msgr protocol 2.0 Signed-off-by: Maya Gilad Co-authored-by: Kefu Chai --- diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index 25f52615b067..dee5157ceee4 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -83,6 +83,9 @@ public: // everything crimson is experimental... return true; } + ceph::PluginRegistry* get_plugin_registry() { + return _plugin_registry; + } CryptoRandom* random() const; PerfCountersCollectionImpl* get_perfcounters_collection(); crimson::common::ConfigProxy& _conf; @@ -92,6 +95,7 @@ public: private: std::unique_ptr _crypto_random; unsigned nref; + ceph::PluginRegistry* _plugin_registry; }; } #else diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 1dd0b57d467f..70bc73a18bc1 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -979,6 +979,56 @@ options: - ms_service_mode flags: - startup +- name: ms_osd_compress_mode + type: str + level: advanced + desc: Compression policy to use in Messenger for communicating with OSD + default: none + services: + - osd + enum_values: + - none + - force + see_also: + - ms_compress_secure + flags: + - runtime +- name: ms_osd_compress_min_size + type: uint + level: advanced + desc: Minimal message size eligable for on-wire compression + default: 1_K + services: + - osd + see_also: + - ms_osd_compress_mode + flags: + - runtime +- name: ms_osd_compression_algorithm + type: str + level: advanced + desc: Compression algorithm to use in Messenger when communicating with OSD + long_desc: Compression algorithm for connections with OSD in order of preference + default: snappy zlib zstd lz4 + services: + - osd + see_also: + - ms_osd_compress_mode + flags: + - runtime +- name: ms_compress_secure + type: bool + level: advanced + desc: Allowing compression when on-wire encryption is enabled + long_desc: Combining encryption with compression reduces the level of security of + messages between peers. In case both encryption and compression are enabled, + compression setting will be ignored and message will not be compressed. + This behaviour can be override using this setting. + default: false + see_also: + - ms_osd_compress_mode + flags: + - runtime - name: ms_learn_addr_from_peer type: bool level: advanced diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index c58161f267b1..8104227c7e1a 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -87,6 +87,7 @@ add_library(crimson-common STATIC ${PROJECT_SOURCE_DIR}/src/common/PluginRegistry.cc ${PROJECT_SOURCE_DIR}/src/common/RefCountedObj.cc ${PROJECT_SOURCE_DIR}/src/common/util.cc + ${PROJECT_SOURCE_DIR}/src/compressor/Compressor.cc ${PROJECT_SOURCE_DIR}/src/crush/builder.c ${PROJECT_SOURCE_DIR}/src/crush/mapper.c ${PROJECT_SOURCE_DIR}/src/crush/crush.c @@ -170,6 +171,7 @@ set(crimson_mon_srcs ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc) set(crimson_net_srcs ${PROJECT_SOURCE_DIR}/src/msg/async/crypto_onwire.cc + ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc net/Errors.cc net/Messenger.cc diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 370085a43a12..ce6f47301db8 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -304,10 +304,9 @@ seastar::future<> ProtocolV2::read_frame_payload() logger().trace("{} RECV({}) frame epilogue", conn, bl.size()); bool ok = false; try { - rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]); bufferlist rx_epilogue; rx_epilogue.append(buffer::create(std::move(bl))); - ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue); + ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); } catch (FrameError& e) { logger().error("read_frame_payload: {} {}", conn, e.what()); abort_in_fault(); @@ -1474,6 +1473,7 @@ void ProtocolV2::execute_accepting() INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; + session_comp_handlers = { nullptr, nullptr }; enable_recording(); return banner_exchange(false); }).then([this] (auto&& ret) { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 319802690cc3..ab6ad86c1bfb 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -8,6 +8,7 @@ #include "Protocol.h" #include "msg/async/frames_v2.h" #include "msg/async/crypto_onwire.h" +#include "msg/async/compression_onwire.h" namespace crimson::net { @@ -122,8 +123,9 @@ class ProtocolV2 final : public Protocol { seastar::future<> write_flush(bufferlist&& buf); ceph::crypto::onwire::rxtx_t session_stream_handlers; - ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false}; - ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false}; + ceph::compression::onwire::rxtx_t session_comp_handlers; + ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false, &session_comp_handlers}; + ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false, &session_comp_handlers}; ceph::bufferlist rx_preamble; ceph::msgr::v2::segment_bls_t rx_segments_data; diff --git a/src/include/msgr.h b/src/include/msgr.h index eedb95dd04fe..c8ad48ad1afe 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -50,11 +50,19 @@ #define HAVE_MSGR2_FEATURE(x, name) \ (((x) & (CEPH_MSGR2_FEATUREMASK_##name)) == (CEPH_MSGR2_FEATUREMASK_##name)) -DEFINE_MSGR2_FEATURE( 0, 1, REVISION_1) // msgr2.1 +DEFINE_MSGR2_FEATURE(0, 1, REVISION_1) // msgr2.1 +DEFINE_MSGR2_FEATURE(1, 1, COMPRESSION) // on-wire compression -#define CEPH_MSGR2_SUPPORTED_FEATURES (CEPH_MSGR2_FEATURE_REVISION_1) +/* + * Features supported. Should be everything above. + */ +#define CEPH_MSGR2_SUPPORTED_FEATURES \ + (CEPH_MSGR2_FEATURE_REVISION_1 | \ + CEPH_MSGR2_FEATURE_COMPRESSION | \ + 0ULL) + +#define CEPH_MSGR2_REQUIRED_FEATURES (0ULL) -#define CEPH_MSGR2_REQUIRED_FEATURES (0ull) /* diff --git a/src/msg/CMakeLists.txt b/src/msg/CMakeLists.txt index e6d0b589b429..70d6975f39b3 100644 --- a/src/msg/CMakeLists.txt +++ b/src/msg/CMakeLists.txt @@ -3,7 +3,8 @@ set(msg_srcs Message.cc Messenger.cc Connection.cc - msg_types.cc) + msg_types.cc + compressor_registry.cc) list(APPEND msg_srcs async/AsyncConnection.cc @@ -16,6 +17,7 @@ list(APPEND msg_srcs async/PosixStack.cc async/Stack.cc async/crypto_onwire.cc + async/compression_onwire.cc async/frames_v2.cc async/net_handler.cc) diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index fb85f42bc91c..edc74a9a4904 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -58,9 +58,11 @@ Messenger::Messenger(CephContext *cct_, entity_name_t w) socket_priority(-1), cct(cct_), crcflags(get_default_crc_flags(cct->_conf)), - auth_registry(cct) + auth_registry(cct), + comp_registry(cct) { auth_registry.refresh_config(); + comp_registry.refresh_config(); } void Messenger::set_endpoint_addr(const entity_addr_t& a, diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index e87f3196b1c9..c832589e88ea 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -34,6 +34,7 @@ #include "auth/Crypto.h" #include "common/item_history.h" #include "auth/AuthRegistry.h" +#include "compressor_registry.h" #include "include/ceph_assert.h" #include @@ -77,7 +78,9 @@ struct Interceptor { READY, HANDLE_MESSAGE, READ_MESSAGE_COMPLETE, - SESSION_RETRY + SESSION_RETRY, + SEND_COMPRESSION_REQUEST, + HANDLE_COMPRESSION_REQUEST }; virtual ~Interceptor() {} @@ -228,6 +231,9 @@ public: auth_server = as; } + // for compression + CompressorRegistry comp_registry; + protected: /** * std::set messenger's address diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 0f40b32ac46b..23a4dfafbb58 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -26,8 +26,10 @@ std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { << " cs=" << connect_seq << " l=" << connection->policy.lossy << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1) - << " rx=" << session_stream_handlers.rx.get() + << " crypto rx=" << session_stream_handlers.rx.get() << " tx=" << session_stream_handlers.tx.get() + << " comp rx=" << session_compression_handlers.rx.get() + << " tx=" << session_compression_handlers.tx.get() << ")."; } @@ -94,8 +96,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) replacing(false), can_write(false), bannerExchangeCallback(nullptr), - tx_frame_asm(&session_stream_handlers, false), - rx_frame_asm(&session_stream_handlers, false), + tx_frame_asm(&session_stream_handlers, false, &session_compression_handlers), + rx_frame_asm(&session_stream_handlers, false, &session_compression_handlers), next_tag(static_cast(0)), keepalive(false) { } @@ -246,15 +248,17 @@ void ProtocolV2::reset_recv_state() { // `write_event()` unlocks it just before calling `write_message()`. // `submit_to()` here is NOT blocking. connection->center->submit_to(connection->center->get_id(), [this] { - ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers" + ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers" << dendl; // Possibly unnecessary. See the comment in `deactivate_existing`. std::lock_guard l(connection->lock); std::lock_guard wl(connection->write_lock); reset_security(); + reset_compression(); }, /* always_async = */true); } else { reset_security(); + reset_compression(); } // clean read and write callbacks @@ -617,6 +621,14 @@ void ProtocolV2::handle_message_ack(uint64_t seq) { } } +void ProtocolV2::reset_compression() { + ldout(cct, 5) << __func__ << dendl; + + comp_meta = CompConnectionMeta{}; + session_compression_handlers.rx.reset(nullptr); + session_compression_handlers.tx.reset(nullptr); +} + void ProtocolV2::write_event() { ldout(cct, 10) << __func__ << dendl; ssize_t r = 0; @@ -1135,6 +1147,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { case Tag::KEEPALIVE2_ACK: case Tag::ACK: case Tag::WAIT: + case Tag::COMPRESSION_REQUEST: + case Tag::COMPRESSION_DONE: return handle_frame_payload(); case Tag::MESSAGE: return handle_message(); @@ -1249,6 +1263,10 @@ CtPtr ProtocolV2::handle_frame_payload() { return handle_message_ack(payload); case Tag::WAIT: return handle_wait(payload); + case Tag::COMPRESSION_REQUEST: + return handle_compression_request(payload); + case Tag::COMPRESSION_DONE: + return handle_compression_done(payload); default: ceph_abort(); } @@ -1304,11 +1322,9 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r) } CtPtr ProtocolV2::_handle_read_frame_epilogue_main() { - bool aborted; + bool ok = false; try { - rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]); - aborted = !rx_frame_asm.disassemble_remaining_segments( - rx_segments_data.data(), rx_epilogue); + ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue); } catch (FrameError& e) { ldout(cct, 1) << __func__ << " " << e.what() << dendl; return _fault(); @@ -1320,7 +1336,7 @@ CtPtr ProtocolV2::_handle_read_frame_epilogue_main() { // we do have a mechanism that allows transmitter to start sending message // and abort after putting entire data field on wire. This will be used by // the kernel client to avoid unnecessary buffering. - if (aborted) { + if (!ok) { reset_throttle(); state = READY; return CONTINUE(read_frame); @@ -1828,6 +1844,29 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload) } CtPtr ProtocolV2::finish_client_auth() { + if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) { + return send_compression_request(); + } + + return start_session_connect(); +} + +CtPtr ProtocolV2::finish_server_auth() { + // server had sent AuthDone and client responded with correct pre-auth + // signature. + // We can start conditioanl msgr protocol + if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) { + state = COMPRESSION_ACCEPTING; + } else { + // No msgr protocol features to process + // we can start accepting new sessions/reconnects. + state = SESSION_ACCEPTING; + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::start_session_connect() { if (!server_cookie) { ceph_assert(connect_seq == 0); state = SESSION_CONNECTING; @@ -2087,6 +2126,40 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload) return ready(); } +CtPtr ProtocolV2::send_compression_request() { + state = COMPRESSION_CONNECTING; + + const entity_type_t peer_type = connection->get_peer_type(); + comp_meta.con_mode = + static_cast( + messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure())); + const auto preferred_methods = messenger->comp_registry.get_methods(peer_type); + auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods); + + INTERCEPT(19); + return WRITE(comp_req_frame, "compression request", read_frame); +} + +CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) { + if (state != COMPRESSION_CONNECTING) { + lderr(cct) << __func__ << " state changed!" << dendl; + return _fault(); + } + + auto response = CompressionDoneFrame::Decode(payload); + ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress() + << ", method=" << response.method() << ")" << dendl; + + comp_meta.con_method = static_cast(response.method()); + if (comp_meta.is_compress() != response.is_compress()) { + comp_meta.con_mode = Compressor::COMP_NONE; + } + session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair( + cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type())); + + return start_session_connect(); +} + /* Server Protocol Methods */ CtPtr ProtocolV2::start_server_banner_exchange() { @@ -2250,10 +2323,8 @@ CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload) } if (state == AUTH_ACCEPTING_SIGN) { - // server had sent AuthDone and client responded with correct pre-auth - // signature. we can start accepting new sessions/reconnects. - state = SESSION_ACCEPTING; - return CONTINUE(read_frame); + // this happened on server side + return finish_server_auth(); } else if (state == AUTH_CONNECTING_SIGN) { // this happened at client side return finish_client_auth(); @@ -2664,7 +2735,9 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, // this happens in the event center's thread as there should be // no user outside its boundaries (simlarly to e.g. outgoing_bl). auto temp_stream_handlers = std::move(session_stream_handlers); + auto temp_compression_handlers = std::move(session_compression_handlers); exproto->auth_meta = auth_meta; + exproto->comp_meta = comp_meta; ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing" << dendl; @@ -2693,7 +2766,8 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, new_worker, new_center, exproto, - temp_stream_handlers=std::move(temp_stream_handlers) + temp_stream_handlers=std::move(temp_stream_handlers), + temp_compression_handlers=std::move(temp_compression_handlers) ](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { @@ -2708,6 +2782,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, existing->outgoing_bl.clear(); existing->open_write = false; exproto->session_stream_handlers = std::move(temp_stream_handlers); + exproto->session_compression_handlers = std::move(temp_compression_handlers); existing->write_lock.unlock(); if (exproto->state == NONE) { existing->shutdown_socket(); @@ -2895,3 +2970,43 @@ CtPtr ProtocolV2::send_reconnect_ok() { return WRITE(reconnect_ok, "reconnect ok", server_ready); } + + +CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) { + if (state != COMPRESSION_ACCEPTING) { + lderr(cct) << __func__ << " state changed!" << dendl; + return _fault(); + } + + auto request = CompressionRequestFrame::Decode(payload); + ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress() + << ", preferred_methods=" << request.preferred_methods() << ")" << dendl; + + const int peer_type = connection->get_peer_type(); + if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode( + peer_type, auth_meta->is_mode_secure()); + mode != Compressor::COMP_NONE && request.is_compress()) { + comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods()); + if (comp_meta.con_method != Compressor::COMP_ALG_NONE) { + comp_meta.con_mode = mode; + } + } else { + comp_meta.con_method = Compressor::COMP_ALG_NONE; + } + + auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method()); + + INTERCEPT(20); + return WRITE(response, "compression done", finish_compression); +} + +CtPtr ProtocolV2::finish_compression() { + // TODO: having a possibility to check whether we're server or client could + // allow reusing finish_compression(). + + session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair( + cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type())); + + state = SESSION_ACCEPTING; + return CONTINUE(read_frame); +} diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 087553891ef1..6441866fea4c 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -6,6 +6,8 @@ #include "Protocol.h" #include "crypto_onwire.h" +#include "compression_meta.h" +#include "compression_onwire.h" #include "frames_v2.h" class ProtocolV2 : public Protocol { @@ -17,6 +19,7 @@ private: HELLO_CONNECTING, AUTH_CONNECTING, AUTH_CONNECTING_SIGN, + COMPRESSION_CONNECTING, SESSION_CONNECTING, SESSION_RECONNECTING, START_ACCEPT, @@ -25,6 +28,7 @@ private: AUTH_ACCEPTING, AUTH_ACCEPTING_MORE, AUTH_ACCEPTING_SIGN, + COMPRESSION_ACCEPTING, SESSION_ACCEPTING, READY, THROTTLE_MESSAGE, @@ -44,6 +48,7 @@ private: "HELLO_CONNECTING", "AUTH_CONNECTING", "AUTH_CONNECTING_SIGN", + "COMPRESSION_CONNECTING", "SESSION_CONNECTING", "SESSION_RECONNECTING", "START_ACCEPT", @@ -52,6 +57,7 @@ private: "AUTH_ACCEPTING", "AUTH_ACCEPTING_MORE", "AUTH_ACCEPTING_SIGN", + "COMPRESSION_ACCEPTING", "SESSION_ACCEPTING", "READY", "THROTTLE_MESSAGE", @@ -67,7 +73,9 @@ private: // TODO: move into auth_meta? ceph::crypto::onwire::rxtx_t session_stream_handlers; - + ceph::compression::onwire::rxtx_t session_compression_handlers; + +private: entity_name_t peer_name; State state; uint64_t peer_supported_features; // CEPH_MSGR2_FEATURE_* @@ -114,6 +122,7 @@ private: bool keepalive; bool write_in_progress = false; + CompConnectionMeta comp_meta; std::ostream& _conn_prefix(std::ostream *_dout); void run_continuation(Ct *pcontinuation); void run_continuation(Ct &continuation); @@ -143,6 +152,7 @@ private: out_queue_entry_t _get_next_outgoing(); ssize_t write_message(Message *m, bool more); void handle_message_ack(uint64_t seq); + void reset_compression(); CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner); READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner); @@ -162,10 +172,12 @@ private: CONTINUATION_DECL(ProtocolV2, throttle_message); CONTINUATION_DECL(ProtocolV2, throttle_bytes); CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue); + CONTINUATION_DECL(ProtocolV2, finish_compression); Ct *read_frame(); Ct *finish_auth(); Ct *finish_client_auth(); + Ct *finish_server_auth(); Ct *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r); Ct *read_frame_segment(); Ct *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r); @@ -174,6 +186,7 @@ private: Ct *_handle_read_frame_epilogue_main(); Ct *handle_read_frame_dispatch(); Ct *handle_frame_payload(); + Ct *finish_compression(); Ct *ready(); @@ -221,6 +234,7 @@ private: Ct *handle_auth_reply_more(ceph::bufferlist &payload); Ct *handle_auth_done(ceph::bufferlist &payload); Ct *handle_auth_signature(ceph::bufferlist &payload); + Ct *start_session_connect(); Ct *send_client_ident(); Ct *send_reconnect(); Ct *handle_ident_missing_features(ceph::bufferlist &payload); @@ -230,6 +244,9 @@ private: Ct *handle_wait(ceph::bufferlist &payload); Ct *handle_reconnect_ok(ceph::bufferlist &payload); Ct *handle_server_ident(ceph::bufferlist &payload); + Ct *send_compression_request(); + Ct *handle_compression_done(ceph::bufferlist &payload); + // Server Protocol CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange); @@ -251,6 +268,7 @@ private: Ct *send_server_ident(); Ct *send_reconnect_ok(); Ct *server_ready(); + Ct *handle_compression_request(ceph::bufferlist &payload); size_t get_current_msg_size() const; }; diff --git a/src/msg/async/compression_meta.h b/src/msg/async/compression_meta.h new file mode 100644 index 000000000000..404e04bec65f --- /dev/null +++ b/src/msg/async/compression_meta.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "compressor/Compressor.h" + +struct CompConnectionMeta { + TOPNSPC::Compressor::CompressionMode con_mode = + TOPNSPC::Compressor::COMP_NONE; // negotiated mode + TOPNSPC::Compressor::CompressionAlgorithm con_method = + TOPNSPC::Compressor::COMP_ALG_NONE; // negotiated method + + bool is_compress() const { + return con_mode != TOPNSPC::Compressor::COMP_NONE; + } + TOPNSPC::Compressor::CompressionAlgorithm get_method() const { + return con_method; + } + TOPNSPC::Compressor::CompressionMode get_mode() const { + return con_mode; + } +}; diff --git a/src/msg/async/compression_onwire.cc b/src/msg/async/compression_onwire.cc new file mode 100644 index 000000000000..4c5ffac2c324 --- /dev/null +++ b/src/msg/async/compression_onwire.cc @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "compression_onwire.h" +#include "compression_meta.h" +#include "common/dout.h" + +#define dout_subsys ceph_subsys_ms + +namespace ceph::compression::onwire { + +rxtx_t rxtx_t::create_handler_pair( + CephContext* ctx, + const CompConnectionMeta& comp_meta, + std::uint64_t compress_min_size) +{ + if (comp_meta.is_compress()) { + CompressorRef compressor = Compressor::create(ctx, comp_meta.get_method()); + if (compressor) { + return {std::make_unique(ctx, compressor), + std::make_unique(ctx, compressor, + comp_meta.get_mode(), + compress_min_size)}; + } + } + return {}; +} + +std::optional TxHandler::compress(const ceph::bufferlist &input) +{ + if (m_init_onwire_size < m_min_size) { + ldout(m_cct, 20) << __func__ + << " discovered frame that is smaller than threshold, aborting compression" + << dendl; + return {}; + } + + m_compress_potential -= input.length(); + + ceph::bufferlist out; + if (input.length() == 0) { + ldout(m_cct, 20) << __func__ + << " discovered an empty segment, skipping compression without aborting" + << dendl; + out.clear(); + return out; + } + + boost::optional compressor_message; + if (m_compressor->compress(input, out, compressor_message)) { + return {}; + } else { + ldout(m_cct, 20) << __func__ << " uncompressed.length()=" << input.length() + << " compressed.length()=" << out.length() << dendl; + m_onwire_size += out.length(); + return out; + } +} + +std::optional RxHandler::decompress(const ceph::bufferlist &input) +{ + ceph::bufferlist out; + if (input.length() == 0) { + ldout(m_cct, 20) << __func__ + << " discovered an empty segment, skipping decompression without aborting" + << dendl; + out.clear(); + return out; + } + + boost::optional compressor_message; + if (m_compressor->decompress(input, out, compressor_message)) { + return {}; + } else { + ldout(m_cct, 20) << __func__ << " compressed.length()=" << input.length() + << " uncompressed.length()=" << out.length() << dendl; + return out; + } +} + +void TxHandler::done() +{ + ldout(m_cct, 25) << __func__ << " compression ratio=" << get_ratio() << dendl; +} + +} // namespace ceph::compression::onwire diff --git a/src/msg/async/compression_onwire.h b/src/msg/async/compression_onwire.h new file mode 100644 index 000000000000..e90274a6c5c4 --- /dev/null +++ b/src/msg/async/compression_onwire.h @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_COMPRESSION_ONWIRE_H +#define CEPH_COMPRESSION_ONWIRE_H + +#include "compressor/Compressor.h" +#include "include/buffer.h" + +class CompConnectionMeta; + +namespace ceph::compression::onwire { + using Compressor = TOPNSPC::Compressor; + using CompressorRef = TOPNSPC::CompressorRef; + + class Handler { + public: + Handler(CephContext* const cct, CompressorRef compressor) + : m_cct(cct), m_compressor(compressor) {} + + protected: + CephContext* const m_cct; + CompressorRef m_compressor; + }; + + class RxHandler final : private Handler { + public: + RxHandler(CephContext* const cct, CompressorRef compressor) + : Handler(cct, compressor) {} + ~RxHandler() {}; + + /** + * Decompresses a bufferlist + * + * @param input compressed bufferlist + * @param out decompressed bufferlist + * + * @returns true on success, false on failure + */ + std::optional decompress(const ceph::bufferlist &input); + }; + + class TxHandler final : private Handler { + public: + TxHandler(CephContext* const cct, CompressorRef compressor, int mode, std::uint64_t min_size) + : Handler(cct, compressor), + m_min_size(min_size), + m_mode(static_cast(mode)) + {} + ~TxHandler() {} + + void reset_handler(int num_segments, uint64_t size) { + m_init_onwire_size = size; + m_compress_potential = size; + m_onwire_size = 0; + } + + void done(); + + /** + * Compresses a bufferlist + * + * @param input bufferlist to compress + * @param out compressed bufferlist + * + * @returns true on success, false on failure + */ + std::optional compress(const ceph::bufferlist &input); + + double get_ratio() const { + return get_initial_size() / (double) get_final_size(); + } + + uint64_t get_initial_size() const { + return m_init_onwire_size; + } + + uint64_t get_final_size() const { + return m_onwire_size; + } + + private: + uint64_t m_min_size; + Compressor::CompressionMode m_mode; + + uint64_t m_init_onwire_size; + uint64_t m_onwire_size; + uint64_t m_compress_potential; + }; + + struct rxtx_t { + std::unique_ptr rx; + std::unique_ptr tx; + + static rxtx_t create_handler_pair( + CephContext* ctx, + const CompConnectionMeta& comp_meta, + std::uint64_t compress_min_size); + }; +} + +#endif // CEPH_COMPRESSION_ONWIRE_H diff --git a/src/msg/async/frames_v2.cc b/src/msg/async/frames_v2.cc index 7a0b5907b248..8a2975347636 100644 --- a/src/msg/async/frames_v2.cc +++ b/src/msg/async/frames_v2.cc @@ -72,6 +72,8 @@ void FrameAssembler::fill_preamble(Tag tag, preamble.segments[i].alignment = m_descs[i].align; } preamble.num_segments = m_descs.size(); + preamble.flags = m_flags; + preamble.crc = ceph_crc32c( 0, reinterpret_cast(&preamble), sizeof(preamble) - sizeof(preamble.crc)); @@ -239,13 +241,18 @@ bufferlist FrameAssembler::asm_secure_rev1(const preamble_block_t& preamble, bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[], const uint16_t segment_aligns[], - size_t segment_count) { + size_t segment_count) { + m_flags = 0; m_descs.resize(calc_num_segments(segment_bls, segment_count)); for (size_t i = 0; i < m_descs.size(); i++) { m_descs[i].logical_len = segment_bls[i].length(); m_descs[i].align = segment_aligns[i]; } + if (m_compression->tx) { + asm_compress(segment_bls); + } + preamble_block_t preamble; fill_preamble(tag, preamble); @@ -317,6 +324,12 @@ Tag FrameAssembler::disassemble_preamble(bufferlist& preamble_bl) { m_descs[i].logical_len = preamble->segments[i].length; m_descs[i].align = preamble->segments[i].alignment; } + + m_flags = preamble->flags; + // If frame has been compressed, + // we need to make sure the compression handler has been setup + ceph_assert_always(!is_compressed() || m_compression->rx); + return static_cast(preamble->tag); } @@ -423,6 +436,19 @@ bool FrameAssembler::disasm_remaining_secure_rev1( return check_epilogue_late_status(epilogue->late_status); } +bool FrameAssembler::disassemble_segments(bufferlist& preamble_bl, + bufferlist segments_bls[], bufferlist& epilogue_bl) const { + disassemble_first_segment(preamble_bl, segments_bls[0]); + if (disassemble_remaining_segments(segments_bls, epilogue_bl)) { + if (is_compressed()) { + disassemble_decompress(segments_bls); + } + return true; + } + + return false; +} + void FrameAssembler::disassemble_first_segment(bufferlist& preamble_bl, bufferlist& segment_bl) const { ceph_assert(!m_descs.empty()); @@ -445,15 +471,15 @@ bool FrameAssembler::disassemble_remaining_segments( // no epilogue if only one segment ceph_assert(epilogue_bl.length() == 0); return true; - } - if (m_crypto->rx) { + } else if (m_crypto->rx) { return disasm_remaining_secure_rev1(segment_bls, epilogue_bl); + } else { + return disasm_remaining_crc_rev1(segment_bls, epilogue_bl); } - return disasm_remaining_crc_rev1(segment_bls, epilogue_bl); - } - if (m_crypto->rx) { + } else if (m_crypto->rx) { return disasm_all_secure_rev0(segment_bls, epilogue_bl); - } + } + return disasm_all_crc_rev0(segment_bls, epilogue_bl); } @@ -469,8 +495,49 @@ std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) { } os << "rev1=" << frame_asm.m_is_rev1 << " rx=" << frame_asm.m_crypto->rx.get() - << " tx=" << frame_asm.m_crypto->tx.get(); + << " tx=" << frame_asm.m_crypto->tx.get() + << " comp rx=" << frame_asm.m_compression->rx.get() + << " comp tx=" << frame_asm.m_compression->tx.get() + << " compressed=" << frame_asm.is_compressed(); return os; } +void FrameAssembler::asm_compress(bufferlist segment_bls[]) { + std::array compressed; + + m_compression->tx->reset_handler(m_descs.size(), get_frame_logical_len()); + + bool abort = false; + for (size_t i = 0; (i < m_descs.size()) && !abort; i++) { + auto out = m_compression->tx->compress(segment_bls[i]); + if (!out) { + abort = true; + } else { + compressed[i] = std::move(*out); + } + } + + if (!abort) { + m_compression->tx->done(); + + for (size_t i = 0; i < m_descs.size(); i++) { + segment_bls[i].swap(compressed[i]); + m_descs[i].logical_len = segment_bls[i].length(); + } + + m_flags |= FRAME_EARLY_DATA_COMPRESSED; + } +} + +void FrameAssembler::disassemble_decompress(bufferlist segment_bls[]) const { + for (size_t i = 0; i < m_descs.size(); i++) { + auto out = m_compression->rx->decompress(segment_bls[i]); + if (!out) { + throw FrameError("Segment decompression failed"); + } else { + segment_bls[i] = std::move(*out); + } + } +} + } // namespace ceph::msgr::v2 diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index 94d4d1732b20..5b786e1aff88 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -4,6 +4,7 @@ #include "include/types.h" #include "common/Clock.h" #include "crypto_onwire.h" +#include "compression_onwire.h" #include #include #include @@ -55,7 +56,9 @@ enum class Tag : __u8 { MESSAGE, KEEPALIVE2, KEEPALIVE2_ACK, - ACK + ACK, + COMPRESSION_REQUEST, + COMPRESSION_DONE }; struct segment_t { @@ -102,7 +105,9 @@ struct preamble_block_t { __u8 num_segments; segment_t segments[MAX_NUM_SEGMENTS]; - __u8 _reserved[2]; + + __u8 flags; + __u8 _reserved; // CRC32 for this single preamble block. ceph_le32 crc; @@ -169,6 +174,12 @@ static constexpr uint32_t FRAME_PREAMBLE_WITH_INLINE_SIZE = #define FRAME_LATE_STATUS_RESERVED_FALSE 0xe0 #define FRAME_LATE_STATUS_RESERVED_MASK 0xf0 +// For msgr 2.1, FRAME_EARLY_X flags are sent as part of epilogue. +// +// This flag indicates whether frame segments have been compressed by +// sender, and used in segments' disassemblig phase. +#define FRAME_EARLY_DATA_COMPRESSED 0X1 + struct FrameError : std::runtime_error { using runtime_error::runtime_error; }; @@ -176,11 +187,13 @@ struct FrameError : std::runtime_error { class FrameAssembler { public: // crypto must be non-null - FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1) - : m_crypto(crypto), m_is_rev1(is_rev1) {} + FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1, + const ceph::compression::onwire::rxtx_t* compression) + : m_crypto(crypto), m_is_rev1(is_rev1), m_compression(compression) {} void set_is_rev1(bool is_rev1) { m_descs.clear(); + m_flags = 0; m_is_rev1 = is_rev1; } @@ -299,6 +312,40 @@ public: Tag disassemble_preamble(bufferlist& preamble_bl); + bool disassemble_segments(bufferlist& preamble_bl, + bufferlist segments_bls[], + bufferlist& epilogue_bl) const; + +private: + struct segment_desc_t { + uint32_t logical_len; + uint16_t align; + }; + + uint32_t get_segment_padded_len(size_t seg_idx) const { + return p2roundup(m_descs[seg_idx].logical_len, + CRYPTO_BLOCK_SIZE); + } + + uint32_t get_auth_tag_len() const { + return m_crypto->rx->get_extra_size_at_final(); + } + + bool is_compressed() const { + return m_flags & FRAME_EARLY_DATA_COMPRESSED; + } + + void asm_compress(bufferlist segment_bls[]); + + bufferlist asm_crc_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + bufferlist asm_secure_rev0(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + bufferlist asm_crc_rev1(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + bufferlist asm_secure_rev1(const preamble_block_t& preamble, + bufferlist segment_bls[]) const; + // Like msgr1, and unlike msgr2.0, msgr2.1 allows interpreting the // first segment before reading in the rest of the frame. // @@ -312,6 +359,7 @@ public: // user-provided buffers // - read in epilogue // - call disassemble_remaining_segments() + // - call disasm_all_decompress() // // For msgr2.0 (set_is_rev1(false)), disassemble_first_segment() is // a noop. To accomodate, disassemble_remaining_segments() always @@ -321,6 +369,7 @@ public: // - read in all segments // - read in epilogue // - call disassemble_remaining_segments() + // - call disasm_all_decompress() // // disassemble_remaining_segments() returns true if the frame is // ready for dispatching, or false if it was aborted by the sender @@ -329,30 +378,7 @@ public: bufferlist& segment_bl) const; bool disassemble_remaining_segments(bufferlist segment_bls[], bufferlist& epilogue_bl) const; - -private: - struct segment_desc_t { - uint32_t logical_len; - uint16_t align; - }; - - uint32_t get_segment_padded_len(size_t seg_idx) const { - return p2roundup(m_descs[seg_idx].logical_len, - CRYPTO_BLOCK_SIZE); - } - - uint32_t get_auth_tag_len() const { - return m_crypto->rx->get_extra_size_at_final(); - } - - bufferlist asm_crc_rev0(const preamble_block_t& preamble, - bufferlist segment_bls[]) const; - bufferlist asm_secure_rev0(const preamble_block_t& preamble, - bufferlist segment_bls[]) const; - bufferlist asm_crc_rev1(const preamble_block_t& preamble, - bufferlist segment_bls[]) const; - bufferlist asm_secure_rev1(const preamble_block_t& preamble, - bufferlist segment_bls[]) const; + void disassemble_decompress(bufferlist segment_bls[]) const; bool disasm_all_crc_rev0(bufferlist segment_bls[], bufferlist& epilogue_bl) const; @@ -372,8 +398,10 @@ private: const FrameAssembler& frame_asm); boost::container::static_vector m_descs; + __u8 m_flags; const ceph::crypto::onwire::rxtx_t* m_crypto; bool m_is_rev1; // msgr2.1? + const ceph::compression::onwire::rxtx_t* m_compression; }; template @@ -837,6 +865,34 @@ protected: using Frame::Frame; }; +struct CompressionRequestFrame : public ControlFrame> { // preferred methods + static const Tag tag = Tag::COMPRESSION_REQUEST; + using ControlFrame::Encode; + using ControlFrame::Decode; + + inline bool &is_compress() { return get_val<0>(); } + inline std::vector &preferred_methods() { return get_val<1>(); } + +protected: + using ControlFrame::ControlFrame; +}; + +struct CompressionDoneFrame : public ControlFrame { // method + static const Tag tag = Tag::COMPRESSION_DONE; + using ControlFrame::Encode; + using ControlFrame::Decode; + + inline bool &is_compress() { return get_val<0>(); } + inline uint32_t &method() { return get_val<1>(); } + +protected: + using ControlFrame::ControlFrame; +}; + } // namespace ceph::msgr::v2 #endif // _MSG_ASYNC_FRAMES_V2_ diff --git a/src/msg/compressor_registry.cc b/src/msg/compressor_registry.cc new file mode 100644 index 000000000000..61efa0dd0993 --- /dev/null +++ b/src/msg/compressor_registry.cc @@ -0,0 +1,126 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "compressor_registry.h" +#include "common/dout.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "CompressorRegistry(" << this << ") " + +CompressorRegistry::CompressorRegistry(CephContext *cct) + : cct(cct) +{ + cct->_conf.add_observer(this); +} + +CompressorRegistry::~CompressorRegistry() +{ + cct->_conf.remove_observer(this); +} + +const char** CompressorRegistry::get_tracked_conf_keys() const +{ + static const char *keys[] = { + "ms_osd_compress_mode", + "ms_osd_compression_algorithm", + "ms_osd_compress_min_size", + "ms_compress_secure", + nullptr + }; + return keys; +} + +void CompressorRegistry::handle_conf_change( + const ConfigProxy& conf, + const std::set& changed) +{ + std::scoped_lock l(lock); + _refresh_config(); +} + +std::vector CompressorRegistry::_parse_method_list(const string& s) +{ + std::vector methods; + + for_each_substr(s, ";,= \t", [&] (auto method) { + ldout(cct,20) << "adding algorithm method: " << method << dendl; + + auto alg_type = Compressor::get_comp_alg_type(method); + if (alg_type) { + methods.push_back(*alg_type); + } else { + ldout(cct,5) << "WARNING: unknown algorithm method " << method << dendl; + } + }); + + if (methods.empty()) { + methods.push_back(Compressor::COMP_ALG_NONE); + } + ldout(cct,20) << __func__ << " " << s << " -> " << methods << dendl; + + return methods; +} + +void CompressorRegistry::_refresh_config() +{ + auto c_mode = Compressor::get_comp_mode_type(cct->_conf.get_val("ms_osd_compress_mode")); + + if (c_mode) { + ms_osd_compress_mode = *c_mode; + } else { + ldout(cct,1) << __func__ << " failed to identify ms_osd_compress_mode " + << ms_osd_compress_mode << dendl; + + ms_osd_compress_mode = Compressor::COMP_NONE; + } + + ms_osd_compression_methods = _parse_method_list(cct->_conf.get_val("ms_osd_compression_algorithm")); + ms_osd_compress_min_size = cct->_conf.get_val("ms_osd_compress_min_size"); + + ms_compress_secure = cct->_conf.get_val("ms_compress_secure"); + + ldout(cct,10) << __func__ << " ms_osd_compression_mode " << ms_osd_compress_mode + << " ms_osd_compression_methods " << ms_osd_compression_methods + << " ms_osd_compress_above_min_size " << ms_osd_compress_min_size + << " ms_compress_secure " << ms_compress_secure + << dendl; +} + +Compressor::CompressionAlgorithm +CompressorRegistry::pick_method(uint32_t peer_type, + const std::vector& preferred_methods) +{ + std::vector allowed_methods = get_methods(peer_type); + auto preferred = std::find_first_of(preferred_methods.begin(), + preferred_methods.end(), + allowed_methods.begin(), + allowed_methods.end()); + if (preferred == preferred_methods.end()) { + ldout(cct,1) << "failed to pick compression method from client's " + << preferred_methods + << " and our " << allowed_methods << dendl; + return Compressor::COMP_ALG_NONE; + } else { + return static_cast(*preferred); + } +} + +Compressor::CompressionMode +CompressorRegistry::get_mode(uint32_t peer_type, bool is_secure) +{ + std::scoped_lock l(lock); + ldout(cct, 20) << __func__ << " peer_type " << peer_type + << " is_secure " << is_secure << dendl; + + if (is_secure && !ms_compress_secure) { + return Compressor::COMP_NONE; + } + + switch (peer_type) { + case CEPH_ENTITY_TYPE_OSD: + return static_cast(ms_osd_compress_mode); + default: + return Compressor::COMP_NONE; + } +} diff --git a/src/msg/compressor_registry.h b/src/msg/compressor_registry.h new file mode 100644 index 000000000000..4a28cc16fa48 --- /dev/null +++ b/src/msg/compressor_registry.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "compressor/Compressor.h" +#include "common/ceph_mutex.h" +#include "common/ceph_context.h" +#include "common/config_cacher.h" + +class CompressorRegistry : public md_config_obs_t { +public: + CompressorRegistry(CephContext *cct); + ~CompressorRegistry(); + + void refresh_config() { + std::scoped_lock l(lock); + _refresh_config(); + } + + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set& changed) override; + + Compressor::CompressionAlgorithm pick_method(uint32_t peer_type, + const std::vector& preferred_methods); + + Compressor::CompressionMode get_mode(uint32_t peer_type, bool is_secure); + + const std::vector get_methods(uint32_t peer_type) { + std::scoped_lock l(lock); + switch (peer_type) { + case CEPH_ENTITY_TYPE_OSD: + return ms_osd_compression_methods; + default: + return {}; + } + } + + uint64_t get_min_compression_size(uint32_t peer_type) const { + std::scoped_lock l(lock); + switch (peer_type) { + case CEPH_ENTITY_TYPE_OSD: + return ms_osd_compress_min_size; + default: + return 0; + } + } + + bool get_is_compress_secure() const { + std::scoped_lock l(lock); + return ms_compress_secure; + } + +private: + CephContext *cct; + mutable ceph::mutex lock = ceph::make_mutex("CompressorRegistry::lock"); + + uint32_t ms_osd_compress_mode; + bool ms_compress_secure; + std::uint64_t ms_osd_compress_min_size; + std::vector ms_osd_compression_methods; + + void _refresh_config(); + std::vector _parse_method_list(const string& s); +}; diff --git a/src/test/compressor/CMakeLists.txt b/src/test/compressor/CMakeLists.txt index 937694ced602..3a170a9429b0 100644 --- a/src/test/compressor/CMakeLists.txt +++ b/src/test/compressor/CMakeLists.txt @@ -8,4 +8,4 @@ add_executable(unittest_compression ) add_ceph_unittest(unittest_compression) target_link_libraries(unittest_compression global GTest::GTest) -add_dependencies(unittest_compression ceph_example) +add_dependencies(unittest_compression ceph_example) \ No newline at end of file diff --git a/src/test/msgr/CMakeLists.txt b/src/test/msgr/CMakeLists.txt index 4791f7c2f6e7..beaa7133d8ea 100644 --- a/src/test/msgr/CMakeLists.txt +++ b/src/test/msgr/CMakeLists.txt @@ -31,6 +31,13 @@ add_executable(unittest_frames_v2 test_frames_v2.cc) add_ceph_unittest(unittest_frames_v2) target_link_libraries(unittest_frames_v2 os global ${UNITTEST_LIBS}) +add_executable(unittest_comp_registry + test_comp_registry.cc + $ + ) +add_ceph_unittest(unittest_comp_registry) +target_link_libraries(unittest_comp_registry global) + # test_userspace_event if(HAVE_DPDK) add_executable(ceph_test_userspace_event diff --git a/src/test/msgr/test_comp_registry.cc b/src/test/msgr/test_comp_registry.cc new file mode 100644 index 000000000000..f0271d95dc1d --- /dev/null +++ b/src/test/msgr/test_comp_registry.cc @@ -0,0 +1,98 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" +#include "include/stringify.h" +#include "compressor/Compressor.h" +#include "msg/compressor_registry.h" +#include "gtest/gtest.h" +#include "common/ceph_context.h" +#include "global/global_context.h" + +#include + +TEST(CompressorRegistry, con_modes) +{ + auto cct = g_ceph_context; + CompressorRegistry reg(cct); + std::vector methods; + uint32_t method; + uint32_t mode; + + const std::vector snappy_method = { Compressor::COMP_ALG_SNAPPY }; + const std::vector zlib_method = { Compressor::COMP_ALG_ZLIB }; + const std::vector both_methods = { Compressor::COMP_ALG_ZLIB, Compressor::COMP_ALG_SNAPPY}; + const std::vector no_method = { Compressor::COMP_ALG_NONE }; + + cct->_conf.set_val( + "enable_experimental_unrecoverable_data_corrupting_features", "*"); + + // baseline: compression for communication with osd is enabled + cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT); + cct->_conf.set_val("ms_osd_compress_mode", "force"); + cct->_conf.set_val("ms_osd_compression_algorithm", "snappy"); + cct->_conf.set_val("ms_compress_secure", "false"); + cct->_conf.apply_changes(NULL); + + ASSERT_EQ(reg.get_is_compress_secure(), false); + + methods = reg.get_methods(CEPH_ENTITY_TYPE_MON); + ASSERT_EQ(methods.size(), 0); + method = reg.pick_method(CEPH_ENTITY_TYPE_MON, both_methods); + ASSERT_EQ(method, Compressor::COMP_ALG_NONE); + mode = reg.get_mode(CEPH_ENTITY_TYPE_MON, false); + ASSERT_EQ(mode, Compressor::COMP_NONE); + + methods = reg.get_methods(CEPH_ENTITY_TYPE_OSD); + ASSERT_EQ(methods, snappy_method); + const std::vector rev_both_methods (both_methods.rbegin(), both_methods.rend()); + method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, rev_both_methods); + ASSERT_EQ(method, Compressor::COMP_ALG_SNAPPY); + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false); + ASSERT_EQ(mode, Compressor::COMP_FORCE); + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true); + ASSERT_EQ(mode, Compressor::COMP_NONE); + + method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, zlib_method); + ASSERT_EQ(method, Compressor::COMP_ALG_NONE); + + // disable compression mode + cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT); + cct->_conf.set_val("ms_osd_compress_mode", "none"); + cct->_conf.apply_changes(NULL); + + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false); + ASSERT_EQ(mode, Compressor::COMP_NONE); + + // no compression methods + cct->_conf.set_val("ms_osd_compress_mode", "force"); + cct->_conf.set_val("ms_osd_compression_algorithm", "none"); + cct->_conf.apply_changes(NULL); + + method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, both_methods); + ASSERT_EQ(method, Compressor::COMP_ALG_NONE); + + // min compression size + cct->_conf.set_val("ms_osd_compress_min_size", "1024"); + cct->_conf.apply_changes(NULL); + + uint32_t s = reg.get_min_compression_size(CEPH_ENTITY_TYPE_OSD); + ASSERT_EQ(s, 1024); + + // allow secure with compression + cct->_conf.set_val("ms_osd_compress_mode", "force"); + cct->_conf.set_val("ms_osd_compression_algorithm", "snappy"); + cct->_conf.set_val("ms_compress_secure", "true"); + cct->_conf.apply_changes(NULL); + + ASSERT_EQ(reg.get_is_compress_secure(), true); + + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true); + ASSERT_EQ(mode, Compressor::COMP_FORCE); + + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false); + ASSERT_EQ(mode, Compressor::COMP_FORCE); + + // back to normalish, for the benefit of the next test(s) + cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT); +} diff --git a/src/test/msgr/test_frames_v2.cc b/src/test/msgr/test_frames_v2.cc index d384e6a81dbe..725903251a72 100644 --- a/src/test/msgr/test_frames_v2.cc +++ b/src/test/msgr/test_frames_v2.cc @@ -19,6 +19,7 @@ #include #include +#include "msg/async/compression_meta.h" #include "auth/Auth.h" #include "common/ceph_argparse.h" #include "global/global_init.h" @@ -27,6 +28,14 @@ #include +#define COMP_THRESHOLD 1 << 10 +#define EXPECT_COMPRESSED(is_compressed, val1, val2) \ + if (is_compressed && val1 > COMP_THRESHOLD) { \ + EXPECT_GE(val1, val2); \ + } else { \ + EXPECT_EQ(val1, val2); \ + } + namespace ceph::msgr::v2 { // MessageFrame with the first segment not fixed to ceph_msg_header2 @@ -87,19 +96,25 @@ protected: struct mode_t { bool is_rev1; bool is_secure; + bool is_compress; }; static std::ostream& operator<<(std::ostream& os, const mode_t& m) { os << "msgr2." << (m.is_rev1 ? "1" : "0") - << (m.is_secure ? "-secure" : "-crc"); + << (m.is_secure ? "-secure" : "-crc") + << (m.is_compress ? "-compress": "-nocompress"); return os; } static const mode_t modes[] = { - {false, false}, - {false, true}, - {true, false}, - {true, true}, + {false, false, false}, + {false, true, false}, + {true, false, false}, + {true, true, false}, + {false, false, true}, + {false, true, true}, + {true, false, true}, + {true, true, true} }; struct round_trip_instance_t { @@ -151,17 +166,16 @@ bool disassemble_frame(FrameAssembler& frame_asm, bufferlist& frame_bl, if (epilogue_onwire_len > 0) { frame_bl.splice(0, epilogue_onwire_len, &epilogue_bl); } - frame_asm.disassemble_first_segment(preamble_bl, segment_bls[0]); - return frame_asm.disassemble_remaining_segments(segment_bls.data(), - epilogue_bl); + + return frame_asm.disassemble_segments(preamble_bl, segment_bls.data(), epilogue_bl); } class RoundTripTestBase : public ::testing::TestWithParam< std::tuple> { protected: RoundTripTestBase() - : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1), - m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1), + : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1, &m_tx_comp), + m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1, &m_rx_comp), m_header(make_bufferlist(std::get<0>(GetParam()).header_len, 'H')), m_front(make_bufferlist(std::get<0>(GetParam()).front_len, 'F')), m_middle(make_bufferlist(std::get<0>(GetParam()).middle_len, 'M')), @@ -181,23 +195,37 @@ protected: g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1, /*crossed=*/true); } + + if (m.is_compress) { + CompConnectionMeta comp_meta; + comp_meta.con_mode = Compressor::COMP_FORCE; + comp_meta.con_method = Compressor::COMP_ALG_SNAPPY; + m_tx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair( + g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD + ); + m_rx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair( + g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD + ); + } } void check_frame_assembler(const FrameAssembler& frame_asm) { const auto& [rti, m] = GetParam(); const auto& onwire_lens = rti.onwire_lens[m.is_rev1 << 1 | m.is_secure]; - EXPECT_EQ(rti.header_len + rti.front_len + rti.middle_len + rti.data_len, + + EXPECT_COMPRESSED(m.is_compress, rti.header_len + rti.front_len + rti.middle_len + rti.data_len, frame_asm.get_frame_logical_len()); ASSERT_EQ(rti.num_segments, frame_asm.get_num_segments()); - EXPECT_EQ(onwire_lens[0], frame_asm.get_preamble_onwire_len()); + EXPECT_COMPRESSED(m.is_compress, onwire_lens[0], frame_asm.get_preamble_onwire_len()); for (size_t i = 0; i < rti.num_segments; i++) { - EXPECT_EQ(onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i)); + EXPECT_COMPRESSED(m.is_compress, onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i)); } - EXPECT_EQ(onwire_lens[rti.num_segments + 1], + EXPECT_COMPRESSED(m.is_compress, onwire_lens[rti.num_segments + 1], frame_asm.get_epilogue_onwire_len()); - EXPECT_EQ(std::accumulate(std::begin(onwire_lens), std::end(onwire_lens), - uint64_t(0)), - frame_asm.get_frame_onwire_len()); + EXPECT_COMPRESSED(m.is_compress, + std::accumulate(std::begin(onwire_lens), std::end(onwire_lens), + uint64_t(0)), + frame_asm.get_frame_onwire_len()); } void test_round_trip() { @@ -224,6 +252,8 @@ protected: ceph::crypto::onwire::rxtx_t m_tx_crypto; ceph::crypto::onwire::rxtx_t m_rx_crypto; + ceph::compression::onwire::rxtx_t m_tx_comp; + ceph::compression::onwire::rxtx_t m_rx_comp; FrameAssembler m_tx_frame_asm; FrameAssembler m_rx_frame_asm;