]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: added on-wire-compression to msgr protocol 2.0 36517/head
authorMaya Gilad <ms.maya.gilad@gmail.com>
Thu, 27 May 2021 10:16:34 +0000 (13:16 +0300)
committerMaya Gilad <ms.maya.gilad@gmail.com>
Tue, 29 Jun 2021 07:32:47 +0000 (10:32 +0300)
Signed-off-by: Maya Gilad <ms.maya.gilad@gmail.com>
Co-authored-by: Kefu Chai <tchaikov@gmail.com>
22 files changed:
src/common/ceph_context.h
src/common/options/global.yaml.in
src/crimson/CMakeLists.txt
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/include/msgr.h
src/msg/CMakeLists.txt
src/msg/Messenger.cc
src/msg/Messenger.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h
src/msg/async/compression_meta.h [new file with mode: 0644]
src/msg/async/compression_onwire.cc [new file with mode: 0644]
src/msg/async/compression_onwire.h [new file with mode: 0644]
src/msg/async/frames_v2.cc
src/msg/async/frames_v2.h
src/msg/compressor_registry.cc [new file with mode: 0644]
src/msg/compressor_registry.h [new file with mode: 0644]
src/test/compressor/CMakeLists.txt
src/test/msgr/CMakeLists.txt
src/test/msgr/test_comp_registry.cc [new file with mode: 0644]
src/test/msgr/test_frames_v2.cc

index 25f52615b067bd8c1b1c084aac2528ee256702fa..dee5157ceee4b6b4df578b02d029cc88b773c107 100644 (file)
@@ -83,6 +83,9 @@ public:
     // everything crimson is experimental...
     return true;
   }
+  ceph::PluginRegistry* get_plugin_registry() {
+    return _plugin_registry;
+  }
   CryptoRandom* random() const;
   PerfCountersCollectionImpl* get_perfcounters_collection();
   crimson::common::ConfigProxy& _conf;
@@ -92,6 +95,7 @@ public:
 private:
   std::unique_ptr<CryptoRandom> _crypto_random;
   unsigned nref;
+  ceph::PluginRegistry* _plugin_registry;
 };
 }
 #else
index 1dd0b57d467f14d5319a93d61f1e07d488631fe8..70bc73a18bc1dec8b793b19aa11867757f864716 100644 (file)
@@ -979,6 +979,56 @@ options:
   - ms_service_mode
   flags:
   - startup
+- name: ms_osd_compress_mode
+  type: str
+  level: advanced
+  desc: Compression policy to use in Messenger for communicating with OSD
+  default: none
+  services:
+  - osd
+  enum_values:
+  - none
+  - force
+  see_also:
+  - ms_compress_secure
+  flags:
+  - runtime
+- name: ms_osd_compress_min_size
+  type: uint
+  level: advanced
+  desc: Minimal message size eligable for on-wire compression
+  default: 1_K
+  services:
+  - osd
+  see_also:
+  - ms_osd_compress_mode
+  flags:
+  - runtime
+- name: ms_osd_compression_algorithm
+  type: str
+  level: advanced
+  desc: Compression algorithm to use in Messenger when communicating with OSD
+  long_desc: Compression algorithm for connections with OSD in order of preference
+  default: snappy zlib zstd lz4
+  services:
+  - osd
+  see_also:
+  - ms_osd_compress_mode
+  flags:
+  - runtime
+- name: ms_compress_secure
+  type: bool
+  level: advanced
+  desc: Allowing compression when on-wire encryption is enabled
+  long_desc: Combining encryption with compression reduces the level of security of
+    messages between peers. In case both encryption and compression are enabled, 
+    compression setting will be ignored and message will not be compressed. 
+    This behaviour can be override using this setting. 
+  default: false
+  see_also: 
+  - ms_osd_compress_mode
+  flags:
+  - runtime
 - name: ms_learn_addr_from_peer
   type: bool
   level: advanced
