]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: add log_manager to handle pgmeta_oid's key-value pairs separately
authormyoungwon oh <ohmyoungwon@gmail.com>
Thu, 28 Aug 2025 02:49:50 +0000 (11:49 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Thu, 19 Feb 2026 07:30:53 +0000 (16:30 +0900)
Signed-off-by: Myoungwon Oh <ohmyoungwon@gmail.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/cache.cc
src/crimson/os/seastore/omap_manager/log/log_manager.cc [new file with mode: 0644]
src/crimson/os/seastore/omap_manager/log/log_manager.h [new file with mode: 0644]
src/crimson/os/seastore/omap_manager/log/log_node.cc [new file with mode: 0644]
src/crimson/os/seastore/omap_manager/log/log_node.h [new file with mode: 0644]
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore_types.cc
src/crimson/os/seastore/seastore_types.h

index 3c82bfed1cd1d3b8ad442a858cb315617e4656ce..89a2ae0b5a6b30336120a29da2bfd89e93aac7ed 100644 (file)
@@ -20,6 +20,8 @@ set(crimson_seastore_srcs
   omap_manager.cc
   omap_manager/btree/btree_omap_manager.cc
   omap_manager/btree/omap_btree_node_impl.cc
+  omap_manager/log/log_node.cc
+  omap_manager/log/log_manager.cc
   onode.cc
   onode_manager/staged-fltree/node.cc
   onode_manager/staged-fltree/node_extent_manager.cc
index 864654e3488b26a0ee0afc225291f5d5738c9f98..620611a988d3093847dfa79b697af39d74211d5c 100644 (file)
@@ -20,6 +20,7 @@
 #include "crimson/os/seastore/collection_manager/collection_flat_node.h"
 #include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager/seastore.h"
 #include "crimson/os/seastore/backref/backref_tree_node.h"
+#include "crimson/os/seastore/omap_manager/log/log_node.h"
 #include "test/crimson/seastore/test_block.h"
 
 using std::string_view;
@@ -174,7 +175,8 @@ void Cache::register_metrics()
     {extent_types_t::TEST_BLOCK,          sm::label_instance("ext", "TEST_BLOCK")},
     {extent_types_t::TEST_BLOCK_PHYSICAL, sm::label_instance("ext", "TEST_BLOCK_PHYSICAL")},
     {extent_types_t::BACKREF_INTERNAL,    sm::label_instance("ext", "BACKREF_INTERNAL")},
-    {extent_types_t::BACKREF_LEAF,        sm::label_instance("ext", "BACKREF_LEAF")}
+    {extent_types_t::BACKREF_LEAF,        sm::label_instance("ext", "BACKREF_LEAF")},
+    {extent_types_t::LOG_NODE,        sm::label_instance("ext", "LOG_NODE")}
   };
   assert(labels_by_ext.size() == (std::size_t)extent_types_t::NONE);
 
@@ -1137,6 +1139,9 @@ CachedExtentRef Cache::alloc_new_non_data_extent_by_type(
     return CachedExtentRef();
   case extent_types_t::TEST_BLOCK_PHYSICAL:
     return alloc_new_non_data_extent<TestBlockPhysical>(t, length, hint, gen);
+  case extent_types_t::LOG_NODE:
+     return alloc_new_non_data_extent<log_manager::LogNode>(
+       t, length, hint, gen);
   case extent_types_t::NONE: {
     ceph_assert(0 == "NONE is an invalid extent type");
     return CachedExtentRef();
@@ -2394,6 +2399,10 @@ Cache::_get_absent_extent_by_type(
   case extent_types_t::TEST_BLOCK_PHYSICAL:
     ret = CachedExtent::make_cached_extent_ref<TestBlockPhysical>(length);
     break;
+  case extent_types_t::LOG_NODE:
+    ret = CachedExtent::make_cached_extent_ref<
+      log_manager::LogNode>(length);
+    break;
   case extent_types_t::NONE:
     ceph_assert(0 == "NONE is an invalid extent type");
     break;
@@ -2516,6 +2525,12 @@ Cache::do_get_caching_extent_by_type(
     ).safe_then([](auto extent) {
       return CachedExtentRef(extent.detach(), false /* add_ref */);
     });
+  case extent_types_t::LOG_NODE:
+    return do_get_caching_extent<log_manager::LogNode>(
+      offset, length, std::move(extent_init_func), std::move(on_cache), p_src
+    ).safe_then([](auto extent) {
+      return CachedExtentRef(extent.detach(), false /* add_ref */);
+    });
   case extent_types_t::NONE: {
     ceph_assert(0 == "NONE is an invalid extent type");
     return get_extent_ertr::make_ready_future<CachedExtentRef>();
diff --git a/src/crimson/os/seastore/omap_manager/log/log_manager.cc b/src/crimson/os/seastore/omap_manager/log/log_manager.cc
new file mode 100644 (file)
index 0000000..6fc0655
--- /dev/null
@@ -0,0 +1,515 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include <string>
+#include <vector>
+
+#include "crimson/common/log.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/transaction_manager.h"
+#include "log_manager.h"
+#include "log_node.h"
+#include "crimson/os/seastore/omap_manager/btree/btree_omap_manager.h"
+
+SET_SUBSYS(seastore_omap);
+
+namespace crimson::os::seastore::log_manager{
+
+base_iertr::future<laddr_t> LogManager::get_dup_addr_from_root(Transaction &t, laddr_t addr) {
+  auto ext = co_await log_load_extent<LogNode>(
+    t, addr, BEGIN_KEY, END_KEY);
+  assert(ext);
+  co_return ext->get_dup_tail_addr();
+}
+
+LogManager::LogManager(
+  TransactionManager &tm)
+  : tm(tm) {}
+
+LogManager::initialize_omap_ret
+LogManager::initialize_omap(Transaction &t, laddr_t hint, omap_type_t omap_type) 
+{
+  LOG_PREFIX(LogManager::initialize_omap);
+  DEBUGT("hint: {}", t, hint);
+  auto extent = co_await tm.alloc_non_data_extent<LogNode>(
+    t, hint, LOG_NODE_BLOCK_SIZE
+  ).handle_error_interruptible(
+    crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+    TransactionManager::alloc_extent_iertr::pass_further{}
+  );
+  // for dup list
+  auto d_extent = co_await tm.alloc_non_data_extent<LogNode>(
+    t, hint, LOG_NODE_BLOCK_SIZE
+  ).handle_error_interruptible(
+    crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+    TransactionManager::alloc_extent_iertr::pass_further{}
+  );
+  extent->set_dup_tail_addr(d_extent->get_laddr());
+
+  omap_root_t omap_root;
+  omap_root.update(extent->get_laddr(), 1, hint,
+    omap_type_t::LOG);
+  t.get_omap_tree_stats().extents_num_delta++;
+  co_return std::move(omap_root);
+}
+
+LogManager::omap_set_keys_ret
+LogManager::omap_set_keys(
+  omap_root_t &log_root,
+  Transaction &t, std::map<std::string, ceph::bufferlist>&& _kvs) 
+{
+  LOG_PREFIX(LogManager::omap_set_keys);
+  DEBUGT("enter kv size {}", t, _kvs.size());
+  assert(log_root.get_type() == omap_type_t::LOG);
+
+  auto kvs = std::move(_kvs);
+  auto ext = co_await log_load_extent<LogNode>(
+    t, log_root.addr, BEGIN_KEY, END_KEY);
+  ceph_assert(ext);
+  std::map<std::string, ceph::bufferlist> dup_kvs;
+  for (auto &p : kvs) {
+    CachedExtentRef node;
+    Transaction::get_extent_ret ret;
+    // To find mutable extent in the same transaction
+    ret = t.get_extent(ext->get_paddr(), &node);
+    assert(ret == Transaction::get_extent_ret::PRESENT);
+    assert(node);
+    LogNodeRef log_node = node->template cast<LogNode>();
+    if (is_dup_log_key(p.first)) {
+      dup_kvs[p.first] = p.second;
+      continue;
+    }
+    co_await _log_set_key(log_root, t, log_node, p.first, p.second);
+    co_return;
+  };
+
+  if (!dup_kvs.empty()) {
+    ext = co_await log_load_extent<LogNode>(
+        t,
+        co_await get_dup_addr_from_root(t, log_root.addr),
+        BEGIN_KEY,
+        END_KEY);
+    for (auto &p: dup_kvs) {
+      co_await _log_set_key(log_root, t, ext, p.first, p.second);
+    }
+  }
+
+  co_return;
+}
+
+LogManager::omap_set_key_ret 
+LogManager::omap_set_key(
+  omap_root_t &log_root,
+  Transaction &t,
+  const std::string &key, const ceph::bufferlist &value) 
+{
+  LOG_PREFIX(LogManager::omap_set_key);
+  DEBUGT("enter k={}", t, key);
+  assert(log_root.get_type() == omap_type_t::LOG);
+
+  std::map<std::string, ceph::bufferlist> kvs;
+  kvs.emplace(key, value);
+  co_return co_await omap_set_keys(log_root, t, std::move(kvs));
+}
+
+LogManager::omap_set_key_ret
+LogManager::_log_set_key(omap_root_t &log_root,
+  Transaction &t, LogNodeRef tail,
+  const std::string &key, const ceph::bufferlist &value)
+{
+  LOG_PREFIX(LogManager::_log_set_key);
+  DEBUGT("enter key={}", t, key);
+  assert(tail);
+  if (!tail->expect_overflow(key.size(), value.length())) {
+    auto mut = tm.get_mutable_extent(t, tail)->cast<LogNode>();
+    mut->append_kv(t, key, value);
+    co_return;
+  }
+  auto extent = co_await tm.alloc_non_data_extent<LogNode>(
+    t, log_root.hint, LOG_NODE_BLOCK_SIZE
+  ).handle_error_interruptible(
+    crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+    omap_set_key_iertr::pass_further{}
+  );
+  assert(extent);
+  if (!is_dup_log_key(key)) {
+    // Normal log key:
+    // Advance the log_root to the new tail extent.
+    // Preserve the existing dup tail by inheriting it from the previous tail.
+    log_root.update(extent->get_laddr(), log_root.depth,
+      log_root.hint, log_root.type);
+    assert(tail->get_dup_tail_addr() != L_ADDR_NULL);
+    extent->set_dup_tail_addr(tail->get_dup_tail_addr());
+  } else {
+    // Dup log key:
+    // Update the dup tail pointer in the current log tail
+    //   to point to the newly created dup extent.
+    auto ext = co_await log_load_extent<LogNode>(
+      t, log_root.addr, BEGIN_KEY, END_KEY);
+    auto mut = tm.get_mutable_extent(t, ext)->cast<LogNode>();
+    mut->set_dup_tail_addr(extent->get_laddr());
+  }
+  extent->append_kv(t, key, value);
+  extent->set_prev_addr(tail->get_laddr());
+  co_return;
+}
+
+std::ostream &LogNode::print_detail_l(std::ostream &out) const
+{
+  laddr_t l = this->get_prev_addr();
+  out << ", prev=" << l
+      << ", num=" << this->get_size()
+      << ", used_space=" << this->use_space()
+      << ", capacity=" << this->get_capacity()
+      << ", last_pos=" << this->get_last_pos();
+  if (has_laddr()) {
+    out << ", begin=" << get_begin()
+       << ", end=" << get_end();
+  }
+  return out;
+}
+
+template <typename T>
+requires std::is_same_v<LogNode, T>
+LogManager::log_load_extent_iertr::future<TCachedExtentRef<T>> 
+LogManager::log_load_extent(
+  Transaction &t,
+  laddr_t laddr,
+  std::string begin,
+  std::string end)
+{
+  LOG_PREFIX(LogManager::log_load_extent);
+  DEBUGT("laddr={}", t, laddr);
+  assert(end <= END_KEY);
+  auto size = LOG_NODE_BLOCK_SIZE;
+  auto maybe_indirect_extent = co_await tm.read_extent<T>(t, laddr, size,
+    [begin=std::move(begin), end=std::move(end)](T &extent) mutable {
+      assert(!extent.is_seen_by_users());
+      extent.init_range(std::move(begin), std::move(end));
+    }
+  ).handle_error_interruptible(
+    log_load_extent_iertr::pass_further{},
+    crimson::ct_error::assert_all{ "Invalid error in log_load_extent" }
+  );
+
+  assert(!maybe_indirect_extent.is_indirect());
+  assert(!maybe_indirect_extent.is_clone);
+  co_return std::move(maybe_indirect_extent.extent);
+}
+
+LogManager::omap_get_value_ret
+LogManager::omap_get_value(
+  const omap_root_t &log_root, Transaction &t, const std::string &key)
+{
+  LOG_PREFIX(LogManager::omap_get_value);
+  DEBUGT("key={}", t, key);
+  assert(log_root.get_type() == omap_type_t::LOG);
+  std::optional<bufferlist> ret;
+  if (!is_dup_log_key(key)) {
+    ret = co_await find_kv(t, log_root.addr, key);
+  } else {
+    ret = co_await find_kv(t, 
+      co_await get_dup_addr_from_root(t, log_root.addr), key);
+  }
+  co_return ret;
+}
+
+LogManager::omap_list_ret
+LogManager::omap_list(
+  const omap_root_t &log_root,
+  Transaction &t,
+  const std::optional<std::string> &first,
+  const std::optional<std::string> &last,
+  OMapManager::omap_list_config_t config)
+{
+  LOG_PREFIX(LogManager::omap_list);
+  DEBUGT("first={}, last={}", t, first, last);
+  assert(log_root.get_type() == omap_type_t::LOG);
+  std::map<std::string, bufferlist> kvs;
+  co_await find_kvs(t, log_root.addr, first, last, kvs);
+  // for dup list
+  co_await find_kvs(t,
+    co_await get_dup_addr_from_root(t, log_root.addr), first, last, kvs);
+  auto ret = omap_list_bare_ret(false, {});
+  auto &[complete, result] = ret;
+  result.insert(kvs.begin(), kvs.end());
+  co_return std::move(ret);
+}
+
+LogManager::omap_list_iertr::future<>
+LogManager::find_kvs(Transaction &t, laddr_t dst,
+  const std::optional<std::string> &first,
+  const std::optional<std::string> &last,
+  std::map<std::string, bufferlist> &kvs)
+{
+  LOG_PREFIX(LogManager::find_kvs);
+  DEBUGT("first={}, last={}, dst={}", t, first, last, dst);
+  if (dst == L_ADDR_NULL) {
+    co_return;
+  }
+  auto extent = co_await log_load_extent<LogNode>(
+    t, dst, BEGIN_KEY, END_KEY);
+  if (extent == nullptr) {
+    co_return;
+  }
+  extent->list(first, last, kvs);
+  co_await find_kvs(t, extent->get_prev_addr(), first, last, kvs);
+  co_return;
+}
+
+
+LogManager::omap_get_value_ret
+LogManager::find_kv(Transaction &t, laddr_t dst, const std::string &key)
+{
+  LOG_PREFIX(LogManager::find_kv);
+  DEBUGT("key={}, dst={}", t, key, dst);
+
+  auto extent = co_await log_load_extent<LogNode>(
+    t, dst, BEGIN_KEY, END_KEY);
+  if (extent == nullptr) {
+    co_return std::nullopt;
+  }
+
+  auto e = co_await extent->get_value(key);
+  if (e == std::nullopt) {
+    if(extent->get_prev_addr() == L_ADDR_NULL) {
+      co_return std::nullopt;
+    }
+    auto ret = co_await find_kv(t, extent->get_prev_addr(), key);
+    co_return ret;
+  }
+  co_return std::move(e);
+}
+
+LogManager::omap_rm_key_ret
+LogManager::remove_node(Transaction &t, LogNodeRef mut, LogNodeRef prev)
+{
+  LOG_PREFIX(LogManager::remove_node);
+  if (prev == nullptr) { 
+    // This is the tail, so just reinitialize the LogNode.
+    // A LogNode for the pg log should preserve the dup tail.
+    laddr_t prev_addr = mut->get_prev_addr();
+    laddr_t dup_tail_addr = mut->get_dup_tail_addr();
+    mut->set_init_vars();
+    mut->set_prev_addr(prev_addr);
+    mut->set_dup_tail_addr(dup_tail_addr);
+    co_return;
+  }
+  assert(mut);
+  DEBUGT("mut={}, prev={}", t, *mut, *prev);
+  laddr_t prev_addr = mut->get_prev_addr();
+  co_await tm.remove(t, mut->get_laddr()
+  ).handle_error_interruptible(
+    omap_rm_key_iertr::pass_further{},
+    crimson::ct_error::assert_all{"Invalid error in remove_node"}
+  );
+  auto mut_prev = tm.get_mutable_extent(t, prev)->template cast<LogNode>();
+  assert(mut_prev);
+  mut_prev->set_prev_addr(prev_addr);
+  co_return;
+}
+
+LogManager::omap_rm_key_ret
+LogManager::remove_kv(Transaction &t, laddr_t dst, const std::string &key, LogNodeRef prev)
+{
+  LOG_PREFIX(LogManager::remove_kv);
+  DEBUGT("key={}, dst={}", t, key, dst);
+
+  auto extent = co_await log_load_extent<LogNode>(
+    t, dst, BEGIN_KEY, END_KEY);
+  if (extent == nullptr) {
+    co_return;
+  }
+
+  auto e = co_await extent->get_value(key);
+  if (e == std::nullopt) {
+    if(extent->get_prev_addr() == L_ADDR_NULL) {
+      co_return;
+    }
+    co_await remove_kv(t, extent->get_prev_addr(), key, extent);
+    co_return;
+  }
+
+  auto mut = tm.get_mutable_extent(t, extent)->template cast<LogNode>();
+  mut->remove_entry(key);
+  if (mut->is_removable()) {
+    co_await remove_node(t, mut, prev);
+    if (prev != nullptr && !is_log_key(key) && mut->get_prev_addr() != L_ADDR_NULL) {
+      mut = co_await log_load_extent<LogNode>(
+       t, prev->get_laddr(), BEGIN_KEY, END_KEY);
+    }
+  }
+  if (!is_log_key(key) && mut->get_prev_addr() != L_ADDR_NULL) {
+    // Remove all duplicate keys
+    co_await remove_kv(t, mut->get_prev_addr(), key, mut);
+  }
+  co_return;
+}
+
+LogManager::omap_rm_key_ret
+LogManager::remove_kvs(Transaction &t, laddr_t dst, 
+  std::optional<std::string> first, 
+  std::optional<std::string> last,
+  LogNodeRef prev)
+{
+  LOG_PREFIX(LogManager::remove_kvs);
+  DEBUGT("first={}, last={}, dst={}", t, first, last, dst);
+      
+  if (dst == L_ADDR_NULL || first == std::nullopt) {
+    co_return;
+  }
+
+  auto extent = co_await log_load_extent<LogNode>(
+    t, dst, BEGIN_KEY, END_KEY);
+  if (extent == nullptr) {
+    co_return;
+  }
+  auto l = last;
+  if (l && (*l).empty()) {
+    l = std::nullopt;
+  }
+
+  laddr_t prev_addr = extent->get_prev_addr();
+
+  if (is_log_key(*first)) {
+    // skip to search due to out of range
+    if (l != std::nullopt && extent->log_has_larger_than(*last)) {
+      co_await remove_kvs(t, prev_addr, first, last, extent);
+      co_return;
+    }
+    // If time-seris log, we don't need traversal anymore
+    if (*first != std::string() && extent->log_less_than(*first)) {
+      co_return;
+    }
+  }
+
+  LogNode::range_t r = extent->has_between(first, l);
+  LogNodeRef p = extent;
+  if (r == LogNode::range_t::HAS_BETWEEN) {
+    auto mut = tm.get_mutable_extent(t, extent)->template cast<LogNode>();
+    assert(mut);
+    auto ret = mut->remove_entries(first, l);
+    assert(ret);
+    DEBUGT("remove {}, extent's last key of deleted entries={}",
+      t, *extent, extent->get_last_key());
+    p = mut;
+    if (mut->is_removable()) {
+      co_await remove_node(t, mut, prev);
+      if (prev != nullptr) {
+       p = co_await log_load_extent<LogNode>(
+         t, prev->get_laddr(), BEGIN_KEY, END_KEY);
+      }
+    }
+  }
+  co_await remove_kvs(t, prev_addr, first, last, p);
+  co_return;
+}
+
+LogManager::omap_rm_key_ret 
+LogManager::omap_rm_key(
+  omap_root_t &log_root,
+  Transaction &t,
+  const std::string &key)
+{
+  LOG_PREFIX(LogManager::omap_rm_key);
+  DEBUGT("key={}", t, key);
+  assert(log_root.get_type() == omap_type_t::LOG);
+  if (!is_dup_log_key(key)) {
+    co_await remove_kv(t, log_root.addr, key, nullptr);
+  } else {
+    co_await remove_kv(t, 
+      co_await get_dup_addr_from_root(t, log_root.addr), key, nullptr);
+  }
+  co_return;
+}
+
+LogManager::omap_rm_key_range_ret 
+LogManager::omap_rm_key_range(
+  omap_root_t &log_root,
+  Transaction &t,
+  const std::string &first,
+  const std::string &last)
+{
+  LOG_PREFIX(LogManager::omap_rm_key_range);
+  DEBUGT("first={}, last={}", t, first, last);
+  assert(log_root.get_type() == omap_type_t::LOG);
+  co_await remove_kvs(t, log_root.addr, first, last, nullptr);
+  // for dup list
+  co_await remove_kvs(t, 
+    co_await get_dup_addr_from_root(t, log_root.addr),
+    first, last, nullptr);
+  co_return;
+}
+
+LogManager::omap_clear_ret
+LogManager::omap_clear(omap_root_t &root, Transaction &t)
+{
+  LOG_PREFIX(LogManager::omap_clear);
+  DEBUGT("enter", t);
+  assert(root.get_type() == omap_type_t::LOG);
+  co_await remove_kvs(t, root.addr,
+    std::optional<std::string>(),
+    std::optional<std::string>(std::nullopt), nullptr);
+  co_await remove_kvs(t, 
+    co_await get_dup_addr_from_root(t, root.addr),
+    std::optional<std::string>(),
+    std::optional<std::string>(std::nullopt), nullptr);
+  co_await tm.remove(t, co_await get_dup_addr_from_root(t, root.addr)
+  ).handle_error_interruptible(
+    omap_clear_iertr::pass_further{},
+    crimson::ct_error::assert_all{"Invalid error in omap_clear"}
+  );
+  co_await tm.remove(t, root.get_location()
+  ).handle_error_interruptible(
+    omap_clear_iertr::pass_further{},
+    crimson::ct_error::assert_all{"Invalid error in omap_clear"}
+  );
+  root.update(
+    L_ADDR_NULL,
+    0, L_ADDR_MIN, root.get_type());
+  co_return;
+}
+
+LogManager::omap_iterate_ret 
+LogManager::omap_iterate(
+  const omap_root_t &log_root,
+  Transaction &t,
+  ObjectStore::omap_iter_seek_t &start_from,
+  omap_iterate_cb_t callback)
+{
+  LOG_PREFIX(LogManager::omap_iterate);
+  DEBUGT("start={}", t, start_from.seek_position);
+  assert(log_root.get_type() == omap_type_t::LOG);
+
+  std::string s = start_from.seek_position;
+  std::map<std::string, bufferlist> kvs;
+  if (start_from.seek_type == ObjectStore::omap_iter_seek_t::LOWER_BOUND) {
+    co_await find_kvs(t, log_root.addr, std::optional<std::string>(s),
+      std::optional<std::string>(std::nullopt), kvs);
+    co_await find_kvs(t,
+      co_await get_dup_addr_from_root(t, log_root.addr),
+      std::optional<std::string>(s),
+      std::optional<std::string>(std::nullopt), kvs);
+  } else {
+    assert(start_from.seek_type == ObjectStore::omap_iter_seek_t::UPPER_BOUND);
+    co_await find_kvs(t, log_root.addr, std::optional<std::string>(std::nullopt),
+      std::optional<std::string>(s), kvs);
+    co_await find_kvs(t, 
+      co_await get_dup_addr_from_root(t, log_root.addr),
+      std::optional<std::string>(std::nullopt),
+      std::optional<std::string>(s), kvs);
+  }
+
+  ObjectStore::omap_iter_ret_t ret;
+  for (auto &p : kvs) {
+    std::string result(p.second.c_str(), p.second.length());
+    ret = callback(p.first, result);
+    if (ret == ObjectStore::omap_iter_ret_t::STOP) {
+      break;
+    }
+  }
+  co_return co_await omap_iterate_iertr::make_ready_future<
+    ObjectStore::omap_iter_ret_t>(std::move(ret));
+}
+
+
+}
diff --git a/src/crimson/os/seastore/omap_manager/log/log_manager.h b/src/crimson/os/seastore/omap_manager/log/log_manager.h
new file mode 100644 (file)
index 0000000..d11dfb9
--- /dev/null
@@ -0,0 +1,315 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "include/denc.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/transaction_manager.h"
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/omap_manager.h"
+#include "log_node.h"
+
+namespace crimson::os::seastore::log_manager{
+
+class LogNode;
+using LogNodeRef = TCachedExtentRef<LogNode>;
+
+/*
+ * 
+ * LogManager aims to handle key-value pairs for pgmeta_oid,
+ * specialized in handling pg_log_entry_t.
+ *
+ *
+ * To support ranged operations (insertion and deletion),
+ * LogManager maintains two singl linked lists:
+ *
+ *   1) pg_log_entry_t list (including all KV entries except pg_log_dup_t)
+ *   2) pg_log_dup_t list (duplicate entries only)
+ *
+ * The ONode points to the LogNode that contains the latest
+ * pg_log_entry_t. That pg_log_entry_t, in turn, holds a pointer
+ * to the head of the pg_log_dup_t list.
+ *
+ * Layout:
+ *
+ *                +----------------+
+ *                |     ONode      |
+ *                +----------------+
+ *                        |
+ *                        v
+ *        +----------------------------------+
+ *    <-  | LogNode (latest pg_log_entry_t)  |
+ *        +----------------------------------+
+ *                        |
+ *                        v
+ *        +----------------------------------+
+ *    <-  | LogNode (latest pg_log_dup_t)    |
+ *        +----------------------------------+
+ *
+ * This separation allows efficient ranged updates while
+ * avoiding full traversal of duplicate entries.
+ */
+class LogManager : public OMapManager {
+public:
+  LogManager(TransactionManager &tm);
+  initialize_omap_ret initialize_omap(Transaction &t,
+    laddr_t hint, omap_type_t type) final;
+
+  /**
+   * omap_set_keys
+   *
+   *  Load the LOG tail extent and, for each (key, value):
+   *    - If the key belongs to the LOG node, write via _log_set_key().
+   *
+   * @param root   LOG root the higher layer passed in.
+   * @param t      Transaction context
+   * @param _kvs   Batch of keys to set
+   */
+  omap_set_keys_ret omap_set_keys(omap_root_t &log_root,
+    Transaction &t, std::map<std::string, ceph::bufferlist>&& _kvs) final;
+
+  // see omap_set_keys
+  omap_set_key_ret omap_set_key(
+    omap_root_t &log_root,
+    Transaction &t,
+    const std::string &key,
+    const ceph::bufferlist &value) final;
+
+  /**
+   * omap_get_value
+   *
+   * get a key-value pair in either object's LOG root
+   *
+   * @param root       LOG root the higher layer passed in.
+   * @param t          Transaction context 
+   * @param key        The key to retrieve
+   *
+   */
+  omap_get_value_ret
+  omap_get_value(const omap_root_t &log_root, Transaction &t,
+    const std::string &key) final;
+
+  /**
+   * omap_list
+   *
+   *  1) Resolve LOG from onode.
+   *  2) Collect LOG list's key–values in the range [first, last] with find_kvs().
+   *  3) Initialize an output pair (complete flag, result map), seed it with LOG entries.
+   *  4) Merge base entries into the result map.
+   *
+   * @param root    LOG root the higher layer passed in.
+   * @param t       Transaction context
+   * @param first   Optional lower bound key
+   * @param last    Optional upper bound key
+   * @param config  see OMapManager
+   */
+  omap_list_ret omap_list(
+    const omap_root_t &log_root,
+    Transaction &t,
+    const std::optional<std::string> &first,
+    const std::optional<std::string> &last,
+    OMapManager::omap_list_config_t config =
+    OMapManager::omap_list_config_t()) final;
+
+  /**
+   * omap_rm_key_range
+   *
+   * - Remove entries in the LOG list within [first, last] by walking the LOG list.
+   *
+   * @param root    LOG root the higher layer passed in.
+   * @param t       Transaction context 
+   * @param first   Lower key bound for removal.
+   * @param last    Upper key bound for removal.
+   * @param config  see OMapManager
+   */
+
+  omap_rm_key_range_ret omap_rm_key_range(
+    omap_root_t &log_root,
+    Transaction &t,
+    const std::string &first,
+    const std::string &last) final;
+
+  /**
+   * omap_rm_key
+   *
+   * clear a key in either object's LOG list 
+   *
+   *  - If the key can be satisfied by the LOG list â€” i.e., the LOG list
+   *    contains the relevant entry â€” remove from the LOG by walking the
+   *    list and stop.
+   *
+   * @param root       LOG root the higher layer passed in.
+   * @param t          Transaction context 
+   * @param key        The key to remove.
+   *
+   */
+  omap_rm_key_ret omap_rm_key(
+    omap_root_t &log_root,
+    Transaction &t,
+    const std::string &key) final;
+
+  /**
+   * omap_clear
+   *
+   * clear all entires in object's LOG list 
+   *
+   * @param root       LOG root the higher layer passed in.
+   * @param t          Transaction context 
+   *
+   */
+  omap_clear_ret omap_clear(omap_root_t &log_root,
+    Transaction &t) final;
+
+
+  /**
+   * omap_iterate
+   *
+   * This routine first consults the LOG list (omap_type_t::LOG) to
+   * perform a traveral, invoking the user-provided callback on
+   * those entries
+   *
+   * Ordering & range:
+   *  - If start_from.seek_type == LOWER_BOUND, we fetch keys in the half-open
+   *    range [s, end) from the LOG list.
+   *  - If start_from.seek_type == UPPER_BOUND, we fetch keys in the range
+   *    (start, s] from the LOG list.
+   *
+   * @param root       LOG root the higher layer passed in.
+   * @param t          Transaction context 
+   * @param start_from Seek hint: position string and LOWER/UPPER bound type.
+   * @param callback  
+   *
+   */
+  omap_iterate_ret omap_iterate(
+    const omap_root_t &log_root,
+    Transaction &t,
+    ObjectStore::omap_iter_seek_t &start_from,
+    omap_iterate_cb_t callback
+  ) final;
+
+
+  omap_list_iertr::future<>
+  find_kvs(Transaction &t, laddr_t dst, const std::optional<std::string> &first,
+    const std::optional<std::string> &last, std::map<std::string, bufferlist> &kvs);
+
+  using log_load_extent_iertr = base_iertr;
+  template <typename T>
+  requires std::is_same_v<LogNode, T>
+  log_load_extent_iertr::future<TCachedExtentRef<T>> log_load_extent(
+    Transaction &t, laddr_t laddr, std::string begin, std::string end);
+
+  omap_get_value_ret find_kv(Transaction &t, laddr_t dst, const std::string &key);
+
+  /**
+   * _log_set_key
+   *
+   *  - Fast path: if the current LOG node (tail) has enough space for (key,value),
+   *    get a mutable view within this transaction and append in place.
+   *  - Split path: if appending would overflow the LOG node, allocate
+   *   a fresh LogNode extent,
+   *    make it the new LOG tail (update log_root), append the KV there, and link
+   *    the previous head via prev_addr.
+   *
+   * @param log_root  LOG root descriptor
+   * @param t         Transaction context
+   * @param tail      Current append target
+   * @param key       Key to set/append.
+   * @param value     Value to set/append.
+   *
+   */
+  omap_set_key_ret _log_set_key(omap_root_t &log_root,
+    Transaction &t, LogNodeRef e, const std::string &key,
+    const ceph::bufferlist &value);
+
+  /**
+   * remove_kv
+   *
+   * This function searches for the given @key starting from the LogNode
+   * identified by @dst and recursively traverses the prev chain until
+   * the key is found or the chain is exhausted.
+   *
+   * When the key is found, the corresponding entry is marked as removed
+   * in the node's deletion bitmap. If the node becomes empty as a result,
+   * it may be removed from the chain via remove_node().
+   * For non-log (non-time-series) keys, duplicate keys must not remain.
+   * In this case, once the key is removed from the current node, the
+   * function continues recursively to remove all remaining duplicates
+   * in earlier nodes.
+   *
+   * @param t     Transaction context.
+   * @param dst   Logical address of the LogNode to start searching from.
+   * @param key   The key to be removed.
+   * @param prev  The next LogNode in the chain (nullptr if @dst is the tail).
+   */
+
+  omap_rm_key_ret remove_kv(Transaction &t, laddr_t dst, const std::string &key,
+    LogNodeRef prev);
+  
+  /**
+   * remove_kvs
+   *
+   * Starting at logical address dst, this loads a LogNode extent,
+   * gathers entries in the [first, last] range, and decides
+   * whether the current extent can be removed
+   * If so, it removes the extent and fixes the link pointer
+   * with prev. Otherwise it recurses to the previous extent.
+   *
+   * @param t       Transaction context.
+   * @param dst     Logical address of the starting extent
+   * @param first   lower key bound (optional).
+   * @param last    upper key bound (optional). Empty string => unbounded.
+   * @param prev    The successor of dst in the forward direction (used to fix links
+   *                when dst is removed). For the initial call at tail, pass nullptr.
+   *
+   * @return omap_rm_key_ret 
+   */
+  omap_rm_key_ret remove_kvs(Transaction &t, laddr_t dst,
+    std::optional<std::string> first, 
+    std::optional<std::string> last,
+    LogNodeRef prev);
+
+  
+  /**
+   * remove_node
+   *
+   * If @prev is nullptr, the node is the tail of the chain. In this case,
+   * the node is not physically removed; instead, it is re-initialized
+   * while preserving its prev pointer (if any).
+   *
+   * Otherwise, the node is physically removed from the transaction manager,
+   * and the previous node (@prev) is updated to bypass the removed node
+   * by inheriting its prev address.
+   *
+   * @param t     Transaction context.
+   * @param mut   The LogNode to be removed or re-initialized.
+   * @param prev  The next LogNode in the chain (nullptr if @mut is the tail).
+   */
+  LogManager::omap_rm_key_ret remove_node(Transaction &t,
+    LogNodeRef mut,
+    LogNodeRef prev);
+
+  base_iertr::future<laddr_t> get_dup_addr_from_root(Transaction &t, laddr_t addr);
+
+  TransactionManager &tm;
+};
+
+inline bool is_log_key(std::string s) {
+  pg_log_entry_t e;
+  return (s.size() == e.get_key_name().size() &&
+      (s[0] >= (0 + '0') && s[0] <= (9 + '0'))) ||
+      s.starts_with("dup_");
+}
+
+inline bool is_pg_log_key(const std::string &s) {
+  pg_log_entry_t e;
+  return (s.size() == e.get_key_name().size() &&
+      (s[0] >= (0 + '0') && s[0] <= (9 + '0')));
+}
+
+inline bool is_dup_log_key(const std::string &s) {
+  return s.starts_with("dup_");
+}
+}
diff --git a/src/crimson/os/seastore/omap_manager/log/log_node.cc b/src/crimson/os/seastore/omap_manager/log/log_node.cc
new file mode 100644 (file)
index 0000000..b994635
--- /dev/null
@@ -0,0 +1,262 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include <string>
+#include <vector>
+
+#include "crimson/common/log.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "log_node.h"
+
+namespace crimson::os::seastore::log_manager{
+
+void delta_t::replay(LogKVNodeLayout &l) {
+  if (op == op_t::APPEND) {
+    l._append(key, val);
+    return;
+  } else if (op == op_t::ADD_PREV) {
+    l.set_prev_node(prev);
+  } else if (op == op_t::ADD_DUP_ADDR) {
+    l.set_dup_tail(prev);
+  } else if (op == op_t::INIT) {
+    l.set_last_pos(0); 
+    l.set_size(0);
+    l.set_prev_node(L_ADDR_NULL);
+    l.set_reserved_len(0);
+    l.set_reserved_size(0);
+    l.init_bitmap();
+  } else if (op == op_t::REMOVE) {
+    d_bitmap_t bitmap;
+    auto biter = val.cbegin();
+    ceph::decode(bitmap, biter);
+    l._set_d_bitmap(bitmap);
+  }
+
+}
+
+void LogNode::append_kv(Transaction &t, const std::string &key,
+    const ceph::bufferlist &val) {
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    journal_append(key, val, p);
+    return;
+  }
+  append(key, val);
+}
+
+void LogNode::set_prev_addr(laddr_t l) {
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    journal_append_prev_addr(l, p);
+    return;
+  }
+  set_prev_node(l);
+}
+
+void LogNode::set_dup_tail_addr(laddr_t l) {
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    journal_append_dup_tail_addr(l, p);
+    return;
+  }
+  set_dup_tail(l);
+}
+
+void LogNode::set_init_vars() {
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    journal_append_init(p);
+    return;
+  }
+  init_vars();
+}
+
+void LogNode::append_remove(ceph::bufferlist bl) {
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    journal_append_remove(p, bl);
+    return;
+  }
+  d_bitmap_t bitmap;
+  auto biter = bl.cbegin();
+  decode(bitmap, biter);
+  _set_d_bitmap(bitmap);
+}
+
+bool LogNode::is_removable() {
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    auto ret = p->get_latest_d_bitmap();
+    if (ret) {
+      d_bitmap_t bitmap;
+      auto biter = (*ret).cbegin();
+      decode(bitmap, biter);
+      return bitmap.is_all_set(get_size() + get_reserved_size());
+    }
+  }
+  auto bitmap = get_d_bitmap();
+  return bitmap.is_all_set(get_size());
+}
+
+void LogNode::set_cur_bitmap(uint32_t begin, uint32_t end) {
+  d_bitmap_t bitmap = get_d_bitmap();
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    auto ret = p->get_latest_d_bitmap();
+    if (ret) {
+      auto biter = (*ret).cbegin();
+      decode(bitmap, biter);
+    }
+  } 
+  bitmap.set_bitmap_range(begin, end);
+  bufferlist bl;
+  encode(bitmap, bl);
+  append_remove(bl);
+}
+
+d_bitmap_t LogNode::get_cur_bitmap() {
+  d_bitmap_t bitmap = get_d_bitmap();
+  auto p = maybe_get_delta_buffer();
+  if (p) {
+    auto ret = p->get_latest_d_bitmap();
+    if (ret) {
+      auto biter = (*ret).cbegin();
+      decode(bitmap, biter);
+    } 
+  } 
+  return bitmap;
+}
+
+void LogNode::set_bitmap(d_bitmap_t map) {
+  bufferlist bl;
+  encode(map, bl);
+  append_remove(bl);
+}
+
+template <typename F>
+void LogNode::for_each_live_entry(F&& fn) {
+  d_bitmap_t bitmap;
+  if (auto p = maybe_get_delta_buffer()) {
+    if (auto ret = p->get_latest_d_bitmap()) {
+      auto it = (*ret).cbegin();
+      decode(bitmap, it);
+    }
+  } else {
+    bitmap = get_d_bitmap();
+  }
+
+  uint32_t index = 0;
+  auto iter = iter_begin();
+  while (iter != iter_end()) {
+    if (!bitmap.is_set(index)) {
+      if (fn(*iter, index)) {
+       return;
+      }
+    }
+    ++iter;
+    ++index;
+  }
+}
+
+void LogNode::list(const std::optional<std::string> &first,
+  const std::optional<std::string> &last,
+  std::map<std::string, bufferlist> &kvs) {
+  std::string_view s(*first);
+  std::string_view e = last ? std::string_view(*last) : std::string_view{};
+  for_each_live_entry([&](const auto& ent, uint32_t index) -> bool {
+    const auto k = ent.get_key();
+    if (k >= s && (!last || k <= e)) {
+      kvs[k] = ent.get_val();
+    }
+    return false;
+  });
+}
+
+LogNode::get_value_ret LogNode::get_value(const std::string &key)
+{
+  bufferlist bl;
+  bool found = false;
+  for_each_live_entry([&](const auto& ent, uint32_t index) -> bool {
+    const auto k = ent.get_key();
+    if (k == key) {
+      bl = ent.get_val();
+      /* If key is time-series log,
+       * duplicate does not exist. In this case, return latest one */
+      if (is_log_key(k)) {
+       found = true;
+       return true;
+      }
+    }
+    return false;
+  });
+  if (bl.length() > 0 || found) {
+    return get_value_ret(
+      interruptible::ready_future_marker{},
+      std::move(bl));
+  }
+
+  return get_value_ret(
+    interruptible::ready_future_marker{},
+    std::nullopt);
+}
+
+bool LogNode::remove_entry(const std::string key)
+{
+  auto iter = iter_begin();
+  uint32_t index = 0;
+  while(iter != iter_end()) {
+    if (iter->get_key() == key) {
+      set_cur_bitmap(index, index);
+      /* If key is time-series log,
+       * duplicate key does not exist. In this case, return true */
+      if (is_log_key(key)) {
+       return true;
+      }
+    }
+    index++;
+    iter++;
+  };
+  return false;
+}
+
+
+bool LogNode::log_less_than(std::string_view str) const
+{
+  std::string last_key = get_last_key();
+  if (is_log_key(last_key)) {
+    return last_key < str;
+  }
+  auto iter = iter_begin();
+  bool all_less = false;
+  // perform full traversal to figure out last entry < str
+  while(iter != iter_end()) {
+    std::string key = iter->get_key();
+    if (is_log_key(key)) {
+      all_less = key < str;
+    }
+    iter++;
+  };
+  return all_less;
+}
+
+bool LogNode::log_has_larger_than(std::string_view str) const
+{
+  auto iter = iter_begin();
+  // return true if the first log entry > str
+  while(iter != iter_end()) {
+    std::string key = iter->get_key();
+    if (!is_log_key(key)) {
+      iter++;
+      continue;
+    }
+    return key > str;
+  };
+  return false;
+}
+
+void LogKVNodeLayout::journal_append_remove(
+  delta_buffer_t *recorder, 
+  ceph::bufferlist bl) {
+  recorder->insert_remove(bl);
+}
+
+}
diff --git a/src/crimson/os/seastore/omap_manager/log/log_node.h b/src/crimson/os/seastore/omap_manager/log/log_node.h
new file mode 100644 (file)
index 0000000..7153516
--- /dev/null
@@ -0,0 +1,917 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "include/denc.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/transaction_manager.h"
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/omap_manager.h"
+#include "crimson/os/seastore/onode.h"
+#include <seastar/core/future.hh>
+#include <seastar/core/coroutine.hh>
+#include "crimson/common/errorator.h"
+#include "crimson/common/coroutine.h"
+#include "log_manager.h"
+
+namespace crimson::os::seastore::log_manager{
+
+struct LogKVNodeLayout;
+struct delta_t {
+  enum class op_t : uint_fast8_t {
+    APPEND,
+    REMOVE,
+    ADD_PREV,
+    ADD_DUP_ADDR,
+    INIT,
+  } op;
+  std::string key;
+  ceph::bufferlist val;
+  laddr_t prev;
+
+  DENC(delta_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.op, p);
+    denc(v.key, p);
+    denc(v.val, p);
+    denc(v.prev, p);
+    DENC_FINISH(p);
+  }
+
+  void replay(LogKVNodeLayout &l);
+};
+
+class delta_buffer_t {
+  std::vector<delta_t> buffer;
+public:
+  bool empty() const {
+    return buffer.empty();
+  }
+  void insert_append(
+    const std::string &key,
+    const ceph::bufferlist &val) {
+    buffer.push_back(
+      delta_t{
+        delta_t::op_t::APPEND,
+        key,
+        val
+      });
+  }
+  void insert_prev_addr(
+      const laddr_t l) {
+    buffer.push_back(
+      delta_t{
+       delta_t::op_t::ADD_PREV,
+       std::string(),
+       bufferlist(),
+       l
+      });
+  }
+
+  void insert_dup_tail_addr(
+      const laddr_t l) {
+    buffer.push_back(
+      delta_t{
+       delta_t::op_t::ADD_DUP_ADDR,
+       std::string(),
+       bufferlist(),
+       l
+      });
+  }
+
+  void insert_init() {
+    buffer.push_back(
+      delta_t{
+       delta_t::op_t::INIT,
+       std::string(),
+       bufferlist(),
+       L_ADDR_NULL
+      });
+  }
+
+  void insert_remove(bufferlist bl) {
+    buffer.push_back(
+      delta_t{
+       delta_t::op_t::REMOVE,
+       std::string(),
+       bl,
+       L_ADDR_NULL
+      });
+  }
+
+  void replay(LogKVNodeLayout &node) {
+    for (auto &i: buffer) {
+      i.replay(node);
+    }
+  }
+
+  void clear() {
+    buffer.clear();
+  }
+
+  std::optional<laddr_t> get_latest_dup_tail_addr() {
+    std::optional<laddr_t> l = std::nullopt;
+    for (auto it = buffer.rbegin(); it != buffer.rend(); ++it) {
+      if (it->op == delta_t::op_t::ADD_DUP_ADDR) {
+        l = it->prev;
+       return l;
+      }
+    }
+    return l;
+  }
+
+  std::optional<laddr_t> get_latest_prev_leaf() {
+    std::optional<laddr_t> l = std::nullopt;
+    for (auto it = buffer.rbegin(); it != buffer.rend(); ++it) {
+      if (it->op == delta_t::op_t::ADD_PREV) {
+        l = it->prev;
+       return l;
+      }
+
+    }
+    return l;
+  }
+
+  std::optional<bufferlist> get_latest_d_bitmap() {
+    std::optional<bufferlist> ret = std::nullopt;
+    for (auto it = buffer.rbegin(); it != buffer.rend(); ++it) {
+      if (it->op == delta_t::op_t::REMOVE) {
+       ret = it->val;
+       return ret;
+      }
+    }
+    return ret;
+  }
+
+  DENC(delta_buffer_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.buffer, p);
+    DENC_FINISH(p);
+  }
+
+};
+}
+WRITE_CLASS_DENC(crimson::os::seastore::log_manager::delta_t)
+WRITE_CLASS_DENC(crimson::os::seastore::log_manager::delta_buffer_t)
+
+namespace crimson::os::seastore::log_manager{
+
+constexpr uint32_t LOG_NODE_BLOCK_SIZE = 16384;
+
+const std::string BEGIN_KEY = "";
+const std::string END_KEY(64, (char)(-1));
+
+inline constexpr uint32_t get_log_node_block_size() {
+  return crimson::os::seastore::log_manager::LOG_NODE_BLOCK_SIZE;
+}
+
+struct LogNode;
+using LogNodeRef = TCachedExtentRef<LogNode>;
+
+struct log_key_t {
+  uint16_t key_len = 0;
+  uint16_t val_len = 0;
+
+  log_key_t() = default;
+  log_key_t(uint16_t k_len, uint16_t v_len)
+  : key_len(k_len), val_len(v_len) {}
+
+  DENC(log_key_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.key_len, p);
+    denc(v.val_len, p);
+    DENC_FINISH(p);
+  }
+};
+
+struct log_key_le_t {
+  ceph_le16 key_len{0};
+  ceph_le16 val_len{0};
+
+  log_key_le_t() = default;
+  log_key_le_t(const log_key_le_t &) = default;
+  explicit log_key_le_t(const log_key_t &key)
+    : key_len(key.key_len),
+      val_len(key.val_len) {}
+
+  log_key_le_t& operator=(log_key_t key) {
+    key_len = key.key_len;
+    val_len = key.val_len;
+    return *this;
+  }
+
+
+  operator log_key_t() const {
+    return log_key_t{uint16_t(key_len),
+                           uint16_t(val_len)};
+  }
+};
+
+// LogNode assumes that 4KiB of LogNode can contain up to 32 entries.
+// This is because each pg_log_entry has about 256 bytes, including key and value.
+// To cover such range, as a result, bitmap is introduced with uint64_t array. 
+// Note that other small entries (e.g., _epoch, _biginfo, can_rollback_info) 
+// are not updated frequently.
+constexpr uint32_t BITMAP_ARRAY_SIZE = ((LOG_NODE_BLOCK_SIZE / 4096) * 32 + 63) / 64;
+
+struct d_bitmap_t {
+  uint64_t bitmap[BITMAP_ARRAY_SIZE] = {0};
+  static constexpr size_t BITS_PER_WORD = 64;
+
+  d_bitmap_t() = default;
+  void set_bitmap(size_t bit) {
+    const size_t word = bit / BITS_PER_WORD;
+    const size_t offset = bit % BITS_PER_WORD;
+    assert(word < BITMAP_ARRAY_SIZE);
+    bitmap[word] |= (1ULL << offset);
+  }
+  void set_bitmap_range(size_t begin, size_t end) {
+    assert(begin <= end);
+    for (size_t i = begin; i <= end; i++) {
+      set_bitmap(i);
+    }
+  }
+  bool is_set(size_t bit) {
+    const size_t word = bit / BITS_PER_WORD;
+    const size_t offset = bit % BITS_PER_WORD;
+    assert(word < BITMAP_ARRAY_SIZE);
+    return (bitmap[word] & (1ULL << offset)) != 0;
+  }
+  bool is_all_set(uint64_t num) const {
+    constexpr uint64_t ALL_SET = std::numeric_limits<uint64_t>::max();
+    assert(num <= BITMAP_ARRAY_SIZE * BITS_PER_WORD);
+    const size_t full_words = num / BITS_PER_WORD;
+    const size_t rem_bits   = num % BITS_PER_WORD;
+
+    for (size_t i = 0; i < full_words; ++i) {
+      if (bitmap[i] != ALL_SET)
+       return false;
+    }
+
+    if (rem_bits != 0) {
+      const uint64_t mask =
+       (uint64_t{1} << rem_bits) - 1;
+      if ((bitmap[full_words] & mask) != mask) {
+       return false;
+      }
+    }
+    return true;
+  }
+  void init() {
+    for (uint32_t i = 0; i < BITMAP_ARRAY_SIZE; i++) {
+      bitmap[i] = 0;
+    }
+  }
+
+  DENC(d_bitmap_t, v, p) {
+    DENC_START(1, 1, p);
+    for (uint32_t i = 0; i < BITMAP_ARRAY_SIZE; i++) {
+      denc(v.bitmap[i], p);
+    }
+    DENC_FINISH(p);
+  }
+};
+
+struct d_bitmap_le_t {
+  ceph_le64 bitmap[BITMAP_ARRAY_SIZE]{};
+
+  d_bitmap_le_t() = default;
+  operator d_bitmap_t() const {
+    d_bitmap_t tmp;
+    for (uint32_t i = 0; i < BITMAP_ARRAY_SIZE; i++) {
+      tmp.bitmap[i] = uint64_t(bitmap[i]);
+    }
+    return tmp;
+  }
+  d_bitmap_le_t& operator=(d_bitmap_t &_bitmap) {
+    for (uint32_t i = 0; i < BITMAP_ARRAY_SIZE; i++) {
+      bitmap[i] = _bitmap.bitmap[i];
+    }
+    return *this;
+  }
+};
+
+/**
+ * LogKVNodeLayout
+ *
+ *  [ num_keys ][ prev pointer ][ last_pos ][ d_bitmap ][ key entry #1 ][ value #1 ] ...
+ *
+ *  - num_keys:
+ *      Total number of key-value pairs stored in this node.
+ *
+ *  - prev pointer (laddr):
+ *      The location of the prev node (logical address).
+ *      Set to NULL if this is the last node.
+ *
+ *  - last_pos:
+ *      The offset position where the last key-value data ends in this node.
+ *
+ *  - d_bitmap:
+ *      bitmap to keep track of deleted entries.
+ *
+ *  - key entry:
+ *      Format: [ key_len ][ val_len ]
+ *        - key_len:  Length of the key in bytes.
+ *        - val_len:  Length of the value in bytes.
+ *  - val entry:
+ *     Format: [ key_buf ][ val_buf ]
+ *        - key_buf:  Raw key data.
+ *        - val_buf:  Raw value data.
+ *
+ */
+
+class LogKVNodeLayout {
+  using LogKVNodeLayoutRef = boost::intrusive_ptr<LogKVNodeLayout>;
+  char *buf;
+  extent_len_t len = 0;
+
+  uint32_t reserved_len = 0;
+  uint32_t reserved_size = 0;
+  using L = absl::container_internal::Layout<ceph_le32, laddr_le_t, ceph_le32, d_bitmap_le_t, laddr_le_t, log_key_le_t>;
+  static constexpr L layout{1, 1, 1, 1, 1, 1};
+public:
+  template <bool is_const>
+  class iter_t {
+    friend class LogKVNodeLayout;
+    using parent_t = typename crimson::common::maybe_const_t<LogKVNodeLayout, is_const>::type;
+
+    parent_t node;
+    uint32_t pos;
+
+    iter_t(
+      parent_t parent,
+      uint32_t pos) : node(parent), pos(pos) {}
+
+  public:
+    iter_t(const iter_t &) = default;
+    iter_t(iter_t &&) = default;
+    iter_t &operator=(const iter_t &) = default;
+    iter_t &operator=(iter_t &&) = default;
+
+    operator iter_t<!is_const>() const {
+      static_assert(!is_const);
+      return iter_t<!is_const>(node, pos);
+    }
+
+    iter_t &operator*() { return *this; }
+    iter_t *operator->() { return this; }
+
+    iter_t operator++(int) {
+      auto ret = *this;
+      auto last = get_node_key();
+      auto new_pos = node->get_size() == 0 ? 0 :
+       pos + node->get_entry_size(last.key_len, last.val_len);
+      pos = new_pos;
+      return ret;
+    }
+
+    iter_t &operator++() {
+      auto last = get_node_key();
+      auto new_pos = node->get_size() == 0 ? 0 :
+       pos + node->get_entry_size(last.key_len, last.val_len);
+      pos = new_pos;
+      return *this;
+    }
+
+    bool operator==(const iter_t &rhs) const {
+      assert(node == rhs.node);
+      return rhs.pos == pos;
+    }
+
+    bool operator!=(const iter_t &rhs) const {
+      assert(node == rhs.node);
+      return pos != rhs.pos;
+    }
+
+  private:
+    log_key_t get_node_key() const {
+      log_key_le_t kint = *((log_key_le_t*)get_node_key_ptr());
+      return log_key_t(kint);
+    }
+    auto get_node_key_ptr() const {
+      return reinterpret_cast<
+       typename crimson::common::maybe_const_t<char, is_const>::type>(
+         node->get_node_key_ptr()) + pos;
+    }
+
+    uint32_t get_node_val_offset() const {
+      return get_node_key().key_off;
+    }
+    auto get_node_val_ptr() const {
+      return get_node_key_ptr() + sizeof(log_key_t);
+    }
+
+    void set_node_key(log_key_t _lb) {
+      static_assert(!is_const);
+      log_key_le_t lb;
+      lb = _lb;
+      *((log_key_le_t*)get_node_key_ptr()) = lb;
+    }
+
+    void set_node_val(const std::string &key, const ceph::bufferlist &val) {
+      static_assert(!is_const);
+      auto node_key = get_node_key();
+      assert(key.size() == node_key.key_len);
+      assert(val.length() == node_key.val_len);
+      ::memcpy(get_node_val_ptr(), key.data(), key.size());
+      auto bliter = val.begin();
+      bliter.copy(node_key.val_len, get_node_val_ptr() + node_key.key_len);
+    }
+
+  public:
+    std::string get_key() const {
+      return std::string(
+       get_node_val_ptr(),
+       get_node_key().key_len);
+    }
+
+    ceph::bufferlist get_val() const {
+      auto node_key = get_node_key();
+      ceph::bufferlist bl;
+      ceph::bufferptr bptr(
+       get_node_val_ptr() + node_key.key_len,
+       get_node_key().val_len);
+      bl.append(bptr);
+      return bl;
+    }
+  };
+  
+  using const_iterator = iter_t<true>;
+  using iterator = iter_t<false>;
+
+  uint32_t get_size() const {
+    ceph_le32 &size = *layout.template Pointer<0>(buf);
+    return uint32_t(size);
+  }
+
+  laddr_t get_dup_tail() const {
+    laddr_le_t &dup_tail = *layout.template Pointer<4>(buf);
+    return laddr_t(dup_tail);
+  }
+
+  laddr_t get_prev() const {
+    laddr_le_t &prev = *layout.template Pointer<1>(buf);
+    return laddr_t(prev);
+  }
+
+  ceph_le32 *get_size_ptr() {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<0>(buf);
+  }
+  laddr_le_t *get_node_addr_ptr() {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<1>(buf);
+  }
+  ceph_le32 *get_last_pos_ptr() {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<2>(buf);
+  }
+  d_bitmap_le_t *get_d_bitmap_ptr() {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<3>(buf);
+  }
+  laddr_le_t *get_dup_tail_addr_ptr() {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<4>(buf);
+  }
+  log_key_le_t *get_node_key_ptr() {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<5>(buf);
+  }
+  const log_key_le_t *get_node_key_ptr() const {
+    return L::Partial(1, 1, 1, 1, 1).template Pointer<5>(buf);
+  }
+
+  uint32_t get_start_off() const {
+    return layout.Offset<5>();
+  }
+
+  const_iterator iter_rbegin() const {
+    return const_iterator(this, get_last_pos());
+  }
+  const_iterator iter_end() const {
+    const_iterator prev_iter(this, get_last_pos());
+    auto last = prev_iter->get_node_key();
+    return const_iterator(this, get_size() == 0 ? get_last_pos() :
+      get_last_pos() + get_entry_size(last.key_len, last.val_len));
+  }
+
+  iterator iter_begin() {
+    return iterator(
+       this,
+       0);
+  }
+
+  const_iterator iter_begin() const {
+    return iter_cbegin();
+  }
+
+  const_iterator iter_cbegin() const {
+    return const_iterator(
+       this,
+       0);
+  }
+
+  iterator iter_end() {
+    iterator prev_iter(this, get_last_pos());
+    auto last = prev_iter->get_node_key();
+    return iterator(this, get_size() == 0 ? get_last_pos() :
+      get_last_pos() + get_entry_size(last.key_len, last.val_len));
+  }
+
+public:
+  LogKVNodeLayout() : buf(nullptr) {}
+
+  void set_layout_buf(char *_buf, extent_len_t _len) {
+    assert(_len > 0);
+    assert(buf == nullptr);
+    assert(_buf != nullptr);
+    buf = _buf;
+    len = _len;
+  }
+
+  void set_prev_node(laddr_t laddr) {
+    laddr_le_t l;
+    l = laddr;
+    *get_node_addr_ptr() = l;
+  }
+
+  void set_dup_tail(laddr_t laddr) {
+    laddr_le_t l;
+    l = laddr;
+    *get_dup_tail_addr_ptr() = l;
+  }
+
+  void set_size(uint32_t size) {
+    ceph_le32 v(size);
+    *get_size_ptr() = v;
+  }
+
+  void set_last_pos(uint32_t pos) {
+    ceph_assert(pos <= LOG_NODE_BLOCK_SIZE);
+    ceph_le32 p;
+    p = pos;
+    *layout.template Pointer<2>(buf) = p;
+  }
+
+  uint32_t get_last_pos() const {
+    ceph_le32 &pos = *layout.template Pointer<2>(buf);
+    return uint32_t(pos);
+  }
+
+  d_bitmap_t get_d_bitmap() {
+    d_bitmap_le_t &bitmap = *get_d_bitmap_ptr();
+    return d_bitmap_t(bitmap);
+  }
+
+  void _set_d_bitmap(d_bitmap_t &_bitmap) {
+    d_bitmap_le_t bitmap;
+    bitmap = _bitmap;
+    *get_d_bitmap_ptr() = bitmap;
+  }
+
+  void set_d_bitmap(size_t begin, size_t end) {
+    auto bitmap = get_d_bitmap();
+    bitmap.set_bitmap_range(begin, end);
+    _set_d_bitmap(bitmap);
+  }
+
+  void init_bitmap() {
+    d_bitmap_t bitmap;
+    bitmap.init();
+    _set_d_bitmap(bitmap);
+  }
+
+  void set_reserved_len(const uint32_t len) {
+    reserved_len = len;
+  }
+
+  uint32_t get_reserved_len() const {
+    return reserved_len;
+  }
+
+  void set_reserved_size(const uint32_t size) {
+    reserved_size = size;
+  }
+
+  uint32_t get_reserved_size() const {
+    return reserved_size;
+  }
+
+  uint16_t get_entry_size(size_t ksize, size_t vsize) const {
+    return (sizeof(log_key_le_t) + ksize + vsize);
+  }
+
+  uint32_t free_space() const {
+    assert(capacity() >= used_space());
+    return capacity() - used_space();
+  }
+
+  uint32_t capacity() const {
+    return len
+      - (reinterpret_cast<char*>(layout.template Pointer<5>(buf))
+      - reinterpret_cast<char*>(layout.template Pointer<0>(buf)));
+  }
+
+  uint32_t used_space() const {
+    if (get_size() == 0) {
+      return 0;
+    }
+    const_iterator iter(this, get_last_pos());
+    auto k = iter->get_node_key();
+    return get_last_pos() + get_entry_size(k.key_len, k.val_len);
+  }
+
+  void _append(const std::string &key, const ceph::bufferlist &val) {
+    iterator prev_iter(this, get_last_pos());
+    auto last = prev_iter->get_node_key();
+    iterator next_iter(this, get_size() == 0 ? get_last_pos() :
+      get_last_pos() + get_entry_size(last.key_len, last.val_len));
+    next_iter.set_node_key(log_key_t(key.size(), val.length()));
+    next_iter.set_node_val(key, val);
+    if (get_size() >= 1) {
+      set_last_pos(get_last_pos() + get_entry_size(last.key_len, last.val_len));
+    }
+    set_size(get_size() + 1);
+  }
+
+
+  void journal_append(
+    const std::string &key,
+    const ceph::bufferlist &val,
+    delta_buffer_t *recorder) {
+    recorder->insert_append(key, val);
+    reserved_len += this->get_entry_size(key.size(), val.length());
+    reserved_size += 1;
+  }
+
+  void journal_append_prev_addr(
+    const laddr_t l,
+    delta_buffer_t *recorder) {
+    recorder->insert_prev_addr(l);
+  }
+
+  void journal_append_dup_tail_addr(
+    const laddr_t l,
+    delta_buffer_t *recorder) {
+    recorder->insert_dup_tail_addr(l);
+  }
+
+  void journal_append_init(
+    delta_buffer_t *recorder) {
+    recorder->insert_init();
+  }
+
+  void journal_append_remove(delta_buffer_t *recorder, ceph::bufferlist bl);
+
+  void append(
+    const std::string &key,
+    const ceph::bufferlist &val) {
+    _append(key, val);
+  }
+
+  void init_vars() {
+    init_bitmap();
+    set_last_pos(0); 
+    set_size(0);
+    set_prev_node(L_ADDR_NULL);
+    set_dup_tail(L_ADDR_NULL);
+    set_reserved_len(0);
+    set_reserved_size(0);
+    
+  }
+
+  bool expect_overflow(size_t ksize, size_t vsize) const {
+    return free_space() < get_entry_size(ksize, vsize) + reserved_len;
+  }
+
+  std::string get_last_key() const {
+    const_iterator iter(this, get_last_pos());
+    return iter->get_key();
+  }
+};
+
+struct LogNode 
+  : LogicalChildNode,
+    LogKVNodeLayout {
+  static constexpr extent_types_t TYPE = extent_types_t::LOG_NODE;
+  explicit LogNode(ceph::bufferptr &&ptr) : LogicalChildNode(std::move(ptr)) {
+    set_layout_buf(this->get_bptr().c_str(), this->get_bptr().length());
+    set_prev_node(L_ADDR_NULL);
+    set_dup_tail(L_ADDR_NULL);
+  }
+  explicit LogNode(extent_len_t length) : LogicalChildNode(length) {}
+
+  LogNode(const LogNode &rhs)
+    : LogicalChildNode(rhs, share_buffer_t()) {
+    set_layout_buf(this->get_bptr().c_str(), this->get_bptr().length());
+    set_last_pos(*get_last_pos_ptr()); // shared buf
+    set_size(get_size());
+    set_reserved_len(rhs.get_reserved_len());
+    set_reserved_size(rhs.get_reserved_size());
+    set_dup_tail(rhs.get_dup_tail_addr());
+  }
+  ~LogNode() {}
+
+  CachedExtentRef duplicate_for_write(Transaction&) final {
+    assert(delta_buffer.empty());
+    return CachedExtentRef(new LogNode(*this));
+  }
+
+  crimson::os::seastore::extent_types_t get_type() const {
+    return extent_types_t::LOG_NODE;
+  }
+
+  ceph::bufferlist get_delta() {
+    ceph::bufferlist bl;
+    if (!delta_buffer.empty()) {
+      encode(delta_buffer, bl);
+    }
+    return bl;
+  }
+
+  void apply_delta(const ceph::bufferlist &bl) {
+    assert(bl.length());
+    delta_buffer_t buffer;
+    auto bptr = bl.cbegin();
+    decode(buffer, bptr);
+    buffer.replay(*this);
+  }
+
+  mutable delta_buffer_t delta_buffer;
+  delta_buffer_t *maybe_get_delta_buffer() {
+    return is_mutation_pending() ? &delta_buffer : nullptr;
+  }
+
+  void append_kv(Transaction &t, const std::string &key,
+    const ceph::bufferlist &val);
+
+  /*
+   *
+   * set laddr directly if LogNode is not mutating
+   * add laddr to delta_buffer if LogNode is mutating
+   *
+   */
+  void set_prev_addr(laddr_t l);
+
+  void set_init_vars();
+
+  using get_value_ret = OMapManager::omap_get_value_ret;
+  get_value_ret get_value(const std::string &key);
+
+  void set_dup_tail_addr(laddr_t laddr);
+
+  void append_remove(ceph::bufferlist bl);
+
+  // Remove all matching keys in LogNode
+  bool remove_entry(const std::string key);
+
+  void set_cur_bitmap(uint32_t begin, uint32_t end);
+  d_bitmap_t get_cur_bitmap();
+  void set_bitmap(d_bitmap_t map);
+
+  // start and end should exist in the node
+  std::optional<std::string> remove_entries(std::optional<std::string> start,
+    std::optional<std::string> end)
+  {
+    std::string_view s(*start);
+    std::string_view e(*end);
+    if (s == e) {
+      if (remove_entry(*start)) {
+       return *start;
+      }
+      return std::nullopt;
+    }
+
+    auto iter = iter_begin();
+
+    uint32_t index = 0;
+    bool remove = false;
+    std::string last;
+    d_bitmap_t map = get_cur_bitmap();
+    while(iter != iter_end()) {
+      auto key = iter->get_key();
+      if (s <= key && key <= e) {
+       map.set_bitmap(index);
+       remove = true;
+       last = key;
+      }
+      index++;
+      iter++;
+    };
+    if (remove) {
+      set_bitmap(map);
+    }
+    return last;
+  }
+
+  bool is_removable();
+
+  bool log_has_larger_than(std::string_view str) const;
+
+  bool log_less_than(std::string_view str) const;
+
+  enum class range_t : uint8_t {
+    HAS_BETWEEN,
+    NO_BETWEEN,
+  };
+
+  range_t has_between(std::optional<std::string> start,
+    std::optional<std::string> end) {
+    std::string_view s(*start);
+    std::string_view e(*end);
+    auto iter = iter_begin();
+    while(iter != iter_end()) {
+      std::string k = iter->get_key();
+      if (k <= e && k >= s) {
+       return range_t::HAS_BETWEEN;
+      } 
+      iter++;
+    };
+    return range_t::NO_BETWEEN;
+  }
+
+  template <typename F>
+  void for_each_live_entry(F&& fn);
+
+  void list(const std::optional<std::string> &first,
+    const std::optional<std::string> &last,
+    std::map<std::string, bufferlist> &kvs);
+
+  std::ostream &print_detail_l(std::ostream &out) const final;
+
+  laddr_t get_dup_tail_addr() const {
+    if (is_mutation_pending() || is_exist_mutation_pending()) {
+      if (!delta_buffer.empty()) {
+       auto ret = delta_buffer.get_latest_dup_tail_addr();
+       if (ret) {
+         return *ret;
+       }
+      }
+    }
+    return this->get_dup_tail();
+  }
+
+  laddr_t get_prev_addr() const {
+    if (is_mutation_pending() || is_exist_mutation_pending()) {
+      if (!delta_buffer.empty()) {
+       auto ret = delta_buffer.get_latest_prev_leaf();
+       if (ret) {
+         return *ret;
+       }
+      }
+    }
+    return this->get_prev();
+  }
+
+  uint32_t use_space() const {
+    return this->used_space();
+  }
+
+  uint32_t get_capacity() const {
+    return this->capacity();
+  }
+
+  void update_delta() {
+    if (!delta_buffer.empty()) {
+      delta_buffer.replay(*this);
+      delta_buffer.clear();
+    }
+  }
+
+  void logical_on_delta_write() final {
+    update_delta();
+    set_reserved_len(0);
+    set_reserved_size(0);
+  }
+
+  // TODO: consistent view in a transaction
+  void prepare_commit() final {
+    if (is_mutation_pending() || is_exist_mutation_pending()) {
+      ceph_assert(!delta_buffer.empty());
+      update_delta();
+    } else {
+      assert(delta_buffer.empty());
+    }
+  }
+
+  void on_fully_loaded() final {
+    this->set_layout_buf(this->get_bptr().c_str(), this->get_bptr().length());
+  }
+
+  void init_range(std::string _begin, std::string _end) {
+    assert(begin.empty());
+    assert(end.empty());
+    begin = std::move(_begin);
+    end = std::move(_end);
+  }
+
+  std::string begin;
+  std::string end;
+};
+
+}
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::log_manager::log_key_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::log_manager::d_bitmap_t)
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::os::seastore::log_manager::LogNode> : fmt::ostream_formatter {};
+#endif
+
index e2ea7b98df1e0a6cdabb7e1521e777fd06eda36b..9b0c41f636db9ddc1b70343fe43c982f7d22a2e5 100644 (file)
@@ -31,6 +31,7 @@
 #include "crimson/os/seastore/omap_manager/btree/btree_omap_manager.h"
 #include "crimson/os/seastore/onode_manager.h"
 #include "crimson/os/seastore/object_data_handler.h"
