]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: convert ObjectDataHandler with interruptible-future
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 3 Aug 2021 08:41:26 +0000 (16:41 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 10 Aug 2021 02:20:38 +0000 (10:20 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/object_data_handler.cc
src/crimson/os/seastore/object_data_handler.h
src/crimson/os/seastore/seastore.cc
src/test/crimson/seastore/test_object_data_handler.cc

index 25bf78661dbdfccc8310124fe7aaa1bb1a0c2676..185c56dd6f46eead6e44bca563474057061ff1bd 100644 (file)
@@ -28,7 +28,7 @@ static constexpr extent_len_t MAX_OBJECT_SIZE = 16<<20;
 #define assert_aligned(x) ceph_assert(((x)%ctx.tm.get_block_size()) == 0)
 
 using context_t = ObjectDataHandler::context_t;
-using get_ertr = ObjectDataHandler::write_ertr;
+using get_iertr = ObjectDataHandler::write_iertr;
 
 auto read_pin(
   context_t ctx,
@@ -36,8 +36,8 @@ auto read_pin(
   return ctx.tm.pin_to_extent<ObjectDataBlock>(
     ctx.t,
     std::move(pin)
-  ).handle_error(
-    get_ertr::pass_further{},
+  ).handle_error_interruptible(
+    get_iertr::pass_further{},
     crimson::ct_error::assert_all{ "read_pin: invalid error" }
   );
 }
@@ -71,16 +71,15 @@ ObjectDataHandler::write_ret do_removals(
   context_t ctx,
   lba_pin_list_t &pins)
 {
-  return crimson::do_for_each(
-    pins.begin(),
-    pins.end(),
+  return trans_intr::do_for_each(
+    pins,
     [ctx](auto &pin) {
       return ctx.tm.dec_ref(
        ctx.t,
        pin->get_laddr()
-      ).safe_then(
+      ).si_then(
        [](auto){},
-       ObjectDataHandler::write_ertr::pass_further{},
+       ObjectDataHandler::write_iertr::pass_further{},
        crimson::ct_error::assert_all{
          "object_data_handler::do_removals invalid error"
        }
@@ -93,9 +92,8 @@ ObjectDataHandler::write_ret do_insertions(
   context_t ctx,
   extent_to_write_list_t &to_write)
 {
-  return crimson::do_for_each(
-    to_write.begin(),
-    to_write.end(),
+  return trans_intr::do_for_each(
+    to_write,
     [ctx](auto &region) {
       if (region.to_write) {
        assert_aligned(region.addr);
@@ -105,7 +103,7 @@ ObjectDataHandler::write_ret do_insertions(
          ctx.t,
          region.addr,
          region.len
-       ).safe_then([&region](auto extent) {
+       ).si_then([&region](auto extent) {
          if (extent->get_laddr() != region.addr) {
            logger().debug(
              "object_data_handler::do_insertions alloc got addr {},"
@@ -117,17 +115,17 @@ ObjectDataHandler::write_ret do_insertions(
          ceph_assert(extent->get_length() == region.len);
          auto iter = region.to_write->cbegin();
          iter.copy(region.len, extent->get_bptr().c_str());
-         return ObjectDataHandler::write_ertr::now();
+         return ObjectDataHandler::write_iertr::now();
        });
       } else {
        return ctx.tm.reserve_region(
          ctx.t,
          region.addr,
          region.len
-       ).safe_then([&region](auto pin) {
+       ).si_then([&region](auto pin) {
          ceph_assert(pin->get_length() == region.len);
          ceph_assert(pin->get_laddr() == region.addr);
-         return ObjectDataHandler::write_ertr::now();
+         return ObjectDataHandler::write_iertr::now();
        });
       }
     });
@@ -143,7 +141,7 @@ ObjectDataHandler::write_ret do_insertions(
 using split_ret_bare = std::pair<
   std::optional<extent_to_write_t>,
   std::optional<bufferptr>>;
-using split_ret = get_ertr::future<split_ret_bare>;
+using split_ret = get_iertr::future<split_ret_bare>;
 split_ret split_pin_left(context_t ctx, LBAPinRef &pin, laddr_t offset)
 {
   const auto pin_offset = pin->get_laddr();
@@ -151,7 +149,7 @@ split_ret split_pin_left(context_t ctx, LBAPinRef &pin, laddr_t offset)
   ceph_assert(offset >= pin_offset);
   if (offset == pin_offset) {
     // Aligned, no tail and no extra extent
-    return get_ertr::make_ready_future<split_ret_bare>(
+    return get_iertr::make_ready_future<split_ret_bare>(
       std::nullopt,
       std::nullopt);
   } else if (pin->get_paddr().is_zero()) {
@@ -163,7 +161,7 @@ split_ret split_pin_left(context_t ctx, LBAPinRef &pin, laddr_t offset)
     auto zero_extent_len = aligned_offset - pin_offset;
     assert_aligned(zero_extent_len);
     auto zero_prepend_len = offset - aligned_offset;
-    return get_ertr::make_ready_future<split_ret_bare>(
+    return get_iertr::make_ready_future<split_ret_bare>(
       (zero_extent_len == 0
        ? std::nullopt
        : std::make_optional(extent_to_write_t(pin_offset, zero_extent_len))),
@@ -173,8 +171,8 @@ split_ret split_pin_left(context_t ctx, LBAPinRef &pin, laddr_t offset)
     // Data, return up to offset to prepend
     auto to_prepend = offset - pin->get_laddr();
     return read_pin(ctx, pin->duplicate()
-    ).safe_then([to_prepend](auto extent) {
-      return get_ertr::make_ready_future<split_ret_bare>(
+    ).si_then([to_prepend](auto extent) {
+      return get_iertr::make_ready_future<split_ret_bare>(
        std::nullopt,
        bufferptr(extent->get_bptr(), 0, to_prepend));
     });
@@ -189,7 +187,7 @@ split_ret split_pin_right(context_t ctx, LBAPinRef &pin, laddr_t end)
   assert_aligned(pin_end);
   ceph_assert(pin_end >= end);
   if (end == pin_end) {
-    return get_ertr::make_ready_future<split_ret_bare>(
+    return get_iertr::make_ready_future<split_ret_bare>(
       std::nullopt,
       std::nullopt);
   } else if (pin->get_paddr().is_zero()) {
@@ -199,7 +197,7 @@ split_ret split_pin_right(context_t ctx, LBAPinRef &pin, laddr_t end)
     auto zero_suffix_len = aligned_end - end;
     auto zero_extent_len = pin_end - aligned_end;
     assert_aligned(zero_extent_len);
-    return get_ertr::make_ready_future<split_ret_bare>(
+    return get_iertr::make_ready_future<split_ret_bare>(
       (zero_extent_len == 0
        ? std::nullopt
        : std::make_optional(extent_to_write_t(aligned_end, zero_extent_len))),
@@ -207,8 +205,8 @@ split_ret split_pin_right(context_t ctx, LBAPinRef &pin, laddr_t end)
     );
   } else {
     return read_pin(ctx, pin->duplicate()
-    ).safe_then([end, pin_begin, pin_end](auto extent) {
-      return get_ertr::make_ready_future<split_ret_bare>(
+    ).si_then([end, pin_begin, pin_end](auto extent) {
+      return get_iertr::make_ready_future<split_ret_bare>(
        std::nullopt,
        bufferptr(
          extent->get_bptr(),
@@ -228,7 +226,7 @@ auto with_object_data(
     std::forward<F>(f),
     [ctx](auto &object_data, auto &f) {
       return std::invoke(f, object_data
-      ).safe_then([ctx, &object_data] {
+      ).si_then([ctx, &object_data] {
        if (object_data.must_update()) {
          ctx.onode.get_mutable_layout(ctx.t).object_data.update(object_data);
        }
@@ -245,18 +243,18 @@ ObjectDataHandler::write_ret ObjectDataHandler::prepare_data_reservation(
   ceph_assert(size <= MAX_OBJECT_SIZE);
   if (!object_data.is_null()) {
     ceph_assert(object_data.get_reserved_data_len() == MAX_OBJECT_SIZE);
-    return write_ertr::now();
+    return write_iertr::now();
   } else {
     return ctx.tm.reserve_region(
       ctx.t,
       0 /* TODO -- pass hint based on object hash */,
       MAX_OBJECT_SIZE
-    ).safe_then([&object_data](auto pin) {
+    ).si_then([&object_data](auto pin) {
       ceph_assert(pin->get_length() == MAX_OBJECT_SIZE);
       object_data.update_reserved(
        pin->get_laddr(),
        pin->get_length());
-      return write_ertr::now();
+      return write_iertr::now();
     });
   }
 }
@@ -275,7 +273,7 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
        ctx.t,
        object_data.get_reserved_data_base() + size,
        object_data.get_reserved_data_len() - size
-      ).safe_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
+      ).si_then([ctx, size, &pins, &object_data, &to_write](auto _pins) {
        _pins.swap(pins);
        ceph_assert(pins.size());
        auto &pin = *pins.front();
@@ -288,13 +286,13 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
          to_write.emplace_back(
            pin.get_laddr(),
            object_data.get_reserved_data_len() - pin_offset);
-         return clear_ertr::now();
+         return clear_iertr::now();
        } else {
          return read_pin(
            ctx,
            pin.duplicate()
-         ).safe_then([size, pin_offset, &pin, &object_data, &to_write](
-                       auto extent) {
+         ).si_then([size, pin_offset, &pin, &object_data, &to_write](
+                    auto extent) {
            bufferlist bl;
            bl.append(
              bufferptr(
@@ -308,18 +306,18 @@ ObjectDataHandler::clear_ret ObjectDataHandler::trim_data_reservation(
            to_write.emplace_back(
              object_data.get_reserved_data_base() + size,
              object_data.get_reserved_data_len() - size);
-           return clear_ertr::now();
+           return clear_iertr::now();
          });
        }
-      }).safe_then([ctx, &pins] {
+      }).si_then([ctx, &pins] {
        return do_removals(ctx, pins);
-      }).safe_then([ctx, &to_write] {
+      }).si_then([ctx, &to_write] {
        return do_insertions(ctx, to_write);
-      }).safe_then([size, &object_data] {
+      }).si_then([size, &object_data] {
        if (size == 0) {
          object_data.clear();
        }
-       return ObjectDataHandler::clear_ertr::now();
+       return ObjectDataHandler::clear_iertr::now();
       });
     });
 }
@@ -360,8 +358,8 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
        ctx,
        pins.front(),
        offset
-      ).safe_then([ctx, pin_begin, &offset, &bl, &pins, &to_write](
-                   auto p) {
+      ).si_then([ctx, pin_begin, &offset, &bl, &pins, &to_write](
+                auto p) {
        auto &[left_extent, headptr] = p;
        if (left_extent) {
          ceph_assert(left_extent->addr == pin_begin);
@@ -379,8 +377,8 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
          ctx,
          pins.back(),
          offset + bl.length());
-      }).safe_then([ctx, pin_end, &offset, &bl, &to_write](
-                    auto p) {
+      }).si_then([ctx, pin_end, &offset, &bl, &to_write](
+                 auto p) {
        auto &[right_extent, tailptr] = p;
        if (tailptr) {
          bl.append(*tailptr);
@@ -391,10 +389,10 @@ ObjectDataHandler::write_ret ObjectDataHandler::overwrite(
          ceph_assert((right_extent->addr  + right_extent->len) == pin_end);
          to_write.push_back(std::move(*right_extent));
        }
-       return write_ertr::now();
-      }).safe_then([ctx, &pins] {
+       return write_iertr::now();
+      }).si_then([ctx, &pins] {
        return do_removals(ctx, pins);
-      }).safe_then([ctx, &to_write] {
+      }).si_then([ctx, &to_write] {
        return do_insertions(ctx, to_write);
       });
     });
@@ -412,14 +410,14 @@ ObjectDataHandler::write_ret ObjectDataHandler::write(
        ctx,
        object_data,
        p2roundup(offset + bl.length(), ctx.tm.get_block_size())
-      ).safe_then([this, ctx, offset, &object_data, &bl] {
+      ).si_then([this, ctx, offset, &object_data, &bl] {
        auto logical_offset = object_data.get_reserved_data_base() + offset;
        return ctx.tm.get_pins(
          ctx.t,
          logical_offset,
          bl.length()
-       ).safe_then([this, ctx,logical_offset, &bl](
-                     auto pins) {
+       ).si_then([this, ctx,logical_offset, &bl](
+                  auto pins) {
          return overwrite(ctx, logical_offset, bufferlist(bl), std::move(pins));
        });
       });
@@ -448,7 +446,7 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
            ctx.t,
            loffset,
            len
-         ).safe_then([ctx, loffset, len, &ret](auto _pins) {
+         ).si_then([ctx, loffset, len, &ret](auto _pins) {
            // offset~len falls within reserved region and len > 0
            ceph_assert(_pins.size() >= 1);
            ceph_assert((*_pins.begin())->get_laddr() <= loffset);
@@ -456,11 +454,10 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
              std::move(_pins),
              loffset,
              [ctx, loffset, len, &ret](auto &pins, auto &current) {
-               return crimson::do_for_each(
-                 std::begin(pins),
-                 std::end(pins),
+               return trans_intr::do_for_each(
+                 pins,
                  [ctx, loffset, len, &current, &ret](auto &pin)
-                 -> read_ertr::future<> {
+                 -> read_iertr::future<> {
                    ceph_assert(current <= (loffset + len));
                    ceph_assert(
                      (loffset + len) > pin->get_laddr());
@@ -476,7 +473,7 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
                      return ctx.tm.pin_to_extent<ObjectDataBlock>(
                        ctx.t,
                        std::move(pin)
-                     ).safe_then([&ret, &current, end](auto extent) {
+                     ).si_then([&ret, &current, end](auto extent) {
                        ceph_assert(
                          (extent->get_laddr() + extent->get_length()) >= end);
                        ceph_assert(end > current);
@@ -487,8 +484,8 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
                            end - current));
                        current = end;
                        return seastar::now();
-                     }).handle_error(
-                       read_ertr::pass_further{},
+                     }).handle_error_interruptible(
+                       read_iertr::pass_further{},
                        crimson::ct_error::assert_all{
                          "ObjectDataHandler::read hit invalid error"
                        }
@@ -497,7 +494,7 @@ ObjectDataHandler::read_ret ObjectDataHandler::read(
                  });
              });
          });
-       }).safe_then([&ret] {
+       }).si_then([&ret] {
          return std::move(ret);
        });
     });
@@ -518,7 +515,7 @@ ObjectDataHandler::truncate_ret ObjectDataHandler::truncate(
          object_data,
          offset);
       } else {
-       return truncate_ertr::now();
+       return truncate_iertr::now();
       }
     });
 }
index 20efd979824e94b920c0436a9a7fd4bf78b6c0ee..677094a25dbd0ef9bff811846196747961ea07d2 100644 (file)
@@ -48,40 +48,40 @@ using ObjectDataBlockRef = TCachedExtentRef<ObjectDataBlock>;
 
 class ObjectDataHandler {
 public:
-  using base_ertr = with_trans_ertr<TransactionManager::base_iertr>;
+  using base_iertr = TransactionManager::base_iertr;
 
   struct context_t {
-    InterruptedTransactionManager tm;
+    TransactionManager &tm;
     Transaction &t;
     Onode &onode;
   };
 
   /// Writes bl to [offset, offset + bl.length())
-  using write_ertr = base_ertr;
-  using write_ret = write_ertr::future<>;
+  using write_iertr = base_iertr;
+  using write_ret = write_iertr::future<>;
   write_ret write(
     context_t ctx,
     objaddr_t offset,
     const bufferlist &bl);
 
   /// Reads data in [offset, offset + len)
-  using read_ertr = base_ertr;
-  using read_ret = read_ertr::future<bufferlist>;
+  using read_iertr = base_iertr;
+  using read_ret = read_iertr::future<bufferlist>;
   read_ret read(
     context_t ctx,
     objaddr_t offset,
     extent_len_t len);
 
   /// Clears data past offset
-  using truncate_ertr = base_ertr;
-  using truncate_ret = truncate_ertr::future<>;
+  using truncate_iertr = base_iertr;
+  using truncate_ret = truncate_iertr::future<>;
   truncate_ret truncate(
     context_t ctx,
     objaddr_t offset);
 
   /// Clears data and reservation
-  using clear_ertr = base_ertr;
-  using clear_ret = clear_ertr::future<>;
+  using clear_iertr = base_iertr;
+  using clear_ret = clear_iertr::future<>;
   clear_ret clear(context_t ctx);
 
 private:
index ad0df8d0e53b6d7d5a38a37db6c333f9522ada51..44c599b6e809e617a47068c80b4dde6949def097 100644 (file)
@@ -257,7 +257,8 @@ SeaStore::read_errorator::future<ceph::bufferlist> SeaStore::read(
     oid,
     Transaction::src_t::READ,
     op_type_t::READ,
-    [=](auto &t, auto &onode) -> ObjectDataHandler::read_ret {
+    [=](auto &t, auto &onode)
+        -> with_trans_ertr<ObjectDataHandler::read_iertr>::future<bufferlist> {
       size_t size = onode.get_layout().size;
 
       if (offset >= size) {
@@ -268,14 +269,16 @@ SeaStore::read_errorator::future<ceph::bufferlist> SeaStore::read(
        size - offset :
        std::min(size - offset, len);
 
-      return ObjectDataHandler().read(
-       ObjectDataHandler::context_t{
-         *transaction_manager,
-         t,
-         onode,
-       },
-       offset,
-       corrected_len);
+      return with_trans_intr(t, [&](auto &t) {
+        return ObjectDataHandler().read(
+          ObjectDataHandler::context_t{
+            transaction_manager->get_tm(),
+            t,
+            onode,
+          },
+          offset,
+          corrected_len);
+      });
     });
 }
 
@@ -884,14 +887,16 @@ SeaStore::tm_ret SeaStore::_write(
   return seastar::do_with(
     std::move(_bl),
     [=, &ctx, &onode](auto &bl) {
-      return ObjectDataHandler().write(
-       ObjectDataHandler::context_t{
-         *transaction_manager,
-         *ctx.transaction,
-         *onode,
-       },
-       offset,
-       bl);
+      return with_trans_intr(*ctx.transaction, [&](auto &t) {
+        return ObjectDataHandler().write(
+          ObjectDataHandler::context_t{
+            transaction_manager->get_tm(),
+            t,
+            *onode,
+          },
+          offset,
+          bl);
+      });
     });
 }
 
@@ -1017,13 +1022,15 @@ SeaStore::tm_ret SeaStore::_truncate(
   LOG_PREFIX(SeaStore::_truncate);
   DEBUGT("onode={} size={}", *ctx.transaction, *onode, size);
   onode->get_mutable_layout(*ctx.transaction).size = size;
-  return ObjectDataHandler().truncate(
-    ObjectDataHandler::context_t{
-      *transaction_manager,
-      *ctx.transaction,
-      *onode
-    },
-    size);
+  return with_trans_intr(*ctx.transaction, [&](auto &t) {
+    return ObjectDataHandler().truncate(
+      ObjectDataHandler::context_t{
+        transaction_manager->get_tm(),
+        t,
+        *onode
+      },
+      size);
+  });
 }
 
 SeaStore::tm_ret SeaStore::_setattrs(
index 2631ebb1cbe7038f1e3ec4a10b6170bad47d6f4c..f30cd7a9ff2a3db093a09ff01f187be02719b029 100644 (file)
@@ -56,14 +56,16 @@ struct object_data_handler_test_t:
        known_contents,
        offset,
        len));
-    return ObjectDataHandler().write(
-      ObjectDataHandler::context_t{
-       itm,
-       t,
-       *onode,
-      },
-      offset,
-      bl).unsafe_get0();
+    with_trans_intr(t, [&](auto &t) {
+      return ObjectDataHandler().write(
+        ObjectDataHandler::context_t{
+          *tm,
+          t,
+          *onode,
+        },
+        offset,
+        bl);
+    }).unsafe_get0();
   }
   void write(objaddr_t offset, extent_len_t len, char fill) {
     auto t = create_mutate_transaction();
@@ -77,13 +79,15 @@ struct object_data_handler_test_t:
        known_contents.c_str() + offset,
        0,
        size - offset);
-      ObjectDataHandler().truncate(
-       ObjectDataHandler::context_t{
-         itm,
-         t,
-         *onode
-       },
-       offset).unsafe_get0();
+      with_trans_intr(t, [&](auto &t) {
+        return ObjectDataHandler().truncate(
+          ObjectDataHandler::context_t{
+            *tm,
+            t,
+            *onode
+          },
+          offset);
+      }).unsafe_get0();
     }
     size = offset;
   }
@@ -94,14 +98,16 @@ struct object_data_handler_test_t:
   }
 
   void read(Transaction &t, objaddr_t offset, extent_len_t len) {
-    bufferlist bl = ObjectDataHandler().read(
-      ObjectDataHandler::context_t{
-       itm,
-       t,
-       *onode
-      },
-      offset,
-      len).unsafe_get0();
+    bufferlist bl = with_trans_intr(t, [&](auto &t) {
+      return ObjectDataHandler().read(
+        ObjectDataHandler::context_t{
+          *tm,
+          t,
+          *onode
+        },
+        offset,
+        len);
+    }).unsafe_get0();
     bufferlist known;
     known.append(
       bufferptr(