]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: Add do_write_v2_compressed()
authorAdam Kupczyk <akupczyk@ibm.com>
Tue, 8 Apr 2025 08:36:21 +0000 (08:36 +0000)
committerAdam Kupczyk <akupczyk@ibm.com>
Thu, 24 Apr 2025 06:47:18 +0000 (06:47 +0000)
Modify do_write_v2() to branch into do_write_v2_compressed().
Segmented and regular cases are recognized and handled properly.
New do_write_v2_compressed() oversees compression / recompression.

Make one Estimator per Collection.
It makes possible for estimator to learn in collection specific compressibility.
In write_v2_compressed use compressor already selected in choose_write_options.
Make Collection create Estimator on first use.

Signed-off-by: Adam Kupczyk <akupczyk@ibm.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/os/bluestore/Compression.cc
src/os/bluestore/Compression.h

index c7ad989109cb789f548eb14f3244a4575124eec3..89c1795d59bcf4ee94ea88d56d1ea30393cc6adf 100644 (file)
@@ -14,6 +14,7 @@
 
 #include <bit>
 #include <utility>
+#include <memory>
 #include <unistd.h>
 #include <stdlib.h>
 #include <sys/types.h>
@@ -12886,6 +12887,22 @@ int BlueStore::_do_read(
   return r;
 }
 
+void inline BlueStore::_do_read_and_pad(
+  Collection* c,
+  OnodeRef& o,
+  uint32_t offset,
+  uint32_t length,
+  ceph::buffer::list& bl)
+{
+  int r = _do_read(c, o, offset, length, bl, 0);
+  ceph_assert(r >= 0 && r <= (int)length);
+  size_t zlen = length - r;
+  if (zlen > 0) {
+    bl.append_zero(zlen);
+    logger->inc(l_bluestore_write_pad_bytes, zlen);
+  }
+}
+
 int BlueStore::_verify_csum(OnodeRef& o,
                            const bluestore_blob_t* blob, uint64_t blob_xoffset,
                            const bufferlist& bl,
@@ -17559,31 +17576,120 @@ int BlueStore::_do_write_v2(
   if (length == 0) {
     return 0;
   }
-  WriteContext wctx;
-  _choose_write_options(c, o, fadvise_flags, &wctx);
-  if (wctx.compress) {
-    // if we have compression, skip to write_v1
-    return _do_write(txc, c, o, offset, length, bl, fadvise_flags);
-  }
-  if (o->onode.segment_size != 0 && wctx.target_blob_size > o->onode.segment_size) {
-    wctx.target_blob_size = o->onode.segment_size;
-  }
+
   if (bl.length() != length) {
     bl.splice(length, bl.length() - length);
   }
-  BlueStore::Writer wr(this, txc, &wctx, o);
-  uint64_t start = p2align(offset, min_alloc_size);
-  uint64_t end = p2roundup(offset + length, min_alloc_size);
-  wr.left_affected_range = start;
-  wr.right_affected_range = end;
-  std::tie(wr.left_shard_bound, wr.right_shard_bound) =
-    o->extent_map.fault_range_ex(db, start, end - start);
-  wr.do_write(offset, bl);
-  o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range);
-  o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range);
+
+  WriteContext wctx;
+  _choose_write_options(c, o, fadvise_flags, &wctx);
+  if (wctx.compressor) {
+    uint32_t end = offset + length;
+    uint32_t segment_size = o->onode.segment_size;
+    if (segment_size) {
+      // split data into segments
+      // first and last segments will do lookaround scan
+      uint32_t write_offset = offset;
+      while (write_offset != end) {
+        uint32_t this_segment_begin = p2align(write_offset, segment_size);
+        uint32_t this_segment_end = this_segment_begin + segment_size;
+        uint32_t write_length = std::min(this_segment_end, end) - write_offset;
+        bufferlist chunk;
+        chunk.substr_of(bl, 0, write_length);
+        bl.splice(0, write_length);
+        _do_write_v2_compressed(txc, c, o, wctx, write_offset, write_length, chunk,
+                                this_segment_begin, this_segment_end);
+        write_offset += write_length;
+      };
+    } else {
+      const uint32_t scan_range = 0x20000; //128kB
+      uint32_t scan_left = offset < scan_range ? 0: offset - scan_range;
+      uint32_t scan_right = end + scan_range;
+      _do_write_v2_compressed(txc, c, o, wctx, offset, length, bl,
+                              scan_left, scan_right);
+    }
+  } else {
+    // normal uncompressed path
+    BlueStore::Writer wr(this, txc, &wctx, o);
+    uint64_t start = p2align(offset, min_alloc_size);
+    uint64_t end = p2roundup(offset + length, min_alloc_size);
+    wr.left_affected_range = start;
+    wr.right_affected_range = end;
+    std::tie(wr.left_shard_bound, wr.right_shard_bound) =
+      o->extent_map.fault_range_ex(db, start, end - start);
+    wr.do_write(offset, bl);
+    o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range);
+    o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range);
+  }
   return r;
 }
 
