auto ext = co_await log_load_extent<LogNode>(
t, log_root.addr, BEGIN_KEY, END_KEY);
ceph_assert(ext);
- std::pair<std::string, ceph::bufferlist> ow_kv;
- // To prevent missing remove_kv even when overwritten is not done
- bool ow_done = false;
auto resync_node = [&](LogNodeRef e)
-> log_load_extent_iertr::future<CachedExtentRef> {
CachedExtentRef node;
ceph_assert(node);
co_return std::move(node);
};
- auto f = [&](const std::string &k, const bufferlist &v, bool has_ow_key)
+ auto f = [&](const std::string &k, const bufferlist &v)
-> omap_set_key_ret {
CachedExtentRef node = co_await resync_node(ext);
LogNodeRef log_node = node->template cast<LogNode>();
- bool can_ow = has_ow_key && log_node->can_ow();
- if (can_ow) {
- ow_done = true;
- }
// If multiple blocks are needed to store the kv pair
if (log_node->get_max_val_length(k.size()) < v.length()) {
co_await _log_set_multi_block_key(log_root, t, log_node, k, v);
co_return;
}
- co_await _log_set_key(log_root, t, log_node, k, v, can_ow);
+ co_await _log_set_key(log_root, t, log_node, k, v);
co_return;
};
+ auto alloc_log_node = [&](laddr_t prev_laddr)
+ -> omap_set_key_iertr::future<LogNodeRef> {
+ return tm.alloc_non_data_extent<LogNode>(
+ t, log_root.hint, LOG_NODE_BLOCK_SIZE
+ ).handle_error_interruptible(
+ crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+ omap_set_key_iertr::pass_further{}
+ ).si_then([prev_laddr](auto ext) {
+ assert(ext);
+ ext->set_prev_addr(prev_laddr);
+ return omap_set_key_iertr::make_ready_future<LogNodeRef>(ext);
+ });
+ };
/*
+ * Fast path:
* During a normal write transaction, pgmeta_oid receives two key–value pairs:
* _fastinfo and pg_log_entry. Unlike pg_log_entry, _fastinfo is likely to be
* overwritten in the near future. Storing _fastinfo in an append-only manner
*/
bool has_ow_key = false;
if (kvs.size() == OW_SIZE) {
- for (auto &p : kvs) {
- if (is_ow_key(p.first)) {
- ow_kv.first = p.first;
- ow_kv.second = p.second;
+ std::vector<std::pair<std::string, ceph::bufferlist>> kvs_for_ow;
+ kvs_for_ow.reserve(2);
+ for (const auto& [k, v] : kvs) {
+ if (is_log_key(k)) {
+ kvs_for_ow.insert(kvs_for_ow.begin(), {k, v});
+ } else if (is_ow_key(k)) {
+ kvs_for_ow.push_back({k, v});
has_ow_key = true;
- break;
}
}
+ LogNodeRef cur = ext;
+ if (has_ow_key) {
+ if (!cur->can_ow()) {
+ co_await remove_kv(t, log_root.addr, get_ow_key(), nullptr);
+ CachedExtentRef node = co_await resync_node(cur);
+ cur = node->template cast<LogNode>();
+ }
+ for (auto &p : kvs_for_ow) {
+ if (cur->get_max_val_length(p.first.size()) < p.second.length()) {
+ co_await _log_set_multi_block_key(log_root, t, cur, p.first, p.second);
+ cur = co_await log_load_extent<LogNode>(
+ t, log_root.addr, BEGIN_KEY, END_KEY);
+ continue;
+ }
+ if (cur->expect_overflow(p.first, p.second.length(),
+ !is_ow_key(p.first) ? cur->can_ow() : false)) {
+ // This means the first entry of the new LogNode is not _fastinfo
+ if (!is_ow_key(p.first)) {
+ // remove _fastinfo in old LogNode
+ auto e = co_await cur->get_value(p.first, LogNode::copy_t::SHALLOW);
+ if (e != std::nullopt) {
+ auto mut = tm.get_mutable_extent(t, cur)->template cast<LogNode>();
+ mut->remove_entry(get_ow_key());
+ }
+ }
+ laddr_t dup_addr = cur->get_dup_tail_addr();
+ cur = co_await alloc_log_node(cur->get_laddr());
+ cur->set_dup_tail_addr(dup_addr);
+ log_root.update(cur->get_laddr(), log_root.depth,
+ log_root.hint, log_root.type);
+ }
+ if (cur->is_initial_pending()) {
+ cur->append_kv(t, p.first, p.second);
+ } else {
+ auto mut = tm.get_mutable_extent(t, cur)->cast<LogNode>();
+ if (cur->can_ow() && is_log_key(p.first)) {
+ mut->overwrite_kv(t, p.first, p.second);
+ } else {
+ mut->append_kv(t, p.first, p.second);
+ }
+ cur = mut;
+ }
+ }
+ co_return;
+ }
}
std::map<std::string, ceph::bufferlist> dup_kvs;
if (kvs.size() > BATCH_CREATE_SIZE) {
- auto alloc_log_node = [&](laddr_t prev_laddr)
- -> omap_set_key_iertr::future<LogNodeRef> {
- return tm.alloc_non_data_extent<LogNode>(
- t, log_root.hint, LOG_NODE_BLOCK_SIZE
- ).handle_error_interruptible(
- crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
- omap_set_key_iertr::pass_further{}
- ).si_then([prev_laddr](auto ext) {
- assert(ext);
- ext->set_prev_addr(prev_laddr);
- return omap_set_key_iertr::make_ready_future<LogNodeRef>(ext);
- });
- };
-
LogNodeRef e = co_await alloc_log_node(ext->get_laddr());
LogNodeRef dup_e = co_await alloc_log_node(
co_await get_dup_addr_from_root(t, ext->get_laddr()));
}
for (auto &p : kvs) {
- if (is_ow_key(p.first) && has_ow_key) {
- continue;
- }
if (is_dup_log_key(p.first)) {
dup_kvs[p.first] = p.second;
continue;
co_await remove_kv(t, log_root.addr, p.first, nullptr);
}
laddr_t last_addr = log_root.addr;
- co_await f(p.first, p.second, has_ow_key);
+ co_await f(p.first, p.second);
if (last_addr != log_root.addr) {
ext = co_await log_load_extent<LogNode>(
t, log_root.addr, BEGIN_KEY, END_KEY);
}
}
- if (!ow_kv.first.empty()) {
- if (!ow_done) {
- co_await remove_kv(t, log_root.addr, ow_kv.first, nullptr);
- }
- co_await f(ow_kv.first, ow_kv.second, has_ow_key);
- }
-
-
if (!dup_kvs.empty()) {
laddr_t last_addr = co_await get_dup_addr_from_root(t, log_root.addr);
ext = co_await log_load_extent<LogNode>(t, last_addr, BEGIN_KEY, END_KEY);
for (auto &p: dup_kvs) {
- co_await f(p.first, p.second, false);
+ co_await f(p.first, p.second);
if (&p != &*dup_kvs.rbegin()) {
laddr_t current_addr = co_await get_dup_addr_from_root(t, log_root.addr);
if (last_addr != current_addr) {
LogManager::omap_set_key_ret
LogManager::_log_set_key(omap_root_t &log_root,
Transaction &t, LogNodeRef tail,
- const std::string &key, const ceph::bufferlist &value, bool can_ow)
+ const std::string &key, const ceph::bufferlist &value)
{
LOG_PREFIX(LogManager::_log_set_key);
DEBUGT("enter key={}", t, key);
assert(tail);
- if (!tail->expect_overflow(key, value.length(), can_ow)) {
+ if (!tail->expect_overflow(key.size(), value.length())) {
auto mut = tm.get_mutable_extent(t, tail)->cast<LogNode>();
- if (can_ow) {
- mut->overwrite_kv(t, key, value);
- } else {
- mut->append_kv(t, key, value);
- }
+ mut->append_kv(t, key, value);
co_return;
}
- // This means the first entry of the new LogNode is not _fastinfo
- if (!is_ow_key(key) && can_ow) {
- // remove _fastinfo in old LogNode
- auto e = co_await tail->get_value(key, LogNode::copy_t::SHALLOW);
- if (e != std::nullopt) {
- auto mut = tm.get_mutable_extent(t, tail)->template cast<LogNode>();
- mut->remove_entry(get_ow_key());
- }
- }
-
auto extent = co_await tm.alloc_non_data_extent<LogNode>(
t, log_root.hint, LOG_NODE_BLOCK_SIZE
).handle_error_interruptible(