index c58161f267b1092e0f7d10874f708b8db766a905..8104227c7e1aa4a2b9cda8e207f71d263901547f 100644 (file)
@@ -87,6 +87,7 @@ add_library(crimson-common STATIC
   ${PROJECT_SOURCE_DIR}/src/common/PluginRegistry.cc
   ${PROJECT_SOURCE_DIR}/src/common/RefCountedObj.cc
   ${PROJECT_SOURCE_DIR}/src/common/util.cc
+  ${PROJECT_SOURCE_DIR}/src/compressor/Compressor.cc
   ${PROJECT_SOURCE_DIR}/src/crush/builder.c
   ${PROJECT_SOURCE_DIR}/src/crush/mapper.c
   ${PROJECT_SOURCE_DIR}/src/crush/crush.c
@@ -170,6 +171,7 @@ set(crimson_mon_srcs
   ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
 set(crimson_net_srcs
   ${PROJECT_SOURCE_DIR}/src/msg/async/crypto_onwire.cc
+  ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc
   ${PROJECT_SOURCE_DIR}/src/msg/async/frames_v2.cc
   net/Errors.cc
   net/Messenger.cc
index 370085a43a123e30429917926d699369886a2737..ce6f47301db88dda5bdfb73a20e7f53c55b6cf6d 100644 (file)
@@ -304,10 +304,9 @@ seastar::future<> ProtocolV2::read_frame_payload()
     logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
     bool ok = false;
     try {
-      rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
       bufferlist rx_epilogue;
       rx_epilogue.append(buffer::create(std::move(bl)));
-      ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+      ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
     } catch (FrameError& e) {
       logger().error("read_frame_payload: {} {}", conn, e.what());
       abort_in_fault();
@@ -1474,6 +1473,7 @@ void ProtocolV2::execute_accepting()
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
+          session_comp_handlers = { nullptr, nullptr };
           enable_recording();
           return banner_exchange(false);
         }).then([this] (auto&& ret) {
index 319802690cc3ca698a18734926b6ac35aa81c8b0..ab6ad86c1bfb9deb59022dbf106c4b4daae4c988 100644 (file)
@@ -8,6 +8,7 @@
 #include "Protocol.h"
 #include "msg/async/frames_v2.h"
 #include "msg/async/crypto_onwire.h"
+#include "msg/async/compression_onwire.h"
 
 namespace crimson::net {
 
@@ -122,8 +123,9 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> write_flush(bufferlist&& buf);
 
   ceph::crypto::onwire::rxtx_t session_stream_handlers;
-  ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false};
-  ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false};
+  ceph::compression::onwire::rxtx_t session_comp_handlers;
+  ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false, &session_comp_handlers};
+  ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false, &session_comp_handlers};
   ceph::bufferlist rx_preamble;
   ceph::msgr::v2::segment_bls_t rx_segments_data;
 
index eedb95dd04febfbf4cddf93a7b443a9741348733..c8ad48ad1afef1306a33662b92daa0e0245b3a0b 100644 (file)
 #define HAVE_MSGR2_FEATURE(x, name) \
        (((x) & (CEPH_MSGR2_FEATUREMASK_##name)) == (CEPH_MSGR2_FEATUREMASK_##name))
 
-DEFINE_MSGR2_FEATURE( 0, 1, REVISION_1)   // msgr2.1
+DEFINE_MSGR2_FEATURE(0, 1, REVISION_1)   // msgr2.1
+DEFINE_MSGR2_FEATURE(1, 1, COMPRESSION)  // on-wire compression
 
-#define CEPH_MSGR2_SUPPORTED_FEATURES (CEPH_MSGR2_FEATURE_REVISION_1)
+/*
+ * Features supported.  Should be everything above.
+ */
+#define CEPH_MSGR2_SUPPORTED_FEATURES \
+       (CEPH_MSGR2_FEATURE_REVISION_1 | \
+        CEPH_MSGR2_FEATURE_COMPRESSION | \
+        0ULL)
+
+#define CEPH_MSGR2_REQUIRED_FEATURES (0ULL)
 
-#define CEPH_MSGR2_REQUIRED_FEATURES  (0ull)
 
 
 /*
index e6d0b589b42997c4def507855fd23a0ecb10249b..70d6975f39b310b9c5d9e22ed178865b4a96cf02 100644 (file)
@@ -3,7 +3,8 @@ set(msg_srcs
   Message.cc
   Messenger.cc
   Connection.cc
-  msg_types.cc)
+  msg_types.cc
+  compressor_registry.cc)
 
 list(APPEND msg_srcs
   async/AsyncConnection.cc
@@ -16,6 +17,7 @@ list(APPEND msg_srcs
   async/PosixStack.cc
   async/Stack.cc
   async/crypto_onwire.cc
+  async/compression_onwire.cc
   async/frames_v2.cc
   async/net_handler.cc)
 
index fb85f42bc91cb16a0d51522e678c28d384d985df..edc74a9a4904b0a50b979d810b404a256f80e51a 100644 (file)
@@ -58,9 +58,11 @@ Messenger::Messenger(CephContext *cct_, entity_name_t w)
     socket_priority(-1),
     cct(cct_),
     crcflags(get_default_crc_flags(cct->_conf)),
-    auth_registry(cct)
+    auth_registry(cct),
+    comp_registry(cct)
 {
   auth_registry.refresh_config();
+  comp_registry.refresh_config();
 }
 
 void Messenger::set_endpoint_addr(const entity_addr_t& a,
index e87f3196b1c9b58e2ab8c2f3a1c93c9b12b23d94..c832589e88ea5ba46530704d9ec70c532b8736ae 100644 (file)
@@ -34,6 +34,7 @@
 #include "auth/Crypto.h"
 #include "common/item_history.h"
 #include "auth/AuthRegistry.h"
+#include "compressor_registry.h"
 #include "include/ceph_assert.h"
 
 #include <errno.h>
@@ -77,7 +78,9 @@ struct Interceptor {
     READY,
     HANDLE_MESSAGE,
     READ_MESSAGE_COMPLETE,
-    SESSION_RETRY
+    SESSION_RETRY,
+    SEND_COMPRESSION_REQUEST,
+    HANDLE_COMPRESSION_REQUEST
   };
 
   virtual ~Interceptor() {}
@@ -228,6 +231,9 @@ public:
     auth_server = as;
   }
 
+  // for compression
+  CompressorRegistry comp_registry;
+
 protected:
   /**
    * std::set messenger's address
index 0f40b32ac46b143448bf594dd70604dc99ec8324..23a4dfafbb5856f74ff3f5845869f59473736588 100644 (file)
@@ -26,8 +26,10 @@ std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                 << " cs=" << connect_seq << " l=" << connection->policy.lossy
                 << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
                                                   REVISION_1)
-                << " rx=" << session_stream_handlers.rx.get()
+                << " crypto rx=" << session_stream_handlers.rx.get()
                 << " tx=" << session_stream_handlers.tx.get()
+                << " comp rx=" << session_compression_handlers.rx.get()
+                << " tx=" << session_compression_handlers.tx.get()
                 << ").";
 }
 
@@ -94,8 +96,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
-      tx_frame_asm(&session_stream_handlers, false),
-      rx_frame_asm(&session_stream_handlers, false),
+      tx_frame_asm(&session_stream_handlers, false, &session_compression_handlers),
+      rx_frame_asm(&session_stream_handlers, false, &session_compression_handlers),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -246,15 +248,17 @@ void ProtocolV2::reset_recv_state() {
     // `write_event()` unlocks it just before calling `write_message()`.
     // `submit_to()` here is NOT blocking.
     connection->center->submit_to(connection->center->get_id(), [this] {
-      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
                     << dendl;
       // Possibly unnecessary. See the comment in `deactivate_existing`.
       std::lock_guard<std::mutex> l(connection->lock);
       std::lock_guard<std::mutex> wl(connection->write_lock);
       reset_security();
+      reset_compression();
     }, /* always_async = */true);
   } else {
     reset_security();
+    reset_compression();
   }
 
   // clean read and write callbacks
@@ -617,6 +621,14 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
   }
 }
 
+void ProtocolV2::reset_compression() {
+  ldout(cct, 5) << __func__ << dendl;
+
+  comp_meta = CompConnectionMeta{};
+  session_compression_handlers.rx.reset(nullptr);
+  session_compression_handlers.tx.reset(nullptr);
+}
+
 void ProtocolV2::write_event() {
   ldout(cct, 10) << __func__ << dendl;
   ssize_t r = 0;
@@ -1135,6 +1147,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
     case Tag::WAIT:
+    case Tag::COMPRESSION_REQUEST:
+    case Tag::COMPRESSION_DONE:
       return handle_frame_payload();
     case Tag::MESSAGE:
       return handle_message();
@@ -1249,6 +1263,10 @@ CtPtr ProtocolV2::handle_frame_payload() {
       return handle_message_ack(payload);
     case Tag::WAIT:
       return handle_wait(payload);
+    case Tag::COMPRESSION_REQUEST:
+      return handle_compression_request(payload);
+    case Tag::COMPRESSION_DONE:
+      return handle_compression_done(payload);
     default:
       ceph_abort();
   }
@@ -1304,11 +1322,9 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
 }
 
 CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