+int BlueStore::_do_write_v2_compressed(
+  TransContext *txc,
+  CollectionRef &c,
+  OnodeRef& o,
+  WriteContext& wctx,
+  uint32_t offset, uint32_t length,
+  ceph::buffer::list& input_bl,
+  uint32_t scan_left, uint32_t scan_right)
+{
+  o->extent_map.fault_range(db, scan_left, scan_right - scan_left);
+  if (!c->estimator) c->estimator.reset(create_estimator());
+  Estimator* estimator = c->estimator.get();
+  Scanner scanner(this);
+  scanner.write_lookaround(o.get(), offset, length, scan_left, scan_right, estimator);
+  std::vector<Estimator::region_t> regions;
+  estimator->get_regions(regions);
+  dout(15) << __func__ << " " << std::hex << offset << "~" << length << " -> ";
+  for (const auto& i : regions) {
+    *_dout << i.offset << "~" << i.length << " ";
+  }
+  *_dout << std::dec << dendl;
+  for (const auto& i : regions) {
+    ceph::buffer::list data_bl;
+    if (i.offset <= offset && offset < i.offset + i.length) {
+      // the starting point is withing the region, so the end must too
+      ceph_assert(offset + length <= i.offset + i.length);
+      if (i.offset < offset) {
+        _do_read_and_pad(c.get(), o, i.offset, offset - i.offset, data_bl);
+      }
+      data_bl.claim_append(input_bl);
+      if (offset + length < i.offset + i.length) {
+        ceph::buffer::list right_bl;
+        _do_read_and_pad(c.get(), o, offset + length,
+          i.offset + i.length - (offset + length), right_bl);
+        data_bl.claim_append(right_bl);
+      }
+    } else {
+      // the starting point is not within region, so the end is not allowed either
+      ceph_assert(offset + length < i.offset || offset + length >= i.offset + i.length);
+      _do_read_and_pad(c.get(), o, i.offset, i.length, data_bl);
+    }
+    ceph_assert(data_bl.length() == i.length);
+    Writer::blob_vec bd;
+    int32_t disk_for_compressed;
+    int32_t disk_for_raw;
+    uint32_t au_size = min_alloc_size;
+    uint32_t max_blob_size = c->pool_opts.value_or(
+      pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, (int64_t)comp_max_blob_size.load());
+    disk_for_compressed = estimator->split_and_compress(wctx.compressor, max_blob_size, data_bl, bd);
+    disk_for_raw = p2roundup(i.offset + i.length, au_size) - p2align(i.offset, au_size);
+    BlueStore::Writer wr(this, txc, &wctx, o);
+    if (disk_for_compressed < disk_for_raw) {
+      wr.do_write_with_blobs(i.offset, i.offset + i.length, i.offset + i.length, bd);
+    } else {
+      wr.do_write(i.offset, data_bl);
+    }
+  }
+  estimator->finish();
+  uint32_t changes_start = regions.front().offset;
+  uint32_t changes_end = regions.back().offset + regions.back().length;
+  o->extent_map.compress_extent_map(changes_start, changes_end - changes_start);
+  o->extent_map.dirty_range(changes_start, changes_end - changes_start);
+  o->extent_map.maybe_reshard(changes_start, changes_end);
+  return 0;
+}
+
 int BlueStore::_write(TransContext *txc,
                      CollectionRef& c,
                      OnodeRef& o,
index bd409bdb88d055b903cf09caa936034dd716c536..be4fa407081f232a5b4323d9f72b4c0a457814ed 100644 (file)
@@ -273,6 +273,8 @@ public:
   struct Onode;
   class Scanner;
   class Estimator;
+  Estimator* create_estimator();
+
   typedef boost::intrusive_ptr<Collection> CollectionRef;
   typedef boost::intrusive_ptr<Onode> OnodeRef;
 
@@ -1691,6 +1693,7 @@ public:
     std::optional<double> compression_req_ratio;
 
     ContextQueue *commit_queue;
+    std::unique_ptr<Estimator> estimator;
 
     OnodeCacheShard* get_onode_cache() const {
       return onode_space.cache;
@@ -3310,6 +3313,13 @@ private:
     uint32_t op_flags = 0,
     uint64_t retry_count = 0);
 
+  void _do_read_and_pad(
+    Collection* c,
+    OnodeRef& o,
+    uint32_t offset,
+    uint32_t length,
+    ceph::buffer::list& bl);
+
   int _do_readv(
     Collection *c,
     OnodeRef& o,
@@ -3834,7 +3844,14 @@ private:
     uint64_t offset, uint64_t length,
     ceph::buffer::list& bl,
     uint32_t fadvise_flags);
-
+  int _do_write_v2_compressed(
+    TransContext *txc,
+    CollectionRef &c,
+    OnodeRef& o,
+    WriteContext& wctx,
+    uint32_t offset, uint32_t length,
+    ceph::buffer::list& bl,
+    uint32_t scan_left, uint32_t scan_right);
   int _touch(TransContext *txc,
             CollectionRef& c,
             OnodeRef& o);
index 7cb38ce52ec2e41cf3135a6252a547da2d3ee8de..6c31699d4e4270c3b7afc76ba76fe3056368f8fd 100644 (file)
@@ -50,7 +50,7 @@ using P = BlueStore::printer;
 using Estimator = BlueStore::Estimator;
 using P = BlueStore::printer;
 
-void Estimator::reset()
+void Estimator::cleanup()
 {
   new_size = 0;
   uncompressed_size = 0;
@@ -220,6 +220,12 @@ void Estimator::finish()
   dout(25) << "exp_comp_factor=" << expected_compression_factor
            << " exp_recomp_err=" << expected_recompression_error
            << " exp_pad_exp=" << expected_pad_expansion << dendl;
+  cleanup();
+}
+
+Estimator* BlueStore::create_estimator()
+{
+  return new Estimator(this);
 }
 
 struct scan_blob_element_t {
index f81d4e81ef682a246ed1877c91e4500947ed5235..d266e1d2e79da725c05a21ad1723e4191a74aa78 100644 (file)
@@ -22,8 +22,6 @@ public:
   Estimator(BlueStore* bluestore)
   :bluestore(bluestore) {}
 
-  // Prepare for new write
-  void reset();
   // Inform estimator that an extent is a candidate for recompression.
   // Estimator has to calculate (guess) the cost (size) of the referenced data.
   // 'gain' is the size that will be released should extent be recompressed.
@@ -70,6 +68,8 @@ private:
   uint32_t actual_compressed = 0;
   uint32_t actual_compressed_plus_pad = 0;
   std::map<uint32_t, uint32_t> extra_recompress;
+  // Prepare for new write
+  void cleanup();
 };