From d16b15a809ad89d07250562ece4eef9e6ff500f7 Mon Sep 17 00:00:00 2001 From: Igor Fedotov Date: Tue, 14 Jan 2025 15:59:08 +0300 Subject: [PATCH] os/bluestore: preload all the compressor plugins on mount. Fixes: https://tracker.ceph.com/issues/69507 Signed-off-by: Igor Fedotov (cherry picked from commit a6a499ed5fcadbbd88239fe133115e0f5945a6a7) --- src/os/bluestore/BlueStore.cc | 222 ++++++++++++++++++---------------- src/os/bluestore/BlueStore.h | 10 +- 2 files changed, 130 insertions(+), 102 deletions(-) diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 690f0063d76e2..4ebb856f6253c 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -4795,8 +4795,6 @@ void BlueStore::_set_compression() _set_compression_alert(true, s.c_str()); } - compressor = nullptr; - if (cct->_conf->bluestore_compression_min_blob_size) { comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size; } else { @@ -4818,19 +4816,36 @@ void BlueStore::_set_compression() comp_max_blob_size = cct->_conf->bluestore_compression_max_blob_size_ssd; } } - - auto& alg_name = cct->_conf->bluestore_compression_algorithm; - if (!alg_name.empty()) { - compressor = Compressor::create(cct, alg_name); - if (!compressor) { + if (compressors.size() == 0) { + compressors.resize(Compressor::COMP_ALG_LAST); + compressors[Compressor::COMP_ALG_NONE] = nullptr; + int alg = Compressor::COMP_ALG_NONE + 1; + while (alg < Compressor::COMP_ALG_LAST) { + compressors[alg] = Compressor::create(cct, alg); + ++alg; + } + } + auto alg_name = cct->_conf->bluestore_compression_algorithm; + CompressorRef c = + !alg_name.empty() ? Compressor::create(cct, alg_name) : CompressorRef(); + if (c) { + ceph_assert(c->get_type() < int(compressors.size())); + def_compressor_alg = c->get_type(); + alg_name = c->get_type_name(); // let's use actual resulting alg name + } else { + if (!alg_name.empty()) { derr << __func__ << " unable to initialize " << alg_name.c_str() << " compressor" << dendl; _set_compression_alert(false, alg_name.c_str()); + } else { + _clear_compression_alert(); } + def_compressor_alg = Compressor::COMP_ALG_NONE; + alg_name = "(none)"; } - + dout(10) << __func__ << " mode " << Compressor::get_comp_mode_name(comp_mode) - << " alg " << (compressor ? compressor->get_type_name() : "(none)") + << " alg " << alg_name << " min_blob " << comp_min_blob_size << " max_blob " << comp_max_blob_size << dendl; @@ -10861,6 +10876,67 @@ int BlueStore::set_collection_opts( return -ENOENT; std::unique_lock l{c->lock}; c->pool_opts = opts; + + string val; + c->compression_algorithm.reset(); + if (c->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) { + auto alg = Compressor::get_comp_alg_type(val); + CompressorRef cp; + if (alg.has_value() && *alg != Compressor::COMP_ALG_NONE) { + cp = *alg < compressors.size() ? compressors[*alg] : cp; + if (!cp) { + _set_compression_alert(false, val.c_str()); + derr << __func__ << " unable to load compressor plugin for " << val.c_str() + << dendl; + } else { + ceph_assert(cp->get_type() == *alg); + c->compression_algorithm = cp->get_type(); + } + } + } + c->compression_mode.reset(); + if (c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) { + auto cm = Compressor::get_comp_mode_type(val); + if (!cm) { + derr << __func__ << " unrecognized compression mode: " << val.c_str() + << dendl; + } else { + c->compression_mode = cm; + } + } + int64_t ival; + c->csum_type.reset(); + if (c->pool_opts.get(pool_opts_t::CSUM_TYPE, &ival)) { + if (ival >= Checksummer::CSUM_MAX) { + derr << __func__ << " unrecognized checksum type: " << ival + << dendl; + } else { + c->csum_type = Checksummer::CSumType(ival); + } + } + c->comp_min_blob_size.reset(); + if (c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &ival)) { + if (ival <= 0) { + derr << __func__ << " invalid min compression blob size: " << ival + << dendl; + } else { + c->comp_min_blob_size = ival; + } + } + c->comp_max_blob_size.reset(); + if (c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &ival)) { + if (ival <= 0) { + derr << __func__ << " invalid max compression blob size: " << ival + << dendl; + } else { + c->comp_max_blob_size = ival; + } + } + double dval; + c->compression_req_ratio.reset(); + if (c->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &dval)) { + c->compression_req_ratio = dval; + } return 0; } @@ -11366,20 +11442,25 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result) bluestore_compression_header_t chdr; decode(chdr, i); int alg = int(chdr.type); - CompressorRef cp = compressor; - if (!cp || (int)cp->get_type() != alg) { - cp = Compressor::create(cct, alg); - } - + CompressorRef cp = + alg < int(compressors.size()) ? compressors[alg] : CompressorRef(); if (!cp.get()) { - // if compressor isn't available - error, because cannot return - // decompressed data? - - const char* alg_name = Compressor::get_comp_alg_name(alg); - derr << __func__ << " can't load decompressor " << alg_name << dendl; - _set_compression_alert(false, alg_name); - r = -EIO; + if (alg != Compressor::COMP_ALG_NONE) { + // if compressor isn't available - error, because cannot return + // decompressed data? + const char* alg_name = Compressor::get_comp_alg_name(alg); + derr << __func__ << " can't locate compressor plugin for " << alg_name + << dendl; + _set_compression_alert(false, alg_name); + r = -EIO; + } else { + dout(0) << __func__ + << " [warn] Compressed Blob has got no alg in the header " + << dendl; + i.copy_all(*result); + } } else { + ceph_assert((int)cp->get_type() == alg); r = cp->decompress(i, chdr.length, *result, chdr.compressor_message); if (r < 0) { derr << __func__ << " decompression failed with exit code " << r << dendl; @@ -15559,39 +15640,12 @@ int BlueStore::_do_alloc_write( CompressorRef c; double crr = 0; if (wctx->compress) { - c = select_option( - "compression_algorithm", - compressor, - [&]() { - string val; - if (coll->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) { - CompressorRef cp = compressor; - if (!cp || cp->get_type_name() != val) { - cp = Compressor::create(cct, val); - if (!cp) { - if (_set_compression_alert(false, val.c_str())) { - derr << __func__ << " unable to initialize " << val.c_str() - << " compressor" << dendl; - } - } - } - return std::optional(cp); - } - return std::optional(); - } - ); - - crr = select_option( - "compression_required_ratio", - cct->_conf->bluestore_compression_required_ratio, - [&]() { - double val; - if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) { - return std::optional(val); - } - return std::optional(); - } - ); + c = coll->compression_algorithm.has_value() ? + compressors[*(coll->compression_algorithm)]: + compressors[def_compressor_alg]; + crr = coll->compression_req_ratio.has_value() ? + *(coll->compression_req_ratio) : + cct->_conf->bluestore_compression_required_ratio; } // checksum @@ -16057,34 +16111,16 @@ void BlueStore::_choose_write_options( wctx->csum_order = block_size_order; // checksum - int64_t csum = csum_type.load(); - csum = select_option( - "csum_type", - csum, - [&]() { - int64_t val; - if (c->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) { - return std::optional(val); - } - return std::optional(); - } - ); - wctx->csum_type = csum; + wctx->csum_type= c->csum_type.has_value() ? + *(c->csum_type): + csum_type.load(); // compression parameters unsigned alloc_hints = o->onode.alloc_hint_flags; - auto cm = select_option( - "compression_mode", - comp_mode.load(), - [&]() { - string val; - if (c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) { - return std::optional( - Compressor::get_comp_mode_type(val)); - } - return std::optional(); - } - ); + + auto cm = c->compression_mode.has_value() ? + *(c->compression_mode) : + comp_mode.load(); wctx->compress = (cm != Compressor::COMP_NONE) && ((cm == Compressor::COMP_FORCE) || @@ -16109,31 +16145,15 @@ void BlueStore::_choose_write_options( } if (wctx->compress) { - wctx->target_blob_size = select_option( - "compression_max_blob_size", - comp_max_blob_size.load(), - [&]() { - int64_t val; - if (c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &val)) { - return std::optional((uint64_t)val); - } - return std::optional(); - } - ); + wctx->target_blob_size = c->comp_max_blob_size.has_value() ? + *(c->comp_max_blob_size): + comp_max_blob_size.load(); } } else { if (wctx->compress) { - wctx->target_blob_size = select_option( - "compression_min_blob_size", - comp_min_blob_size.load(), - [&]() { - int64_t val; - if (c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &val)) { - return std::optional((uint64_t)val); - } - return std::optional(); - } - ); + wctx->target_blob_size = c->comp_min_blob_size.has_value() ? + *(c->comp_min_blob_size): + comp_min_blob_size.load(); } } diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 14c09fcf74e3e..63297edd8266d 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -1527,6 +1527,13 @@ private: //pool options pool_opts_t pool_opts; + std::optional compression_algorithm; + std::optional compression_mode; + std::optional csum_type; + std::optional comp_min_blob_size; + std::optional comp_max_blob_size; + std::optional compression_req_ratio; + ContextQueue *commit_queue; OnodeCacheShard* get_onode_cache() const { @@ -2345,7 +2352,8 @@ private: std::atomic comp_mode = {Compressor::COMP_NONE}; ///< compression mode - CompressorRef compressor; + std::atomic def_compressor_alg = {Compressor::COMP_ALG_NONE}; + std::vector compressors; std::atomic comp_min_blob_size = {0}; std::atomic comp_max_blob_size = {0}; -- 2.39.5