+#include "crimson/os/seastore/omap_manager/log/log_manager.h"
 
 using crimson::common::local_conf;
 
@@ -1789,10 +1790,11 @@ SeaStore::Shard::_do_transaction_step(
        if (op->hint & CEPH_OSD_ALLOC_HINT_FLAG_LOG) {
          ceph_assert(get_omap_root(omap_type_t::LOG, *onode).is_null());
          ceph_assert(get_omap_root(omap_type_t::OMAP, *onode).is_null());
-         // BtreeOMapManager doesn't need a do_with yet.
-         auto mgr = BtreeOMapManager(*transaction_manager);
-         return omaptree_initialize(
-           *ctx.transaction, mgr, omap_type_t::LOG, *onode, *device
+         auto mgr = crimson::os::seastore::log_manager::LogManager(*transaction_manager);
+         return mgr.initialize_omap(
+           *ctx.transaction, 
+           onode->get_metadata_hint(device->get_block_size()),
+           omap_type_t::LOG
          ).si_then([&onode, &ctx](auto new_root) {
            onode->update_omap_root(*ctx.transaction, new_root);
          });
index 618ab2d94fa3a47048afeac27b8ccc3cec598c65..b63aba12ccc697495028072950370b6232a8eb2c 100644 (file)
@@ -270,6 +270,8 @@ std::ostream &operator<<(std::ostream &out, extent_types_t t)
     return out << "BACKREF_INTERNAL";
   case extent_types_t::BACKREF_LEAF:
     return out << "BACKREF_LEAF";
+  case extent_types_t::LOG_NODE:
+    return out << "LOG_NODE";
   case extent_types_t::NONE:
     return out << "NONE";
   default:
index 91197c040d7b2dab427b52f3224e3d6be6473cb4..268befbec64a8c496a9179d936c2b0288ebb163c 100644 (file)
@@ -1480,8 +1480,9 @@ enum class extent_types_t : uint8_t {
   TEST_BLOCK_PHYSICAL = 14,
   BACKREF_INTERNAL = 15,
   BACKREF_LEAF = 16,
+  LOG_NODE = 17,
   // None and the number of valid extent_types_t
-  NONE = 17,
+  NONE = 18,
 };
 using extent_types_le_t = uint8_t;
 constexpr auto EXTENT_TYPES_MAX = static_cast<uint8_t>(extent_types_t::NONE);
@@ -1496,14 +1497,16 @@ constexpr bool is_data_type(extent_types_t type) {
 }
 
 constexpr bool is_logical_metadata_type(extent_types_t type) {
-  return type >= extent_types_t::ROOT_META &&
-         type <= extent_types_t::COLL_BLOCK;
+  return (type >= extent_types_t::ROOT_META &&
+         type <= extent_types_t::COLL_BLOCK) ||
+        type == extent_types_t::LOG_NODE;
 }
 
 constexpr bool is_logical_type(extent_types_t type) {
   if ((type >= extent_types_t::ROOT_META &&
        type <= extent_types_t::OBJECT_DATA_BLOCK) ||
-      type == extent_types_t::TEST_BLOCK) {
+      type == extent_types_t::TEST_BLOCK ||
+      type == extent_types_t::LOG_NODE) {
     assert(is_logical_metadata_type(type) ||
            is_data_type(type));
     return true;
@@ -1557,7 +1560,8 @@ constexpr bool is_backref_mapped_type(extent_types_t type) {
   if ((type >= extent_types_t::LADDR_INTERNAL &&
        type <= extent_types_t::OBJECT_DATA_BLOCK) ||
       type == extent_types_t::TEST_BLOCK ||
-      type == extent_types_t::TEST_BLOCK_PHYSICAL) {
+      type == extent_types_t::TEST_BLOCK_PHYSICAL ||
+      type == extent_types_t::LOG_NODE) {
     assert(is_logical_type(type) ||
           is_lba_node(type) ||
           type == extent_types_t::TEST_BLOCK_PHYSICAL);
@@ -1573,7 +1577,7 @@ constexpr bool is_backref_mapped_type(extent_types_t type) {
 constexpr bool is_real_type(extent_types_t type) {
   if (type <= extent_types_t::OBJECT_DATA_BLOCK ||
       (type >= extent_types_t::TEST_BLOCK &&
-       type <= extent_types_t::BACKREF_LEAF)) {
+       type <= extent_types_t::LOG_NODE)) {
     assert(is_logical_type(type) ||
            is_physical_type(type));
     return true;