]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore/compression: Estimator class
authorAdam Kupczyk <akupczyk@ibm.com>
Wed, 9 Apr 2025 16:03:52 +0000 (16:03 +0000)
committerAdam Kupczyk <akupczyk@ibm.com>
Thu, 24 Apr 2025 06:45:53 +0000 (06:45 +0000)
Add CMake rules to compile.
Add bluestore_compression dout subsys.

Created Estimator class.
It is used by Scanner to decide if specific extent is to be recompressed.
Prepare for future machine learning / adaptive algorithm for estimation.

So far logic of Estimator is relatively simple.
It learns expected recompression values and uses them in next iterations to predict.

Signed-off-by: Adam Kupczyk <akupczyk@ibm.com>
src/common/options/global.yaml.in
src/common/subsys.h
src/crimson/os/alienstore/CMakeLists.txt
src/os/CMakeLists.txt
src/os/bluestore/BlueStore.h
src/os/bluestore/Compression.cc [new file with mode: 0644]
src/os/bluestore/Compression.h [new file with mode: 0644]

index 279156370a514f308bff8e0bbdd14e805afab8c6..3e30a52bee614ee268d6e499d044a128b292e153 100644 (file)
@@ -4768,6 +4768,21 @@ options:
   flags:
   - runtime
   with_legacy: true
+- name: bluestore_recompression_min_gain
+  type: float
+  level: advanced
+  desc: Required estimated gain for accepting extents for recompressing.
+  long_desc: Partial writes over compressed blobs have high cost. New data requires
+    new allocation units, but whole old blob must remain. To combat it BlueStore
+    looks around write position to find blobs that will make it profitable to read
+    and recompress.
+  fmt_desc: A float value, (size_exact_released / size_expected_after_compression).
+  default: 1.2
+  see_also:
+  - bluestore_compression_max_blob_size
+  flags:
+  - runtime
+  with_legacy: true
 # Specifies minimum expected amount of saved allocation units
 # per single blob to enable compressed blobs garbage collection
 - name: bluestore_gc_enable_blob_threshold
index 67bee2a8b5acba6afee82375d0739bd08bee0e7d..a133b101f1d390da817764091939b469ca1731e7 100644 (file)
@@ -73,6 +73,7 @@ SUBSYS(throttle, 1, 1)
 SUBSYS(refs, 0, 0)
 SUBSYS(compressor, 1, 5)
 SUBSYS(bluestore, 1, 5)
+SUBSYS(bluestore_compression, 1, 5)
 SUBSYS(bluefs, 1, 5)
 SUBSYS(bdev, 1, 3)
 SUBSYS(kstore, 1, 5)
index 70a8fb9848e23afeb05f8876a846cb01aa87280f..5ecde78b5ff726d11ed7db0a22e9c0e77feac3a7 100644 (file)
@@ -60,6 +60,7 @@ set(alien_store_srcs
   ${PROJECT_SOURCE_DIR}/src/os/bluestore/StupidAllocator.cc
   ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapAllocator.cc
   ${PROJECT_SOURCE_DIR}/src/os/bluestore/Writer.cc
+  ${PROJECT_SOURCE_DIR}/src/os/bluestore/Compression.cc
   ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueStore_debug.cc
   ${PROJECT_SOURCE_DIR}/src/os/memstore/MemStore.cc)
 
index a07822dbca32721d6f5261be4a6d88b228c7fe45..c98d09b24e2cc38dcbfe33a1b9048404e97f97bd 100644 (file)
@@ -27,6 +27,7 @@ if(WITH_BLUESTORE)
     bluestore/Btree2Allocator.cc
     bluestore/HybridAllocator.cc
     bluestore/Writer.cc
+    bluestore/Compression.cc
   )
 endif(WITH_BLUESTORE)
 
index 9bc60bf77426f9d042117c0694caa90bb09bac9d..f79e7e4320ccaa99e7971f755fe18663fa36daa5 100644 (file)
@@ -271,6 +271,7 @@ public:
   struct BufferSpace;
   struct Collection;
   struct Onode;
+  class Estimator;
   typedef boost::intrusive_ptr<Collection> CollectionRef;
   typedef boost::intrusive_ptr<Onode> OnodeRef;
 