-  bool aborted;
+  bool ok = false;
   try {
-    rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
-    aborted = !rx_frame_asm.disassemble_remaining_segments(
-        rx_segments_data.data(), rx_epilogue);
+    ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
   } catch (FrameError& e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return _fault();
@@ -1320,7 +1336,7 @@ CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
   // we do have a mechanism that allows transmitter to start sending message
   // and abort after putting entire data field on wire. This will be used by
   // the kernel client to avoid unnecessary buffering.
-  if (aborted) {
+  if (!ok) {
     reset_throttle();
     state = READY;
     return CONTINUE(read_frame);
@@ -1828,6 +1844,29 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
 }
 
 CtPtr ProtocolV2::finish_client_auth() {
+  if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+    return send_compression_request();
+  }
+
+  return start_session_connect();
+}
+
+CtPtr ProtocolV2::finish_server_auth() {
+  // server had sent AuthDone and client responded with correct pre-auth
+  // signature. 
+  // We can start conditioanl msgr protocol
+  if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+    state = COMPRESSION_ACCEPTING;
+  } else {
+    // No msgr protocol features to process
+    // we can start accepting new sessions/reconnects.
+    state = SESSION_ACCEPTING;
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::start_session_connect() {
   if (!server_cookie) {
     ceph_assert(connect_seq == 0);
     state = SESSION_CONNECTING;
@@ -2087,6 +2126,40 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
   return ready();
 }
 
+CtPtr ProtocolV2::send_compression_request() {
+  state = COMPRESSION_CONNECTING;
+
+  const entity_type_t peer_type = connection->get_peer_type();
+  comp_meta.con_mode =
+    static_cast<Compressor::CompressionMode>(
+      messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
+  const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
+  auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
+
+  INTERCEPT(19);
+  return WRITE(comp_req_frame, "compression request", read_frame);
+}
+
+CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
+  if (state != COMPRESSION_CONNECTING) {
+    lderr(cct) << __func__ << " state changed!" << dendl;
+    return _fault();
+  }
+
+  auto response = CompressionDoneFrame::Decode(payload);
+  ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
+                << ", method=" << response.method() << ")" << dendl;
+
+  comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
+  if (comp_meta.is_compress() != response.is_compress()) {
+    comp_meta.con_mode = Compressor::COMP_NONE;
+  }
+  session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+    cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+  return start_session_connect();
+}
+
 /* Server Protocol Methods */
 
 CtPtr ProtocolV2::start_server_banner_exchange() {
@@ -2250,10 +2323,8 @@ CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
   }
 
   if (state == AUTH_ACCEPTING_SIGN) {
-    // server had sent AuthDone and client responded with correct pre-auth
-    // signature. we can start accepting new sessions/reconnects.
-    state = SESSION_ACCEPTING;
-    return CONTINUE(read_frame);
+    // this happened on server side
+    return finish_server_auth();
   } else if (state == AUTH_CONNECTING_SIGN) {
     // this happened at client side
     return finish_client_auth();
@@ -2664,7 +2735,9 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   // this happens in the event center's thread as there should be
   // no user outside its boundaries (simlarly to e.g. outgoing_bl).
   auto temp_stream_handlers = std::move(session_stream_handlers);
+  auto temp_compression_handlers = std::move(session_compression_handlers);
   exproto->auth_meta = auth_meta;
+  exproto->comp_meta = comp_meta;
 
   ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
                            << dendl;
@@ -2693,7 +2766,8 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
         new_worker,
         new_center,
         exproto,
-        temp_stream_handlers=std::move(temp_stream_handlers)
+        temp_stream_handlers=std::move(temp_stream_handlers),
+        temp_compression_handlers=std::move(temp_compression_handlers)
       ](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
@@ -2708,6 +2782,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
           existing->outgoing_bl.clear();
           existing->open_write = false;
           exproto->session_stream_handlers = std::move(temp_stream_handlers);
+          exproto->session_compression_handlers = std::move(temp_compression_handlers);
           existing->write_lock.unlock();
           if (exproto->state == NONE) {
             existing->shutdown_socket();
@@ -2895,3 +2970,43 @@ CtPtr ProtocolV2::send_reconnect_ok() {
 
   return WRITE(reconnect_ok, "reconnect ok", server_ready);
 }
+
+
+CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
+  if (state != COMPRESSION_ACCEPTING) {
+    lderr(cct) << __func__ << " state changed!" << dendl;
+    return _fault();
+  }
+
+  auto request = CompressionRequestFrame::Decode(payload);
+  ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
+                << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
+
+  const int peer_type = connection->get_peer_type();
+  if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
+        peer_type, auth_meta->is_mode_secure());
+      mode != Compressor::COMP_NONE && request.is_compress()) {
+    comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
+    if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
+      comp_meta.con_mode = mode;
+    }
+  } else {
+    comp_meta.con_method = Compressor::COMP_ALG_NONE;
+  }
+  
+  auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
+
+  INTERCEPT(20);
+  return WRITE(response, "compression done", finish_compression);
+}
+
+CtPtr ProtocolV2::finish_compression() {
+  // TODO: having a possibility to check whether we're server or client could
+  // allow reusing finish_compression().
+  
+  session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+    cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+  state = SESSION_ACCEPTING;
+  return CONTINUE(read_frame);
+}
index 087553891ef19c060f633095fadfb098639ff35c..6441866fea4c33a6885df660b75dd6608b9384a2 100644 (file)
@@ -6,6 +6,8 @@
 
 #include "Protocol.h"
 #include "crypto_onwire.h"
+#include "compression_meta.h"
+#include "compression_onwire.h"
 #include "frames_v2.h"
 
 class ProtocolV2 : public Protocol {
@@ -17,6 +19,7 @@ private:
     HELLO_CONNECTING,
     AUTH_CONNECTING,
     AUTH_CONNECTING_SIGN,
+    COMPRESSION_CONNECTING,
     SESSION_CONNECTING,
     SESSION_RECONNECTING,
     START_ACCEPT,
@@ -25,6 +28,7 @@ private:
     AUTH_ACCEPTING,
     AUTH_ACCEPTING_MORE,
     AUTH_ACCEPTING_SIGN,
+    COMPRESSION_ACCEPTING,
     SESSION_ACCEPTING,
     READY,
     THROTTLE_MESSAGE,
@@ -44,6 +48,7 @@ private:
                                       "HELLO_CONNECTING",
                                       "AUTH_CONNECTING",
                                       "AUTH_CONNECTING_SIGN",
+                                      "COMPRESSION_CONNECTING",
                                       "SESSION_CONNECTING",
                                       "SESSION_RECONNECTING",
                                       "START_ACCEPT",
@@ -52,6 +57,7 @@ private:
                                       "AUTH_ACCEPTING",
                                       "AUTH_ACCEPTING_MORE",
                                       "AUTH_ACCEPTING_SIGN",
+                                      "COMPRESSION_ACCEPTING",
                                       "SESSION_ACCEPTING",
                                       "READY",
                                       "THROTTLE_MESSAGE",
@@ -67,7 +73,9 @@ private:
 
   // TODO: move into auth_meta?
   ceph::crypto::onwire::rxtx_t session_stream_handlers;
-
+  ceph::compression::onwire::rxtx_t session_compression_handlers;
+  
+private:
   entity_name_t peer_name;
   State state;
   uint64_t peer_supported_features;  // CEPH_MSGR2_FEATURE_*
@@ -114,6 +122,7 @@ private:
   bool keepalive;
   bool write_in_progress = false;
 
+  CompConnectionMeta comp_meta;
   std::ostream& _conn_prefix(std::ostream *_dout);
   void run_continuation(Ct<ProtocolV2> *pcontinuation);
   void run_continuation(Ct<ProtocolV2> &continuation);
@@ -143,6 +152,7 @@ private:
   out_queue_entry_t _get_next_outgoing();
   ssize_t write_message(Message *m, bool more);
   void handle_message_ack(uint64_t seq);
+  void reset_compression();
 
   CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
   READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
@@ -162,10 +172,12 @@ private:
   CONTINUATION_DECL(ProtocolV2, throttle_message);
   CONTINUATION_DECL(ProtocolV2, throttle_bytes);
   CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
+  CONTINUATION_DECL(ProtocolV2, finish_compression);
 
   Ct<ProtocolV2> *read_frame();
   Ct<ProtocolV2> *finish_auth();
   Ct<ProtocolV2> *finish_client_auth();
+  Ct<ProtocolV2> *finish_server_auth();
   Ct<ProtocolV2> *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r);
   Ct<ProtocolV2> *read_frame_segment();
   Ct<ProtocolV2> *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r);
@@ -174,6 +186,7 @@ private:
   Ct<ProtocolV2> *_handle_read_frame_epilogue_main();
   Ct<ProtocolV2> *handle_read_frame_dispatch();
   Ct<ProtocolV2> *handle_frame_payload();
+  Ct<ProtocolV2> *finish_compression();
 
   Ct<ProtocolV2> *ready();
 
@@ -221,6 +234,7 @@ private:
   Ct<ProtocolV2> *handle_auth_reply_more(ceph::bufferlist &payload);
   Ct<ProtocolV2> *handle_auth_done(ceph::bufferlist &payload);
   Ct<ProtocolV2> *handle_auth_signature(ceph::bufferlist &payload);
+  Ct<ProtocolV2> *start_session_connect();
   Ct<ProtocolV2> *send_client_ident();
   Ct<ProtocolV2> *send_reconnect();
   Ct<ProtocolV2> *handle_ident_missing_features(ceph::bufferlist &payload);
