]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: preload all the compressor plugins on mount.
authorIgor Fedotov <ifedotov@croit.io>
Tue, 14 Jan 2025 12:59:08 +0000 (15:59 +0300)
committerIgor Fedotov <igor.fedotov@croit.io>
Wed, 5 Mar 2025 18:28:59 +0000 (21:28 +0300)
Fixes: https://tracker.ceph.com/issues/69507
Signed-off-by: Igor Fedotov <igor.fedotov@croit.io>
(cherry picked from commit a6a499ed5fcadbbd88239fe133115e0f5945a6a7)

src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index 690f0063d76e2cec447f0a7ad9904fa1a3981719..4ebb856f6253c81d70eb563e89c0b8824e1b26b0 100644 (file)
@@ -4795,8 +4795,6 @@ void BlueStore::_set_compression()
     _set_compression_alert(true, s.c_str());
   }
 
-  compressor = nullptr;
-
   if (cct->_conf->bluestore_compression_min_blob_size) {
     comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size;
   } else {
@@ -4818,19 +4816,36 @@ void BlueStore::_set_compression()
       comp_max_blob_size = cct->_conf->bluestore_compression_max_blob_size_ssd;
     }
   }
-
-  auto& alg_name = cct->_conf->bluestore_compression_algorithm;
-  if (!alg_name.empty()) {
-    compressor = Compressor::create(cct, alg_name);
-    if (!compressor) {
+  if (compressors.size() == 0) {
+    compressors.resize(Compressor::COMP_ALG_LAST);
+    compressors[Compressor::COMP_ALG_NONE] = nullptr;
+    int alg = Compressor::COMP_ALG_NONE + 1;
+    while (alg < Compressor::COMP_ALG_LAST) {
+      compressors[alg] = Compressor::create(cct, alg);
+      ++alg;
+    }
+  }
+  auto alg_name = cct->_conf->bluestore_compression_algorithm;
+  CompressorRef c =
+   !alg_name.empty() ? Compressor::create(cct, alg_name) : CompressorRef();
+  if (c) {
+    ceph_assert(c->get_type() < int(compressors.size()));
+    def_compressor_alg = c->get_type();
+    alg_name = c->get_type_name(); // let's use actual resulting alg name
+  } else {
+    if (!alg_name.empty()) {
       derr << __func__ << " unable to initialize " << alg_name.c_str() << " compressor"
            << dendl;
       _set_compression_alert(false, alg_name.c_str());
+    } else {
+      _clear_compression_alert();
     }
+    def_compressor_alg = Compressor::COMP_ALG_NONE;
+    alg_name = "(none)";
   }
+
   dout(10) << __func__ << " mode " << Compressor::get_comp_mode_name(comp_mode)
-          << " alg " << (compressor ? compressor->get_type_name() : "(none)")
+          << " alg " << alg_name
           << " min_blob " << comp_min_blob_size
           << " max_blob " << comp_max_blob_size
           << dendl;
@@ -10861,6 +10876,67 @@ int BlueStore::set_collection_opts(
     return -ENOENT;
   std::unique_lock l{c->lock};
   c->pool_opts = opts;
+
+  string val;
+  c->compression_algorithm.reset();
+  if (c->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) {
+    auto alg = Compressor::get_comp_alg_type(val);
+    CompressorRef cp;
+    if (alg.has_value() && *alg != Compressor::COMP_ALG_NONE) {
+      cp = *alg < compressors.size() ? compressors[*alg] : cp;
+      if (!cp) {
+        _set_compression_alert(false, val.c_str());
+        derr << __func__ << " unable to load compressor plugin for " << val.c_str()
+             << dendl;
+      } else {
+        ceph_assert(cp->get_type() == *alg);
+        c->compression_algorithm = cp->get_type();
+      }
+    }
+  }
+  c->compression_mode.reset();
+  if (c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
+    auto cm = Compressor::get_comp_mode_type(val);
+    if (!cm) {
+      derr << __func__ << " unrecognized compression mode: " << val.c_str()
+           << dendl;
+    } else {
+      c->compression_mode = cm;
+    }
+  }
+  int64_t ival;
+  c->csum_type.reset();
+  if (c->pool_opts.get(pool_opts_t::CSUM_TYPE, &ival)) {
+    if (ival >= Checksummer::CSUM_MAX) {
+      derr << __func__ << " unrecognized checksum type: " << ival
+           << dendl;
+    } else {
+      c->csum_type = Checksummer::CSumType(ival);
+    }
+  }
+  c->comp_min_blob_size.reset();
+  if (c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &ival)) {
+    if (ival <= 0) {
+      derr << __func__ << " invalid min compression blob size: " << ival
+           << dendl;
+    } else {
+      c->comp_min_blob_size = ival;
+    }
+  }
+  c->comp_max_blob_size.reset();
+  if (c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &ival)) {
+    if (ival <= 0) {
+      derr << __func__ << " invalid max compression blob size: " << ival
+           << dendl;
+    } else {
+      c->comp_max_blob_size = ival;
+    }
+  }
+  double dval;
+  c->compression_req_ratio.reset();
+  if (c->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &dval)) {
+    c->compression_req_ratio = dval;
+  }
   return 0;
 }
 
