]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/object_data_handler: LBACursor based overwrite
authorXuehan Xu <xuxuehan@qianxin.com>
Fri, 21 Mar 2025 02:58:27 +0000 (10:58 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Tue, 5 Aug 2025 06:33:59 +0000 (14:33 +0800)
This should avoid unnecessary lba tree searches in the old
implementation of ObjectDataHandler::overwrite()

Overwrites of ObjectDataBlocks are dealt with by first punching holes
in the lba tree and then inserting new extents in the holes.

Specifically, overwrites are classified into two categories:
1. the range of the overwrite falls in a single lba mapping;
2. the range of the overwrite crosses multiple lba mappings.

For the first category, ObjectDataHandler processes the overwrites in
the following way:
1. if the mapping is a pending one (corresponds to a pending extent),
   merge the overwrite with the data of the pending extent;
2. otherwise, if the overwrite can, apply it with delta based
   overwrites;
3. otherwise, punch a hole in the mapping, insert a new extent with the
   data of the overwrite.

For the second category, the overwrite is processed as follows:
1. if the left boundary of the overwrite is inside an existing mapping,
   deal with the mapping in a way similar to the single-mapping
   overwrites;
2. remove all lba mappings that are strictly within the range of the
   overwrite;
3. deal with the right boundary of the overwrite in the same way as the
   left boundary.

Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/os/seastore/lba_mapping.h
src/crimson/os/seastore/object_data_handler.cc
src/crimson/os/seastore/object_data_handler.h
src/crimson/os/seastore/seastore_types.h
src/crimson/os/seastore/transaction_manager.h
src/test/crimson/seastore/test_object_data_handler.cc
src/test/crimson/seastore/test_transaction_manager.cc

index ad099e6e94dfd7dcf1b6a19e0c6cca335bb18a05..aa3b0306a93bd6d06e811431df8bf1f51579e162 100644 (file)
@@ -46,6 +46,11 @@ public:
   LBAMapping &operator=(LBAMapping &&) = default;
   ~LBAMapping() = default;
 
+  // whether the mapping corresponds to a pending extent
+  bool is_pending() const {
+    return !is_indirect() && !is_data_stable();
+  }
+
   bool is_linked_direct() const {
     return (bool)direct_cursor;
   }
index 782e0b764a1535de540c6ad165431bf985bd5050..8b5c43f65fc42e436e2e4573bc573a73629487d6 100644 (file)
@@ -7,7 +7,6 @@
 #include "crimson/common/log.h"
 
 #include "crimson/os/seastore/object_data_handler.h"
-#include "crimson/os/seastore/laddr_interval_set.h"
 
 namespace {
   seastar::logger& logger() {
@@ -1055,7 +1054,8 @@ auto with_objects_data(
     });
 }
 
-ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
+ObjectDataHandler::write_iertr::future<std::optional<LBAMapping>>
+ObjectDataHandler::prepare_data_reservation(
   context_t ctx,
   object_data_t &object_data,
   extent_len_t size)
@@ -1068,7 +1068,7 @@ ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
            ctx.t,
            object_data.get_reserved_data_base(),
            object_data.get_reserved_data_len());
-    return write_iertr::now();
+    return write_iertr::make_ready_future<std::optional<LBAMapping>>();
   } else {
     DEBUGT("reserving: {}~0x{:x}",
            ctx.t,
@@ -1083,7 +1083,7 @@ ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
       object_data.update_reserved(
        pin.get_key(),
        pin.get_length());
-      return write_iertr::now();
+      return std::make_optional<LBAMapping>(std::move(pin));
     }).handle_error_interruptible(
       crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
       write_iertr::pass_further{}
@@ -1091,301 +1091,912 @@ ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
   }
 }
 
-ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
-  context_t ctx, object_data_t &object_data, extent_len_t size)
+ObjectDataHandler::read_iertr::future<std::optional<bufferlist>> read_mapping(
+  ObjectDataHandler::context_t ctx,
+  LBAMapping read_pos,
+  extent_len_t unaligned_offset,
+  extent_len_t unaligned_len,
+  bool for_zero /* whether this is for zero overwrite*/)
 {
-  ceph_assert(!object_data.is_null());
-  ceph_assert(size <= object_data.get_reserved_data_len());
-  return seastar::do_with(
-    lba_mapping_list_t(),
-    extent_to_write_list_t(),
-    [ctx, size, &object_data, this](auto &pins, auto &to_write) {
-      LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
-      auto data_base = object_data.get_reserved_data_base();
-      auto data_len = object_data.get_reserved_data_len();
-      DEBUGT("object_data: {}~0x{:x}", ctx.t, data_base, data_len);
-      laddr_t aligned_start = (data_base + size).get_aligned_laddr();
-      loffset_t aligned_length =
-         data_len - aligned_start.get_byte_distance<loffset_t>(data_base);
-      return ctx.tm.get_pins(
-       ctx.t, aligned_start, aligned_length
-      ).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
-       _pins.swap(pins);
-       ceph_assert(pins.size());
-       if (!size) {
-         // no need to reserve region if we are truncating the object's
-         // size to 0
-         return clear_iertr::now();
-       }
-       auto &pin = pins.front();
-       ceph_assert(pin.get_key() >= object_data.get_reserved_data_base());
-       ceph_assert(
-         pin.get_key() <= object_data.get_reserved_data_base() + size);
-       auto pin_offset = pin.get_key().template get_byte_distance<extent_len_t>(
-         object_data.get_reserved_data_base());
-       if ((pin.get_key() == (object_data.get_reserved_data_base() + size)) ||
-         (pin.get_val().is_zero())) {
-         /* First pin is exactly at the boundary or is a zero pin.  Either way,
-          * remove all pins and add a single zero pin to the end. */
-         to_write.push_back(extent_to_write_t::create_zero(
-           pin.get_key(),
-           object_data.get_reserved_data_len() - pin_offset));
-         return clear_iertr::now();
-       } else {
-         /* First pin overlaps the boundary and has data, remap it
-          * if aligned or rewrite it if not aligned to size */
-          auto roundup_size = p2roundup(size, ctx.tm.get_block_size());
-          auto append_len = roundup_size - size;
-          if (append_len == 0) {
-            LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
-            TRACET("First pin overlaps the boundary and has aligned data"
-              "create existing at addr:{}, len:0x{:x}",
-              ctx.t, pin.get_key(), size - pin_offset);
-            to_write.push_back(extent_to_write_t::create_existing(
-              pin.duplicate(),
-              pin.get_key(),
-              size - pin_offset));
-           to_write.push_back(extent_to_write_t::create_zero(
-             (object_data.get_reserved_data_base() + roundup_size).checked_to_laddr(),
-             object_data.get_reserved_data_len() - roundup_size));
-            return clear_iertr::now();
-          } else {
-            return ctx.tm.read_pin<ObjectDataBlock>(
-              ctx.t,
-              pin.duplicate()
-            ).si_then([ctx, size, pin_offset, append_len, roundup_size,
-                      &pin, &object_data, &to_write](auto maybe_indirect_extent) {
-              auto read_bl = maybe_indirect_extent.get_bl();
-              ceph::bufferlist write_bl;
-              write_bl.substr_of(read_bl, 0, size - pin_offset);
-              write_bl.append_zero(append_len);
-              LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
-              TRACET("First pin overlaps the boundary and has unaligned data"
-                "create data at addr:{}, len:0x{:x}",
-                ctx.t, pin.get_key(), write_bl.length());
-             to_write.push_back(extent_to_write_t::create_data(
-               pin.get_key(),
-               write_bl));
-             to_write.push_back(extent_to_write_t::create_zero(
-               (object_data.get_reserved_data_base() + roundup_size).checked_to_laddr(),
-               object_data.get_reserved_data_len() - roundup_size));
-              return clear_iertr::now();
-            });
-          }
-       }
-      }).si_then([ctx, size, &to_write, &object_data, &pins, this] {
-        return seastar::do_with(
-          prepare_ops_list(pins, to_write,
-           delta_based_overwrite_max_extent_size),
-          [ctx, size, &object_data](auto &ops) {
-            return do_remappings(ctx, ops.to_remap
-            ).si_then([ctx, &ops] {
-              return do_removals(ctx, ops.to_remove);
-            }).si_then([ctx, &ops] {
-              return do_insertions(ctx, ops.to_insert);
-            }).si_then([size, &object_data] {
-             if (size == 0) {
-               object_data.clear();
-             }
-             return ObjectDataHandler::clear_iertr::now();
-            });
-        });
-      });
+  assert(unaligned_len != 0);
+  if (read_pos.is_zero_reserved()) {
+    if (for_zero) {
+      // if we are doing zero overwrite and the current read_pos
+      // is already a zero-reserved one, don't add any data to it
+      return ObjectDataHandler::read_iertr::make_ready_future<
+       std::optional<bufferlist>>();
+    } else {
+      bufferlist bl;
+      bl.append_zero(unaligned_len);
+      return ObjectDataHandler::read_iertr::make_ready_future<
+       std::optional<bufferlist>>(std::move(bl));
+    }
+  } else {
+    auto aligned_offset = p2align(unaligned_offset, ctx.tm.get_block_size());
+    auto aligned_len =
+      p2roundup(unaligned_offset + unaligned_len,
+               ctx.tm.get_block_size()) - aligned_offset;
+    return ctx.tm.read_pin<ObjectDataBlock>(
+      ctx.t, read_pos, aligned_offset, aligned_len
+    ).si_then([unaligned_offset, unaligned_len, aligned_offset, aligned_len]
+             (auto maybe_indirect_left_extent) {
+      auto read_bl = maybe_indirect_left_extent.get_range(
+       aligned_offset, aligned_len);
+      ceph::bufferlist prepend_bl;
+      prepend_bl.substr_of(
+       read_bl, unaligned_offset - aligned_offset, unaligned_len);
+      return ObjectDataHandler::read_iertr::make_ready_future<
+       std::optional<bufferlist>>(std::move(prepend_bl));
     });
+  }
 }
 