@@ -230,6 +244,9 @@ private:
   Ct<ProtocolV2> *handle_wait(ceph::bufferlist &payload);
   Ct<ProtocolV2> *handle_reconnect_ok(ceph::bufferlist &payload);
   Ct<ProtocolV2> *handle_server_ident(ceph::bufferlist &payload);
+  Ct<ProtocolV2> *send_compression_request();
+  Ct<ProtocolV2> *handle_compression_done(ceph::bufferlist &payload);
+
 
   // Server Protocol
   CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
@@ -251,6 +268,7 @@ private:
   Ct<ProtocolV2> *send_server_ident();
   Ct<ProtocolV2> *send_reconnect_ok();
   Ct<ProtocolV2> *server_ready();
+  Ct<ProtocolV2> *handle_compression_request(ceph::bufferlist &payload);
 
   size_t get_current_msg_size() const;
 };
diff --git a/src/msg/async/compression_meta.h b/src/msg/async/compression_meta.h
new file mode 100644 (file)
index 0000000..404e04b
--- /dev/null
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "compressor/Compressor.h"
+
+struct CompConnectionMeta {
+  TOPNSPC::Compressor::CompressionMode con_mode =
+    TOPNSPC::Compressor::COMP_NONE;  // negotiated mode
+  TOPNSPC::Compressor::CompressionAlgorithm con_method =
+    TOPNSPC::Compressor::COMP_ALG_NONE; // negotiated method
+
+  bool is_compress() const {
+    return con_mode != TOPNSPC::Compressor::COMP_NONE;
+  }
+  TOPNSPC::Compressor::CompressionAlgorithm get_method() const {
+    return con_method;
+  }
+  TOPNSPC::Compressor::CompressionMode get_mode() const {
+    return con_mode;
+  }
+};
diff --git a/src/msg/async/compression_onwire.cc b/src/msg/async/compression_onwire.cc
new file mode 100644 (file)
index 0000000..4c5ffac
--- /dev/null
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "compression_onwire.h"
+#include "compression_meta.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_ms
+
+namespace ceph::compression::onwire {
+
+rxtx_t rxtx_t::create_handler_pair(
+    CephContext* ctx,
+    const CompConnectionMeta& comp_meta,
+    std::uint64_t compress_min_size)
+{
+  if (comp_meta.is_compress()) {
+     CompressorRef compressor = Compressor::create(ctx, comp_meta.get_method());
+    if (compressor) {
+      return {std::make_unique<RxHandler>(ctx, compressor),
+             std::make_unique<TxHandler>(ctx, compressor,
+                                         comp_meta.get_mode(),
+                                         compress_min_size)};
+    }
+  }
+  return {};
+}
+
+std::optional<ceph::bufferlist> TxHandler::compress(const ceph::bufferlist &input)
+{
+  if (m_init_onwire_size < m_min_size) {
+    ldout(m_cct, 20) << __func__ 
+                    << " discovered frame that is smaller than threshold, aborting compression"
+                    << dendl;
+    return {};
+  }
+
+  m_compress_potential -= input.length();
+
+  ceph::bufferlist out;
+  if (input.length() == 0) {
+    ldout(m_cct, 20) << __func__ 
+                    << " discovered an empty segment, skipping compression without aborting"
+                    << dendl;
+    out.clear();
+    return out;
+  }
+
+  boost::optional<int32_t> compressor_message;
+  if (m_compressor->compress(input, out, compressor_message)) {
+    return {};
+  } else {
+    ldout(m_cct, 20) << __func__ << " uncompressed.length()=" << input.length()
+                     << " compressed.length()=" << out.length() << dendl;
+    m_onwire_size += out.length();
+    return out;
+  }
+}
+
+std::optional<ceph::bufferlist> RxHandler::decompress(const ceph::bufferlist &input)
+{
+  ceph::bufferlist out;
+  if (input.length() == 0) {
+    ldout(m_cct, 20) << __func__
+                    << " discovered an empty segment, skipping decompression without aborting"
+                    << dendl;
+    out.clear();
+    return out;
+  }
+
+  boost::optional<int32_t> compressor_message;
+  if (m_compressor->decompress(input, out, compressor_message)) {
+    return {};
+  } else {
+    ldout(m_cct, 20) << __func__ << " compressed.length()=" << input.length()
+                     << " uncompressed.length()=" << out.length() << dendl;
+    return out;
+  }
+}
+
+void TxHandler::done()
+{
+  ldout(m_cct, 25) << __func__ << " compression ratio=" << get_ratio() << dendl;
+}
+
+} // namespace ceph::compression::onwire
diff --git a/src/msg/async/compression_onwire.h b/src/msg/async/compression_onwire.h
new file mode 100644 (file)
index 0000000..e90274a
--- /dev/null
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_COMPRESSION_ONWIRE_H
+#define CEPH_COMPRESSION_ONWIRE_H
+
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+
+class CompConnectionMeta;
+
+namespace ceph::compression::onwire {
+  using Compressor = TOPNSPC::Compressor;
+  using CompressorRef = TOPNSPC::CompressorRef;
+
+  class Handler {
+  public:
+    Handler(CephContext* const cct, CompressorRef compressor)
+      : m_cct(cct), m_compressor(compressor) {}
+
+  protected:
+    CephContext* const m_cct;
+    CompressorRef m_compressor;
+  };
+
+  class RxHandler final : private Handler {
+  public:
+    RxHandler(CephContext* const cct, CompressorRef compressor)
+      : Handler(cct, compressor) {}
+    ~RxHandler() {};
+
+    /**
+     * Decompresses a bufferlist 
+     *
+     * @param input compressed bufferlist
+     * @param out decompressed bufferlist
+     *
+     * @returns true on success, false on failure
+     */
+    std::optional<ceph::bufferlist> decompress(const ceph::bufferlist &input);
+  };
+
+  class TxHandler final : private Handler {
+  public:
+    TxHandler(CephContext* const cct, CompressorRef compressor, int mode, std::uint64_t min_size)
+      : Handler(cct, compressor),
+       m_min_size(min_size),
+       m_mode(static_cast<Compressor::CompressionMode>(mode))
+    {}
+    ~TxHandler() {}
+
+    void reset_handler(int num_segments, uint64_t size) {
+      m_init_onwire_size = size;
+      m_compress_potential = size;
+      m_onwire_size = 0;
+    }
+
+    void done();
+
+    /**
+     * Compresses a bufferlist 
+     *
+     * @param input bufferlist to compress
+     * @param out compressed bufferlist
+     *
+     * @returns true on success, false on failure
+     */
+    std::optional<ceph::bufferlist> compress(const ceph::bufferlist &input);
+
+    double get_ratio() const {
+      return get_initial_size() / (double) get_final_size();
+    }
+
+    uint64_t get_initial_size() const {
+      return m_init_onwire_size;
+    } 
+
+    uint64_t get_final_size() const {
+      return m_onwire_size;
+    }
+
+  private:
+    uint64_t m_min_size; 
+    Compressor::CompressionMode m_mode;
+
+    uint64_t m_init_onwire_size;
+    uint64_t m_onwire_size;
+    uint64_t m_compress_potential;
+  };
+
+  struct rxtx_t {
+    std::unique_ptr<RxHandler> rx;
+    std::unique_ptr<TxHandler> tx;
+
+    static rxtx_t create_handler_pair(
+      CephContext* ctx,
+      const CompConnectionMeta& comp_meta,
+      std::uint64_t compress_min_size);
+  };
+}
+
+#endif // CEPH_COMPRESSION_ONWIRE_H
index 7a0b5907b248944a59ffe60a99020de1baa5d874..8a297534763645433b3c8fedb27044fcaca3ea0f 100644 (file)
@@ -72,6 +72,8 @@ void FrameAssembler::fill_preamble(Tag tag,
     preamble.segments[i].alignment = m_descs[i].align;
   }
   preamble.num_segments = m_descs.size();
