]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
os/bluestore/compression: Main part of recompression feature
authorAdam Kupczyk <akupczyk@ibm.com>
Tue, 8 Apr 2025 11:03:22 +0000 (11:03 +0000)
committerAdam Kupczyk <akupczyk@ibm.com>
Thu, 24 Apr 2025 06:47:18 +0000 (06:47 +0000)
Add feature of recompression scanner that looks around write region to see how much
would be gained, if we read some more around and wrote more.
Added Compression.h / Compression.cc.
Added debug_bluestore_compression dout.
Created Scanner class.
Provides write_lookaround() for scanning loaded extents.

Signed-off-by: Adam Kupczyk <akupczyk@ibm.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/os/bluestore/Compression.cc
src/os/bluestore/Compression.h
src/os/bluestore/Writer.h

index a34c6f08992b8eac8b8bcf655ec0f05a3d2381e3..c7ad989109cb789f548eb14f3244a4575124eec3 100644 (file)
@@ -57,6 +57,7 @@
 #include "common/WorkQueue.h"
 #include "kv/KeyValueHistogram.h"
 #include "Writer.h"
+#include "Compression.h"
 
 #if defined(WITH_LTTNG)
 #define TRACEPOINT_DEFINE
@@ -4405,6 +4406,16 @@ BlueStore::extent_map_t::iterator BlueStore::ExtentMap::maybe_split_at(uint32_t
   return p;
 }
 