diff --git a/src/os/bluestore/Compression.cc b/src/os/bluestore/Compression.cc
new file mode 100644 (file)
index 0000000..71f2c9b
--- /dev/null
@@ -0,0 +1,216 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "Compression.h"
+#include "BlueStore.h"
+#include "include/intarith.h"
+#include <limits>
+
+
+template <typename Char>
+struct basic_ostream_formatter : fmt::formatter<std::basic_string_view<Char>, Char> {
+  template <typename T, typename OutputIt>
+  auto format(const T& value, fmt::basic_format_context<OutputIt, Char>& ctx) const
+      -> OutputIt {
+    std::basic_stringstream<Char> ss;
+    ss << value;
+    return fmt::formatter<std::basic_string_view<Char>, Char>::format(
+        ss.view(), ctx);
+  }
+};
+using ostream_formatter = basic_ostream_formatter<char>;
+
+template <> struct fmt::formatter<BlueStore::Blob::printer>
+  : ostream_formatter {};
+template <> struct fmt::formatter<BlueStore::Extent::printer>
+  : ostream_formatter {};
+
+#define dout_context bluestore->cct
+#define dout_subsys ceph_subsys_bluestore_compression
+#undef  dout_prefix
+#define dout_prefix *_dout << "bluecompr "
+
+using Estimator = BlueStore::Estimator;
+using P = BlueStore::printer;
+
+void Estimator::reset()
+{
+  new_size = 0;
+  uncompressed_size = 0;
+  compressed_occupied = 0;
+  compressed_size = 0;
+  total_uncompressed_size = 0;
+  total_compressed_occupied = 0;
+  total_compressed_size = 0;
+  actual_compressed = 0;
+  actual_compressed_plus_pad = 0;
+  extra_recompress.clear();
+}
+inline void Estimator::batch(const BlueStore::Extent* e, uint32_t gain)
+{
+  const Blob *h_Blob = &(*e->blob);
+  const bluestore_blob_t &h_bblob = h_Blob->get_blob();
+  if (h_bblob.is_compressed()) {
+    compressed_size += e->length * h_bblob.get_compressed_payload_length() / h_bblob.get_logical_length();
+    compressed_occupied += gain;
+  } else {
+    uncompressed_size += e->length;
+  }
+  dout(20) << fmt::format("Estimator::batch {} gain {:#x}", e->print(P::NICK), gain) << dendl;
+  dout(20) << fmt::format("Estimator::batch non-compr={:#x} compr-occup={:#x} compr-size?={:#x}",
+    uncompressed_size, compressed_occupied, compressed_size) << dendl;
+}
+
+inline bool Estimator::is_worth()
+{
+  uint32_t cost = uncompressed_size * expected_compression_factor +
+                  compressed_size * expected_recompression_error;
+  uint32_t gain = uncompressed_size + compressed_occupied;
+  double need_ratio = bluestore->cct->_conf->bluestore_recompression_min_gain;
+  bool take = gain > cost * need_ratio;
+  if (take) {
+    total_uncompressed_size += uncompressed_size;
+    total_compressed_occupied += compressed_occupied;
+    total_compressed_size += compressed_size;
+  }
+  uncompressed_size = 0;
+  compressed_occupied = 0;
+  compressed_size = 0;
+  return take;
+}
+
+inline bool Estimator::is_worth(const BlueStore::Extent* e)
+{
+  const Blob *h_Blob = &(*e->blob);
+  const bluestore_blob_t &h_bblob = h_Blob->get_blob();
+  ceph_assert(!h_bblob.is_compressed());
+  ceph_assert(!h_bblob.is_shared());
+  // for now assume it always worth
+  total_uncompressed_size += e->length;
+  return true;
+}
+
+inline void Estimator::mark_recompress(const BlueStore::Extent* e)
+{
+  ceph_assert(!extra_recompress.contains(e->logical_offset));
+  dout(25) << "recompress: " << e->print(P::NICK + P::JUSTID) << dendl;
+  extra_recompress.emplace(e->logical_offset, e->length);
+}
+
+inline void Estimator::mark_main(uint32_t location, uint32_t length)
+{
+  ceph_assert(!extra_recompress.contains(location));
+  dout(25) << "main data compress: " << std::hex
+    << location << "~" << length << std::dec << dendl;
+  extra_recompress.emplace(location, length);
+  new_size = length;
+}
+
+void Estimator::get_regions(std::vector<region_t>& regions)
+{
+  constexpr uint32_t unset = std::numeric_limits<uint32_t>::max();
+  // walk extents to form continous regions
+  region_t* r;
+  uint32_t end = unset;
+  auto i = extra_recompress.begin();
+  while (i != extra_recompress.end()) {
+    dout(25) << std::hex << i->first
+      << "~" << i->second << dendl;
+    if (end == unset) {
+      regions.emplace_back();
+      r = &regions.back();
+      r->offset = i->first;
+      r->length = i->second;
+      end = i->first + i->second;
+    } else {
+      if (i->first == end) {
+        r->length += i->second;
+        end = i->first + i->second;
+      }
+    }
+    ++i;
+    if (i == extra_recompress.end() || i->first != end) {
+      end = unset;
+    }
+  }
+}
+
+int32_t Estimator::split_and_compress(
+  CompressorRef compr,
+  uint32_t max_blob_size,
+  ceph::buffer::list& data_bl,
+  Writer::blob_vec& bd)
+{
+  uint32_t au_size = bluestore->min_alloc_size;
+  uint32_t size = data_bl.length();
+  ceph_assert(size > 0);
+  uint32_t blobs = (size + max_blob_size - 1) / max_blob_size;
+  uint32_t blob_size = p2roundup(size / blobs, au_size);
+  std::vector<uint32_t> blob_sizes(blobs);
+  for (auto& i: blob_sizes) {
+    i = std::min(size, blob_size);
+    size -= i;
+  }
+  int32_t disk_needed = 0;
+  uint32_t bl_src_off = 0;
+  for (auto& i: blob_sizes) {
+    bd.emplace_back();
+    bd.back().real_length = i;
+    bd.back().compressed_length = 0;
+    bd.back().object_data.substr_of(data_bl, bl_src_off, i);
+    bl_src_off += i;
+    // FIXME: memory alignment here is bad
+    bufferlist t;
+    std::optional<int32_t> compressor_message;
+    int r = compr->compress(bd.back().object_data, t, compressor_message);
+    ceph_assert(r == 0);
+    bluestore_compression_header_t chdr;
+    chdr.type = compr->get_type();
+    chdr.length = t.length();
+    chdr.compressor_message = compressor_message;
+    encode(chdr, bd.back().disk_data);
+    bd.back().disk_data.claim_append(t);
+    uint32_t len = bd.back().disk_data.length();
+    bd.back().compressed_length = len;
+    uint32_t rem = p2nphase(len, au_size);
+    if (rem > 0) {
+      bd.back().disk_data.append_zero(rem);
+    }
+    actual_compressed += len;
+    actual_compressed_plus_pad += len + rem;
+    disk_needed += len + rem;
+  }
+  return disk_needed;
+}
+
+void Estimator::finish()
+{
+  dout(25) << "new_size=" << new_size
+          << " unc_size=" << total_uncompressed_size
+          << " comp_cost=" << total_compressed_size << dendl;
+  uint32_t sum = new_size + total_uncompressed_size + total_compressed_size;
+  double expected =
+    (new_size + total_uncompressed_size) * expected_compression_factor +
+    total_compressed_size * expected_recompression_error;
+  double size_misprediction = double(expected - actual_compressed) / actual_compressed;
+  double size_misprediction_weighted = 1.0 / sum * size_misprediction * 0.01;
+  expected_compression_factor -= (new_size + total_uncompressed_size) * size_misprediction_weighted;
+  expected_recompression_error -= total_compressed_size * size_misprediction_weighted;
+
+  double expected_pad = actual_compressed * expected_pad_expansion;
+  double pad_misprediction = (expected_pad - actual_compressed_plus_pad) / actual_compressed;
+  expected_pad_expansion -= pad_misprediction * 0.01;
+  dout(25) << "exp_comp_factor=" << expected_compression_factor
+           << " exp_recomp_err=" << expected_recompression_error
+           << " exp_pad_exp=" << expected_pad_expansion << dendl;
+}
diff --git a/src/os/bluestore/Compression.h b/src/os/bluestore/Compression.h
new file mode 100644 (file)
index 0000000..17adf45
--- /dev/null
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef COMPRESSION_H_INCLUDED
+#define COMPRESSION_H_INCLUDED
+
+#include "BlueStore.h"
+#include "Writer.h"
+
+class BlueStore::Estimator {
+public:
+  Estimator(BlueStore* bluestore)
+  :bluestore(bluestore) {}
+
+  // Prepare for new write
+  void reset();
+  // Inform estimator that an extent is a candidate for recompression.
+  // Estimator has to calculate (guess) the cost (size) of the referenced data.
+  // 'gain' is the size that will be released should extent be recompressed.
+  void batch(const BlueStore::Extent* candidate, uint32_t gain);
+  // Lets estimator decide if extents previously passed by batch()
+  // are worth recompressing.
+  // If so (returns true), extents will be added by mark_recompress().
+  bool is_worth();
+  // Lets estimator decide if an uncompressed neighbor should be
+  // recompressed. The extent passed is always uncompressed and
+  // always a direct neighbor to already accepted recompression batch.
+  // If so (returns true), extents will be added by mark_recompress().
+  bool is_worth(const BlueStore::Extent* uncompressed_neighbor);
+
+  void mark_recompress(const BlueStore::Extent* e);
+  void mark_main(uint32_t location, uint32_t length);
+  struct region_t {
+    uint32_t offset; // offset of region
+    uint32_t length; // size of region
+  };
+  void get_regions(std::vector<region_t>& regions);
+
+  int32_t split_and_compress(
+    CompressorRef compr,
+    uint32_t max_blob_size,
+    ceph::buffer::list& data_bl,
+    Writer::blob_vec& bd);
+
+  void finish();
+
+private:
+  BlueStore* bluestore;
+  double expected_compression_factor = 0.5;
+  double expected_recompression_error = 1.1;
+  double expected_pad_expansion = 1.1;
+  uint32_t new_size = 0;              // fresh data to write
+  uint32_t uncompressed_size = 0;     // data that was not compressed
+  uint32_t compressed_size = 0;       // data of compressed size
+  uint32_t compressed_occupied = 0;   // disk size that will be freed
+  uint32_t total_uncompressed_size = 0;
+  uint32_t total_compressed_size = 0;
+  uint32_t total_compressed_occupied = 0;
+  // accumulated size of compressed, used in feedback learn stage
+  uint32_t actual_compressed = 0;
+  uint32_t actual_compressed_plus_pad = 0;
+  std::map<uint32_t, uint32_t> extra_recompress;
+};
+
+#endif