+  preamble.flags = m_flags;  
+
   preamble.crc = ceph_crc32c(
       0, reinterpret_cast<const unsigned char*>(&preamble),
       sizeof(preamble) - sizeof(preamble.crc));
@@ -239,13 +241,18 @@ bufferlist FrameAssembler::asm_secure_rev1(const preamble_block_t& preamble,
 
 bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
                                           const uint16_t segment_aligns[],
-                                          size_t segment_count) {
+                                          size_t segment_count) {  
+  m_flags = 0;
   m_descs.resize(calc_num_segments(segment_bls, segment_count));
   for (size_t i = 0; i < m_descs.size(); i++) {
     m_descs[i].logical_len = segment_bls[i].length();
     m_descs[i].align = segment_aligns[i];
   }
 
+  if (m_compression->tx) {   
+    asm_compress(segment_bls);
+  }
+
   preamble_block_t preamble;
   fill_preamble(tag, preamble);
 
@@ -317,6 +324,12 @@ Tag FrameAssembler::disassemble_preamble(bufferlist& preamble_bl) {
     m_descs[i].logical_len = preamble->segments[i].length;
     m_descs[i].align = preamble->segments[i].alignment;
   }
+
+  m_flags = preamble->flags;
+  // If frame has been compressed, 
+  // we need to make sure the compression handler has been setup
+  ceph_assert_always(!is_compressed() || m_compression->rx);
+
   return static_cast<Tag>(preamble->tag);
 }
 
@@ -423,6 +436,19 @@ bool FrameAssembler::disasm_remaining_secure_rev1(
   return check_epilogue_late_status(epilogue->late_status);
 }
 
+bool FrameAssembler::disassemble_segments(bufferlist& preamble_bl, 
+  bufferlist segments_bls[], bufferlist& epilogue_bl) const {
+  disassemble_first_segment(preamble_bl, segments_bls[0]);
+  if (disassemble_remaining_segments(segments_bls, epilogue_bl)) {
+    if (is_compressed()) {
+      disassemble_decompress(segments_bls);
+    }
+    return true;
+  }
+
+  return false;
+}
+
 void FrameAssembler::disassemble_first_segment(bufferlist& preamble_bl,
                                                bufferlist& segment_bl) const {
   ceph_assert(!m_descs.empty());
@@ -445,15 +471,15 @@ bool FrameAssembler::disassemble_remaining_segments(
       // no epilogue if only one segment
       ceph_assert(epilogue_bl.length() == 0);
       return true;
-    }
-    if (m_crypto->rx) {
+    } else if (m_crypto->rx) {
       return disasm_remaining_secure_rev1(segment_bls, epilogue_bl);
+    } else {
+      return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
     }
-    return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
-  }
-  if (m_crypto->rx) {
+  } else if (m_crypto->rx) {
     return disasm_all_secure_rev0(segment_bls, epilogue_bl);
-  }
+  } 
+  
   return disasm_all_crc_rev0(segment_bls, epilogue_bl);
 }
 
@@ -469,8 +495,49 @@ std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
   }
   os << "rev1=" << frame_asm.m_is_rev1
      << " rx=" << frame_asm.m_crypto->rx.get()
-     << " tx=" << frame_asm.m_crypto->tx.get();
+     << " tx=" << frame_asm.m_crypto->tx.get()
+     << " comp rx=" << frame_asm.m_compression->rx.get()
+     << " comp tx=" << frame_asm.m_compression->tx.get()
+     << " compressed=" << frame_asm.is_compressed();
   return os;
 }
 
+void FrameAssembler::asm_compress(bufferlist segment_bls[]) {
+  std::array<bufferlist, MAX_NUM_SEGMENTS> compressed;
+
+  m_compression->tx->reset_handler(m_descs.size(), get_frame_logical_len());
+
+  bool abort = false;
+  for (size_t i = 0; (i < m_descs.size()) && !abort; i++) {
+      auto out = m_compression->tx->compress(segment_bls[i]);
+      if (!out) {
+        abort = true;
+      } else {
+        compressed[i] = std::move(*out);
+      }
+  }
+
+  if (!abort) {
+    m_compression->tx->done();
+
+    for (size_t i = 0; i < m_descs.size(); i++) {
+      segment_bls[i].swap(compressed[i]);
+      m_descs[i].logical_len = segment_bls[i].length();
+    }
+
+    m_flags |= FRAME_EARLY_DATA_COMPRESSED;
+  }
+}
+
+void FrameAssembler::disassemble_decompress(bufferlist segment_bls[]) const {
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    auto out = m_compression->rx->decompress(segment_bls[i]);
+    if (!out) {
+      throw FrameError("Segment decompression failed");
+    } else {
+      segment_bls[i] = std::move(*out);
+    }
+  }
+}
+
 }  // namespace ceph::msgr::v2
index 94d4d1732b201fe4c64d5db3f7b0d13ca2379486..5b786e1aff881cd056ebc9de47dc8b391b9b1011 100644 (file)
@@ -4,6 +4,7 @@
 #include "include/types.h"
 #include "common/Clock.h"
 #include "crypto_onwire.h"
+#include "compression_onwire.h"
 #include <array>
 #include <iosfwd>
 #include <utility>