+// If there exist extent at `offset` return it,
+// otherwise return smallest that `offset < logical_offset`.
+BlueStore::extent_map_t::iterator BlueStore::ExtentMap::seek_nextent(
+  uint64_t offset)
+{
+  Extent dummy(offset);
+  auto p = extent_map.lower_bound(dummy);
+  return p;
+}
+
 bool BlueStore::ExtentMap::has_any_lextents(uint64_t offset, uint64_t length)
 {
   auto fp = seek_lextent(offset);
index f79e7e4320ccaa99e7971f755fe18663fa36daa5..bd409bdb88d055b903cf09caa936034dd716c536 100644 (file)
@@ -271,6 +271,7 @@ public:
   struct BufferSpace;
   struct Collection;
   struct Onode;
+  class Scanner;
   class Estimator;
   typedef boost::intrusive_ptr<Collection> CollectionRef;
   typedef boost::intrusive_ptr<Onode> OnodeRef;
@@ -1171,6 +1172,8 @@ public:
     /// seek to the first lextent including or after offset
     extent_map_t::iterator seek_lextent(uint64_t offset);
     extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
+    /// seek to the exactly the extent, or after offset
+    extent_map_t::iterator seek_nextent(uint64_t offset);
 
     /// split extent
     extent_map_t::iterator split_at(extent_map_t::iterator p, uint32_t offset);
index 71f2c9bdfd2989fe3235ca1cd5c2e07e20d22cfa..7cb38ce52ec2e41cf3135a6252a547da2d3ee8de 100644 (file)
@@ -16,7 +16,6 @@
 #include "include/intarith.h"
 #include <limits>
 
-
 template <typename Char>
 struct basic_ostream_formatter : fmt::formatter<std::basic_string_view<Char>, Char> {
   template <typename T, typename OutputIt>
@@ -40,6 +39,14 @@ template <> struct fmt::formatter<BlueStore::Extent::printer>
 #undef  dout_prefix
 #define dout_prefix *_dout << "bluecompr "
 
+using Extent = BlueStore::Extent;
+using ExtentMap = BlueStore::ExtentMap;
+using Blob = BlueStore::Blob;
+using exmp_cit = BlueStore::extent_map_t::const_iterator;
+using exmp_it = BlueStore::extent_map_t::iterator;
+using Scanner = BlueStore::Scanner;
+using Scan = BlueStore::Scanner::Scan;
+using P = BlueStore::printer;
 using Estimator = BlueStore::Estimator;
 using P = BlueStore::printer;
 
@@ -214,3 +221,654 @@ void Estimator::finish()
            << " exp_recomp_err=" << expected_recompression_error
            << " exp_pad_exp=" << expected_pad_expansion << dendl;
 }
+
+struct scan_blob_element_t {
+  uint32_t blob_use_size = 0;    // how many bytes in object logical mapping are covered by this blob
+  uint32_t visited_size = 0;     // updated during scanning, will be either commited to consumed or reverted from consumed
+  uint32_t consumed_size = 0;    // updated during merging
+  uint32_t consumed_saved_size = 0;
+  uint32_t blob_left = 0;        // beginning of blob in object logical space
+  uint32_t blob_right = 0;       // end of blob in object logical space
+  exmp_cit leftmost_extent;      // leftmost extent that refers the blob
+  exmp_cit rightmost_extent;     // rightmost extent that refers the blob
+  bool     rejected = false;     // blob checked and rejected for recompression
+  scan_blob_element_t(
+    uint32_t blob_use_size,
+    uint32_t visited_size,
+    uint32_t consumed_size,
+    uint32_t consumed_saved_size,
+    uint32_t blob_left,
+    uint32_t blob_right,
+    exmp_cit leftmost_extent,
+    exmp_cit rightmost_extent,
+    bool rejected = false)
+    : blob_use_size(blob_use_size)
+    , visited_size(visited_size)
+    , consumed_size(consumed_size)
+    , consumed_saved_size(consumed_saved_size)
+    , blob_left(blob_left)
+    , blob_right(blob_right)
+    , leftmost_extent(leftmost_extent)
+    , rightmost_extent(rightmost_extent)
+    , rejected(rejected) {}
+};
+using object_scan_info_t = std::map<const Blob*, scan_blob_element_t>;
+
+class BlueStore::Scanner::Scan {
+public:
+  BlueStore* bluestore;
+  BlueStore::Onode* onode;
+  ExtentMap* extent_map;
+  Estimator* estimator;
+  Scan(
+    BlueStore* bluestore,
+    BlueStore::Onode* onode,
+    Estimator* estimator)
+  : bluestore(bluestore)
+  , onode(onode)
+  , extent_map(&onode->extent_map)
+  , estimator(estimator) {}
+  object_scan_info_t scanned_blobs;
+  void on_write_start(
+    uint32_t offset, uint32_t length,
+    uint32_t left_limit, uint32_t right_limit);
+  struct exmp_logical_offset {
+    const Scan &ow;
+    const exmp_cit &cit;
+    exmp_logical_offset(const Scan& ow, const exmp_cit& cit)
+    : ow(ow), cit(cit) {}
+  };
+  friend std::ostream &operator<<(std::ostream &out,
+                                  const exmp_logical_offset &lo);
+
+private:
+  bool is_end(const exmp_cit &it) const {
+    return it == extent_map->extent_map.end();
+  }
+  // range that has aready been processed
+  uint32_t done_left;
+  uint32_t done_right;
+  // range that is allowed for processing
+  uint32_t limit_left;
+  uint32_t limit_right;
+  // range that has already been scanned for blobs
+  uint32_t scanned_left;
+  uint32_t scanned_right;
+  exmp_cit scanned_left_it;  // <- points to first in scanned range
+  exmp_cit scanned_right_it; // <- points to first outside scanned range
+  void scan_for_compressed_blobs(exmp_cit start, exmp_cit finish);
+  bool maybe_expand_scan_range(const exmp_cit &it, uint32_t &left, uint32_t &right);
+  void remove_consumed_blobs();
+  void move_visited_to_consumed();
+  void remove_fully_consumed_blobs();
+  void save_consumed();
+  void restore_consumed();
+  object_scan_info_t::iterator find_least_expanding();
+  void candidate(const Extent *e);
+  void estimate_gain_left(exmp_cit left, uint32_t right);
+  void estimate_gain_right(uint32_t left, exmp_cit right);
+  void add_compress_left(exmp_cit left, uint32_t right);
+  void add_compress_right(uint32_t left, exmp_cit right);
+  void expand_left(exmp_cit &it);
+  void expand_right(exmp_cit &it);
+  exmp_logical_offset lo(const exmp_cit &it) {
+    return exmp_logical_offset(*this, it);
+  }
+};
+template <> struct fmt::formatter<BlueStore::Scanner::Scan::exmp_logical_offset>
+  : ostream_formatter {};
+
+void Scanner::write_lookaround(
+  BlueStore::Onode* onode,
+  uint32_t offset, uint32_t length,
+  uint32_t left_limit, uint32_t right_limit,
+  Estimator* estimator)
+{
+  Scan on_write(bluestore, onode, estimator);
+  on_write.on_write_start(offset, length, left_limit, right_limit);
+}
+
+std::ostream& operator<<(
+  std::ostream& out, const scan_blob_element_t& b)
+{
+  out << fmt::format("blob=[{:#x}-{:x}] extent=[{:#x}-{:x}] {:#x}/{:x}/{:x}({:x}){}",
+    b.blob_left, b.blob_right,
+    b.leftmost_extent->logical_offset, b.rightmost_extent->logical_offset,
+    b.blob_use_size, b.visited_size, b.consumed_size, b.consumed_saved_size,
+    b.rejected?" rejected":"");
+  return out;
+}
+
+std::ostream& operator<<(
+  std::ostream& out, const object_scan_info_t& scanned_blobs)
+{
+  out << "[";
+  for (const auto& it : scanned_blobs) {
+    out << std::endl;
+    out << it.first->print(P::NICK + P::JUSTID) << " ==> " << it.second;
+  }
+  out << "]";
+  return out;
+}
+
+std::ostream& operator<<(
+  std::ostream& out,
+  const Scan::exmp_logical_offset& lo)
+{
+  if (lo.ow.is_end(lo.cit)) {
+    out << "end";
+  } else {
+    out << std::hex << lo.cit->logical_offset << std::dec;
+  }
+  return out;
+}
+
+// Action to do after initial scan_for_compressed_blobs on write region.
+// The compressed blobs that are completely visited are removed, because
+// those blob will no longer exist.
+// The visited size becomes consumed size, because relevant extents will be
+// overwritten with new data.
+void Scan::remove_consumed_blobs()
+{
+  for (auto i = scanned_blobs.begin(); i != scanned_blobs.end();) {
+    if (i->second.visited_size == i->second.blob_use_size) {
+      i = scanned_blobs.erase(i);
+    } else {
+      i->second.consumed_size = i->second.visited_size;
+      ++i;
+    }
+  }
+}
+
+inline void Scan::save_consumed()
+{
+  for (auto& i : scanned_blobs) {
+    i.second.consumed_saved_size = i.second.consumed_size;
+  }
+};
+
+inline void Scan::restore_consumed()
+{
+  for (auto& i : scanned_blobs) {
+    i.second.consumed_size = i.second.consumed_saved_size;
+  }
+};
+
+// Scan to take info on compressed blobs in specified range.
+// It can be used for initial cover for punch_hole() region,
+// or as subsequent scan to fully cover compressed blobs that
+// we intend to become candidates for recompression.
+// start - first extent of region to scan
+// finish - the first extent outside scan region
+void Scan::scan_for_compressed_blobs(
+  exmp_cit start, exmp_cit finish)
+{
+  dout(20) << "scan.compr range " << lo(start) << "-" << lo(finish) << dendl;
+  for (auto it = start; it != finish; ++it) {
+    const Blob* h_Blob = &(*it->blob);
+    const bluestore_blob_t& h_bblob = h_Blob->get_blob();
+    if (h_bblob.is_shared()) {
+      // Do not analyze shared blobs, even compressed ones.
+      // It seems improbable that recompressing it would yield benefits.
+      dout(20) << "scan.compr " << h_Blob->print(P::NICK | P::JUSTID) << " shared, rejected" << dendl;
+      continue;
+    }
+    if (!h_bblob.is_compressed()) {
+      dout(20) << "scan.compr " << h_Blob->print(P::NICK | P::JUSTID) << " non-compressed, rejected" << dendl;
+      continue;
+    }
+    auto bit = scanned_blobs.find(h_Blob);
+    if (bit == scanned_blobs.end()) {
+      // first time we see this blob
+      dout(20) << fmt::format("scan.compr {} new, range [{:#x}-{:x}]",
+        h_Blob->print(P::NICK | P::JUSTID), it->logical_offset, it->logical_end()) << dendl;
+      scanned_blobs.emplace(
+          h_Blob,
+          scan_blob_element_t(
+              h_Blob->get_blob_use_tracker().get_referenced_bytes(),
+              it->length, 0, 0, it->blob_start(), it->blob_end(), it, it));
+      // Do not care that blob_start() or blob_end() might be outside limit_left
+      // or limit_right. It is possible that no extent is on the outside and we
+      // still can fully scan blob.
+    } else {
+      // we are already processing this blob, just update
+      scan_blob_element_t &info = bit->second;
+      info.visited_size += it->length;
+      if (it->logical_offset > info.rightmost_extent->logical_offset) {
+        info.rightmost_extent = it;
+      }
+      if (it->logical_offset < info.leftmost_extent->logical_offset) {
+        info.leftmost_extent = it;
+      }
+      dout(20) << fmt::format("scan.compr {} updated range [{:#x}-{:x}]",
+        h_Blob->print(P::NICK | P::JUSTID),
+        info.leftmost_extent->logical_offset, info.rightmost_extent->logical_end()) << dendl;
+      ceph_assert(info.blob_left == it->blob_start());
+      ceph_assert(info.blob_right == it->blob_end());
+    }
+  }
+  dout(30) << "scan.compr done" << dendl;
+}
+
+// Takes a compressed extent `it` and expands scan range
+// so it would surely encompass all extents that refer to the same blob.
+bool Scan::maybe_expand_scan_range(
+  const exmp_cit& it, uint32_t& left, uint32_t& right)
+{
+  bool expanded = false;
+  const Blob *h_Blob = &(*it->blob);
+  const bluestore_blob_t &h_bblob = h_Blob->get_blob();
+  // we made left_walk up to here, it must be compressed blob
+  if (h_bblob.is_shared()) {
+    dout(30) << "maybe_expand rejecting shared " << h_Blob->print(P::NICK + P::JUSTID) << dendl;
+  } else {
+    ceph_assert(h_bblob.is_compressed());
+    auto i = scanned_blobs.find(h_Blob);
+    if (i == scanned_blobs.end() || i->second.rejected == false) {
+      // We only accept blobs we have not seen before,
+      // or ones that are not fully scanned yet.
+      left = std::min(left, it->blob_start());
+      left = std::max(limit_left, left);
+      right = std::max(right, it->blob_end());
+      right = std::min(limit_right, right);
+      dout(20) << fmt::format("maybe_expand take {} range=[{:#x}-{:x}]",
+        h_Blob->print(P::NICK + P::SDISK + P::SUSE + P::SCHK), left, right) << dendl;
+      expanded = true;
+    } else {
+      dout(30) << "maybe_expand blob aready scanned and rejected" << dendl;
+    }
+  }
+  return expanded;
+}
+
+// Calculate gain and cost of recompressing an extent
+void Scan::candidate(const Extent* e)
+{
+  const Blob *h_Blob = &(*e->blob);
+  const bluestore_blob_t &h_bblob = h_Blob->get_blob();
+  uint32_t gain = 0;
+  dout(30) << "candidate " << e->print(P::NICK + P::JUSTID) << dendl;
+  if (h_bblob.is_shared()) {
+    dout(30) << "candidate ignoring shared " << h_Blob->print(P::NICK + P::JUSTID) << dendl;
+  } else if (h_bblob.is_compressed()) {
+    auto bit = scanned_blobs.find(h_Blob);
+    if (bit != scanned_blobs.end()) {
+      bit->second.consumed_size += e->length;
+      if (bit->second.consumed_size == bit->second.blob_use_size) {
+        // Seen all extents of the blob, can take gain.
+        gain = h_bblob.get_ondisk_length();
+      }
+      estimator->batch(e, gain);
+    } else {
+      derr << "candidate blob not scanned before! " << h_Blob->print(P::NICK + P::SUSE) << dendl;
+      // This blob was seen, but removed as completely consumed?
+      // Is it even possible?
+      ceph_assert(false);
+    }
+  } else {
+    // not compressed
+    gain = e->length;
+    estimator->batch(e, gain);
+  }
+};
+
+// left side variant of estimate
+// 'left' is extent that is the leftmost extent to be analyzed.
+// 'right' is the leftmost logical offset of area already marked for recompression
+void Scan::estimate_gain_left(
+  exmp_cit left, uint32_t right)
+{
+  auto i = left;
+  uint32_t last_offset = i->logical_offset;
+  for (; i != extent_map->extent_map.end() && i->logical_offset < right; ++i) {
+    ceph_assert(i->logical_offset >= last_offset);
+    if (last_offset < i->logical_offset) {
+      // Here we have a hole.
+      // TODO something has to be done with it....
+      // Does it influence compression gain/cost?
+    }
+    candidate(&(*i));
+    last_offset = i->logical_end();
+  }
+  if (last_offset < right) {
+    // Here we have a hole
+    // TODO somehow factor it in compression gain/cost.
+  }
+};
+
+// right side variant of estimate
+// 'left' is the first byte after rightmost logical offset that has already marked for recompression
+// 'right' is extent that is the last that should be analyzed.
+void Scan::estimate_gain_right(
+  uint32_t left, exmp_cit right)
+{
+  uint32_t last_offset = left;
+  for (auto i = extent_map->seek_lextent(left);;) {
+    ceph_assert(i->logical_offset >= last_offset);
+    if (i->logical_offset > last_offset) {
+      // here we have hole
+      // TODO something has to be done with it....
+    }
+    candidate(&(*i));
+    last_offset = i->logical_end();
+    if (i == right)
+      break; // stop once right is processed
+    ++i;
+  };
+};
+
+// Add extents to recompress. Left side variant.
+// 'left' is extent that is the leftmost extent to add.
+// 'right' is the leftmost logical offset of area already marked for recompression
+void Scan::add_compress_left(
+  exmp_cit left, uint32_t right)
+{
+  ceph_assert(left != extent_map->extent_map.end());
+  dout(30) << fmt::format("add_compress_left extents {:#x}(include) .. {:#x}",
+    left->logical_offset, right) << dendl;
+  ceph_assert(!left->blob->get_blob().is_shared());
+  for (auto i = left; i != extent_map->extent_map.end() && i->logical_offset < right; ++i) {
+    if (!i->blob->get_blob().is_shared()) {
+      // we only process not shared blobs
+      estimator->mark_recompress(&(*i));
+    }
+  }
+}
+
+// Add extents to recompress. Right side variant.
+// 'left' is the first byte after rightmost logical offset 
+//        that has already been marked for recompression
+// 'right' is the rightmost extent that should be added
+void Scan::add_compress_right(
+  uint32_t left, exmp_cit right) {
+  ceph_assert(right != extent_map->extent_map.end());
+  dout(30) << fmt::format("add_compress_right {:#x} .. {:#x}(include)",
+    left, right->logical_offset) << dendl;
+  ceph_assert(!right->blob->get_blob().is_shared());
+  for (auto i = extent_map->seek_lextent(left);;) {
+    ceph_assert(i != extent_map->extent_map.end());
+    if (!i->blob->get_blob().is_shared()) {
+      // we only process not shared blobs
+      estimator->mark_recompress(&(*i));
+    }
+    if (i == right)
+      break; // stop once right is processed
+    ++i;
+  }
+}
+
+// Argument `it` points to extent that has already been processed
+// Returned `it` points to extent that has been processed
+void Scan::expand_left(exmp_cit& it)
+{
+  dout(30) << "expand_left start it=" << lo(it) << dendl;
+  uint16_t mode = P::NICK + P::SDISK + P::SUSE;
+  for (; it != extent_map->extent_map.begin();) {
+    --it;
+    const Blob *h_Blob = &(*it->blob);
+    const bluestore_blob_t &h_bblob = h_Blob->get_blob();
+    if (it->logical_offset < limit_left || h_bblob.is_shared() || h_bblob.is_compressed()) {
+      dout(30) << "expand_left stops at " << it->print(mode) << dendl;
+      ++it; //back out
+      break;
+    }
+    if (estimator->is_worth(&(*it))) {
+      dout(20) << "expand_left take " << it->print(mode) << dendl;
+      estimator->mark_recompress(&(*it));
+      done_left = it->logical_offset;
+    } else {
+      dout(20) << "expand_left rejected " << it->print(mode) << dendl;
+      break;
+    }
+  }
+  scanned_left_it = it;
+  scanned_left = it->logical_offset;
+  dout(30) << "expand_left done it=" << lo(it) << dendl;
+}
+
+// Argument `it` points to extent that is a candidate
+// Returned `it` points to extent that has been rejected
+void Scan::expand_right(exmp_cit& it)
+{
+  uint16_t mode = P::NICK + P::SDISK + P::SUSE;
+  dout(30) << "expand_right start it=" << lo(it) << dendl;
+  for (; !is_end(it); ++it) {
+    const Blob *h_Blob = &(*it->blob);
+    const bluestore_blob_t &h_bblob = h_Blob->get_blob();
+    if (it->logical_end() > limit_right || h_bblob.is_shared() || h_bblob.is_compressed()) {
+      dout(30) << "expand_right stops at " << it->print(mode) << dendl;
+      break;
+    }
+    if (estimator->is_worth(&(*it))) {
+      dout(20) << "expand_right take " << it->print(mode) << dendl;
+      estimator->mark_recompress(&(*it));
+      done_right = it->logical_end();
+    } else {
+      dout(20) << "expand_right rejected " << it->print(mode) << dendl;
+      break;
+    }
+  }
+  scanned_right_it = it;
+  scanned_right = is_end(it) ? limit_right : it->logical_offset;
+  dout(30) << "expand_right done it=" << lo(it) << dendl;
+}
+
+// Searches in `scanned_blobs` for a blob that causes least expansion of recompress range.
+// Never selects "rejected" blobs.
+// Uses `done_left` and `done_right` to measure expansion ratio.
+object_scan_info_t::iterator Scanner::Scan::find_least_expanding()
+{
+  auto expansion_size = [&](object_scan_info_t::const_iterator i) -> uint32_t {
+    return done_left - std::min(i->second.leftmost_extent->logical_offset, done_left) +
+           std::max(i->second.rightmost_extent->logical_end(), done_right) - done_right;
+  };
+  if (scanned_blobs.empty()) {
+    return scanned_blobs.end();
+  }
+  // find first non-rejected
+  auto imin = scanned_blobs.begin();
+  for (; imin != scanned_blobs.end() && imin->second.rejected; ++imin) {
+  }
+  if (imin != scanned_blobs.end()) {
+    uint32_t vmin = expansion_size(imin);
+    auto i = imin;
+    ++i;
+    for (; i != scanned_blobs.end(); ++i) {
+      if (!i->second.rejected) {
+        uint32_t v = expansion_size(i);
+        if (v < vmin) {
+          vmin = v;
+          imin = i;
+        }
+      }
+    }
+  }
+  return imin;
+};
+
+// 1. Scan write region for compressed blobs
+// 2. Some compressed blobs might stick out of write region, calculate range that covers them
+// 3. Scan expanded region to make sure compressed blobs are fully scanned
+//              WRITEWRITE
+//           AAaaAaAA  CCCCccCC
+//             BB E        DD
+//     FFFFFFF
+// A, B, C blobs partialy scanned during 1)
+// E blob fully scanned during 1)
+// A - E blobs fully scanned during 3)
+// F blob partially scanned during 3)
+//
+// 4. Order observed compressed blobs from smallest to largest.
+//    Check if melding gives space benefits. Meld.
+// NOTE - partially scanned blobs from step 3 are never necessary for
+// recompression of write region
+// Proof:
+// Notice that compressed blobs cannot cross each other
+// AAABBBAAABBB <- not possible, A or B has to be written first:
+// AAABBBBBBBBB or AAAAAAAAABBB <- only possible overlaps
+// So, when we find outermost extents of compressed blobs:
+//              WRITEWRITE
+//           A BB   A  C       C
+//    F       F            D  D
+//            ^impossible  ^possible
+//            (FAFA)       (CDDC)
+// In the result if we find additional compressed blobs during step 3
+// it will be fully inside range calculated in step 2.
+// 5. When we consumed all compressed blob candidates, 
+//    try to expand on previously non-compressed extents.
+// 6. If blob on left and right are compressed, 
+//    expand scan region and goto 3
+void Scan::on_write_start(
+  uint32_t offset, uint32_t length,
+  uint32_t _limit_left, uint32_t _limit_right)
+{
+  estimator->mark_main(offset, length);
+  done_left = offset;
+  done_right = offset + length;
+  limit_left = _limit_left;
+  limit_right = _limit_right;
+  dout(10) << fmt::format("on_write #1 [{:#x}-{:x}] limit=({:#x}-{:x})",
+    done_left, done_right, limit_left, limit_right) << dendl;
+  dout(20) << "on_write #1 on: "
+    << onode->print(P::NICK + P::SDISK + P::SUSE, limit_left, limit_right) << dendl;
+
+  ceph_assert(limit_left <= done_left && done_right <= limit_right);
+
+  exmp_cit start = extent_map->maybe_split_at(done_left);
+  exmp_cit finish = extent_map->maybe_split_at(done_right);
+  
+  // STEP 1.
+  // Scan blobs in write punch_hole() region.
+  scan_for_compressed_blobs(start, finish);
+  // The region under write will release previous content.
+  // Remove those blobs that will not exist after punch_hole().
+  remove_consumed_blobs();
+
+  scanned_left_it = start;
+  scanned_right_it = finish;
+  scanned_left = done_left;
+  scanned_right = done_right;
+  // Note. It is possible that:
+  // scanned_left_it->logical_offset != scanned_left
+  // scanned_right_it->logical_offset != scanned_right
+  // STEP 2.
+  dout(15) << fmt::format("on_write #2 scan=[{:#x}-{:x}] it={}-{}",
+    scanned_left, scanned_right, lo(scanned_left_it), lo(scanned_right_it)) << dendl;
+
+  uint32_t left = std::numeric_limits<uint32_t>::max();
+  uint32_t right = std::numeric_limits<uint32_t>::min();
+  // Find maximum reach of blobs we scanned.
+  for (auto& i: scanned_blobs) {
+    ceph_assert(i.second.blob_use_size != i.second.consumed_size);
+    left = std::min(left, i.second.blob_left);
+    right = std::max(right, i.second.blob_right);
+  }
+  // Keep scan range withing limit.
+  left = std::max(left, limit_left);
+  right = std::min(right, limit_right);
+  // STEP 3.
+  // `left`, `right` cannot be iterators, because original begin/end of compressed blob
+  // could be overwritten, so there would be no extent that left_it->logical_offset == left.
+  step3:
+  if (left < scanned_left || scanned_right < right) {
+    dout(15) << fmt::format("on_write #3 expand [{:#x}-{:x}] to [{:#x}-{:x}]",
+      scanned_left, scanned_right, left, right) << dendl;
+    if (left < scanned_left) {
+      exmp_cit s = extent_map->seek_nextent(left); //get precisely or next
+      scan_for_compressed_blobs(s, scanned_left_it);
+      scanned_left = left;
+      scanned_left_it = s;
+    }
+    // scan right side
+    if (scanned_right < right) {
+      //exmp_cit s = extent_map->seek_lextent(done_right);
+      exmp_cit f = extent_map->seek_nextent(right);
+      scan_for_compressed_blobs(scanned_right_it, f);
+      scanned_right = right;
+      scanned_right_it = f;
+    }
+    // Now, compressed blobs in write range should be fully visible,
+    // as should compressed blobs from step 6.
+    // The exception is if some extents fell outside processing limit.
+
+    // STEP 4.
+    // We are ready to take blobs in expanding order.
+    // There can be 2 blobs that are same least expanding.
+    // But it is impossible that 2 extents start or end at the same position.
+    dout(20) << "on_write #4 scanned_blobs=" << scanned_blobs << dendl;
+    object_scan_info_t::iterator b_it;
+    while (true) {
+      b_it = find_least_expanding();
+      if (b_it == scanned_blobs.end()) {
+        break;
+      }
+      dout(25) << "on_write #4 test expand " << b_it->first->print(P::NICK | P::JUSTID)
+        << ", " << b_it->second << dendl;
+      save_consumed();
+      // now check if we want to merge this blob that requires least expanding
+      if (b_it->second.leftmost_extent->logical_offset < done_left) {
+        estimate_gain_left(b_it->second.leftmost_extent, done_left);
+      }
+      if (done_right < b_it->second.rightmost_extent->logical_end()) {
+        estimate_gain_right(done_right, b_it->second.rightmost_extent);
+      }
+      if (estimator->is_worth()) {
+        // meld this new blob into re-write region
+        if (b_it->second.leftmost_extent->logical_offset < done_left) {
+          add_compress_left(b_it->second.leftmost_extent, done_left);
+          done_left = b_it->second.leftmost_extent->logical_offset;
+        }
+        if (done_right < b_it->second.rightmost_extent->logical_end()) {
+          add_compress_right(done_right, b_it->second.rightmost_extent);
+          done_right = b_it->second.rightmost_extent->logical_end();
+        }
+        scanned_blobs.erase(b_it);
+      } else {
+        // We do not want to recompress this blob yet.
+        restore_consumed();
+        // Do not select this blob again.
+        b_it->second.rejected = true;
+        // It might get recompressed as a part of larger recompress though.
+      }
+    }
+  } else {
+    dout(15) << fmt::format("on_write #3 scanned=[{:#x}-{:x}]", scanned_left, scanned_right) << dendl;
+  }
+  // STEP 5.
+  // All compressed blobs are within the range we already scanned.
+  // We are free to expand onto uncompressed blobs.
+  exmp_cit right_it = extent_map->seek_nextent(done_right);
+  exmp_cit left_it = extent_map->seek_nextent(done_left);
+  dout(15) << "on_write #5 left=" << lo(left_it) << " right=" << lo(right_it) << dendl;
+  if (!is_end(right_it)) {
+    expand_right(right_it);
+  }
+  if (left_it != extent_map->extent_map.begin()) {
+    expand_left(left_it);
+  }
+  
+  // STEP 6.
+  // now left_it & right_it are either compressed or at allowed limit
+  // attempt to expand scan range
+  dout(15) << "on_write #6 left=" << lo(left_it) << " right=" << lo(right_it) << dendl;
+  bool has_expanded = false;
+  if (right_it != extent_map->extent_map.end()) {
+    if (right_it->logical_end() < limit_right) {
+      dout(30) << "right maybe expand" << dendl;
+      has_expanded |= maybe_expand_scan_range(right_it, left, right);
+    }
+  }
+  if (left_it != extent_map->extent_map.begin()) {
+    --left_it; // left_walk points to processes extent
+    if (limit_left < left_it->logical_offset) {
+      dout(30) << "left maybe expand" << dendl;
+      has_expanded |= maybe_expand_scan_range(left_it, left, right);
+    }
+    ++left_it; //give it back
+  }
+  dout(15) << "on_write #6 expanded=" << has_expanded << dendl;
+  if (has_expanded) {
+    goto step3;
+  }
+  dout(10) << "on_write done" << dendl;
+}
index 17adf458a5fb0a60e3e759fae89db0b0c80094c9..f81d4e81ef682a246ed1877c91e4500947ed5235 100644 (file)
@@ -72,4 +72,18 @@ private:
   std::map<uint32_t, uint32_t> extra_recompress;
 };
 
+
+class BlueStore::Scanner {
+  BlueStore* bluestore;
+public:
+  Scanner(BlueStore* bluestore)
+  :bluestore(bluestore) {}
+
+  void write_lookaround(
+    BlueStore::Onode* onode,
+    uint32_t offset, uint32_t length,
+    uint32_t left_limit, uint32_t right_limit,
+    Estimator* estimator);
+  class Scan;
+};
 #endif
index 3c2449399415e698a1d8068c270572cf2c5cc61c..44ae1bcea47f17136a4aaea9b19ea5d3035dbf49 100644 (file)
@@ -31,6 +31,13 @@ public:
                                 // or contains compressed data. Block aligned.
     bufferlist object_data;     // Object data. Needed to put into caches.
     bool is_compressed() const {return compressed_length != 0;}
+    blob_data_t()
+      : real_length(0), compressed_length(0) {}
+    blob_data_t(
+      uint32_t real_length, uint32_t compressed_length,
+      const bufferlist& disk_data, const bufferlist& object_data)
+      : real_length(real_length), compressed_length(compressed_length),
+        disk_data(disk_data), object_data(object_data) {};
   };
   using blob_vec = std::vector<blob_data_t>;
   struct blob_data_printer {