]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: support for large kv pair in LogNode
authormyoungwon oh <ohmyoungwon@gmail.com>
Fri, 13 Feb 2026 02:06:02 +0000 (11:06 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Thu, 19 Feb 2026 07:32:11 +0000 (16:32 +0900)
Each log_key_t contains a chunk_idx field to manage values
that span multiple LogNodes when the value size exceeds the
maximum capacity of a single LogNode.
See detailed description in log_manager.h.

Signed-off-by: Myoungwon Oh <ohmyoungwon@gmail.com>
src/crimson/os/seastore/omap_manager/log/log_manager.cc
src/crimson/os/seastore/omap_manager/log/log_manager.h
src/crimson/os/seastore/omap_manager/log/log_node.cc
src/crimson/os/seastore/omap_manager/log/log_node.h

index 065c770e7fc443b7364c70f89e0b79fe9ab75d92..dc14b9f65dae043b0dd6443d628abe636ec5cfbe 100644 (file)
@@ -81,6 +81,11 @@ LogManager::omap_set_keys(
     if (can_ow) {
       ow_done = true;
     }
+    // If multiple blocks are needed to store the kv pair
+    if (log_node->get_max_val_length(k.size()) < v.length()) {
+      co_await _log_set_multi_block_key(log_root, t, log_node, k, v);
+      co_return;
+    }
     co_await _log_set_key(log_root, t, log_node, k, v, can_ow);
     co_return;
   };
@@ -148,6 +153,18 @@ LogManager::omap_set_keys(
       if (is_dup_log_key(p.first)) {
        cur = dup_e;
       }
+      if (e->get_max_val_length(p.first.size()) < p.second.length()) {
+       co_await _log_set_multi_block_key(log_root, t, cur, p.first, p.second);
+       if (!is_dup_log_key(p.first)) {
+         e = co_await log_load_extent<LogNode>(
+           t, log_root.addr, BEGIN_KEY, END_KEY);
+       } else {
+         dup_e = co_await log_load_extent<LogNode>(
+           t, co_await get_dup_addr_from_root(t, log_root.addr),
+           BEGIN_KEY, END_KEY);
+       }
+       continue;
+      }
       if (cur->expect_overflow(p.first.size(), p.second.length())) {
        cur = co_await alloc_log_node(cur->get_laddr());
        if (!is_dup_log_key(p.first)) {
@@ -220,6 +237,52 @@ LogManager::omap_set_key(
   co_return co_await omap_set_keys(log_root, t, std::move(kvs));
 }
 
+LogManager::omap_set_key_ret
+LogManager::_log_set_multi_block_key(omap_root_t &log_root,
+  Transaction &t, LogNodeRef tail,
+  const std::string &key, const ceph::bufferlist &value)
+{
+  LOG_PREFIX(LogManager::_log_set_multi_block_key);
+  DEBUGT("enter key={}", t, key);
+  size_t length = value.length();
+  size_t max_val_length = tail->get_max_val_length(key.size());
+  assert(max_val_length > 0);
+  uint16_t idx = 1;
+  size_t offset = 0;
+  LogNodeRef cur_extent = tail;
+
+  while (offset < length) {
+    size_t chunk_len = std::min(max_val_length, length - offset);
+      ceph::bufferlist chunk;
+    auto extent = co_await tm.alloc_non_data_extent<LogNode>(
+      t, log_root.hint, LOG_NODE_BLOCK_SIZE
+    ).handle_error_interruptible(
+      crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+      omap_set_key_iertr::pass_further{}
+    );
+    assert(extent);
+    chunk.substr_of(value, offset, chunk_len);
+    DEBUGT("offset={}, chunk_len={}, idx={}, value length={}",
+         t, offset, chunk_len, idx, value.length());
+    extent->append_multi_block_kv(t, key, chunk, idx);
+    extent->set_prev_addr(cur_extent->get_laddr());
+    cur_extent = extent;
+    offset += chunk_len;
+    idx++;
+  }
+  if (!is_dup_log_key(key)) {
+    log_root.update(cur_extent->get_laddr(), log_root.depth,
+      log_root.hint, log_root.type);
+    cur_extent->set_dup_tail_addr(tail->get_dup_tail_addr());
+  } else {
+    auto ext = co_await log_load_extent<LogNode>(
+      t, log_root.addr, BEGIN_KEY, END_KEY);
+    auto mut = tm.get_mutable_extent(t, ext)->cast<LogNode>();
+    mut->set_dup_tail_addr(cur_extent->get_laddr());
+  }
+  co_return;
+}
+
 LogManager::omap_set_key_ret
 LogManager::_log_set_key(omap_root_t &log_root,
   Transaction &t, LogNodeRef tail,
@@ -375,11 +438,36 @@ LogManager::find_kvs(Transaction &t, laddr_t dst,
   if (extent == nullptr) {
     co_return;
   }
+  if (extent->has_multi_block_kv()) {
+    co_await find_multi_block_kvs(t, extent, first, last, kvs);
+    co_return;
+  }
   extent->list(first, last, kvs);
   co_await find_kvs(t, extent->get_prev_addr(), first, last, kvs);
   co_return;
 }
 
+LogManager::omap_list_iertr::future<>
+LogManager::find_multi_block_kvs(Transaction &t, LogNodeRef extent,
+  const std::optional<std::string> &first,
+  const std::optional<std::string> &last,
+  std::map<std::string, bufferlist> &kvs)
+{
+  LOG_PREFIX(LogManager::find_multi_block_kvs);
+  DEBUGT("first={}, last={}, dst={}", t, first, last, extent->get_laddr());
+  extent->list(first, last, kvs);
+  if (extent->get_prev_addr() == L_ADDR_NULL) {
+    co_return;
+  }
+  auto prev_extent = co_await log_load_extent<LogNode>(
+    t, extent->get_prev_addr(), BEGIN_KEY, END_KEY);
+  if (prev_extent->has_multi_block_kv()) {
+    co_await find_multi_block_kvs(t, prev_extent, first, last, kvs);
+  } else {
+    co_await find_kvs(t, prev_extent->get_laddr(), first, last, kvs);
+  }
+  co_return;
+}
 
 LogManager::omap_get_value_ret
 LogManager::find_kv(Transaction &t, laddr_t dst, const std::string &key)
@@ -392,6 +480,11 @@ LogManager::find_kv(Transaction &t, laddr_t dst, const std::string &key)
   if (extent == nullptr) {
     co_return std::nullopt;
   }
+  if (extent->has_multi_block_kv(key)) {
+    bufferlist buf;
+    co_await find_multi_block_kv(t, key, extent, buf);
+    co_return std::move(buf);
+  }
 
   auto e = co_await extent->get_value(key);
   if (e == std::nullopt) {
@@ -404,6 +497,31 @@ LogManager::find_kv(Transaction &t, laddr_t dst, const std::string &key)
   co_return std::move(e);
 }
 
+LogManager::omap_get_value_iertr::future<>
+LogManager::find_multi_block_kv(Transaction &t, const std::string &key,
+  LogNodeRef extent, bufferlist &buf)
+{
+  LOG_PREFIX(LogManager::find_multi_block_kv);
+  DEBUGT("key={}, dst={}", t, key, extent->get_laddr());
+  auto e = co_await extent->get_value(key);
+  assert(e);
+  if (extent->is_first_multi_block(key)) {
+    buf.append(*e);
+    co_return;
+  } 
+  assert(extent->get_prev_addr() != L_ADDR_NULL);
+  auto prev_extent = co_await log_load_extent<LogNode>(
+    t, extent->get_prev_addr(), BEGIN_KEY, END_KEY);
+  if (prev_extent->has_multi_block_kv(key)) {
+    co_await find_multi_block_kv(t, key, prev_extent, buf);
+  } else {
+    assert(0 == "impossible");
+  }
+  assert(e);
+  buf.append(*e);
+  co_return;
+}
+
 LogManager::omap_rm_key_ret
 LogManager::remove_node(Transaction &t, LogNodeRef mut, LogNodeRef prev)
 {
@@ -438,12 +556,30 @@ LogManager::remove_kv(Transaction &t, laddr_t dst, const std::string &key, LogNo
   LOG_PREFIX(LogManager::remove_kv);
   DEBUGT("key={}, dst={}", t, key, dst);
 
+  if (dst == L_ADDR_NULL) {
+    co_return;
+  }
   auto extent = co_await log_load_extent<LogNode>(
     t, dst, BEGIN_KEY, END_KEY);
   if (extent == nullptr) {
     co_return;
   }
 
+  if (extent->has_multi_block_kv(key)) {
+    auto mut = tm.get_mutable_extent(t, extent)->template cast<LogNode>();
+    mut->remove_entry(key);
+    assert(mut->is_removable());
+    if (mut->is_removable()) {
+      co_await remove_node(t, mut, prev);
+      if (prev != nullptr && mut->get_prev_addr() != L_ADDR_NULL) {
+       mut = co_await log_load_extent<LogNode>(
+         t, prev->get_laddr(), BEGIN_KEY, END_KEY);
+      }
+    }
+    co_await remove_kv(t, mut->get_prev_addr(), key, mut);
+    co_return;
+  }
+
   auto e = co_await extent->get_value(key);
   if (e == std::nullopt) {
     if(extent->get_prev_addr() == L_ADDR_NULL) {
index 0c49de79cf7d7cc620b6506dfaf2dae56767b354..b073611ba94897ded20ff059e938785d737b854c 100644 (file)
@@ -301,6 +301,48 @@ public:
 
   base_iertr::future<laddr_t> get_dup_addr_from_root(Transaction &t, laddr_t addr);
 
+  /**
+   *
+   * Support for multi-block KV pairs
+   *
+   * Each log_key_t contains a chunk_idx field to manage values
+   * that span multiple LogNodes when the value size exceeds the
+   * maximum capacity of a single LogNode.
+   * For simplicity, we always create separate blocks for each chunk,
+   * even if this may introduce some internal fragmentation.
+   * TODO: Implement block packing to improve space efficiency.
+   *
+   * Layout example:
+   *
+   *                     log_root
+   *                         |
+   *                         v
+   *        +-------------------------------+
+   *        |  LogNode (1 KV, chunk_idx:2)  |
+   *        |        (later chunk)          |
+   *        +-------------------------------+
+   *                         |
+   *                         v
+   *        +-------------------------------+
+   *        |  LogNode (1 KV, chunk_idx:1)  |
+   *        |        (earlier chunk)        |
+   *        +-------------------------------+
+   *
+   */
+  omap_get_value_iertr::future<>
+  find_multi_block_kv(Transaction &t, const std::string &key,
+    LogNodeRef extent, bufferlist &buf);
+  omap_list_iertr::future<>
+  find_multi_block_kvs(Transaction &t, LogNodeRef extent,
+    const std::optional<std::string> &first,
+    const std::optional<std::string> &last,
+    std::map<std::string, bufferlist> &kvs);
+  omap_set_key_ret
+  _log_set_multi_block_key(omap_root_t &log_root,
+    Transaction &t, LogNodeRef tail,
+    const std::string &key, const ceph::bufferlist &value);
+
+
   TransactionManager &tm;
 };
 
index 02dd8f7c4536a71b19f4d770851718dd9b864705..4e5363a1d934aa2b26df54583978075978d384fd 100644 (file)
@@ -34,6 +34,12 @@ void delta_t::replay(LogKVNodeLayout &l) {
   }
 }
 
+void LogNode::append_multi_block_kv(Transaction &t, const std::string &key,
+  const ceph::bufferlist &val, const uint16_t idx) {
+  assert(!maybe_get_delta_buffer());
+  _append_multi_block_kv(key, val, idx);
+}
+
 void LogNode::append_kv(Transaction &t, const std::string &key,
     const ceph::bufferlist &val) {
   auto p = maybe_get_delta_buffer();
@@ -181,7 +187,17 @@ void LogNode::list(const std::optional<std::string> &first,
   for_each_live_entry([&](const auto& ent, uint32_t index) -> bool {
     const auto k = ent.get_key();
     if (k >= s && (!last || k <= e)) {
-      kvs[k] = ent.get_val();
+      if (ent.get_chunk_idx() == 0) {
+       // This is not multi block kv pair
+       kvs[k] = ent.get_val();
+      } else {
+       bufferlist head = ent.get_val();
+       auto it = kvs.find(k);
+       if (it != kvs.end()) {
+         head.claim_append(kvs[k]);
+       }
+       kvs[k] = std::move(head);
+      }
     }
     return false;
   });
index fcb57043e4eef21b669db84ffb600272618bbb02..1a091974c2c1a74772f201a6548c6311d4e15574 100644 (file)
@@ -198,15 +198,17 @@ using LogNodeRef = TCachedExtentRef<LogNode>;
 struct log_key_t {
   uint16_t key_len = 0;
   uint16_t val_len = 0;
+  uint16_t chunk_idx = 0;
 
   log_key_t() = default;
-  log_key_t(uint16_t k_len, uint16_t v_len)
-  : key_len(k_len), val_len(v_len) {}
+  log_key_t(uint16_t k_len, uint16_t v_len, uint16_t c_idx = 0)
+  : key_len(k_len), val_len(v_len), chunk_idx(c_idx) {}
 
   DENC(log_key_t, v, p) {
     DENC_START(1, 1, p);
     denc(v.key_len, p);
     denc(v.val_len, p);
+    denc(v.chunk_idx, p);
     DENC_FINISH(p);
   }
 };
@@ -214,23 +216,26 @@ struct log_key_t {
 struct log_key_le_t {
   ceph_le16 key_len{0};
   ceph_le16 val_len{0};
+  ceph_le16 chunk_idx{0};
 
   log_key_le_t() = default;
   log_key_le_t(const log_key_le_t &) = default;
   explicit log_key_le_t(const log_key_t &key)
     : key_len(key.key_len),
-      val_len(key.val_len) {}
+      val_len(key.val_len),
+      chunk_idx(key.chunk_idx) {}
 
   log_key_le_t& operator=(log_key_t key) {
     key_len = key.key_len;
     val_len = key.val_len;
+    chunk_idx = key.chunk_idx;
     return *this;
   }
 
 
   operator log_key_t() const {
     return log_key_t{uint16_t(key_len),
-                           uint16_t(val_len)};
+      uint16_t(val_len), uint16_t(chunk_idx)};
   }
 };
 
@@ -462,6 +467,10 @@ public:
       bl.append(bptr);
       return bl;
     }
+
+    uint64_t get_chunk_idx() const {
+      return get_node_key().chunk_idx;
+    }
   };
   
   using const_iterator = iter_t<true>;
@@ -657,6 +666,18 @@ public:
     set_size(get_size() + 1);
   }
 
+  void _append_multi_block_kv(const std::string &key, const ceph::bufferlist &val,
+    const uint16_t idx) {
+    iterator prev_iter(this, get_last_pos());
+    auto last = prev_iter->get_node_key();
+    iterator next_iter(this, get_size() == 0 ? get_last_pos() :
+      get_last_pos() + get_entry_size(last.key_len, last.val_len));
+    next_iter.set_node_key(log_key_t(key.size(), val.length(), idx));
+    next_iter.set_node_val(key, val);
+    ceph_assert(get_size() == 0);
+    set_size(get_size() + 1);
+  }
+
   void _overwrite(const std::string &key, const ceph::bufferlist &val) {
     iterator iter(this, get_last_pos());
     iter.set_node_key(log_key_t(key.size(), val.length()));
@@ -782,6 +803,9 @@ struct LogNode
     return is_mutation_pending() ? &delta_buffer : nullptr;
   }
 
+  void append_multi_block_kv(Transaction &t, const std::string &key,
+    const ceph::bufferlist &val, const uint16_t idx);
+
   void append_kv(Transaction &t, const std::string &key,
     const ceph::bufferlist &val);
 
@@ -926,6 +950,25 @@ struct LogNode
     return free_space() < get_entry_size(ksize, vsize) + reserved_len;
   }
 
+  size_t get_max_val_length(size_t ksize) {
+    return (capacity() - get_entry_size(ksize, 0));
+  }
+
+  bool is_first_multi_block(const std::string &key) const {
+    auto iter = iter_begin();
+    return (iter->get_chunk_idx() == 1 && iter->get_key() == key);
+  }
+
+  bool has_multi_block_kv() const {
+    auto iter = iter_begin();
+    return (iter->get_chunk_idx() >= 1);
+  }
+
+  bool has_multi_block_kv(const std::string &key) const {
+    auto iter = iter_begin();
+    return (iter->get_chunk_idx() >= 1 && iter->get_key() == key);
+  }
+
   void update_delta() {
     if (!delta_buffer.empty()) {
       delta_buffer.replay(*this);