@@ -55,7 +56,9 @@ enum class Tag : __u8 {
   MESSAGE,
   KEEPALIVE2,
   KEEPALIVE2_ACK,
-  ACK
+  ACK,
+  COMPRESSION_REQUEST,
+  COMPRESSION_DONE
 };
 
 struct segment_t {
@@ -102,7 +105,9 @@ struct preamble_block_t {
   __u8 num_segments;
 
   segment_t segments[MAX_NUM_SEGMENTS];
-  __u8 _reserved[2];
+
+  __u8 flags;
+  __u8 _reserved;
 
   // CRC32 for this single preamble block.
   ceph_le32 crc;
@@ -169,6 +174,12 @@ static constexpr uint32_t FRAME_PREAMBLE_WITH_INLINE_SIZE =
 #define FRAME_LATE_STATUS_RESERVED_FALSE  0xe0
 #define FRAME_LATE_STATUS_RESERVED_MASK   0xf0
 
+// For msgr 2.1, FRAME_EARLY_X flags are sent as part of epilogue.
+//
+// This flag indicates whether frame segments have been compressed by 
+// sender, and used in segments' disassemblig phase. 
+#define FRAME_EARLY_DATA_COMPRESSED       0X1
+
 struct FrameError : std::runtime_error {
   using runtime_error::runtime_error;
 };
@@ -176,11 +187,13 @@ struct FrameError : std::runtime_error {
 class FrameAssembler {
 public:
   // crypto must be non-null
-  FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1)
-      : m_crypto(crypto), m_is_rev1(is_rev1) {}
+  FrameAssembler(const ceph::crypto::onwire::rxtx_t* crypto, bool is_rev1, 
+    const ceph::compression::onwire::rxtx_t* compression)
+      : m_crypto(crypto), m_is_rev1(is_rev1), m_compression(compression) {}
 
   void set_is_rev1(bool is_rev1) {
     m_descs.clear();
+    m_flags = 0;
     m_is_rev1 = is_rev1;
   }
 
@@ -299,6 +312,40 @@ public:
 
   Tag disassemble_preamble(bufferlist& preamble_bl);
 
+  bool disassemble_segments(bufferlist& preamble_bl, 
+                            bufferlist segments_bls[], 
+                            bufferlist& epilogue_bl) const;
+
+private:
+  struct segment_desc_t {
+    uint32_t logical_len;
+    uint16_t align;
+  };
+
+  uint32_t get_segment_padded_len(size_t seg_idx) const {
+    return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
+                               CRYPTO_BLOCK_SIZE);
+  }
+
+  uint32_t get_auth_tag_len() const {
+    return m_crypto->rx->get_extra_size_at_final();
+  }
+
+  bool is_compressed() const { 
+    return m_flags & FRAME_EARLY_DATA_COMPRESSED; 
+  }
+
+  void asm_compress(bufferlist segment_bls[]);
+
+  bufferlist asm_crc_rev0(const preamble_block_t& preamble,
+                          bufferlist segment_bls[]) const;
+  bufferlist asm_secure_rev0(const preamble_block_t& preamble,
+                             bufferlist segment_bls[]) const;
+  bufferlist asm_crc_rev1(const preamble_block_t& preamble,
+                          bufferlist segment_bls[]) const;
+  bufferlist asm_secure_rev1(const preamble_block_t& preamble,
+                             bufferlist segment_bls[]) const;
+
   // Like msgr1, and unlike msgr2.0, msgr2.1 allows interpreting the
   // first segment before reading in the rest of the frame.
   //
@@ -312,6 +359,7 @@ public:
   //   user-provided buffers
   // - read in epilogue
   // - call disassemble_remaining_segments()
+  // - call disasm_all_decompress()
   //
   // For msgr2.0 (set_is_rev1(false)), disassemble_first_segment() is
   // a noop.  To accomodate, disassemble_remaining_segments() always
@@ -321,6 +369,7 @@ public:
   // - read in all segments
   // - read in epilogue
   // - call disassemble_remaining_segments()
+  // - call disasm_all_decompress()
   //
   // disassemble_remaining_segments() returns true if the frame is
   // ready for dispatching, or false if it was aborted by the sender
@@ -329,30 +378,7 @@ public:
                                  bufferlist& segment_bl) const;
   bool disassemble_remaining_segments(bufferlist segment_bls[],
                                       bufferlist& epilogue_bl) const;
-
-private:
-  struct segment_desc_t {
-    uint32_t logical_len;
-    uint16_t align;
-  };
-
-  uint32_t get_segment_padded_len(size_t seg_idx) const {
-    return p2roundup<uint32_t>(m_descs[seg_idx].logical_len,
-                               CRYPTO_BLOCK_SIZE);
-  }
-
-  uint32_t get_auth_tag_len() const {
-    return m_crypto->rx->get_extra_size_at_final();
-  }
-
-  bufferlist asm_crc_rev0(const preamble_block_t& preamble,
-                          bufferlist segment_bls[]) const;
-  bufferlist asm_secure_rev0(const preamble_block_t& preamble,
-                             bufferlist segment_bls[]) const;
-  bufferlist asm_crc_rev1(const preamble_block_t& preamble,
-                          bufferlist segment_bls[]) const;
-  bufferlist asm_secure_rev1(const preamble_block_t& preamble,
-                             bufferlist segment_bls[]) const;
+  void disassemble_decompress(bufferlist segment_bls[]) const;
 
   bool disasm_all_crc_rev0(bufferlist segment_bls[],
                            bufferlist& epilogue_bl) const;
@@ -372,8 +398,10 @@ private:
                                   const FrameAssembler& frame_asm);
 
   boost::container::static_vector<segment_desc_t, MAX_NUM_SEGMENTS> m_descs;
+  __u8 m_flags;
   const ceph::crypto::onwire::rxtx_t* m_crypto;
   bool m_is_rev1;  // msgr2.1?
+  const ceph::compression::onwire::rxtx_t* m_compression;
 };
 
 template <class T, uint16_t... SegmentAlignmentVs>
@@ -837,6 +865,34 @@ protected:
   using Frame::Frame;
 };
 
+struct CompressionRequestFrame : public ControlFrame<CompressionRequestFrame,
+                                              bool, // is compress
+                                              std::vector<uint32_t>> { // preferred methods
+  static const Tag tag = Tag::COMPRESSION_REQUEST;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
+
+  inline bool &is_compress() { return get_val<0>(); }
+  inline std::vector<uint32_t> &preferred_methods() { return get_val<1>(); }
+
+protected:
+  using ControlFrame::ControlFrame;
+};
+
+struct CompressionDoneFrame : public ControlFrame<CompressionDoneFrame,
+                                           bool, // is compress
+                                           uint32_t> { // method
+  static const Tag tag = Tag::COMPRESSION_DONE;
+  using ControlFrame::Encode;
+  using ControlFrame::Decode;
+
+  inline bool &is_compress() { return get_val<0>(); }
+  inline uint32_t &method() { return get_val<1>(); }
+
+protected:
+  using ControlFrame::ControlFrame;
+};
+
 } // namespace ceph::msgr::v2
 
 #endif // _MSG_ASYNC_FRAMES_V2_