-/**
- * get_to_writes_with_zero_buffer
- *
- * Returns extent_to_write_t's reflecting a zero region extending
- * from offset~len with headbl optionally on the left and tailbl
- * optionally on the right.
- */
-extent_to_write_list_t get_to_writes_with_zero_buffer(
-  laddr_t data_base,
-  const extent_len_t block_size,
-  objaddr_t offset, extent_len_t len,
-  std::optional<ceph::bufferlist> &&headbl,
-  std::optional<ceph::bufferlist> &&tailbl)
+std::ostream& operator<<(
+  std::ostream &out, const overwrite_range_t &overwrite_range) {
+  return out << "overwrite_range_t{" << std::hex
+    << "unaligned_len=0x" << overwrite_range.unaligned_len
+    << ", unaligned_begin=0x" << overwrite_range.unaligned_begin
+    << ", aligned_begin=0x" << overwrite_range.aligned_begin
+    << ", unaligned_end=0x" << overwrite_range.unaligned_end
+    << ", aligned_end=0x" << overwrite_range.aligned_end
+    << ", aligned_len=0x" << overwrite_range.aligned_len << std::dec
+    << "}";
+}
+
+std::ostream& operator<<(std::ostream &out, const data_t &data) {
+  return out << "data_t{" << std::hex
+    << "headbl=0x" << (data.headbl ? data.headbl->length() : 0)
+    << ", bl=0x" << (data.bl ? data.bl->length() : 0)
+    << ", tailbl=0x" << (data.tailbl ? data.tailbl->length() : 0) << std::dec
+    << "}";
+}
+
+ObjectDataHandler::write_ret
+ObjectDataHandler::delta_based_overwrite(
+  context_t ctx,
+  extent_len_t unaligned_offset,
+  extent_len_t unaligned_len,
+  LBAMapping overwrite_mapping,
+  std::optional<bufferlist> data)
 {
-  auto zero_left = p2roundup(offset, (objaddr_t)block_size);
-  auto zero_right = p2align(offset + len, (objaddr_t)block_size);
-  auto left = headbl ? (offset - headbl->length()) : offset;
-  auto right = tailbl ?
-    (offset + len + tailbl->length()) :
-    (offset + len);
-
-  assert(
-    (headbl && ((zero_left - left) ==
-                p2roundup(headbl->length(), block_size))) ^
-    (!headbl && (zero_left == left)));
-  assert(
-    (tailbl && ((right - zero_right) ==
-                p2roundup(tailbl->length(), block_size))) ^
-    (!tailbl && (right == zero_right)));
-
-  assert(right > left);
-
-  // zero region too small for a reserved section,
-  // headbl and tailbl in same extent
-  if (zero_right <= zero_left) {
+  LOG_PREFIX(ObjectDataHandler::delta_based_overwrite);
+  DEBUGT("0x{:x}~0x{:x} {} zero={}",
+    ctx.t, unaligned_offset, unaligned_len, overwrite_mapping, !data.has_value());
+  // delta based overwrite
+  return ctx.tm.read_pin<ObjectDataBlock>(
+    ctx.t,
+    overwrite_mapping
+  ).handle_error_interruptible(
+    TransactionManager::base_iertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "ObjectDataHandler::do_remapping hit invalid error"
+    }
+  ).si_then([ctx](auto maybe_indirect_extent) {
+    assert(!maybe_indirect_extent.is_indirect());
+    return ctx.tm.get_mutable_extent(ctx.t, maybe_indirect_extent.extent);
+  }).si_then([overwrite_mapping, unaligned_offset,
+             unaligned_len, data=std::move(data)](auto extent) {
     bufferlist bl;
-    if (headbl) {
-      bl.append(*headbl);
+    if (data) {
+      bl.append(*data);
+    } else {
+      bl.append_zero(unaligned_len);
+    }
+    auto odblock = extent->template cast<ObjectDataBlock>();
+    odblock->overwrite(unaligned_offset, std::move(bl));
+  });
+}
+
+ObjectDataHandler::write_ret do_zero(
+  context_t ctx,
+  LBAMapping zero_pos,
+  const overwrite_range_t &overwrite_range,
+  data_t &data)
+{
+  assert(!data.bl);
+  auto fut = TransactionManager::get_pin_iertr::make_ready_future<
+    std::optional<LBAMapping>>();
+  if (data.tailbl) {
+    assert(data.tailbl->length() < ctx.tm.get_block_size());
+    data.tailbl->prepend_zero(
+      ctx.tm.get_block_size() - data.tailbl->length());
+    fut = ctx.tm.alloc_data_extents<ObjectDataBlock>(
+      ctx.t,
+      (overwrite_range.aligned_end - ctx.tm.get_block_size()).checked_to_laddr(),
+      ctx.tm.get_block_size(),
+      std::move(zero_pos)
+    ).si_then([ctx, &data](auto extents) {
+      assert(extents.size() == 1);
+      auto &extent = extents.back();
+      auto iter = data.tailbl->cbegin();
+      iter.copy(extent->get_length(), extent->get_bptr().c_str());
+      return ctx.tm.get_pin(ctx.t, *extent);
+    }).si_then([](auto zero_pos) {
+      return std::make_optional<LBAMapping>(std::move(zero_pos));
+    }).handle_error_interruptible(
+      crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+      TransactionManager::get_pin_iertr::pass_further{}
+    );
+  }
+  fut = fut.si_then([ctx, &overwrite_range, zero_pos=std::move(zero_pos),
+                   &data](auto pin) mutable {
+    if (pin) {
+      zero_pos = std::move(*pin);
+    }
+    auto laddr =
+      (overwrite_range.aligned_begin +
+       (data.headbl ? ctx.tm.get_block_size() : 0)
+      ).checked_to_laddr();
+    auto end =
+      (overwrite_range.aligned_end -
+       (data.tailbl ? ctx.tm.get_block_size() : 0)
+      ).checked_to_laddr();
+    auto len = end.get_byte_distance<extent_len_t>(laddr);
+    return ctx.tm.reserve_region(ctx.t, std::move(zero_pos), laddr, len);
+  }).si_then([](auto zero_pos) {
+    return std::make_optional<LBAMapping>(std::move(zero_pos));
+  }).handle_error_interruptible(
+    crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+    TransactionManager::get_pin_iertr::pass_further{}
+  );
+  if (data.headbl) {
+    assert(data.headbl->length() < ctx.tm.get_block_size());
+    data.headbl->append_zero(
+      ctx.tm.get_block_size() - data.headbl->length());
+    fut = fut.si_then([ctx, &overwrite_range](auto zero_pos) {
+      return ctx.tm.alloc_data_extents<ObjectDataBlock>(
+       ctx.t,
+       overwrite_range.aligned_begin,
+       ctx.tm.get_block_size(),
+       std::move(*zero_pos));
+    }).si_then([&data](auto extents) {
+      assert(extents.size() == 1);
+      auto &extent = extents.back();
+      auto iter = data.headbl->cbegin();
+      iter.copy(extent->get_length(), extent->get_bptr().c_str());
+      return TransactionManager::get_pin_iertr::make_ready_future<
+       std::optional<LBAMapping>>();
+    }).handle_error_interruptible(
+      crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+      TransactionManager::get_pin_iertr::pass_further{}
+    );
+  }
+  return fut.discard_result().handle_error_interruptible(
+    ObjectDataHandler::write_iertr::pass_further{},
+    crimson::ct_error::assert_all{"unexpected error"}
+  );
+}
+
+ObjectDataHandler::write_ret do_write(
+  context_t ctx,
+  LBAMapping write_pos,
+  const overwrite_range_t &overwrite_range,
+  data_t &data)
+{
+  assert(data.bl);
+  return ctx.tm.alloc_data_extents<ObjectDataBlock>(
+    ctx.t,
+    overwrite_range.aligned_begin,
+    overwrite_range.aligned_end.template get_byte_distance<
+      extent_len_t>(overwrite_range.aligned_begin),
+    std::move(write_pos)
+  ).si_then([&overwrite_range, &data](auto extents) {
+    auto off = overwrite_range.aligned_begin;
+    auto left = overwrite_range.aligned_end.template get_byte_distance<
+      extent_len_t>(overwrite_range.aligned_begin);
+    bufferlist _bl;
+    if (data.headbl) {
+      _bl.append(*data.headbl);
+    }
+    _bl.append(*data.bl);
+    if (data.tailbl) {
+      _bl.append(*data.tailbl);
+    }
+    auto iter = _bl.cbegin();
+    assert(_bl.length() == left);
+    for (auto &extent : extents) {
+      ceph_assert(left >= extent->get_length());
+      if (extent->get_laddr() != off) {
+       logger().debug(
+         "object_data_handler::do_insertions alloc got addr {},"
+         " should have been {}",
+         extent->get_laddr(),
+         off);
+      }
+      iter.copy(extent->get_length(), extent->get_bptr().c_str());
+      off = (off + extent->get_length()).checked_to_laddr();
+      left -= extent->get_length();
     }
-    bl.append_zero(
-      right - left - bl.length() - (tailbl ? tailbl->length() : 0));
-    if (tailbl) {
-      bl.append(*tailbl);
+    return ObjectDataHandler::write_iertr::now();
+  }).handle_error_interruptible(
+    crimson::ct_error::enospc::assert_failure{"unexpected enospc"},
+    ObjectDataHandler::write_iertr::pass_further{}
+  );
+}
+
+std::ostream& operator<<(std::ostream &out, const edge_t &edge) {
+  out << "edge_t{";
+  switch (edge) {
+  case edge_t::NONE:
+    out << "NONE";
+    break;
+  case edge_t::LEFT:
+    out << "LEFT";
+    break;
+  case edge_t::RIGHT:
+    out << "RIGHT";
+    break;
+  case edge_t::BOTH:
+    out << "BOTH";
+    break;
+  default:
+    ceph_abort();
+  }
+  return out << "}";
+}
+
+// read the padding edge data into data.headbl/data.tailbl, note that
+// the method doesn't expand the overwrite range, as the aligned boundaries
+// are not affected, expands only happens in the merge_pending_edge method.
+ObjectDataHandler::read_iertr::future<>
+ObjectDataHandler::read_unaligned_edge_data(
+  context_t ctx,
+  const overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping &read_pos,
+  edge_t edge)
+{
+  assert(edge != edge_t::NONE);
+  LOG_PREFIX(ObjectDataHandler::read_unaligned_edge_data);
+  DEBUGT("{} {} {} edge={}", ctx.t, overwrite_range, data, read_pos, edge);
+  std::vector<ObjectDataHandler::read_iertr::future<>> futs;
+  if (edge & edge_t::LEFT) {
+    auto unaligned_off = read_pos.get_key().template get_byte_distance<
+      extent_len_t>(overwrite_range.aligned_begin);
+    auto unaligned_length =
+      overwrite_range.unaligned_begin.template get_byte_distance<
+       extent_len_t>(overwrite_range.aligned_begin);
+    futs.emplace_back(read_mapping(
+      ctx, read_pos, unaligned_off, unaligned_length, !data.bl
+    ).si_then([&data](auto bl) {
+      data.headbl = std::move(bl);
+    }));
+  }
+
+  if (edge & edge_t::RIGHT) {
+    auto unaligned_off =
+      overwrite_range.unaligned_end.template get_byte_distance<
+       extent_len_t>(read_pos.get_key());
+    auto unaligned_length =
+      overwrite_range.aligned_end.template get_byte_distance<
+       extent_len_t>(overwrite_range.unaligned_end);
+    futs.emplace_back(read_mapping(
+       ctx, read_pos, unaligned_off, unaligned_length, !data.bl
+    ).si_then([&data](auto bl) {
+      data.tailbl = std::move(bl);
+    }));
+  }
+
+  // TODO: when_all_succeed should be utilized here, however, it doesn't
+  //      actually work with interruptible errorated futures for now.
+  return trans_intr::parallel_for_each(
+    futs, [](auto &fut) { return std::move(fut); });
+}
+
+// read the pending edge mapping's data into data.headbl/data.tailbl,
+// remove the mapping and expand the overwrite_range; basically, this
+// is equivalent to merge the current overwrite range with the pending
+// edge mapping
+//
+// Note that this method should only be called when the overwrite handle
+// policy is MERGE_PENDING.
+ObjectDataHandler::read_iertr::future<>
+ObjectDataHandler::merge_pending_edge(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping &edge_mapping,
+  edge_t edge)
+{
+  assert(edge != edge_t::NONE);
+  assert(edge_mapping.is_pending());
+  std::vector<ObjectDataHandler::read_iertr::future<>> futs;
+  if (edge & edge_t::LEFT) {
+    auto unaligned_length = edge_mapping.get_key().template get_byte_distance<
+      extent_len_t>(overwrite_range.unaligned_begin);
+    if (unaligned_length != 0) {
+      overwrite_range.expand_begin(edge_mapping.get_key());
+      futs.emplace_back(read_mapping(
+       ctx, edge_mapping, 0, unaligned_length, !data.bl
+      ).si_then([&data](auto bl) {
+       data.headbl = std::move(bl);
+      }));
+    }
+  }
+
+  if (edge & edge_t::RIGHT) {
+    auto unaligned_offset = overwrite_range.unaligned_end.template get_byte_distance<
+      extent_len_t>(edge_mapping.get_key());
+    auto len = edge_mapping.get_length() - unaligned_offset;
+    if (len != 0) {
+      auto end = (edge_mapping.get_key() + edge_mapping.get_length()
+       ).checked_to_laddr();
+      overwrite_range.expand_end(end);
+      futs.emplace_back(read_mapping(
+       ctx, edge_mapping, unaligned_offset, len, !data.bl
+      ).si_then([&data](auto bl) {
+       data.tailbl = std::move(bl);
+      }));
     }
-    assert(bl.length() % block_size == 0);
-    assert(bl.length() == (right - left));
-    extent_to_write_list_t ret;
-    ret.push_back(extent_to_write_t::create_data(
-      (data_base + left).checked_to_laddr(), bl));
-    return ret;
+  }
+
+  // TODO: when_all_succeed should be utilized here, however, it doesn't
+  //      actually work with interruptible errorated futures for now.
+  return trans_intr::parallel_for_each(
+    futs, [](auto &fut) { return std::move(fut); });
+}
+
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::delta_based_edge_overwrite(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t& data,
+  LBAMapping edge_mapping,
+  edge_t edge)
+{
+  LOG_PREFIX(ObjectDataHandler::do_delta_based_edge_push);
+  DEBUGT("{} {} {} {}", ctx.t, overwrite_range, data, edge_mapping, edge);
+  std::optional<bufferlist> bl = std::nullopt;
+  assert(edge != edge_t::BOTH);
+  assert(edge != edge_t::NONE);
+  if (edge == edge_t::LEFT) {
+    assert(overwrite_range.is_begin_in_mapping(edge_mapping));
   } else {
-    // reserved section between ends, headbl and tailbl in different extents
-    extent_to_write_list_t ret;
-    if (headbl) {
-      bufferlist head_zero_bl;
-      head_zero_bl.append(*headbl);
-      head_zero_bl.append_zero(zero_left - left - head_zero_bl.length());
-      assert(head_zero_bl.length() % block_size == 0);
-      assert(head_zero_bl.length() > 0);
-      ret.push_back(extent_to_write_t::create_data(
-        (data_base + left).checked_to_laddr(), head_zero_bl));
+    assert(overwrite_range.is_end_in_mapping(edge_mapping));
+  }
+  if (data.bl) {
+    extent_len_t unaligned_len =
+      (edge == edge_t::LEFT)
+       ? overwrite_range.unaligned_begin.template get_byte_distance<
+           extent_len_t>(edge_mapping.get_key() + edge_mapping.get_length())
+       : overwrite_range.unaligned_end.template get_byte_distance<
+           extent_len_t>(edge_mapping.get_key());
+    extent_len_t unaligned_offset =
+      (edge == edge_t::LEFT) ? 0 : data.bl->length() - unaligned_len;
+    assert(unaligned_offset + unaligned_len <= data.bl->length());
+    bl = std::make_optional<bufferlist>();
+    bl->substr_of(*data.bl, unaligned_offset, unaligned_len);
+    bufferlist t_bl;
+    if (edge == edge_t::LEFT) {
+      t_bl.substr_of(*data.bl, unaligned_len, data.bl->length() - unaligned_len);
+    } else {
+      t_bl.substr_of(*data.bl, 0, unaligned_offset);
     }
-    // reserved zero region
-    ret.push_back(extent_to_write_t::create_zero(
-      (data_base + zero_left).checked_to_laddr(),
-      zero_right - zero_left));
-    assert(ret.back().len % block_size == 0);
-    assert(ret.back().len > 0);
-    if (tailbl) {
-      bufferlist tail_zero_bl;
-      tail_zero_bl.append(*tailbl);
-      tail_zero_bl.append_zero(right - zero_right - tail_zero_bl.length());
-      assert(tail_zero_bl.length() % block_size == 0);
-      assert(tail_zero_bl.length() > 0);
-      ret.push_back(extent_to_write_t::create_data(
-        (data_base + zero_right).checked_to_laddr(), tail_zero_bl));
+    data.bl = std::move(t_bl);
+  }
+  extent_len_t unaligned_overlapped_offset =
+    (edge == edge_t::LEFT)
+      ? overwrite_range.unaligned_begin.template get_byte_distance<
+         extent_len_t>(edge_mapping.get_key())
+      : 0;
+  extent_len_t unaligned_overlapped_len =
+    (edge == edge_t::LEFT)
+      ? overwrite_range.unaligned_begin.template get_byte_distance<
+         extent_len_t>(edge_mapping.get_key() + edge_mapping.get_length())
+      : overwrite_range.unaligned_end.template get_byte_distance<
+         extent_len_t>(edge_mapping.get_key());
+  return delta_based_overwrite(
+    ctx,
+    unaligned_overlapped_offset,
+    unaligned_overlapped_len,
+    edge_mapping, std::move(bl)
+  ).si_then([edge_mapping, &overwrite_range, edge]() mutable {
+    if (edge == edge_t::LEFT) {
+      auto new_begin = edge_mapping.get_key() + edge_mapping.get_length();
+      overwrite_range.shrink_begin(new_begin.checked_to_laddr());
+      return edge_mapping.next();
+    } else {
+      auto new_end = edge_mapping.get_key();
+      overwrite_range.shrink_end(new_end);
+      return base_iertr::make_ready_future<
+       LBAMapping>(std::move(edge_mapping));
     }
-    return ret;
+  });
+}
+
+ObjectDataHandler::write_ret
+ObjectDataHandler::merge_into_mapping(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping edge_mapping)
+{
+  LOG_PREFIX(ObjectDataHandler::merge_into_mapping);
+  DEBUGT("{} {} {}", ctx.t, overwrite_range, data, edge_mapping);
+  assert(overwrite_range.is_range_in_mapping(edge_mapping));
+  return ctx.tm.read_pin<ObjectDataBlock>(ctx.t, edge_mapping
+  ).si_then([&overwrite_range, &data, edge_mapping](auto maybe_indirect_extent) {
+    assert(!maybe_indirect_extent.is_indirect());
+    assert(maybe_indirect_extent.extent);
+    assert(maybe_indirect_extent.extent->is_initial_pending());
+    auto offset = overwrite_range.unaligned_begin.template get_byte_distance<
+      extent_len_t>(edge_mapping.get_key());
+    bufferlist bl;
+    if (data.bl) {
+      bl.append(*data.bl);
+    } else {
+      bl.append_zero(overwrite_range.unaligned_len);
+    }
+    auto iter = bl.cbegin();
+    auto &ptr = maybe_indirect_extent.extent->get_bptr();
+    iter.copy(bl.length(), ptr.c_str() + offset);
+  });
+}
+
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::merge_into_pending_edge(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping edge_mapping,
+  edge_t edge)
+{
+  LOG_PREFIX(ObjectDataHandler::merge_into_pending_edge);
+  DEBUGT("{} {} {} {}", ctx.t, overwrite_range, data, edge_mapping, edge);
+  bufferlist bl;
+  assert(edge != edge_t::BOTH);
+  assert(edge != edge_t::NONE);
+  assert(edge_mapping.is_initial_pending());
+  if (edge == edge_t::LEFT) {
+    assert(overwrite_range.is_begin_in_mapping(edge_mapping));
+  } else {
+    assert(overwrite_range.is_end_in_mapping(edge_mapping));
+  }
+  extent_len_t unaligned_len =
+    (edge == edge_t::LEFT)
+      ? overwrite_range.unaligned_begin.template get_byte_distance<
+         extent_len_t>(edge_mapping.get_key() + edge_mapping.get_length())
+      : overwrite_range.unaligned_end.template get_byte_distance<
+         extent_len_t>(edge_mapping.get_key());
+  if (data.bl) {
+    extent_len_t unaligned_offset =
+      (edge == edge_t::LEFT) ? 0 : data.bl->length() - unaligned_len;
+    assert(unaligned_offset + unaligned_len <= data.bl->length());
+    bl.substr_of(*data.bl, unaligned_offset, unaligned_len);
+    bufferlist t_bl;
+    if (edge == edge_t::LEFT) {
+      t_bl.substr_of(*data.bl, unaligned_len, data.bl->length() - unaligned_len);
+    } else {
+      t_bl.substr_of(*data.bl, 0, unaligned_offset);
+    }
+    data.bl = std::move(t_bl);
+  } else {
+    bl.append_zero(unaligned_len);
   }
+  return ctx.tm.read_pin<ObjectDataBlock>(ctx.t, edge_mapping
+  ).si_then([bl=std::move(bl), &overwrite_range, edge_mapping, edge]
+           (auto maybe_indirect_extent) mutable {
+    assert(!maybe_indirect_extent.is_indirect());
+    assert(maybe_indirect_extent.extent);
+    assert(maybe_indirect_extent.extent->is_initial_pending());
+    extent_len_t offset =
+      (edge == edge_t::LEFT)
+       ? overwrite_range.unaligned_begin.template get_byte_distance<
+           extent_len_t>(edge_mapping.get_key())
+       : 0;
+    auto iter = bl.cbegin();
+    auto &ptr = maybe_indirect_extent.extent->get_bptr();
+    iter.copy(bl.length(), ptr.c_str() + offset);
+    if (edge == edge_t::LEFT) {
+      auto new_begin = edge_mapping.get_key() + edge_mapping.get_length();
+      overwrite_range.shrink_begin(new_begin.checked_to_laddr());
+      return edge_mapping.next();
+    } else {
+      auto new_end = edge_mapping.get_key();
+      overwrite_range.shrink_end(new_end);
+      return base_iertr::make_ready_future<
+       LBAMapping>(std::move(edge_mapping));
+    }
+  });
 }
 
-/**
- * get_to_writes
- *
- * Returns extent_to_write_t's from bl.
- *
- * TODO: probably add some kind of upper limit on extent size.
- */
-extent_to_write_list_t get_to_writes(laddr_t offset, bufferlist &bl)
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::do_merge_based_edge_punch(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping edge_mapping,
+  edge_t edge)
 {
-  auto ret = extent_to_write_list_t();
-  ret.push_back(extent_to_write_t::create_data(offset, bl));
-  return ret;
-};
+  LOG_PREFIX(ObjectDataHandler::do_merge_based_edge_push);
+  DEBUGT("{} {} {} {}", ctx.t, overwrite_range, data, edge_mapping, edge);
+  assert(edge_mapping.is_pending());
+  return merge_pending_edge(ctx, overwrite_range, data, edge_mapping, edge
+  ).si_then([edge_mapping, ctx] {
+    return ctx.tm.remove(ctx.t, std::move(edge_mapping));
+  }).handle_error_interruptible(
+    ObjectDataHandler::base_iertr::pass_further{},
+    crimson::ct_error::assert_all{"unexpected error"}
+  );
+}
 
-ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::do_remap_based_edge_punch(
   context_t ctx,
-  laddr_t data_base,
-  objaddr_t offset,
-  extent_len_t len,
-  std::optional<bufferlist> &&bl,
-  lba_mapping_list_t &&_pins)
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping edge_mapping,
+  edge_t edge)
 {
-  if (bl.has_value()) {
-    assert(bl->length() == len);
+  LOG_PREFIX(ObjectDataHandler::do_remap_based_edge_push);
+  DEBUGT("{} {} {} {}", ctx.t, overwrite_range, data, edge_mapping, edge);
+  if (edge & edge_t::LEFT) {
+    assert(overwrite_range.is_begin_in_mapping(edge_mapping));
+  } else {
+    assert(edge & edge_t::RIGHT);
+    assert(overwrite_range.is_end_in_mapping(edge_mapping));
   }
-  overwrite_plan_t overwrite_plan(data_base, offset, len, _pins, ctx.tm.get_block_size());
-  return seastar::do_with(
-    std::move(_pins),
-    extent_to_write_list_t(),
-    [ctx, data_base, len, offset, overwrite_plan, bl=std::move(bl), this]
-    (auto &pins, auto &to_write) mutable
-  {
-    LOG_PREFIX(ObjectDataHandler::overwrite);
-    DEBUGT("overwrite: 0x{:x}~0x{:x}",
-           ctx.t,
-           offset,
-           len);
-    ceph_assert(pins.size() >= 1);
-    DEBUGT("overwrite: split overwrite_plan {}", ctx.t, overwrite_plan);
 
-    return operate_left(
-      ctx,
-      pins.front(),
-      overwrite_plan
-    ).si_then([ctx, data_base, len, offset, overwrite_plan, bl=std::move(bl),
-               &to_write, &pins, this](auto p) mutable {
-      auto &[left_extent, headbl] = p;
-      if (left_extent) {
-        ceph_assert(left_extent->addr == overwrite_plan.pin_begin);
-        append_extent_to_write(to_write, std::move(*left_extent));
+  auto fut = ObjectDataHandler::base_iertr::now();
+  if (((edge & edge_t::LEFT) &&
+       !overwrite_range.is_begin_aligned(ctx.tm.get_block_size())) ||
+      ((edge & edge_t::RIGHT) &&
+       !overwrite_range.is_end_aligned(ctx.tm.get_block_size()))) {
+    // if the overwrite range is not aligned,
+    // we need to read the padding data first.
+    fut = read_unaligned_edge_data(
+      ctx, overwrite_range, data, edge_mapping, edge);
+  }
+  return fut.si_then([ctx, edge_mapping, &overwrite_range, edge] {
+    if (edge == edge_t::LEFT) {
+      if (overwrite_range.aligned_begin > edge_mapping.get_key()) {
+       return ctx.tm.cut_mapping<ObjectDataBlock>(
+         ctx.t, overwrite_range.aligned_begin, std::move(edge_mapping), true
+       ).si_then([](auto mapping) {
+         return mapping.next();
+       });
+      } else {
+       // this branch happens when:
+       // "overwrite.aligned_begin == edge_mapping.get_key() &&
+       //  overwrite.unaligned_begin > edge_mapping.get_key()"
+       return ObjectDataHandler::base_iertr::make_ready_future<
+         LBAMapping>(std::move(edge_mapping));
       }
-      if (headbl) {
-        assert(headbl->length() > 0);
+    } else {
+      assert(edge == edge_t::RIGHT);
+      if (overwrite_range.aligned_end <
+               edge_mapping.get_key() + edge_mapping.get_length()) {
+       return ctx.tm.cut_mapping<ObjectDataBlock>(
+         ctx.t, overwrite_range.aligned_end, std::move(edge_mapping), false);
+      } else {
+       // this branch happens when overwrite.aligned_end is equal to
+       // the end of the edge_mapping while overwrite.unaligned_end is
+       // less than that of the edge_mapping.
+       return ctx.tm.remove(ctx.t, std::move(edge_mapping)
+       ).handle_error_interruptible(
+         ObjectDataHandler::base_iertr::pass_further{},
+         crimson::ct_error::assert_all{"unexpected error"}
+       );
       }
-      return operate_right(
-        ctx,
-        pins.back(),
-        overwrite_plan
-      ).si_then([ctx, data_base, len, offset,
-                 pin_begin=overwrite_plan.pin_begin,
-                 pin_end=overwrite_plan.pin_end,
-                 bl=std::move(bl), headbl=std::move(headbl),
-                 &to_write, &pins, this](auto p) mutable {
-        auto &[right_extent, tailbl] = p;
-        if (bl.has_value()) {
-          auto write_offset = offset;
-          bufferlist write_bl;
-          if (headbl) {
-            write_bl.append(*headbl);
-            write_offset = write_offset - headbl->length();
-          }
-          write_bl.claim_append(*bl);
-          if (tailbl) {
-            write_bl.append(*tailbl);
-            assert_aligned(write_bl.length());
-          }
-          splice_extent_to_write(
-            to_write,
-            get_to_writes((data_base + write_offset).checked_to_laddr(), write_bl));
-        } else {
-          splice_extent_to_write(
-            to_write,
-            get_to_writes_with_zero_buffer(
-             data_base,
-              ctx.tm.get_block_size(),
-              offset,
-              len,
-              std::move(headbl),
-              std::move(tailbl)));
-        }
-        if (right_extent) {
-          ceph_assert(right_extent->get_end_addr() == pin_end);
-          append_extent_to_write(to_write, std::move(*right_extent));
-        }
-        assert(to_write.size());
-        assert(pin_begin == to_write.front().addr);
-        assert(pin_end == to_write.back().get_end_addr());
+    }
+  });
+}
 
-        return seastar::do_with(
-          prepare_ops_list(pins, to_write,
-           delta_based_overwrite_max_extent_size),
-          [ctx](auto &ops) {
-            return do_remappings(ctx, ops.to_remap
-            ).si_then([ctx, &ops] {
-              return do_removals(ctx, ops.to_remove);
-            }).si_then([ctx, &ops] {
-              return do_insertions(ctx, ops.to_insert);
-            });
-        });
+// punch the edge mapping following the edge_handle_policy_t.
+// Specifically:
+// 1. edge_handle_policy_t::DELTA_BASED_PUNCH: cut the overlapped part
+//    of data.bl, apply it to the edge_maping as a mutation and shrink
+//    the overwrite_range.
+// 2. edge_handle_policy_t::MERGE_PENDING: merge the overwrite data with
+//    that of the edge_mapping, remove the edge_mapping and expand the
+//    overwrite_range.
+// 3. edge_handle_policy_t::REMAP: drop the overlapped part of the edge mapping
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::punch_mapping_on_edge(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping edge_mapping,
+  edge_t edge,
+  op_type_t op_type)
+{
+  assert(edge != edge_t::NONE);
+  LOG_PREFIX(ObjectDataHandler::punch_mapping_on_edge);
+  DEBUGT("{}, {}, {}, {}", ctx.t, overwrite_range, data, edge_mapping, edge);
+  ceph_assert(edge != edge_t::BOTH);
+  assert(edge_mapping.is_viewable());
+
+  auto edge_key = edge_mapping.get_key();
+  auto edge_length = edge_mapping.get_length();
+  laddr_t aligned_overlapped_start =
+    (edge == edge_t::LEFT)
+      ? overwrite_range.aligned_begin
+      : edge_key;
+  extent_len_t aligned_overlapped_len =
+    (edge == edge_t::LEFT)
+      ? overwrite_range.aligned_begin.template get_byte_distance<
+         extent_len_t>(edge_key + edge_length)
+      : overwrite_range.aligned_end.template get_byte_distance<
+         extent_len_t>(edge_key);
+  auto ehpolicy = get_edge_handle_policy(
+    edge_mapping,
+    aligned_overlapped_start,
+    aligned_overlapped_len,
+    op_type);
+  switch (ehpolicy) {
+  case edge_handle_policy_t::DELTA_BASED_PUNCH:
+    return delta_based_edge_overwrite(
+      ctx, overwrite_range, data, std::move(edge_mapping), edge);
+  case edge_handle_policy_t::MERGE_INPLACE:
+    return merge_into_pending_edge(
+      ctx, overwrite_range, data, std::move(edge_mapping), edge);
+  case edge_handle_policy_t::REMAP:
+    return do_remap_based_edge_punch(
+      ctx, overwrite_range, data, std::move(edge_mapping), edge);
+  default:
+    ceph_abort_msg("unexpected edge handling policy");
+  }
+}
+
+// The first step in a multi-mapping-hole-punching scenario: remap the
+// left mapping if it crosses the left edge of the hole's range
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::punch_left_mapping(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &overwrite_data,
+  LBAMapping left_mapping,
+  op_type_t op_type)
+{
+  if (overwrite_range.unaligned_begin > left_mapping.get_key()) {
+    // left_mapping crosses the left edge
+    assert(overwrite_range.unaligned_begin <
+      left_mapping.get_key() + left_mapping.get_length());
+    return punch_mapping_on_edge(
+      ctx, overwrite_range, overwrite_data,
+      std::move(left_mapping), edge_t::LEFT, op_type);
+  }
+  return ObjectDataHandler::base_iertr::make_ready_future<
+    LBAMapping>(std::move(left_mapping));
+}
+
+// The second step in a multi-mapping-hole-punching scenario: remove
+// all the mappings that are strictly inside the hole's range
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::punch_inner_mappings(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  LBAMapping first_mapping)
+{
+  auto unaligned_len = overwrite_range.unaligned_end.template get_byte_distance<
+    extent_len_t>(overwrite_range.aligned_begin);
+  return ctx.tm.remove_mappings_in_range(
+    ctx.t, overwrite_range.aligned_begin,
+    unaligned_len, std::move(first_mapping));
+}
+
+// The last step in the multi-mapping-hole-punching scenario: remap
+// the right mapping if it crosses the right edge of the hole's range
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::punch_right_mapping(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &overwrite_data,
+  LBAMapping right_mapping,
+  op_type_t op_type)
+{
+  if (right_mapping.is_end() ||
+      overwrite_range.aligned_end <= right_mapping.get_key()) {
+    return ObjectDataHandler::base_iertr::make_ready_future<
+      LBAMapping>(std::move(right_mapping));
+  }
+  return punch_mapping_on_edge(
+    ctx, overwrite_range, overwrite_data,
+    std::move(right_mapping), edge_t::RIGHT, op_type);
+}
+
+// punch the hole whose range is within a single pending mapping
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::punch_hole_in_pending_mapping(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping mapping)
+{
+  return merge_pending_edge(ctx, overwrite_range, data, mapping, edge_t::BOTH
+  ).si_then([ctx, mapping=std::move(mapping)]() mutable {
+    return ctx.tm.remove(ctx.t, std::move(mapping));
+  }).handle_error_interruptible(
+    ObjectDataHandler::base_iertr::pass_further{},
+    crimson::ct_error::assert_all{"impossible"}
+  );
+}
+
+ObjectDataHandler::base_iertr::future<LBAMapping>
+ObjectDataHandler::punch_multi_mapping_hole(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping left_mapping,
+  op_type_t op_type)
+{
+  return punch_left_mapping(
+    ctx, overwrite_range, data, std::move(left_mapping), op_type
+  ).si_then([this, ctx, &overwrite_range](auto mapping) {
+    return punch_inner_mappings(ctx, overwrite_range, std::move(mapping));
+  }).si_then([this, ctx, &overwrite_range, &data, op_type](auto mapping) {
+    return punch_right_mapping(
+      ctx, overwrite_range, data, std::move(mapping), op_type);
+  });
+}
+
+ObjectDataHandler::write_ret
+ObjectDataHandler::handle_single_mapping_overwrite(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping mapping,
+  op_type_t op_type)
+{
+  auto ehpolicy = get_edge_handle_policy(
+    mapping,
+    overwrite_range.aligned_begin,
+    overwrite_range.aligned_len,
+    op_type);
+  auto do_overwrite = [ctx, &overwrite_range, &data](auto pos) {
+    if (overwrite_range.is_empty()) {
+      // the overwrite is completed in the previous steps,
+      // this can happen if delta based overwrites are involved.
+      return write_iertr::now();
+    }
+    if (overwrite_range.aligned_end.template get_byte_distance<
+         extent_len_t>(overwrite_range.aligned_begin) == ctx.tm.get_block_size()
+       && (data.headbl || data.tailbl)) {
+      // the range to zero is within a block
+      bufferlist bl;
+      if (data.headbl) {
+       bl.append(*data.headbl);
+      }
+      if (!data.bl) {
+       bl.append_zero(overwrite_range.unaligned_len);
+      } else {
+       bl.append(*data.bl);
+      }
+      if (data.tailbl) {
+       bl.append(*data.tailbl);
+      }
+      data.headbl.reset();
+      data.tailbl.reset();
+      data.bl = std::move(bl);
+    }
+    if (data.bl) {
+      return do_write(ctx, std::move(pos), overwrite_range, data);
+    } else {
+      return do_zero(ctx, std::move(pos), overwrite_range, data);
+    }
+  };
+
+  switch (ehpolicy) {
+  case edge_handle_policy_t::DELTA_BASED_PUNCH:
+    {
+      auto unaligned_offset = mapping.get_key().template get_byte_distance<
+       extent_len_t>(overwrite_range.unaligned_begin);
+      auto unaligned_len = overwrite_range.unaligned_len;
+      return delta_based_overwrite(
+       ctx, unaligned_offset, unaligned_len, std::move(mapping), data.bl);
+    }
+  case edge_handle_policy_t::MERGE_INPLACE:
+    {
+      return merge_into_mapping(
+       ctx, overwrite_range, data, std::move(mapping));
+    }
+  case edge_handle_policy_t::REMAP:
+    {
+      auto fut = ObjectDataHandler::base_iertr::now();
+      edge_t edge =  edge_t::NONE;
+      if (!overwrite_range.is_begin_aligned(ctx.tm.get_block_size())) {
+       edge = static_cast<edge_t>(edge | edge_t::LEFT);
+      }
+      if (!overwrite_range.is_end_aligned(ctx.tm.get_block_size())) {
+       edge = static_cast<edge_t>(edge | edge_t::RIGHT);
+      }
+      if (edge != edge_t::NONE) {
+       fut = read_unaligned_edge_data(
+         ctx, overwrite_range, data, mapping, edge);
+      }
+      return fut.si_then([ctx, &overwrite_range, mapping] {
+       return ctx.tm.punch_hole_in_mapping<ObjectDataBlock>(
+         ctx.t, overwrite_range.aligned_begin,
+         overwrite_range.aligned_len, std::move(mapping));
+      }).si_then([do_overwrite=std::move(do_overwrite)](auto pos) {
+       return do_overwrite(std::move(pos));
       });
-    });
+    }
+  default:
+    ceph_abort_msg("unexpected edge handling policy");
+  }
+}
+
+ObjectDataHandler::write_ret
+ObjectDataHandler::handle_multi_mapping_overwrite(
+  context_t ctx,
+  overwrite_range_t &overwrite_range,
+  data_t &data,
+  LBAMapping first_mapping,
+  op_type_t op_type)
+{
+  return punch_multi_mapping_hole(
+    ctx, overwrite_range, data, std::move(first_mapping), op_type
+  ).si_then([ctx, &overwrite_range, &data](auto pos) {
+    if (overwrite_range.is_empty()) {
+      // the overwrite is completed in the previous steps,
+      // this can happen if delta based overwrites are involved.
+      return write_iertr::now();
+    }
+    if (overwrite_range.aligned_end.template get_byte_distance<
+         extent_len_t>(overwrite_range.aligned_begin) == ctx.tm.get_block_size()
+       && (data.headbl || data.tailbl)) {
+      // the range to zero is within a block
+      bufferlist bl;
+      if (data.headbl) {
+       bl.append(*data.headbl);
+      }
+      if (!data.bl) {
+       bl.append_zero(overwrite_range.unaligned_len);
+      } else {
+       bl.append(*data.bl);
+      }
+      if (data.tailbl) {
+       bl.append(*data.tailbl);
+      }
+      data.headbl.reset();
+      data.tailbl.reset();
+      data.bl = std::move(bl);
+    }
+    if (data.bl) {
+      return do_write(ctx, std::move(pos), overwrite_range, data);
+    } else {
+      return do_zero(ctx, std::move(pos), overwrite_range, data);
+    }
+  });
+}
+
+ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
+  context_t ctx,
+  laddr_t data_base,
+  objaddr_t offset,
+  extent_len_t len,
+  std::optional<bufferlist> &&bl,
+  LBAMapping first_mapping)
+{
+  LOG_PREFIX(ObjectDataHandler::overwrite);
+  assert(!bl.has_value() || bl->length() == len);
+  auto unaligned_begin = data_base + offset;
+  auto unaligned_end = data_base + offset + len;
+  assert(first_mapping.get_key() <= unaligned_begin.get_aligned_laddr());
+  DEBUGT(
+    "data_base={}, offset=0x{:x}, len=0x{:x}, "
+    "aligned_begin={}, aligned_end={}",
+    ctx.t, data_base, offset, len,
+    unaligned_begin.get_aligned_laddr(),
+    unaligned_end.get_roundup_laddr());
+  return seastar::do_with(
+    data_t{std::nullopt, std::move(bl), std::nullopt},
+    overwrite_range_t{
+      len,
+      unaligned_begin,
+      unaligned_end},
+    [first_mapping=std::move(first_mapping),
+    this, ctx](auto &data, auto &overwrite_range) {
+    if (overwrite_range.is_range_in_mapping(first_mapping)) {
+      return handle_single_mapping_overwrite(
+       ctx, overwrite_range, data, std::move(first_mapping),
+       data.bl.has_value() ? op_type_t::OVERWRITE : op_type_t::ZERO);
+    } else {
+      return handle_multi_mapping_overwrite(
+       ctx, overwrite_range, data, std::move(first_mapping),
+       data.bl.has_value() ? op_type_t::OVERWRITE : op_type_t::ZERO);
+    }
   });
 }
 
@@ -1409,23 +2020,24 @@ ObjectDataHandler::zero_ret ObjectDataHandler::zero(
        ctx,
        object_data,
        p2roundup(offset + len, ctx.tm.get_block_size())
-      ).si_then([this, ctx, offset, len, &object_data] {
+      ).si_then([this, ctx, offset, len, &object_data](auto mapping) {
        auto data_base = object_data.get_reserved_data_base();
+       if (mapping) {
+         return overwrite(
+           ctx, data_base, offset, len,
+           std::nullopt, std::move(*mapping));
+       }
        laddr_offset_t l_start = data_base + offset;
-       laddr_offset_t l_end = l_start + len;
-       laddr_t aligned_start = l_start.get_aligned_laddr();
-       loffset_t aligned_length =
-           l_end.get_roundup_laddr().get_byte_distance<
-             loffset_t>(aligned_start);
-       return ctx.tm.get_pins(
-         ctx.t,
-         aligned_start,
-         aligned_length
-       ).si_then([this, ctx, data_base, offset, len](auto pins) {
+       return ctx.tm.get_containing_pin(
+         ctx.t, l_start.get_aligned_laddr(ctx.tm.get_block_size())
+       ).si_then([this, ctx, data_base, offset, len](auto pin) {
          return overwrite(
            ctx, data_base, offset, len,
-           std::nullopt, std::move(pins));
-       });
+           std::nullopt, std::move(pin));
+       }).handle_error_interruptible(
+         write_iertr::pass_further{},
+         crimson::ct_error::assert_all("unexpected enoent")
+       );
       });
     });
 }
@@ -1450,28 +2062,56 @@ ObjectDataHandler::write_ret ObjectDataHandler::write(
        ctx,
        object_data,
        p2roundup(offset + bl.length(), ctx.tm.get_block_size())
-      ).si_then([this, ctx, offset, &object_data, &bl] {
+      ).si_then([this, ctx, offset, &object_data, &bl]
+               (auto mapping) -> write_ret {
        auto data_base = object_data.get_reserved_data_base();
+       if (mapping) {
+         return overwrite(
+           ctx, data_base, offset, bl.length(),
+           bufferlist(bl), std::move(*mapping));
+       }
        laddr_offset_t l_start = data_base + offset;
-       laddr_offset_t l_end = l_start + bl.length();
-       laddr_t aligned_start = l_start.get_aligned_laddr();
-       loffset_t aligned_length =
-           l_end.get_roundup_laddr().get_byte_distance<
-             loffset_t>(aligned_start);
-       return ctx.tm.get_pins(
-         ctx.t,
-         aligned_start,
-         aligned_length
-       ).si_then([this, ctx, offset, data_base, &bl](
-                  auto pins) {
+       return ctx.tm.get_containing_pin(
+         ctx.t, l_start.get_aligned_laddr(ctx.tm.get_block_size())
+       ).si_then([this, ctx, offset, data_base, &bl](auto pin) {
          return overwrite(
            ctx, data_base, offset, bl.length(),
-           bufferlist(bl), std::move(pins));
-       });
+           bufferlist(bl), std::move(pin));
+       }).handle_error_interruptible(
+         write_iertr::pass_further{},
+         crimson::ct_error::assert_all{"unexpected enoent"}
+       );
       });
     });
 }
 
+ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
+  context_t ctx, object_data_t &object_data, extent_len_t size)
+{
+  LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
+  DEBUGT("0x{:x}~0x{:x}, 0x{:x}",
+    ctx.t, object_data.get_reserved_data_base(),
+    object_data.get_reserved_data_len(), size);
+  ceph_assert(!object_data.is_null());
+  ceph_assert(size <= object_data.get_reserved_data_len());
+  auto data_base = object_data.get_reserved_data_base();
+  auto unaligned_begin = data_base + size;
+  return ctx.tm.get_containing_pin(
+    ctx.t, unaligned_begin.get_aligned_laddr(ctx.tm.get_block_size())
+  ).si_then([ctx, data_base, size, this,
+           unaligned_begin, &object_data](auto mapping) {
+    assert(mapping.get_key() <= unaligned_begin &&
+      mapping.get_key() + mapping.get_length() > unaligned_begin);
+    auto data_len = object_data.get_reserved_data_len();
+    return overwrite(
+      ctx, data_base, size, data_len - size,
+      std::nullopt, std::move(mapping));
+  }).handle_error_interruptible(
+    clear_iertr::pass_further{},
+    crimson::ct_error::assert_all{"unexpected enoent"}
+  );
+}
+
 ObjectDataHandler::read_ret ObjectDataHandler::read(
   context_t ctx,
   objaddr_t obj_offset,
@@ -1680,7 +2320,7 @@ ObjectDataHandler::truncate_ret ObjectDataHandler::truncate(
        return prepare_data_reservation(
          ctx,
          object_data,
-         p2roundup(offset, ctx.tm.get_block_size()));
+         p2roundup(offset, ctx.tm.get_block_size())).discard_result();
       } else {
        return truncate_iertr::now();
       }
@@ -1790,7 +2430,7 @@ ObjectDataHandler::clone_ret ObjectDataHandler::clone(
       ctx,
       d_object_data,
       object_data.get_reserved_data_len()
-    ).si_then([&object_data, &d_object_data, ctx, this] {
+    ).si_then([&object_data, &d_object_data, ctx, this](auto) {
       assert(!object_data.is_null());
       auto base = object_data.get_reserved_data_base();
       auto len = object_data.get_reserved_data_len();
@@ -1804,7 +2444,7 @@ ObjectDataHandler::clone_ret ObjectDataHandler::clone(
        ctx,
        object_data,
        d_object_data.get_reserved_data_len()
-      ).si_then([&d_object_data, ctx, &object_data, base, len, this] {
+      ).si_then([&d_object_data, ctx, &object_data, base, len, this](auto) {
        LOG_PREFIX("ObjectDataHandler::clone");
        DEBUGT("head obj reserve_data_base: {}, len 0x{:x}",
          ctx.t,
@@ -1829,3 +2469,12 @@ ObjectDataHandler::clone_ret ObjectDataHandler::clone(
 }
 
 } // namespace crimson::os::seastore
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::os::seastore::overwrite_range_t>
+  : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::os::seastore::data_t>
+  : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::os::seastore::edge_t>
+  : fmt::ostream_formatter {};
+#endif
index 2c3e41bf4d1d825a7d76744db55caddf1154825b..5656d3d19e0a16f446fc6c612424dcbad07f4bdf 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "test/crimson/seastore/test_block.h" // TODO
 
+#include "crimson/os/seastore/laddr_interval_set.h"
 #include "crimson/os/seastore/onode.h"
 #include "crimson/os/seastore/transaction_manager.h"
 #include "crimson/os/seastore/transaction.h"
@@ -78,6 +79,106 @@ private:
   mutable std::optional<ceph::bufferptr> ptr = std::nullopt;
 };
 
+struct overwrite_range_t {
+  objaddr_t unaligned_len = 0;
+  laddr_offset_t unaligned_begin;
+  laddr_offset_t unaligned_end;
+  laddr_t aligned_begin = L_ADDR_NULL;
+  laddr_t aligned_end = L_ADDR_NULL;
+  objaddr_t aligned_len = 0;
+  overwrite_range_t(
+    objaddr_t unaligned_len,
+    laddr_offset_t unaligned_begin,
+    laddr_offset_t unaligned_end)
+    : unaligned_len(unaligned_len),
+      unaligned_begin(unaligned_begin),
+      unaligned_end(unaligned_end),
+      aligned_begin(unaligned_begin.get_aligned_laddr()),
+      aligned_end(unaligned_end.get_roundup_laddr()),
+      aligned_len(
+       aligned_end.template get_byte_distance<
+         extent_len_t>(aligned_begin))
+  {}
+
+  bool is_empty() const {
+    return unaligned_begin == unaligned_end;
+  }
+  bool is_range_in_mapping(
+    const LBAMapping &mapping) const
+  {
+    return unaligned_begin >= mapping.get_key() &&
+       unaligned_end <= mapping.get_key() + mapping.get_length();
+  }
+  bool is_begin_aligned(size_t alignment) const {
+    return unaligned_begin.is_aligned(alignment);
+  }
+  bool is_end_aligned(size_t alignment) const {
+    return unaligned_end.is_aligned(alignment);
+  }
+#ifndef NDEBUG
+  bool is_begin_in_mapping(const LBAMapping &mapping) const {
+    return unaligned_begin > mapping.get_key() &&
+      unaligned_begin < mapping.get_key() + mapping.get_length();
+  }
+  bool is_end_in_mapping(const LBAMapping &mapping) const {
+    return unaligned_end > mapping.get_key() &&
+      unaligned_end < mapping.get_key() + mapping.get_length();
+  }
+#endif
+  void expand_begin(laddr_t new_begin) {
+    assert(new_begin <= aligned_begin);
+    unaligned_len += new_begin.template get_byte_distance<
+      extent_len_t>(unaligned_begin);
+    aligned_len += new_begin.template get_byte_distance<
+      extent_len_t>(aligned_begin);
+    aligned_begin = new_begin;
+    unaligned_begin = laddr_offset_t{new_begin};
+  }
+  void expand_end(laddr_t new_end) {
+    assert(new_end >= aligned_end);
+    unaligned_len += new_end.template get_byte_distance<
+      extent_len_t>(unaligned_end);
+    aligned_len += new_end.template get_byte_distance<
+      extent_len_t>(aligned_end);
+    aligned_end = new_end;
+    unaligned_end = laddr_offset_t{new_end};
+  }
+  void shrink_begin(laddr_t new_begin) {
+    assert(new_begin >= aligned_begin);
+    unaligned_len -= new_begin.template get_byte_distance<
+      extent_len_t>(unaligned_begin);
+    aligned_len -= new_begin.template get_byte_distance<
+      extent_len_t>(aligned_begin);
+    aligned_begin = new_begin;
+    unaligned_begin = laddr_offset_t{new_begin};
+  }
+  void shrink_end(laddr_t new_end) {
+    assert(new_end <= aligned_end);
+    unaligned_len -= new_end.template get_byte_distance<
+      extent_len_t>(unaligned_end);
+    aligned_len -= new_end.template get_byte_distance<
+      extent_len_t>(aligned_end);
+    aligned_end = new_end;
+    unaligned_end = laddr_offset_t{new_end};
+  }
+};
+std::ostream& operator<<(std::ostream &, const overwrite_range_t &);
+
+struct data_t {
+  std::optional<bufferlist> headbl;
+  std::optional<bufferlist> bl;
+  std::optional<bufferlist> tailbl;
+};
+std::ostream& operator<<(std::ostream &out, const data_t &data);
+
+enum edge_t : uint8_t {
+  NONE = 0x0,
+  LEFT = 0x1,
+  RIGHT = 0x2,
+  BOTH = 0x3
+};
+std::ostream& operator<<(std::ostream &out, const edge_t &edge);
+
 struct ObjectDataBlock : crimson::os::seastore::LogicalChildNode {
   using Ref = TCachedExtentRef<ObjectDataBlock>;
 
@@ -105,6 +206,7 @@ struct ObjectDataBlock : crimson::os::seastore::LogicalChildNode {
   }
 
   void overwrite(extent_len_t offset, bufferlist bl) {
+    assert(is_mutation_pending() || is_exist_mutation_pending());
     block_delta_t b {offset, bl.length(), bl};
     cached_overwrites.add(b);
     delta.push_back(b);
@@ -229,16 +331,16 @@ public:
 private:
   /// Updates region [_offset, _offset + bl.length) to bl
   write_ret overwrite(
-    context_t ctx,        ///< [in] ctx
-    laddr_t data_base,    ///< [in] data base laddr
-    objaddr_t offset,     ///< [in] write offset
-    extent_len_t len,     ///< [in] len to write, len == bl->length() if bl
-    std::optional<bufferlist> &&bl, ///< [in] buffer to write, empty for zeros
-    lba_mapping_list_t &&pins ///< [in] set of pins overlapping above region
-  );
+    context_t ctx,
+    laddr_t data_base,
+    objaddr_t offset,
+    extent_len_t len,
+    std::optional<bufferlist> &&bl,
+    LBAMapping first_mapping);
 
   /// Ensures object_data reserved region is prepared
-  write_ret prepare_data_reservation(
+  write_iertr::future<std::optional<LBAMapping>>
+  prepare_data_reservation(
     context_t ctx,
     object_data_t &object_data,
     extent_len_t size);
@@ -255,6 +357,199 @@ private:
     lba_mapping_list_t &pins,
     laddr_t data_base);
 
+  enum op_type_t : uint8_t {
+    OVERWRITE,
+    ZERO,
+    TRIM
+  };
+  enum edge_handle_policy_t : uint8_t {
+    DELTA_BASED_PUNCH,
+    MERGE_INPLACE,
+    REMAP
+  };
+
+  edge_handle_policy_t get_edge_handle_policy(
+    const LBAMapping &edge_mapping,
+    laddr_t start,
+    extent_len_t len,
+    op_type_t op_type) const
+  {
+#ifndef NDEBUG
+    laddr_interval_set_t range;
+    range.insert(edge_mapping.get_key(), edge_mapping.get_length());
+    assert(range.contains(start, len));
+#endif
+
+    //XXX: may need to adjust once object data partial write is available.
+    if (edge_mapping.is_pending()) {
+      // TODO: all LBAMapping::is_XXX_pending() methods search the parent
+      //       lba nodes, which consumes cpu. Fortunately, this branch happens
+      //       mostly in the recovery case, which is relatively rare compared
+      //       to normal IO processing.
+      if (edge_mapping.is_initial_pending()) {
+       return edge_handle_policy_t::MERGE_INPLACE;
+      } else {
+       return edge_handle_policy_t::DELTA_BASED_PUNCH;
+      }
+    }
+
+    // TODO: allow TRIM to do delta based overwrites. We forbid it
+    //              now because it violate unit tests.
+    if (op_type == op_type_t::TRIM ||
+       op_type == op_type_t::ZERO ||
+       len > delta_based_overwrite_max_extent_size ||
+       edge_mapping.is_zero_reserved() ||
+       edge_mapping.is_indirect()) {
+      return edge_handle_policy_t::REMAP;
+    }
+
+    return edge_handle_policy_t::DELTA_BASED_PUNCH;
+  }
+
+  write_ret delta_based_overwrite(
+    context_t ctx,
+    extent_len_t offset,
+    extent_len_t len,
+    LBAMapping mapping,
+    std::optional<bufferlist> data);
+
+  // read the padding edge data into data.headbl/data.tailbl
+  read_iertr::future<> read_unaligned_edge_data(
+    context_t ctx,
+    const overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping &mapping,
+    edge_t edge);
+
+  // read the pending edge mapping's data into data.headbl/data.tailbl,
+  // remove the mapping and expand the overwrite_range; basically, this
+  // is equivalent to merge the current overwrite range with the pending
+  // edge mapping
+  read_iertr::future<> merge_pending_edge(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping &mapping,
+    edge_t edge);
+
+  // cut the overlapped part of data.bl, apply it to the
+  // edge_maping as a mutation and shrink the overwrite_range.
+  base_iertr::future<LBAMapping> delta_based_edge_overwrite(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t& data,
+    LBAMapping edge_mapping,
+    edge_t edge);
+
+  // drop the overlapped part of the edge mapping
+  base_iertr::future<LBAMapping> do_remap_based_edge_punch(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping edge_mapping,
+    edge_t edge);
+
+  // merge the overwrite data with that of the edge_mapping,
+  // remove the edge_mapping and expand the overwrite_range.
+  base_iertr::future<LBAMapping> do_merge_based_edge_punch(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping edge_mapping,
+    edge_t edge);
+
+  // punch the edge mapping following the edge_handle_policy_t.
+  // Specifically:
+  // 1. edge_handle_policy_t::DELTA_BASED_PUNCH: cut the overlapped part
+  //    of data.bl, apply it to the edge_maping as a mutation and shrink
+  //    the overwrite_range.
+  // 2. edge_handle_policy_t::MERGE_PENDING: merge the overwrite data with
+  //    that of the edge_mapping, remove the edge_mapping and expand the
+  //    overwrite_range.
+  // 3. edge_handle_policy_t::REMAP: drop the overlapped part of the edge mapping
+  base_iertr::future<LBAMapping>
+  punch_mapping_on_edge(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping edge_mapping,
+    edge_t edge,
+    op_type_t op_type);
+
+  // The first step in a multi-mapping-hole-punching scenario: remap the
+  // left mapping if it crosses the left edge of the hole's range
+  base_iertr::future<LBAMapping> punch_left_mapping(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &overwrite_data,
+    LBAMapping left_mapping,
+    op_type_t op_type);
+
+  // The second step in a multi-mapping-hole-punching scenario: remove
+  // all the mappings that are strictly inside the hole's range
+  base_iertr::future<LBAMapping> punch_inner_mappings(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    LBAMapping mapping /*the first inner mapping*/);
+
+  // The last step in the multi-mapping-hole-punching scenario: remap
+  // the right mapping if it crosses the right edge of the hole's range
+  base_iertr::future<LBAMapping> punch_right_mapping(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &overwrite_data,
+    LBAMapping right_mapping,
+    op_type_t op_type);
+
+  // punch the hole whose range is within a single pending mapping
+  base_iertr::future<LBAMapping> punch_hole_in_pending_mapping(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping mapping);
+
+  // handle the overwrite the range of which is within a single lba mapping.
+  write_ret handle_single_mapping_overwrite(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping mapping,
+    op_type_t op_type);
+
+  // handle overwrites whose ranges cross multiple lba mappings.
+  write_ret handle_multi_mapping_overwrite(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping mapping,
+    op_type_t op_type);
+
+  // punch a lba hole that crosses multiple lba mappings.
+  base_iertr::future<LBAMapping> punch_multi_mapping_hole(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping left_mapping,
+    op_type_t op_type);
+
+  // merge the data of the range on which the current overwrite and
+  // the pending edge mapping overlaps into the corresponding pending
+  // extent
+  base_iertr::future<LBAMapping> merge_into_pending_edge(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping edge_mapping,
+    edge_t edge);
+
+  // merge the data of the current overwrite into
+  // the pending mapping's extent
+  write_ret merge_into_mapping(
+    context_t ctx,
+    overwrite_range_t &overwrite_range,
+    data_t &data,
+    LBAMapping edge_mapping);
+
 private:
   /**
    * max_object_size
index 4d669f0f2fd4e294d7d23ae8713f48d0d8fcdd92..f3ad75865653830b5574edd6750290cda1de20ee 100644 (file)
@@ -1143,6 +1143,14 @@ public:
       assert(offset < laddr_t::UNIT_SIZE);
       return offset;
     }
+    bool has_offset() const {
+      return offset != 0;
+    }
+    bool is_aligned(size_t alignment) const {
+      assert(alignment % laddr_t::UNIT_SIZE == 0);
+      return !has_offset() &&
+       base % (alignment >> UNIT_SHIFT) == 0;
+    }
     laddr_t checked_to_laddr() const {
       assert(offset == 0);
       return laddr_t(base);
index a912ef7ddb0eb02f6b0ecf5e573221c186ae8f76..a178ab6d17f0fd443140e5644ae5980f1d9b8cd2 100644 (file)
@@ -132,6 +132,16 @@ public:
     });
   }
 
+  get_pin_ret get_pin(Transaction &t, LogicalChildNode &extent) {
+    LOG_PREFIX(TransactionManager::get_pin);
+    SUBDEBUGT(seastore_tm, "{} ...", t, extent);
+    return lba_manager->get_mapping(t, extent
+    ).si_then([FNAME, &t](LBAMapping pin) {
+      SUBDEBUGT(seastore_tm, "got {}", t, pin);
+      return pin;
+    });
+  }
+
   /**
    * get_pins
    *
@@ -531,6 +541,7 @@ public:
     static_assert(is_data_type(T::TYPE));
     // must be user-oriented required by (the potential) maybe_init
     assert(is_user_transaction(t.get_src()));
+    assert(pin.is_indirect() || !pin.is_zero_reserved());
 
     LOG_PREFIX(TransactionManager::remap_pin);
 #ifndef NDEBUG
@@ -1011,6 +1022,205 @@ public:
     return *cache;
   }
 
+  template <typename T, std::size_t N>
+  remap_pin_ret remap_mappings(
+    Transaction &t,
+    LBAMapping mapping,
+    std::array<TransactionManager::remap_entry_t, N> remaps)
+  {
+    if (!mapping.is_indirect() && mapping.is_zero_reserved()) {
+      return seastar::do_with(
+       std::vector<TransactionManager::remap_entry_t>(
+         remaps.begin(), remaps.end()),
+       std::vector<LBAMapping>(),
+       [&t, mapping=std::move(mapping), this]
+       (auto &remaps, auto &mappings) mutable {
+       auto orig_laddr = mapping.get_key();
+       return remove(t, std::move(mapping)
+       ).si_then([&remaps, &t, &mappings, orig_laddr,
+                 this](auto pos) {
+         return seastar::do_with(
+           std::move(pos),
+           [this, &t, &remaps, orig_laddr, &mappings](auto &pos) {
+           return trans_intr::do_for_each(
+             remaps.begin(),
+             remaps.end(),
+             [&t, &pos, orig_laddr, &mappings, this]
+             (const auto &remap) mutable {
+             auto laddr = (orig_laddr + remap.offset).checked_to_laddr();
+             return this->reserve_region(
+               t,
+               std::move(pos),
+               laddr,
+               remap.len
+             ).si_then([&mappings](auto new_mapping) {
+               mappings.emplace_back(new_mapping);
+               return new_mapping.next();
+             }).si_then([&pos](auto new_mapping) {
+               pos = std::move(new_mapping);
+               return seastar::now();
+             });
+           });
+         });
+       }).si_then([&mappings] { return std::move(mappings); });
+      }).handle_error_interruptible(
+       remap_mappings_iertr::pass_further{},
+       crimson::ct_error::assert_all{
+         "remap_mappings hit invalid error"
+       }
+      );
+    } else {
+      return remap_pin<T, N>(
+       t, std::move(mapping), std::move(remaps));
+    }
+  }
+
+  /*
+   * punch_hole_in_mapping
+   *
+   * punch an lba hole inside a single mapping, this requires laddr~len
+   * is within the mapping.
+   *
+   * Return: the position for later inserts, e.g. the mapping next to
+   *        the hole
+   */
+  using punch_mappings_iertr = base_iertr;
+  using punch_mappings_ret = punch_mappings_iertr::future<LBAMapping>;
+  template <typename T>
+  punch_mappings_ret punch_hole_in_mapping(
+    Transaction &t,
+    laddr_t laddr,
+    objaddr_t aligned_len,
+    LBAMapping mapping)
+  {
+    LOG_PREFIX(TransactionManager::punch_hole_in_mapping);
+    SUBDEBUGT(seastore_tm, "{}~{} {}", t, laddr, aligned_len, mapping);
+    assert(!mapping.is_pending());
+    assert(laddr >= mapping.get_key() &&
+       laddr + aligned_len <= mapping.get_key() + mapping.get_length());
+    if (laddr > mapping.get_key()) {
+      if (laddr + aligned_len < mapping.get_key() + mapping.get_length()) {
+       auto offset1 = laddr.template get_byte_distance<
+         extent_len_t>(mapping.get_key());
+       auto offset2 = (laddr + aligned_len).template get_byte_distance<
+         extent_len_t>(mapping.get_key());
+       auto len2 = mapping.get_length() - offset2;
+       return remap_mappings<T, 2>(
+         t,
+         std::move(mapping),
+         std::array{
+           remap_entry_t{0, offset1},
+           remap_entry_t{offset2, len2}}
+       ).si_then([](auto ret) {
+         assert(ret.size() == 2);
+         return std::move(ret.back());
+       });
+      } else {
+       return cut_mapping<T>(t, laddr, std::move(mapping), true
+       ).si_then([](auto mapping) {
+         return mapping.next();
+       });
+      }
+    } else if (laddr + aligned_len < mapping.get_key() + mapping.get_length()) {
+      return cut_mapping<T>(
+       t, (laddr + aligned_len).checked_to_laddr(), std::move(mapping), false);
+    } else {
+      return remove(t, std::move(mapping)
+      ).handle_error_interruptible(
+       punch_mappings_iertr::pass_further{},
+       crimson::ct_error::assert_all{"impossible"}
+      );
+    }
+  }
+
+  /*
+   * cut_mapping
+   *
+   * remove the left/right part of the mapping
+   *
+   * Return: the remaining part of the mapping
+   */
+  using cut_mapping_iertr = punch_mappings_ret;
+  using cut_mapping_ret = punch_mappings_ret;
+  template <typename T>
+  cut_mapping_ret cut_mapping(
+    Transaction &t,
+    laddr_t pivot,
+    LBAMapping mapping,
+    bool keep_left)
+  {
+    LOG_PREFIX(TransactionManager::cut_mapping);
+    SUBDEBUGT(seastore_tm, "{} {} {}",
+      t, pivot, mapping, keep_left ? "LEFT" : "RIGHT");
+    assert(mapping.is_indirect() || mapping.is_data_stable());
+    assert(pivot > mapping.get_key() &&
+      pivot < mapping.get_key() + mapping.get_length());
+    auto offset = keep_left
+      ? 0
+      : pivot.template get_byte_distance<extent_len_t>(mapping.get_key());
+    auto len = keep_left
+      ? pivot.template get_byte_distance<
+       extent_len_t>(mapping.get_key())
+      : pivot.template get_byte_distance<
+       extent_len_t>(mapping.get_key() + mapping.get_length());
+    return remap_mappings<T, 1>(
+      t,
+      std::move(mapping),
+      std::array{remap_entry_t{offset, len}}
+    ).si_then([] (auto ret) {
+      assert(ret.size() == 1);
+      return std::move(ret.back());
+    });
+  }
+
+  /*
+   * remove_mappings_in_range
+   *
+   * remove the mappings that are completely inside the range start~unaligned_len
+   *
+   * Return: the mapping next to the right boundary of the range
+   */
+  punch_mappings_ret remove_mappings_in_range(
+    Transaction &t,
+    laddr_t start,
+    objaddr_t unaligned_len,
+    LBAMapping first_mapping)
+  {
+    LOG_PREFIX(TransactionManager::remove_mappings_in_range);
+    SUBDEBUGT(seastore_tm, "{}~{}, first_mapping: {}",
+      t, start, unaligned_len, first_mapping);
+    // remove all middle mappings
+    return seastar::do_with(
+      std::move(first_mapping),
+      [&t, this, start, unaligned_len](auto &mapping) {
+      return trans_intr::repeat([&t, this, start, unaligned_len, &mapping] {
+       if (mapping.is_end()) {
+         return punch_mappings_iertr::make_ready_future<
+           seastar::stop_iteration>(seastar::stop_iteration::yes);
+       }
+       assert(mapping.get_key() >= start);
+       auto mapping_end =
+         (mapping.get_key() + mapping.get_length()).checked_to_laddr();
+       if (mapping_end > start + unaligned_len) {
+         return punch_mappings_iertr::make_ready_future<
+           seastar::stop_iteration>(seastar::stop_iteration::yes);
+       }
+       return remove(t, std::move(mapping)
+       ).si_then([&mapping](auto next_mapping) {
+         mapping = std::move(next_mapping);
+         return seastar::stop_iteration::no;
+       }).handle_error_interruptible(
+         punch_mappings_iertr::pass_further{},
+         crimson::ct_error::assert_all{
+           "remove_mappings_in_range hit invalid error"
+         }
+       );
+      }).si_then([&mapping] {
+       return std::move(mapping);
+      });
+    });
+  }
+
   ~TransactionManager();
 
 private:
index 2ab65ac2a6e72f3105e1a1a7a2b8d99df204e782..a313f5ea5db42585c28822ea9bd4767d64584d8d 100644 (file)
@@ -452,12 +452,12 @@ struct object_data_handler_test_t:
 
   void write_right() {
     write(0, 128<<10, 'x');
-    write(64<<10, 60<<10, 'a');
+    write(64<<10, 64<<10, 'a');
   }
 
   void write_left() {
     write(0, 128<<10, 'x');
-    write(4<<10, 60<<10, 'a');
+    write(0, 64<<10, 'a');
   }
 
   void write_right_left() {
@@ -469,17 +469,11 @@ struct object_data_handler_test_t:
     write(0, 128<<10, 'x');
 
     auto t = create_mutate_transaction();
-    // normal split
     write(*t, 120<<10, 4<<10, 'a');
-    // not aligned right
     write(*t, 4<<10, 5<<10, 'b');
-    // split right extent of last split result
     write(*t, 32<<10, 4<<10, 'c');
-    // non aligned overwrite
     write(*t, 13<<10, 4<<10, 'd');
-
     write(*t, 64<<10, 32<<10, 'e');
-    // not split right
     write(*t, 60<<10, 8<<10, 'f');
 
     submit_transaction(std::move(t));
@@ -742,13 +736,13 @@ TEST_P(object_data_handler_test_t, multiple_remap) {
     disable_max_extent_size();
     multiple_write();
     auto pins = get_mappings(0, 128<<10);
-    EXPECT_EQ(pins.size(), 3);
+    EXPECT_EQ(pins.size(), 11);
 
-    size_t res[3] = {0, 120<<10, 124<<10};
+    size_t res[11] = {0, 4<<10, 12<<10, 20<<10, 32<<10, 36<<10, 60<<10, 64<<10, 96<<10, 120<<10, 124<<10};
     auto base = pins.front().get_key();
     int i = 0;
     for (auto &pin : pins) {
-      EXPECT_EQ(pin.get_key().get_byte_distance<size_t>(base), res[i]);
+      assert(pin.get_key().get_byte_distance<size_t>(base) == res[i]);
       i++;
     }
     read(0, 128<<10);
index d2deac07a91d9092e55ba33622c11c237f3b4fc6..6459a03fd01fa1f851d6473088b26531cfa8fee2 100644 (file)
@@ -1620,17 +1620,17 @@ struct transaction_manager_test_t :
                continue;
              }
               auto new_off = get_laddr_hint(off << 10)
-                 .get_byte_distance<extent_len_t>(last_pin.get_key());
-              auto new_len = last_pin.get_length() - new_off;
+                 .get_byte_distance<extent_len_t>(last_pin->get_key());
+              auto new_len = last_pin->get_length() - new_off;
               //always remap right extent at new split_point
-             auto pin = remap_pin(t, std::move(last_pin), new_off, new_len);
+             auto pin = remap_pin(t, std::move(*last_pin), new_off, new_len);
               if (!pin) {
                conflicted++;
                return;
              }
               last_pin = std::move(pin);
            }
-            auto last_ext = try_get_extent(t, last_pin.get_key());
+            auto last_ext = try_get_extent(t, last_pin->get_key());
             if (!last_ext) {
               conflicted++;
               return;