]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/ObjectDataHandler: handle block not alinged input
authorZhang Song <zhangsong02@qianxin.com>
Thu, 22 Aug 2024 13:17:40 +0000 (21:17 +0800)
committerZhang Song <zhangsong02@qianxin.com>
Fri, 23 Aug 2024 11:19:38 +0000 (19:19 +0800)
Signed-off-by: Zhang Song <zhangsong02@qianxin.com>
src/crimson/os/seastore/object_data_handler.cc
src/crimson/os/seastore/object_data_handler.h

index f27fb0b006f21fc787156e472f8ca4acc957dbea..8840c08124db1eb70f0c421ead23b08c68555fa4 100644 (file)
@@ -625,13 +625,16 @@ std::ostream& operator<<(
  * left_paddr                                  right_paddr
  */
 struct overwrite_plan_t {
-  // addresses
+  // reserved data base of object data
+  laddr_t data_base;
+
+  // addresses about extents
   laddr_t pin_begin;
   laddr_t pin_end;
   paddr_t left_paddr;
   paddr_t right_paddr;
-  laddr_t data_begin;
-  laddr_t data_end;
+  laddr_offset_t data_begin;
+  laddr_offset_t data_end;
   laddr_t aligned_data_begin;
   laddr_t aligned_data_end;
 
@@ -681,7 +684,8 @@ public:
     std::ostream& out,
     const overwrite_plan_t& overwrite_plan) {
     return out << "overwrite_plan_t("
-              << "pin_begin=" << overwrite_plan.pin_begin
+              << "data_base=" << overwrite_plan.data_base
+              << ", pin_begin=" << overwrite_plan.pin_begin
               << ", pin_end=" << overwrite_plan.pin_end
               << ", left_paddr=" << overwrite_plan.left_paddr
               << ", right_paddr=" << overwrite_plan.right_paddr
@@ -697,18 +701,20 @@ public:
               << ")";
   }
 
-  overwrite_plan_t(laddr_t offset,
+  overwrite_plan_t(laddr_t data_base,
+                  objaddr_t offset,
                   extent_len_t len,
                   const lba_pin_list_t& pins,
                   extent_len_t block_size) :
+      data_base(data_base),
       pin_begin(pins.front()->get_key()),
       pin_end(pins.back()->get_key() + pins.back()->get_length()),
       left_paddr(pins.front()->get_val()),
       right_paddr(pins.back()->get_val()),
-      data_begin(offset),
-      data_end(offset + len),
-      aligned_data_begin(data_begin.get_aligned_laddr(block_size)),
-      aligned_data_end(data_end.get_roundup_laddr(block_size)),
+      data_begin(data_base + offset),
+      data_end(data_base + offset + len),
+      aligned_data_begin(data_begin.get_aligned_laddr()),
+      aligned_data_end(data_end.get_roundup_laddr()),
       left_operation(overwrite_operation_t::UNKNOWN),
       right_operation(overwrite_operation_t::UNKNOWN),
       block_size(block_size),
@@ -1076,14 +1082,14 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
     extent_to_write_list_t(),
     [ctx, size, &object_data, this](auto &pins, auto &to_write) {
       LOG_PREFIX(ObjectDataHandler::trim_data_reservation);
-      DEBUGT("object_data: {}~{}",
-            ctx.t,
-            object_data.get_reserved_data_base(),
-            object_data.get_reserved_data_len());
+      auto data_base = object_data.get_reserved_data_base();
+      auto data_len = object_data.get_reserved_data_len();
+      DEBUGT("object_data: {}~{}", ctx.t, data_base, data_len);
+      laddr_t aligned_start = (data_base + size).get_aligned_laddr();
+      loffset_t aligned_length =
+         data_len - aligned_start.get_byte_distance<loffset_t>(data_base);
       return ctx.tm.get_pins(
-       ctx.t,
-       object_data.get_reserved_data_base() + size,
-       object_data.get_reserved_data_len() - size
+       ctx.t, aligned_start, aligned_length
       ).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
        _pins.swap(pins);
        ceph_assert(pins.size());
@@ -1181,12 +1187,13 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
  * optionally on the right.
  */
 extent_to_write_list_t get_to_writes_with_zero_buffer(
+  laddr_t data_base,
   const extent_len_t block_size,
-  laddr_t offset, extent_len_t len,
+  objaddr_t offset, extent_len_t len,
   std::optional<bufferptr> &&headptr, std::optional<bufferptr> &&tailptr)
 {
-  auto zero_left = offset.get_roundup_laddr(block_size);
-  auto zero_right = (offset + len).get_aligned_laddr(block_size);
+  auto zero_left = p2roundup(offset, (objaddr_t)block_size);
+  auto zero_right = p2align(offset + len, (objaddr_t)block_size);
   auto left = headptr ? (offset - headptr->length()) : offset;
   auto right = tailptr ?
     (offset + len + tailptr->length()) :
@@ -1218,7 +1225,8 @@ extent_to_write_list_t get_to_writes_with_zero_buffer(
     assert(bl.length() % block_size == 0);
     assert(bl.length() == (right - left));
     extent_to_write_list_t ret;
-    ret.push_back(extent_to_write_t::create_data(left, bl));
+    ret.push_back(extent_to_write_t::create_data(
+      (data_base + left).checked_to_laddr(), bl));
     return ret;
   } else {
     // reserved section between ends, headptr and tailptr in different extents
@@ -1229,10 +1237,13 @@ extent_to_write_list_t get_to_writes_with_zero_buffer(
       headbl.append_zero(zero_left - left - headbl.length());
       assert(headbl.length() % block_size == 0);
       assert(headbl.length() > 0);
-      ret.push_back(extent_to_write_t::create_data(left, headbl));
+      ret.push_back(extent_to_write_t::create_data(
+                     (data_base + left).checked_to_laddr(), headbl));
     }
     // reserved zero region
-    ret.push_back(extent_to_write_t::create_zero(zero_left, zero_right - zero_left));
+    ret.push_back(extent_to_write_t::create_zero(
+      (data_base + zero_left).checked_to_laddr(),
+      zero_right - zero_left));
     assert(ret.back().len % block_size == 0);
     assert(ret.back().len > 0);
     if (tailptr) {
@@ -1241,7 +1252,8 @@ extent_to_write_list_t get_to_writes_with_zero_buffer(
       tailbl.append_zero(right - zero_right - tailbl.length());
       assert(tailbl.length() % block_size == 0);
       assert(tailbl.length() > 0);
-      ret.push_back(extent_to_write_t::create_data(zero_right, tailbl));
+      ret.push_back(extent_to_write_t::create_data(
+        (data_base + zero_right).checked_to_laddr(), tailbl));
     }
     return ret;
   }
@@ -1263,7 +1275,8 @@ extent_to_write_list_t get_to_writes(laddr_t offset, bufferlist &bl)
 
 ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
   context_t ctx,
-  laddr_t offset,
+  laddr_t data_base,
+  objaddr_t offset,
   extent_len_t len,
   std::optional<bufferlist> &&bl,
   lba_pin_list_t &&_pins)
@@ -1271,11 +1284,11 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
   if (bl.has_value()) {
     assert(bl->length() == len);
   }
-  overwrite_plan_t overwrite_plan(offset, len, _pins, ctx.tm.get_block_size());
+  overwrite_plan_t overwrite_plan(data_base, offset, len, _pins, ctx.tm.get_block_size());
   return seastar::do_with(
     std::move(_pins),
     extent_to_write_list_t(),
-    [ctx, len, offset, overwrite_plan, bl=std::move(bl), this]
+    [ctx, data_base, len, offset, overwrite_plan, bl=std::move(bl), this]
     (auto &pins, auto &to_write) mutable
   {
     LOG_PREFIX(ObjectDataHandler::overwrite);
@@ -1290,7 +1303,7 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
       ctx,
       pins.front(),
       overwrite_plan
-    ).si_then([ctx, len, offset, overwrite_plan, bl=std::move(bl),
+    ).si_then([ctx, data_base, len, offset, overwrite_plan, bl=std::move(bl),
                &to_write, &pins, this](auto p) mutable {
       auto &[left_extent, headptr] = p;
       if (left_extent) {
@@ -1304,7 +1317,7 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
         ctx,
         pins.back(),
         overwrite_plan
-      ).si_then([ctx, len, offset,
+      ).si_then([ctx, data_base, len, offset,
                  pin_begin=overwrite_plan.pin_begin,
                  pin_end=overwrite_plan.pin_end,
                  bl=std::move(bl), headptr=std::move(headptr),
@@ -1324,11 +1337,12 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
           }
           splice_extent_to_write(
             to_write,
-            get_to_writes(write_offset, write_bl));
+            get_to_writes((data_base + write_offset).checked_to_laddr(), write_bl));
         } else {
           splice_extent_to_write(
             to_write,
             get_to_writes_with_zero_buffer(
+             data_base,
               ctx.tm.get_block_size(),
               offset,
               len,
@@ -1380,14 +1394,20 @@ ObjectDataHandler::zero_ret ObjectDataHandler::zero(
        object_data,
        p2roundup(offset + len, ctx.tm.get_block_size())
       ).si_then([this, ctx, offset, len, &object_data] {
-       auto logical_offset = object_data.get_reserved_data_base() + offset;
+       auto data_base = object_data.get_reserved_data_base();
+       laddr_offset_t l_start = data_base + offset;
+       laddr_offset_t l_end = l_start + len;
+       laddr_t aligned_start = l_start.get_aligned_laddr();
+       loffset_t aligned_length =
+           l_end.get_roundup_laddr().get_byte_distance<
+             loffset_t>(aligned_start);
        return ctx.tm.get_pins(
          ctx.t,
-         logical_offset,
-         len
-       ).si_then([this, ctx, logical_offset, len](auto pins) {
+         aligned_start,
+         aligned_length
+       ).si_then([this, ctx, data_base, offset, len](auto pins) {
          return overwrite(
-           ctx, logical_offset, len,
+           ctx, data_base, offset, len,
            std::nullopt, std::move(pins));
        });
       });
@@ -1415,15 +1435,21 @@ ObjectDataHandler::write_ret ObjectDataHandler::write(
        object_data,
        p2roundup(offset + bl.length(), ctx.tm.get_block_size())
       ).si_then([this, ctx, offset, &object_data, &bl] {
-       auto logical_offset = object_data.get_reserved_data_base() + offset;
+       auto data_base = object_data.get_reserved_data_base();
+       laddr_offset_t l_start = data_base + offset;
+       laddr_offset_t l_end = l_start + bl.length();
+       laddr_t aligned_start = l_start.get_aligned_laddr();
+       loffset_t aligned_length =
+           l_end.get_roundup_laddr().get_byte_distance<
+             loffset_t>(aligned_start);
        return ctx.tm.get_pins(
          ctx.t,
-         logical_offset,
-         bl.length()
-       ).si_then([this, ctx,logical_offset, &bl](
+         aligned_start,
+         aligned_length
+       ).si_then([this, ctx, offset, data_base, &bl](
                   auto pins) {
          return overwrite(
-           ctx, logical_offset, bl.length(),
+           ctx, data_base, offset, bl.length(),
            bufferlist(bl), std::move(pins));
        });
       });
@@ -1451,17 +1477,21 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
       ceph_assert(!object_data.is_null());
       ceph_assert((obj_offset + len) <= object_data.get_reserved_data_len());
       ceph_assert(len > 0);
-      laddr_t l_start =
+      laddr_offset_t l_start =
         object_data.get_reserved_data_base() + obj_offset;
+      laddr_offset_t l_end = l_start + len;
+      laddr_t aligned_start = l_start.get_aligned_laddr();
+      loffset_t aligned_length =
+         l_end.get_roundup_laddr().get_byte_distance<
+           loffset_t>(aligned_start);
       return ctx.tm.get_pins(
         ctx.t,
-        l_start,
-        len
-      ).si_then([FNAME, ctx, l_start, len, &ret](auto _pins) {
+       aligned_start,
+       aligned_length
+      ).si_then([FNAME, ctx, l_start, l_end, &ret](auto _pins) {
         // offset~len falls within reserved region and len > 0
         ceph_assert(_pins.size() >= 1);
         ceph_assert((*_pins.begin())->get_key() <= l_start);
-        auto l_end = l_start + len;
         return seastar::do_with(
           std::move(_pins),
           l_start,
@@ -1480,9 +1510,9 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
             ceph_assert(l_current < l_end);
             auto pin_len = pin->get_length();
             assert(pin_len > 0);
-            laddr_t l_pin_end = pin_key + pin_len;
+            laddr_offset_t l_pin_end = pin_key + pin_len;
             ceph_assert(l_current < l_pin_end);
-            laddr_t l_current_end = std::min(l_pin_end, l_end);
+            laddr_offset_t l_current_end = std::min(l_pin_end, l_end);
             if (pin->get_val().is_zero()) {
               DEBUGT("got {}~{} from zero-pin {}~{}",
                 ctx.t,
@@ -1584,19 +1614,24 @@ ObjectDataHandler::fiemap_ret ObjectDataHandler::fiemap(
       ceph_assert(!object_data.is_null());
       ceph_assert((obj_offset + len) <= object_data.get_reserved_data_len());
       ceph_assert(len > 0);
-      laddr_t l_start =
+      laddr_offset_t l_start =
         object_data.get_reserved_data_base() + obj_offset;
+      laddr_offset_t l_end = l_start + len;
+      laddr_t aligned_start = l_start.get_aligned_laddr();
+      loffset_t aligned_length =
+         l_end.get_roundup_laddr().get_byte_distance<
+           loffset_t>(aligned_start);
       return ctx.tm.get_pins(
         ctx.t,
-        l_start,
-        len
+       aligned_start,
+       aligned_length
       ).si_then([l_start, len, &object_data, &ret](auto &&pins) {
        ceph_assert(pins.size() >= 1);
         ceph_assert((*pins.begin())->get_key() <= l_start);
        for (auto &&i: pins) {
          if (!(i->get_val().is_zero())) {
-           auto ret_left = std::max(i->get_key(), l_start);
-           auto ret_right = std::min(
+           laddr_offset_t ret_left = std::max(laddr_offset_t(i->get_key(), 0), l_start);
+           laddr_offset_t ret_right = std::min(
              i->get_key() + i->get_length(),
              l_start + len);
            assert(ret_right > ret_left);
index 8dc52701a29b34366430229aaac434ac1c9283e4..50be20b3706d4b206d6688cf484a7004159b146e 100644 (file)
@@ -229,7 +229,8 @@ private:
   /// Updates region [_offset, _offset + bl.length) to bl
   write_ret overwrite(
     context_t ctx,        ///< [in] ctx
-    laddr_t offset,       ///< [in] write offset
+    laddr_t data_base,    ///< [in] data base laddr
+    objaddr_t offset,     ///< [in] write offset
     extent_len_t len,     ///< [in] len to write, len == bl->length() if bl
     std::optional<bufferlist> &&bl, ///< [in] buffer to write, empty for zeros
     lba_pin_list_t &&pins ///< [in] set of pins overlapping above region