diff --git a/src/msg/compressor_registry.cc b/src/msg/compressor_registry.cc
new file mode 100644 (file)
index 0000000..61efa0d
--- /dev/null
@@ -0,0 +1,126 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "compressor_registry.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "CompressorRegistry(" << this << ") "
+
+CompressorRegistry::CompressorRegistry(CephContext *cct)
+  : cct(cct)
+{
+  cct->_conf.add_observer(this);
+}
+
+CompressorRegistry::~CompressorRegistry()
+{
+  cct->_conf.remove_observer(this);
+}
+
+const char** CompressorRegistry::get_tracked_conf_keys() const
+{
+  static const char *keys[] = {
+    "ms_osd_compress_mode",
+    "ms_osd_compression_algorithm",
+    "ms_osd_compress_min_size",
+    "ms_compress_secure",
+    nullptr
+  };
+  return keys;
+}
+
+void CompressorRegistry::handle_conf_change(
+  const ConfigProxy& conf,
+  const std::set<std::string>& changed)
+{
+  std::scoped_lock l(lock);
+  _refresh_config();
+}
+
+std::vector<uint32_t> CompressorRegistry::_parse_method_list(const string& s)
+{
+  std::vector<uint32_t> methods;
+
+  for_each_substr(s, ";,= \t", [&] (auto method) {
+    ldout(cct,20) << "adding algorithm method: " << method << dendl;
+
+    auto alg_type = Compressor::get_comp_alg_type(method);
+    if (alg_type) {
+      methods.push_back(*alg_type);
+    } else {
+      ldout(cct,5) << "WARNING: unknown algorithm method " << method << dendl;
+    }
+  });
+
+  if (methods.empty()) {
+    methods.push_back(Compressor::COMP_ALG_NONE);
+  }
+  ldout(cct,20) << __func__ << " " << s << " -> " << methods << dendl;
+
+  return methods;
+}
+
+void CompressorRegistry::_refresh_config()
+{
+  auto c_mode = Compressor::get_comp_mode_type(cct->_conf.get_val<string>("ms_osd_compress_mode"));
+
+  if (c_mode) {
+    ms_osd_compress_mode = *c_mode;
+  } else {
+    ldout(cct,1) << __func__ << " failed to identify ms_osd_compress_mode " 
+      << ms_osd_compress_mode << dendl;
+
+    ms_osd_compress_mode = Compressor::COMP_NONE;
+  }
+
+  ms_osd_compression_methods = _parse_method_list(cct->_conf.get_val<string>("ms_osd_compression_algorithm"));
+  ms_osd_compress_min_size = cct->_conf.get_val<std::uint64_t>("ms_osd_compress_min_size");
+
+  ms_compress_secure = cct->_conf.get_val<bool>("ms_compress_secure");
+
+  ldout(cct,10) << __func__ << " ms_osd_compression_mode " << ms_osd_compress_mode
+    << " ms_osd_compression_methods " << ms_osd_compression_methods
+    << " ms_osd_compress_above_min_size " << ms_osd_compress_min_size
+    << " ms_compress_secure " << ms_compress_secure
+    << dendl;
+}
+
+Compressor::CompressionAlgorithm
+CompressorRegistry::pick_method(uint32_t peer_type,
+                                const std::vector<uint32_t>& preferred_methods)
+{
+  std::vector<uint32_t> allowed_methods = get_methods(peer_type);
+  auto preferred = std::find_first_of(preferred_methods.begin(),
+                                      preferred_methods.end(),
+                                      allowed_methods.begin(),
+                                      allowed_methods.end());
+  if (preferred == preferred_methods.end()) {
+    ldout(cct,1) << "failed to pick compression method from client's "
+                 << preferred_methods
+                 << " and our " << allowed_methods << dendl;
+    return Compressor::COMP_ALG_NONE;
+  } else {
+    return static_cast<Compressor::CompressionAlgorithm>(*preferred);
+  }
+}
+
+Compressor::CompressionMode
+CompressorRegistry::get_mode(uint32_t peer_type, bool is_secure)
+{
+  std::scoped_lock l(lock);
+  ldout(cct, 20) << __func__ << " peer_type " << peer_type 
+    << " is_secure " << is_secure << dendl;
+
+  if (is_secure && !ms_compress_secure) {
+    return Compressor::COMP_NONE;
+  }
+
+  switch (peer_type) {
+  case CEPH_ENTITY_TYPE_OSD:
+    return static_cast<Compressor::CompressionMode>(ms_osd_compress_mode);
+  default:
+    return Compressor::COMP_NONE;
+  }
+}
diff --git a/src/msg/compressor_registry.h b/src/msg/compressor_registry.h
new file mode 100644 (file)
index 0000000..4a28cc1
--- /dev/null
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <vector>
+
+#include "compressor/Compressor.h"
+#include "common/ceph_mutex.h"
+#include "common/ceph_context.h"
+#include "common/config_cacher.h"
+
+class CompressorRegistry : public md_config_obs_t {
+public:
+  CompressorRegistry(CephContext *cct);
+  ~CompressorRegistry();
+
+  void refresh_config() {
+    std::scoped_lock l(lock);
+    _refresh_config();
+  }
+
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(const ConfigProxy& conf,
+                          const std::set<std::string>& changed) override;
+
+  Compressor::CompressionAlgorithm pick_method(uint32_t peer_type,
+                                              const std::vector<uint32_t>& preferred_methods);
+
+  Compressor::CompressionMode get_mode(uint32_t peer_type, bool is_secure);
+
+  const std::vector<uint32_t> get_methods(uint32_t peer_type) { 
+    std::scoped_lock l(lock);
+    switch (peer_type) {
+      case CEPH_ENTITY_TYPE_OSD:
+        return ms_osd_compression_methods;
+      default:
+        return {};
+    }
+   }
+
+  uint64_t get_min_compression_size(uint32_t peer_type) const {
+    std::scoped_lock l(lock);
+    switch (peer_type) {
+      case CEPH_ENTITY_TYPE_OSD:
+        return ms_osd_compress_min_size;
+      default:
+        return 0;
+    }
+  }
+
+  bool get_is_compress_secure() const { 
+    std::scoped_lock l(lock);
+    return ms_compress_secure; 
+  }
+
+private:
+  CephContext *cct;
+  mutable ceph::mutex lock = ceph::make_mutex("CompressorRegistry::lock");
+
+  uint32_t ms_osd_compress_mode;
+  bool ms_compress_secure;
+  std::uint64_t ms_osd_compress_min_size;
+  std::vector<uint32_t> ms_osd_compression_methods;
+
+  void _refresh_config();
+  std::vector<uint32_t> _parse_method_list(const string& s);
+};
index 937694ced6022e5bf322dfe6d2330fd83daf5092..3a170a9429b02f36fb21179fdb2beee03aaf3884 100644 (file)
@@ -8,4 +8,4 @@ add_executable(unittest_compression
   )
 add_ceph_unittest(unittest_compression)
 target_link_libraries(unittest_compression global GTest::GTest)
-add_dependencies(unittest_compression ceph_example)
+add_dependencies(unittest_compression ceph_example)
\ No newline at end of file
index 4791f7c2f6e7e4696798740ec19ab09c7a3f275c..beaa7133d8eaf86533388f2785fdf7bc474b4fb1 100644 (file)
@@ -31,6 +31,13 @@ add_executable(unittest_frames_v2 test_frames_v2.cc)
 add_ceph_unittest(unittest_frames_v2)
 target_link_libraries(unittest_frames_v2 os global ${UNITTEST_LIBS})
 
