From 528813e46b18bba35a04c4c10807ff3fb0e73a4c Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Tue, 8 Apr 2025 08:36:21 +0000 Subject: [PATCH] os/bluestore: Add do_write_v2_compressed() Modify do_write_v2() to branch into do_write_v2_compressed(). Segmented and regular cases are recognized and handled properly. New do_write_v2_compressed() oversees compression / recompression. Make one Estimator per Collection. It makes possible for estimator to learn in collection specific compressibility. In write_v2_compressed use compressor already selected in choose_write_options. Make Collection create Estimator on first use. Signed-off-by: Adam Kupczyk --- src/os/bluestore/BlueStore.cc | 144 +++++++++++++++++++++++++++----- src/os/bluestore/BlueStore.h | 19 ++++- src/os/bluestore/Compression.cc | 8 +- src/os/bluestore/Compression.h | 4 +- 4 files changed, 152 insertions(+), 23 deletions(-) diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index c7ad989109cb7..89c1795d59bcf 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -12886,6 +12887,22 @@ int BlueStore::_do_read( return r; } +void inline BlueStore::_do_read_and_pad( + Collection* c, + OnodeRef& o, + uint32_t offset, + uint32_t length, + ceph::buffer::list& bl) +{ + int r = _do_read(c, o, offset, length, bl, 0); + ceph_assert(r >= 0 && r <= (int)length); + size_t zlen = length - r; + if (zlen > 0) { + bl.append_zero(zlen); + logger->inc(l_bluestore_write_pad_bytes, zlen); + } +} + int BlueStore::_verify_csum(OnodeRef& o, const bluestore_blob_t* blob, uint64_t blob_xoffset, const bufferlist& bl, @@ -17559,31 +17576,120 @@ int BlueStore::_do_write_v2( if (length == 0) { return 0; } - WriteContext wctx; - _choose_write_options(c, o, fadvise_flags, &wctx); - if (wctx.compress) { - // if we have compression, skip to write_v1 - return _do_write(txc, c, o, offset, length, bl, fadvise_flags); - } - if (o->onode.segment_size != 0 && wctx.target_blob_size > o->onode.segment_size) { - wctx.target_blob_size = o->onode.segment_size; - } + if (bl.length() != length) { bl.splice(length, bl.length() - length); } - BlueStore::Writer wr(this, txc, &wctx, o); - uint64_t start = p2align(offset, min_alloc_size); - uint64_t end = p2roundup(offset + length, min_alloc_size); - wr.left_affected_range = start; - wr.right_affected_range = end; - std::tie(wr.left_shard_bound, wr.right_shard_bound) = - o->extent_map.fault_range_ex(db, start, end - start); - wr.do_write(offset, bl); - o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range); - o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range); + + WriteContext wctx; + _choose_write_options(c, o, fadvise_flags, &wctx); + if (wctx.compressor) { + uint32_t end = offset + length; + uint32_t segment_size = o->onode.segment_size; + if (segment_size) { + // split data into segments + // first and last segments will do lookaround scan + uint32_t write_offset = offset; + while (write_offset != end) { + uint32_t this_segment_begin = p2align(write_offset, segment_size); + uint32_t this_segment_end = this_segment_begin + segment_size; + uint32_t write_length = std::min(this_segment_end, end) - write_offset; + bufferlist chunk; + chunk.substr_of(bl, 0, write_length); + bl.splice(0, write_length); + _do_write_v2_compressed(txc, c, o, wctx, write_offset, write_length, chunk, + this_segment_begin, this_segment_end); + write_offset += write_length; + }; + } else { + const uint32_t scan_range = 0x20000; //128kB + uint32_t scan_left = offset < scan_range ? 0: offset - scan_range; + uint32_t scan_right = end + scan_range; + _do_write_v2_compressed(txc, c, o, wctx, offset, length, bl, + scan_left, scan_right); + } + } else { + // normal uncompressed path + BlueStore::Writer wr(this, txc, &wctx, o); + uint64_t start = p2align(offset, min_alloc_size); + uint64_t end = p2roundup(offset + length, min_alloc_size); + wr.left_affected_range = start; + wr.right_affected_range = end; + std::tie(wr.left_shard_bound, wr.right_shard_bound) = + o->extent_map.fault_range_ex(db, start, end - start); + wr.do_write(offset, bl); + o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range); + o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range); + } return r; } +int BlueStore::_do_write_v2_compressed( + TransContext *txc, + CollectionRef &c, + OnodeRef& o, + WriteContext& wctx, + uint32_t offset, uint32_t length, + ceph::buffer::list& input_bl, + uint32_t scan_left, uint32_t scan_right) +{ + o->extent_map.fault_range(db, scan_left, scan_right - scan_left); + if (!c->estimator) c->estimator.reset(create_estimator()); + Estimator* estimator = c->estimator.get(); + Scanner scanner(this); + scanner.write_lookaround(o.get(), offset, length, scan_left, scan_right, estimator); + std::vector regions; + estimator->get_regions(regions); + dout(15) << __func__ << " " << std::hex << offset << "~" << length << " -> "; + for (const auto& i : regions) { + *_dout << i.offset << "~" << i.length << " "; + } + *_dout << std::dec << dendl; + for (const auto& i : regions) { + ceph::buffer::list data_bl; + if (i.offset <= offset && offset < i.offset + i.length) { + // the starting point is withing the region, so the end must too + ceph_assert(offset + length <= i.offset + i.length); + if (i.offset < offset) { + _do_read_and_pad(c.get(), o, i.offset, offset - i.offset, data_bl); + } + data_bl.claim_append(input_bl); + if (offset + length < i.offset + i.length) { + ceph::buffer::list right_bl; + _do_read_and_pad(c.get(), o, offset + length, + i.offset + i.length - (offset + length), right_bl); + data_bl.claim_append(right_bl); + } + } else { + // the starting point is not within region, so the end is not allowed either + ceph_assert(offset + length < i.offset || offset + length >= i.offset + i.length); + _do_read_and_pad(c.get(), o, i.offset, i.length, data_bl); + } + ceph_assert(data_bl.length() == i.length); + Writer::blob_vec bd; + int32_t disk_for_compressed; + int32_t disk_for_raw; + uint32_t au_size = min_alloc_size; + uint32_t max_blob_size = c->pool_opts.value_or( + pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, (int64_t)comp_max_blob_size.load()); + disk_for_compressed = estimator->split_and_compress(wctx.compressor, max_blob_size, data_bl, bd); + disk_for_raw = p2roundup(i.offset + i.length, au_size) - p2align(i.offset, au_size); + BlueStore::Writer wr(this, txc, &wctx, o); + if (disk_for_compressed < disk_for_raw) { + wr.do_write_with_blobs(i.offset, i.offset + i.length, i.offset + i.length, bd); + } else { + wr.do_write(i.offset, data_bl); + } + } + estimator->finish(); + uint32_t changes_start = regions.front().offset; + uint32_t changes_end = regions.back().offset + regions.back().length; + o->extent_map.compress_extent_map(changes_start, changes_end - changes_start); + o->extent_map.dirty_range(changes_start, changes_end - changes_start); + o->extent_map.maybe_reshard(changes_start, changes_end); + return 0; +} + int BlueStore::_write(TransContext *txc, CollectionRef& c, OnodeRef& o, diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index bd409bdb88d05..be4fa407081f2 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -273,6 +273,8 @@ public: struct Onode; class Scanner; class Estimator; + Estimator* create_estimator(); + typedef boost::intrusive_ptr CollectionRef; typedef boost::intrusive_ptr OnodeRef; @@ -1691,6 +1693,7 @@ public: std::optional compression_req_ratio; ContextQueue *commit_queue; + std::unique_ptr estimator; OnodeCacheShard* get_onode_cache() const { return onode_space.cache; @@ -3310,6 +3313,13 @@ private: uint32_t op_flags = 0, uint64_t retry_count = 0); + void _do_read_and_pad( + Collection* c, + OnodeRef& o, + uint32_t offset, + uint32_t length, + ceph::buffer::list& bl); + int _do_readv( Collection *c, OnodeRef& o, @@ -3834,7 +3844,14 @@ private: uint64_t offset, uint64_t length, ceph::buffer::list& bl, uint32_t fadvise_flags); - + int _do_write_v2_compressed( + TransContext *txc, + CollectionRef &c, + OnodeRef& o, + WriteContext& wctx, + uint32_t offset, uint32_t length, + ceph::buffer::list& bl, + uint32_t scan_left, uint32_t scan_right); int _touch(TransContext *txc, CollectionRef& c, OnodeRef& o); diff --git a/src/os/bluestore/Compression.cc b/src/os/bluestore/Compression.cc index 7cb38ce52ec2e..6c31699d4e427 100644 --- a/src/os/bluestore/Compression.cc +++ b/src/os/bluestore/Compression.cc @@ -50,7 +50,7 @@ using P = BlueStore::printer; using Estimator = BlueStore::Estimator; using P = BlueStore::printer; -void Estimator::reset() +void Estimator::cleanup() { new_size = 0; uncompressed_size = 0; @@ -220,6 +220,12 @@ void Estimator::finish() dout(25) << "exp_comp_factor=" << expected_compression_factor << " exp_recomp_err=" << expected_recompression_error << " exp_pad_exp=" << expected_pad_expansion << dendl; + cleanup(); +} + +Estimator* BlueStore::create_estimator() +{ + return new Estimator(this); } struct scan_blob_element_t { diff --git a/src/os/bluestore/Compression.h b/src/os/bluestore/Compression.h index f81d4e81ef682..d266e1d2e79da 100644 --- a/src/os/bluestore/Compression.h +++ b/src/os/bluestore/Compression.h @@ -22,8 +22,6 @@ public: Estimator(BlueStore* bluestore) :bluestore(bluestore) {} - // Prepare for new write - void reset(); // Inform estimator that an extent is a candidate for recompression. // Estimator has to calculate (guess) the cost (size) of the referenced data. // 'gain' is the size that will be released should extent be recompressed. @@ -70,6 +68,8 @@ private: uint32_t actual_compressed = 0; uint32_t actual_compressed_plus_pad = 0; std::map extra_recompress; + // Prepare for new write + void cleanup(); }; -- 2.39.5