From a21f5a37855db4a247d912f097dc220184e268de Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Wed, 9 Apr 2025 16:03:52 +0000 Subject: [PATCH] os/bluestore/compression: Estimator class Add CMake rules to compile. Add bluestore_compression dout subsys. Created Estimator class. It is used by Scanner to decide if specific extent is to be recompressed. Prepare for future machine learning / adaptive algorithm for estimation. So far logic of Estimator is relatively simple. It learns expected recompression values and uses them in next iterations to predict. Signed-off-by: Adam Kupczyk --- src/common/options/global.yaml.in | 15 ++ src/common/subsys.h | 1 + src/crimson/os/alienstore/CMakeLists.txt | 1 + src/os/CMakeLists.txt | 1 + src/os/bluestore/BlueStore.h | 1 + src/os/bluestore/Compression.cc | 216 +++++++++++++++++++++++ src/os/bluestore/Compression.h | 75 ++++++++ 7 files changed, 310 insertions(+) create mode 100644 src/os/bluestore/Compression.cc create mode 100644 src/os/bluestore/Compression.h diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 279156370a514..3e30a52bee614 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -4768,6 +4768,21 @@ options: flags: - runtime with_legacy: true +- name: bluestore_recompression_min_gain + type: float + level: advanced + desc: Required estimated gain for accepting extents for recompressing. + long_desc: Partial writes over compressed blobs have high cost. New data requires + new allocation units, but whole old blob must remain. To combat it BlueStore + looks around write position to find blobs that will make it profitable to read + and recompress. + fmt_desc: A float value, (size_exact_released / size_expected_after_compression). + default: 1.2 + see_also: + - bluestore_compression_max_blob_size + flags: + - runtime + with_legacy: true # Specifies minimum expected amount of saved allocation units # per single blob to enable compressed blobs garbage collection - name: bluestore_gc_enable_blob_threshold diff --git a/src/common/subsys.h b/src/common/subsys.h index 67bee2a8b5acb..a133b101f1d39 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -73,6 +73,7 @@ SUBSYS(throttle, 1, 1) SUBSYS(refs, 0, 0) SUBSYS(compressor, 1, 5) SUBSYS(bluestore, 1, 5) +SUBSYS(bluestore_compression, 1, 5) SUBSYS(bluefs, 1, 5) SUBSYS(bdev, 1, 3) SUBSYS(kstore, 1, 5) diff --git a/src/crimson/os/alienstore/CMakeLists.txt b/src/crimson/os/alienstore/CMakeLists.txt index 70a8fb9848e23..5ecde78b5ff72 100644 --- a/src/crimson/os/alienstore/CMakeLists.txt +++ b/src/crimson/os/alienstore/CMakeLists.txt @@ -60,6 +60,7 @@ set(alien_store_srcs ${PROJECT_SOURCE_DIR}/src/os/bluestore/StupidAllocator.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapAllocator.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/Writer.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/Compression.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueStore_debug.cc ${PROJECT_SOURCE_DIR}/src/os/memstore/MemStore.cc) diff --git a/src/os/CMakeLists.txt b/src/os/CMakeLists.txt index a07822dbca327..c98d09b24e2cc 100644 --- a/src/os/CMakeLists.txt +++ b/src/os/CMakeLists.txt @@ -27,6 +27,7 @@ if(WITH_BLUESTORE) bluestore/Btree2Allocator.cc bluestore/HybridAllocator.cc bluestore/Writer.cc + bluestore/Compression.cc ) endif(WITH_BLUESTORE) diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 9bc60bf77426f..f79e7e4320cca 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -271,6 +271,7 @@ public: struct BufferSpace; struct Collection; struct Onode; + class Estimator; typedef boost::intrusive_ptr CollectionRef; typedef boost::intrusive_ptr OnodeRef; diff --git a/src/os/bluestore/Compression.cc b/src/os/bluestore/Compression.cc new file mode 100644 index 0000000000000..71f2c9bdfd298 --- /dev/null +++ b/src/os/bluestore/Compression.cc @@ -0,0 +1,216 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "Compression.h" +#include "BlueStore.h" +#include "include/intarith.h" +#include + + +template +struct basic_ostream_formatter : fmt::formatter, Char> { + template + auto format(const T& value, fmt::basic_format_context& ctx) const + -> OutputIt { + std::basic_stringstream ss; + ss << value; + return fmt::formatter, Char>::format( + ss.view(), ctx); + } +}; +using ostream_formatter = basic_ostream_formatter; + +template <> struct fmt::formatter + : ostream_formatter {}; +template <> struct fmt::formatter + : ostream_formatter {}; + +#define dout_context bluestore->cct +#define dout_subsys ceph_subsys_bluestore_compression +#undef dout_prefix +#define dout_prefix *_dout << "bluecompr " + +using Estimator = BlueStore::Estimator; +using P = BlueStore::printer; + +void Estimator::reset() +{ + new_size = 0; + uncompressed_size = 0; + compressed_occupied = 0; + compressed_size = 0; + total_uncompressed_size = 0; + total_compressed_occupied = 0; + total_compressed_size = 0; + actual_compressed = 0; + actual_compressed_plus_pad = 0; + extra_recompress.clear(); +} +inline void Estimator::batch(const BlueStore::Extent* e, uint32_t gain) +{ + const Blob *h_Blob = &(*e->blob); + const bluestore_blob_t &h_bblob = h_Blob->get_blob(); + if (h_bblob.is_compressed()) { + compressed_size += e->length * h_bblob.get_compressed_payload_length() / h_bblob.get_logical_length(); + compressed_occupied += gain; + } else { + uncompressed_size += e->length; + } + dout(20) << fmt::format("Estimator::batch {} gain {:#x}", e->print(P::NICK), gain) << dendl; + dout(20) << fmt::format("Estimator::batch non-compr={:#x} compr-occup={:#x} compr-size?={:#x}", + uncompressed_size, compressed_occupied, compressed_size) << dendl; +} + +inline bool Estimator::is_worth() +{ + uint32_t cost = uncompressed_size * expected_compression_factor + + compressed_size * expected_recompression_error; + uint32_t gain = uncompressed_size + compressed_occupied; + double need_ratio = bluestore->cct->_conf->bluestore_recompression_min_gain; + bool take = gain > cost * need_ratio; + if (take) { + total_uncompressed_size += uncompressed_size; + total_compressed_occupied += compressed_occupied; + total_compressed_size += compressed_size; + } + uncompressed_size = 0; + compressed_occupied = 0; + compressed_size = 0; + return take; +} + +inline bool Estimator::is_worth(const BlueStore::Extent* e) +{ + const Blob *h_Blob = &(*e->blob); + const bluestore_blob_t &h_bblob = h_Blob->get_blob(); + ceph_assert(!h_bblob.is_compressed()); + ceph_assert(!h_bblob.is_shared()); + // for now assume it always worth + total_uncompressed_size += e->length; + return true; +} + +inline void Estimator::mark_recompress(const BlueStore::Extent* e) +{ + ceph_assert(!extra_recompress.contains(e->logical_offset)); + dout(25) << "recompress: " << e->print(P::NICK + P::JUSTID) << dendl; + extra_recompress.emplace(e->logical_offset, e->length); +} + +inline void Estimator::mark_main(uint32_t location, uint32_t length) +{ + ceph_assert(!extra_recompress.contains(location)); + dout(25) << "main data compress: " << std::hex + << location << "~" << length << std::dec << dendl; + extra_recompress.emplace(location, length); + new_size = length; +} + +void Estimator::get_regions(std::vector& regions) +{ + constexpr uint32_t unset = std::numeric_limits::max(); + // walk extents to form continous regions + region_t* r; + uint32_t end = unset; + auto i = extra_recompress.begin(); + while (i != extra_recompress.end()) { + dout(25) << std::hex << i->first + << "~" << i->second << dendl; + if (end == unset) { + regions.emplace_back(); + r = ®ions.back(); + r->offset = i->first; + r->length = i->second; + end = i->first + i->second; + } else { + if (i->first == end) { + r->length += i->second; + end = i->first + i->second; + } + } + ++i; + if (i == extra_recompress.end() || i->first != end) { + end = unset; + } + } +} + +int32_t Estimator::split_and_compress( + CompressorRef compr, + uint32_t max_blob_size, + ceph::buffer::list& data_bl, + Writer::blob_vec& bd) +{ + uint32_t au_size = bluestore->min_alloc_size; + uint32_t size = data_bl.length(); + ceph_assert(size > 0); + uint32_t blobs = (size + max_blob_size - 1) / max_blob_size; + uint32_t blob_size = p2roundup(size / blobs, au_size); + std::vector blob_sizes(blobs); + for (auto& i: blob_sizes) { + i = std::min(size, blob_size); + size -= i; + } + int32_t disk_needed = 0; + uint32_t bl_src_off = 0; + for (auto& i: blob_sizes) { + bd.emplace_back(); + bd.back().real_length = i; + bd.back().compressed_length = 0; + bd.back().object_data.substr_of(data_bl, bl_src_off, i); + bl_src_off += i; + // FIXME: memory alignment here is bad + bufferlist t; + std::optional compressor_message; + int r = compr->compress(bd.back().object_data, t, compressor_message); + ceph_assert(r == 0); + bluestore_compression_header_t chdr; + chdr.type = compr->get_type(); + chdr.length = t.length(); + chdr.compressor_message = compressor_message; + encode(chdr, bd.back().disk_data); + bd.back().disk_data.claim_append(t); + uint32_t len = bd.back().disk_data.length(); + bd.back().compressed_length = len; + uint32_t rem = p2nphase(len, au_size); + if (rem > 0) { + bd.back().disk_data.append_zero(rem); + } + actual_compressed += len; + actual_compressed_plus_pad += len + rem; + disk_needed += len + rem; + } + return disk_needed; +} + +void Estimator::finish() +{ + dout(25) << "new_size=" << new_size + << " unc_size=" << total_uncompressed_size + << " comp_cost=" << total_compressed_size << dendl; + uint32_t sum = new_size + total_uncompressed_size + total_compressed_size; + double expected = + (new_size + total_uncompressed_size) * expected_compression_factor + + total_compressed_size * expected_recompression_error; + double size_misprediction = double(expected - actual_compressed) / actual_compressed; + double size_misprediction_weighted = 1.0 / sum * size_misprediction * 0.01; + expected_compression_factor -= (new_size + total_uncompressed_size) * size_misprediction_weighted; + expected_recompression_error -= total_compressed_size * size_misprediction_weighted; + + double expected_pad = actual_compressed * expected_pad_expansion; + double pad_misprediction = (expected_pad - actual_compressed_plus_pad) / actual_compressed; + expected_pad_expansion -= pad_misprediction * 0.01; + dout(25) << "exp_comp_factor=" << expected_compression_factor + << " exp_recomp_err=" << expected_recompression_error + << " exp_pad_exp=" << expected_pad_expansion << dendl; +} diff --git a/src/os/bluestore/Compression.h b/src/os/bluestore/Compression.h new file mode 100644 index 0000000000000..17adf458a5fb0 --- /dev/null +++ b/src/os/bluestore/Compression.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef COMPRESSION_H_INCLUDED +#define COMPRESSION_H_INCLUDED + +#include "BlueStore.h" +#include "Writer.h" + +class BlueStore::Estimator { +public: + Estimator(BlueStore* bluestore) + :bluestore(bluestore) {} + + // Prepare for new write + void reset(); + // Inform estimator that an extent is a candidate for recompression. + // Estimator has to calculate (guess) the cost (size) of the referenced data. + // 'gain' is the size that will be released should extent be recompressed. + void batch(const BlueStore::Extent* candidate, uint32_t gain); + // Lets estimator decide if extents previously passed by batch() + // are worth recompressing. + // If so (returns true), extents will be added by mark_recompress(). + bool is_worth(); + // Lets estimator decide if an uncompressed neighbor should be + // recompressed. The extent passed is always uncompressed and + // always a direct neighbor to already accepted recompression batch. + // If so (returns true), extents will be added by mark_recompress(). + bool is_worth(const BlueStore::Extent* uncompressed_neighbor); + + void mark_recompress(const BlueStore::Extent* e); + void mark_main(uint32_t location, uint32_t length); + struct region_t { + uint32_t offset; // offset of region + uint32_t length; // size of region + }; + void get_regions(std::vector& regions); + + int32_t split_and_compress( + CompressorRef compr, + uint32_t max_blob_size, + ceph::buffer::list& data_bl, + Writer::blob_vec& bd); + + void finish(); + +private: + BlueStore* bluestore; + double expected_compression_factor = 0.5; + double expected_recompression_error = 1.1; + double expected_pad_expansion = 1.1; + uint32_t new_size = 0; // fresh data to write + uint32_t uncompressed_size = 0; // data that was not compressed + uint32_t compressed_size = 0; // data of compressed size + uint32_t compressed_occupied = 0; // disk size that will be freed + uint32_t total_uncompressed_size = 0; + uint32_t total_compressed_size = 0; + uint32_t total_compressed_occupied = 0; + // accumulated size of compressed, used in feedback learn stage + uint32_t actual_compressed = 0; + uint32_t actual_compressed_plus_pad = 0; + std::map extra_recompress; +}; + +#endif -- 2.39.5