+add_executable(unittest_comp_registry
+  test_comp_registry.cc
+  $<TARGET_OBJECTS:unit-main>
+  )
+add_ceph_unittest(unittest_comp_registry)
+target_link_libraries(unittest_comp_registry global)
+
 # test_userspace_event
 if(HAVE_DPDK)
   add_executable(ceph_test_userspace_event
diff --git a/src/test/msgr/test_comp_registry.cc b/src/test/msgr/test_comp_registry.cc
new file mode 100644 (file)
index 0000000..f0271d9
--- /dev/null
@@ -0,0 +1,98 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+#include "include/stringify.h"
+#include "compressor/Compressor.h"
+#include "msg/compressor_registry.h"
+#include "gtest/gtest.h"
+#include "common/ceph_context.h"
+#include "global/global_context.h"
+
+#include <sstream>
+
+TEST(CompressorRegistry, con_modes)
+{
+  auto cct = g_ceph_context;
+  CompressorRegistry reg(cct);
+  std::vector<uint32_t> methods;
+  uint32_t method;
+  uint32_t mode;
+
+  const std::vector<uint32_t> snappy_method = { Compressor::COMP_ALG_SNAPPY };
+  const std::vector<uint32_t> zlib_method = { Compressor::COMP_ALG_ZLIB };
+  const std::vector<uint32_t> both_methods = { Compressor::COMP_ALG_ZLIB, Compressor::COMP_ALG_SNAPPY};
+  const std::vector<uint32_t> no_method = { Compressor::COMP_ALG_NONE };
+
+  cct->_conf.set_val(
+    "enable_experimental_unrecoverable_data_corrupting_features", "*");
+
+  // baseline: compression for communication with osd is enabled
+  cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+  cct->_conf.set_val("ms_osd_compress_mode", "force");
+  cct->_conf.set_val("ms_osd_compression_algorithm", "snappy");
+  cct->_conf.set_val("ms_compress_secure", "false");
+  cct->_conf.apply_changes(NULL);
+
+  ASSERT_EQ(reg.get_is_compress_secure(), false);
+
+  methods = reg.get_methods(CEPH_ENTITY_TYPE_MON);
+  ASSERT_EQ(methods.size(), 0);
+  method = reg.pick_method(CEPH_ENTITY_TYPE_MON, both_methods);
+  ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+  mode = reg.get_mode(CEPH_ENTITY_TYPE_MON, false);
+  ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+  methods = reg.get_methods(CEPH_ENTITY_TYPE_OSD);
+  ASSERT_EQ(methods, snappy_method);
+  const std::vector<uint32_t> rev_both_methods (both_methods.rbegin(), both_methods.rend());
+  method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, rev_both_methods);
+  ASSERT_EQ(method, Compressor::COMP_ALG_SNAPPY);
+  mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+  ASSERT_EQ(mode, Compressor::COMP_FORCE);
+  mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true);
+  ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+  method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, zlib_method);
+  ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+
+  // disable compression mode
+  cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+  cct->_conf.set_val("ms_osd_compress_mode", "none");
+  cct->_conf.apply_changes(NULL);
+
+  mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+  ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+  // no compression methods
+  cct->_conf.set_val("ms_osd_compress_mode", "force");
+  cct->_conf.set_val("ms_osd_compression_algorithm", "none");
+  cct->_conf.apply_changes(NULL);
+
+  method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, both_methods);
+  ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+
+  // min compression size
+  cct->_conf.set_val("ms_osd_compress_min_size", "1024");
+  cct->_conf.apply_changes(NULL);
+
+  uint32_t s = reg.get_min_compression_size(CEPH_ENTITY_TYPE_OSD);
+  ASSERT_EQ(s, 1024);
+
+  // allow secure with compression
+  cct->_conf.set_val("ms_osd_compress_mode", "force");
+  cct->_conf.set_val("ms_osd_compression_algorithm", "snappy");
+  cct->_conf.set_val("ms_compress_secure", "true");
+  cct->_conf.apply_changes(NULL);
+
+  ASSERT_EQ(reg.get_is_compress_secure(), true);
+
+  mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true);
+  ASSERT_EQ(mode, Compressor::COMP_FORCE);
+
+  mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+  ASSERT_EQ(mode, Compressor::COMP_FORCE);
+  
+  // back to normalish, for the benefit of the next test(s)
+  cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);  
+}
index d384e6a81dbece6cbbae18a705ed7de0de8d4023..725903251a723d28fcc6fa3f205535b76efa758a 100644 (file)
@@ -19,6 +19,7 @@
 #include <string>
 #include <tuple>
 
+#include "msg/async/compression_meta.h"
 #include "auth/Auth.h"
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
 
 #include <gtest/gtest.h>
 
+#define COMP_THRESHOLD 1 << 10
+#define EXPECT_COMPRESSED(is_compressed, val1, val2) \
+  if (is_compressed && val1 > COMP_THRESHOLD) { \
+    EXPECT_GE(val1, val2); \
+  } else { \
+    EXPECT_EQ(val1, val2); \
+  }
+
 namespace ceph::msgr::v2 {
 
 // MessageFrame with the first segment not fixed to ceph_msg_header2
@@ -87,19 +96,25 @@ protected:
 struct mode_t {
   bool is_rev1;
   bool is_secure;
+  bool is_compress;
 };
 
 static std::ostream& operator<<(std::ostream& os, const mode_t& m) {
   os << "msgr2." << (m.is_rev1 ? "1" : "0")
-     << (m.is_secure ? "-secure" : "-crc");
+     << (m.is_secure ? "-secure" : "-crc")
+     << (m.is_compress ? "-compress": "-nocompress");
   return os;
 }
 
 static const mode_t modes[] = {
-  {false, false},
-  {false, true},
-  {true, false},
-  {true, true},
+  {false, false, false},
+  {false, true, false},
+  {true, false, false},
+  {true, true, false},
+  {false, false, true},
+  {false, true, true},
+  {true, false, true},
+  {true, true, true}
 };
 
 struct round_trip_instance_t {
@@ -151,17 +166,16 @@ bool disassemble_frame(FrameAssembler& frame_asm, bufferlist& frame_bl,
   if (epilogue_onwire_len > 0) {
     frame_bl.splice(0, epilogue_onwire_len, &epilogue_bl);
   }
-  frame_asm.disassemble_first_segment(preamble_bl, segment_bls[0]);
-  return frame_asm.disassemble_remaining_segments(segment_bls.data(),
-                                                  epilogue_bl);
+  
+  return frame_asm.disassemble_segments(preamble_bl, segment_bls.data(), epilogue_bl);
 }
 
 class RoundTripTestBase : public ::testing::TestWithParam<
                               std::tuple<round_trip_instance_t, mode_t>> {
 protected:
   RoundTripTestBase()
-      : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1),
-        m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1),
+      : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1, &m_tx_comp),
+        m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1, &m_rx_comp),
         m_header(make_bufferlist(std::get<0>(GetParam()).header_len, 'H')),
         m_front(make_bufferlist(std::get<0>(GetParam()).front_len, 'F')),
         m_middle(make_bufferlist(std::get<0>(GetParam()).middle_len, 'M')),
@@ -181,23 +195,37 @@ protected:
           g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1,
           /*crossed=*/true);
     }
+    
+    if (m.is_compress) {
+      CompConnectionMeta comp_meta;
+      comp_meta.con_mode = Compressor::COMP_FORCE;
+      comp_meta.con_method = Compressor::COMP_ALG_SNAPPY;
+      m_tx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair(
+        g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD
+      );
+      m_rx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair(
+        g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD
+      );
+    }
   }
 
   void check_frame_assembler(const FrameAssembler& frame_asm) {
     const auto& [rti, m] = GetParam();
     const auto& onwire_lens = rti.onwire_lens[m.is_rev1 << 1 | m.is_secure];
-    EXPECT_EQ(rti.header_len + rti.front_len + rti.middle_len + rti.data_len,
+
+    EXPECT_COMPRESSED(m.is_compress, rti.header_len + rti.front_len + rti.middle_len + rti.data_len,
               frame_asm.get_frame_logical_len());
     ASSERT_EQ(rti.num_segments, frame_asm.get_num_segments());
-    EXPECT_EQ(onwire_lens[0], frame_asm.get_preamble_onwire_len());
+    EXPECT_COMPRESSED(m.is_compress, onwire_lens[0], frame_asm.get_preamble_onwire_len());
     for (size_t i = 0; i < rti.num_segments; i++) {
-      EXPECT_EQ(onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i));
+      EXPECT_COMPRESSED(m.is_compress, onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i));
     }
-    EXPECT_EQ(onwire_lens[rti.num_segments + 1],
+    EXPECT_COMPRESSED(m.is_compress, onwire_lens[rti.num_segments + 1],
               frame_asm.get_epilogue_onwire_len());
-    EXPECT_EQ(std::accumulate(std::begin(onwire_lens), std::end(onwire_lens),
-                              uint64_t(0)),
-              frame_asm.get_frame_onwire_len());
+    EXPECT_COMPRESSED(m.is_compress,
+                      std::accumulate(std::begin(onwire_lens), std::end(onwire_lens),
+                                      uint64_t(0)),
+                      frame_asm.get_frame_onwire_len());
   }
 
   void test_round_trip() {
@@ -224,6 +252,8 @@ protected:
 
   ceph::crypto::onwire::rxtx_t m_tx_crypto;
   ceph::crypto::onwire::rxtx_t m_rx_crypto;
+  ceph::compression::onwire::rxtx_t m_tx_comp;
+  ceph::compression::onwire::rxtx_t m_rx_comp;
   FrameAssembler m_tx_frame_asm;
   FrameAssembler m_rx_frame_asm;