o->extent_map.fault_range(db, scan_left, scan_right - scan_left);
if (!c->estimator) c->estimator.reset(create_estimator());
Estimator* estimator = c->estimator.get();
+ estimator->set_wctx(&wctx);
Scanner scanner(this);
scanner.write_lookaround(o.get(), offset, length, scan_left, scan_right, estimator);
std::vector<Estimator::region_t> regions;
int32_t disk_for_compressed;
int32_t disk_for_raw;
uint32_t au_size = min_alloc_size;
- uint32_t max_blob_size = c->pool_opts.value_or(
- pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, (int64_t)comp_max_blob_size.load());
- disk_for_compressed = estimator->split_and_compress(wctx.compressor, max_blob_size, data_bl, bd);
+ disk_for_compressed = estimator->split_and_compress(data_bl, bd);
disk_for_raw = p2roundup(i.offset + i.length, au_size) - p2align(i.offset, au_size);
BlueStore::Writer wr(this, txc, &wctx, o);
if (disk_for_compressed < disk_for_raw) {
void Estimator::cleanup()
{
+ wctx = nullptr;
new_size = 0;
uncompressed_size = 0;
compressed_occupied = 0;
compressed_size = 0;
+ compressed_area = 0;
total_uncompressed_size = 0;
total_compressed_occupied = 0;
total_compressed_size = 0;
actual_compressed = 0;
actual_compressed_plus_pad = 0;
extra_recompress.clear();
+ single_compressed_blob = true;
+ last_blob = nullptr;
}
+
+void Estimator::set_wctx(const WriteContext* wctx)
+{
+ this->wctx = wctx;
+}
+
inline void Estimator::batch(const BlueStore::Extent* e, uint32_t gain)
{
const Blob *h_Blob = &(*e->blob);
const bluestore_blob_t &h_bblob = h_Blob->get_blob();
if (h_bblob.is_compressed()) {
+ if (last_blob) {
+ if (h_Blob != last_blob) {
+ single_compressed_blob = false;
+ }
+ } else {
+ last_blob = h_Blob;
+ }
compressed_size += e->length * h_bblob.get_compressed_payload_length() / h_bblob.get_logical_length();
+ compressed_area += e->length;
compressed_occupied += gain;
} else {
uncompressed_size += e->length;
{
uint32_t cost = uncompressed_size * expected_compression_factor +
compressed_size * expected_recompression_error;
+ if (uncompressed_size == 0 && single_compressed_blob) {
+ // The special case if all extents are from compressed blobs.
+ // We want to avoid the case of recompressing into exactly the same.
+ // The cost should increase proportionally to blob size;
+ // the rationale is that recompressing small blob is likely to provide gain,
+ // but recompressing whole large blob isn't.
+ uint64_t padding_size = p2nphase<uint64_t>(compressed_size, bluestore->min_alloc_size);
+ uint32_t split_tax = padding_size * compressed_area / wctx->target_blob_size;
+ cost += split_tax;
+ }
uint32_t gain = uncompressed_size + compressed_occupied;
double need_ratio = bluestore->cct->_conf->bluestore_recompression_min_gain;
bool take = gain > cost * need_ratio;
}
int32_t Estimator::split_and_compress(
- CompressorRef compr,
- uint32_t max_blob_size,
ceph::buffer::list& data_bl,
Writer::blob_vec& bd)
{
uint32_t au_size = bluestore->min_alloc_size;
uint32_t size = data_bl.length();
ceph_assert(size > 0);
- uint32_t blobs = (size + max_blob_size - 1) / max_blob_size;
+ uint32_t blobs = (size + wctx->target_blob_size - 1) / wctx->target_blob_size;
uint32_t blob_size = p2roundup(size / blobs, au_size);
std::vector<uint32_t> blob_sizes(blobs);
for (auto& i: blob_sizes) {
// FIXME: memory alignment here is bad
bufferlist t;
std::optional<int32_t> compressor_message;
- int r = compr->compress(bd.back().object_data, t, compressor_message);
+ int r = wctx->compressor->compress(bd.back().object_data, t, compressor_message);
ceph_assert(r == 0);
bluestore_compression_header_t chdr;
- chdr.type = compr->get_type();
+ chdr.type = wctx->compressor->get_type();
chdr.length = t.length();
chdr.compressor_message = compressor_message;
encode(chdr, bd.back().disk_data);
Estimator(BlueStore* bluestore)
:bluestore(bluestore) {}
+ // Each estimator run needs specific WriteContext
+ void set_wctx(const WriteContext* wctx);
+
// Inform estimator that an extent is a candidate for recompression.
// Estimator has to calculate (guess) the cost (size) of the referenced data.
// 'gain' is the size that will be released should extent be recompressed.
void get_regions(std::vector<region_t>& regions);
int32_t split_and_compress(
- CompressorRef compr,
- uint32_t max_blob_size,
ceph::buffer::list& data_bl,
Writer::blob_vec& bd);
double expected_compression_factor = 0.5;
double expected_recompression_error = 1.1;
double expected_pad_expansion = 1.1;
+ const WriteContext* wctx = nullptr;
uint32_t new_size = 0; // fresh data to write
uint32_t uncompressed_size = 0; // data that was not compressed
- uint32_t compressed_size = 0; // data of compressed size
+ uint32_t compressed_size = 0; // estimated size of compressed data
+ uint32_t compressed_area = 0; // area that is compressed
uint32_t compressed_occupied = 0; // disk size that will be freed
uint32_t total_uncompressed_size = 0;
uint32_t total_compressed_size = 0;
uint32_t actual_compressed = 0;
uint32_t actual_compressed_plus_pad = 0;
std::map<uint32_t, uint32_t> extra_recompress;
+ bool single_compressed_blob = true;
+ const Blob* last_blob = nullptr;
// Prepare for new write
void cleanup();
};