@@ -11366,20 +11442,25 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result)
   bluestore_compression_header_t chdr;
   decode(chdr, i);
   int alg = int(chdr.type);
-  CompressorRef cp = compressor;
-  if (!cp || (int)cp->get_type() != alg) {
-    cp = Compressor::create(cct, alg);
-  }
-
+  CompressorRef cp =
+    alg < int(compressors.size()) ? compressors[alg] : CompressorRef();
   if (!cp.get()) {
-    // if compressor isn't available - error, because cannot return
-    // decompressed data?
-    
-    const char* alg_name = Compressor::get_comp_alg_name(alg);
-    derr << __func__ << " can't load decompressor " << alg_name << dendl;
-    _set_compression_alert(false, alg_name);
-    r = -EIO;
+    if (alg != Compressor::COMP_ALG_NONE) {
+      // if compressor isn't available - error, because cannot return
+      // decompressed data?
+      const char* alg_name = Compressor::get_comp_alg_name(alg);
+      derr << __func__ << " can't locate compressor plugin for " << alg_name
+           << dendl;
+      _set_compression_alert(false, alg_name);
+      r = -EIO;
+    } else {
+      dout(0) << __func__
+              << " [warn] Compressed Blob has got no alg in the header "
+              << dendl;
+      i.copy_all(*result);
+    }
   } else {
+    ceph_assert((int)cp->get_type() == alg);
     r = cp->decompress(i, chdr.length, *result, chdr.compressor_message);
     if (r < 0) {
       derr << __func__ << " decompression failed with exit code " << r << dendl;
@@ -15559,39 +15640,12 @@ int BlueStore::_do_alloc_write(
   CompressorRef c;
   double crr = 0;
   if (wctx->compress) {
-    c = select_option(
-      "compression_algorithm",
-      compressor,
-      [&]() {
-        string val;
-        if (coll->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) {
-          CompressorRef cp = compressor;
-          if (!cp || cp->get_type_name() != val) {
-            cp = Compressor::create(cct, val);
-           if (!cp) {
-             if (_set_compression_alert(false, val.c_str())) {
-               derr << __func__ << " unable to initialize " << val.c_str()
-                    << " compressor" << dendl;
-             }
-           }
-          }
-          return std::optional<CompressorRef>(cp);
-        }
-        return std::optional<CompressorRef>();
-      }
-    );
-
-    crr = select_option(
-      "compression_required_ratio",
-      cct->_conf->bluestore_compression_required_ratio,
-      [&]() {
-        double val;
-        if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
-          return std::optional<double>(val);
-        }
-        return std::optional<double>();
-      }
-    );
+    c = coll->compression_algorithm.has_value() ?
+      compressors[*(coll->compression_algorithm)]:
+      compressors[def_compressor_alg];
+    crr = coll->compression_req_ratio.has_value() ?
+      *(coll->compression_req_ratio) :
+      cct->_conf->bluestore_compression_required_ratio;
   }
 
   // checksum
@@ -16057,34 +16111,16 @@ void BlueStore::_choose_write_options(
   wctx->csum_order = block_size_order;
 
   // checksum
-  int64_t csum = csum_type.load();
-  csum = select_option(
-    "csum_type",
-    csum,
-    [&]() {
-      int64_t val;
-      if (c->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
-        return std::optional<int64_t>(val);
-      }
-      return std::optional<int64_t>();
-    }
-  );
-  wctx->csum_type = csum;
+  wctx->csum_type= c->csum_type.has_value() ?
+    *(c->csum_type):
+    csum_type.load();
 
   // compression parameters
   unsigned alloc_hints = o->onode.alloc_hint_flags;
-  auto cm = select_option(
-    "compression_mode",
-    comp_mode.load(),
-    [&]() {
-      string val;
-      if (c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
-       return std::optional<Compressor::CompressionMode>(
-         Compressor::get_comp_mode_type(val));
-      }
-      return std::optional<Compressor::CompressionMode>();
-    }
-  );
+
+  auto cm = c->compression_mode.has_value() ?
+    *(c->compression_mode) :
+    comp_mode.load();
 
   wctx->compress = (cm != Compressor::COMP_NONE) &&
     ((cm == Compressor::COMP_FORCE) ||
@@ -16109,31 +16145,15 @@ void BlueStore::_choose_write_options(
     }
 
     if (wctx->compress) {
-      wctx->target_blob_size = select_option(
-        "compression_max_blob_size",
-        comp_max_blob_size.load(),
-        [&]() {
-          int64_t val;
-          if (c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &val)) {
-           return std::optional<uint64_t>((uint64_t)val);
-          }
-          return std::optional<uint64_t>();
-        }
-      );
+      wctx->target_blob_size = c->comp_max_blob_size.has_value() ?
+        *(c->comp_max_blob_size):
+        comp_max_blob_size.load();
     }
   } else {
     if (wctx->compress) {
-      wctx->target_blob_size = select_option(
-        "compression_min_blob_size",
-        comp_min_blob_size.load(),
-        [&]() {
-          int64_t val;
-          if (c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &val)) {
-           return std::optional<uint64_t>((uint64_t)val);
-          }
-          return std::optional<uint64_t>();
-        }
-      );
+      wctx->target_blob_size = c->comp_min_blob_size.has_value() ?
+        *(c->comp_min_blob_size):
+        comp_min_blob_size.load();
     }
   }
 
index 14c09fcf74e3e263e88a914a2f5f5259c20c9761..63297edd8266d08419accac6bb1fe757002ff205 100644 (file)
@@ -1527,6 +1527,13 @@ private:
 
     //pool options
     pool_opts_t pool_opts;
+    std::optional<int> compression_algorithm;
+    std::optional<int> compression_mode;
+    std::optional<int> csum_type;
+    std::optional<int64_t> comp_min_blob_size;
+    std::optional<int64_t> comp_max_blob_size;
+    std::optional<double> compression_req_ratio;
+
     ContextQueue *commit_queue;
 
     OnodeCacheShard* get_onode_cache() const {
@@ -2345,7 +2352,8 @@ private:
 
   std::atomic<Compressor::CompressionMode> comp_mode =
     {Compressor::COMP_NONE}; ///< compression mode
-  CompressorRef compressor;
+  std::atomic<int> def_compressor_alg = {Compressor::COMP_ALG_NONE};
+  std::vector<CompressorRef> compressors;
   std::atomic<uint64_t> comp_min_blob_size = {0};
   std::atomic<uint64_t> comp_max_blob_size = {0};