]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: rework transaction loop
authorSamuel Just <sjust@redhat.com>
Tue, 26 Jan 2021 00:15:04 +0000 (16:15 -0800)
committerSamuel Just <sjust@redhat.com>
Wed, 24 Feb 2021 02:43:39 +0000 (18:43 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore.h

index 601ce429e445925c2f3a91bcd8aa161c218bc366..d1c60c49c042b4ce4c31d15b631c9978af1dfc65 100644 (file)
@@ -197,75 +197,58 @@ seastar::future<std::map<uint64_t, uint64_t>> SeaStore::fiemap(
   return seastar::make_ready_future<std::map<uint64_t, uint64_t>>();
 }
 
+void SeaStore::on_error(ceph::os::Transaction &t) {
+  logger().error(" transaction dump:\n");
+  JSONFormatter f(true);
+  f.open_object_section("transaction");
+  t.dump(&f);
+  f.close_section();
+  std::stringstream str;
+  f.flush(str);
+  logger().error("{}", str.str());
+  abort();
+}
+
 seastar::future<> SeaStore::do_transaction(
   CollectionRef _ch,
   ceph::os::Transaction&& _t)
 {
-  return seastar::do_with(
-    _t.begin(),
-    transaction_manager.create_transaction(),
-    std::vector<OnodeRef>(),
+  return repeat_with_internal_context(
+    _ch,
     std::move(_t),
-    std::move(_ch),
-    [this](auto &iter, auto &trans, auto &onodes, auto &t, auto &ch) {
+    [this](auto &ctx) {
       return onode_manager->get_or_create_onodes(
-       *trans, iter.get_objects()).safe_then(
-         [this, &iter, &trans, &onodes, &t, &ch](auto &&read_onodes) {
-           onodes = std::move(read_onodes);
-           return seastar::do_until(
-             [&iter]() { return iter.have_op(); },
-             [this, &iter, &trans, &onodes, &t, &ch]() {
-               return _do_transaction_step(trans, ch, onodes, iter).safe_then(
-                 [this, &trans] {
-                   return transaction_manager.submit_transaction(std::move(trans));
-                 }).handle_error(
-                   // TODO: add errorator::do_until
-                   crimson::ct_error::eagain::handle([]() {
-                     // TODO retry
-                   }),
-                   write_ertr::all_same_way([&t](auto e) {
-                     logger().error(" transaction dump:\n");
-                     JSONFormatter f(true);
-                     f.open_object_section("transaction");
-                     t.dump(&f);
-                     f.close_section();
-                     std::stringstream str;
-                     f.flush(str);
-                     logger().error("{}", str.str());
-                     abort();
-                   }));
-             });
-         }).safe_then([this, &trans, &onodes]() {
-           return onode_manager->write_dirty(*trans, onodes);
-         }).safe_then([]() {
-           // TODO: complete transaction!
-           return;
-         }).handle_error(
-           write_ertr::all_same_way([&t](auto e) {
-             logger().error(" transaction dump:\n");
-             JSONFormatter f(true);
-             f.open_object_section("transaction");
-             t.dump(&f);
-             f.close_section();
-             std::stringstream str;
-             f.flush(str);
-             logger().error("{}", str.str());
-             abort();
-           })).then([&t]() {
-             for (auto i : {
-                 t.get_on_applied(),
-                   t.get_on_commit(),
-                   t.get_on_applied_sync()}) {
-               if (i) {
-                 i->complete(0);
-               }
-             }
+       *ctx.transaction, ctx.iter.get_objects()
+      ).safe_then([this, &ctx](auto &&read_onodes) {
+       ctx.onodes = std::move(read_onodes);
+       return crimson::do_until(
+         [this, &ctx] {
+           return _do_transaction_step(
+             ctx, ctx.ch, ctx.onodes, ctx.iter
+           ).safe_then([&ctx] {
+             return seastar::make_ready_future<bool>(!ctx.iter.have_op());
            });
+         });
+      }).safe_then([this, &ctx] {
+       return onode_manager->write_dirty(*ctx.transaction, ctx.onodes);
+      }).safe_then([this, &ctx] {
+       return transaction_manager.submit_transaction(std::move(ctx.transaction));
+      }).safe_then([&ctx]() {
+       for (auto i : {
+           ctx.ext_transaction.get_on_applied(),
+           ctx.ext_transaction.get_on_commit(),
+           ctx.ext_transaction.get_on_applied_sync()}) {
+         if (i) {
+           i->complete(0);
+         }
+       }
+       return tm_ertr::now();
+      });
     });
 }
 
-SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_do_transaction_step(
+  internal_context_t &ctx,
   CollectionRef &col,
   std::vector<OnodeRef> &onodes,
   ceph::os::Transaction::iterator &i)
@@ -279,15 +262,15 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
   try {
     switch (auto op = i.decode_op(); op->op) {
     case Transaction::OP_NOP:
-      return write_ertr::now();
+      return tm_ertr::now();
     case Transaction::OP_REMOVE:
     {
-      return _remove(trans, get_onode(op->oid));
+      return _remove(ctx, get_onode(op->oid));
     }
     break;
     case Transaction::OP_TOUCH:
     {
-      return _touch(trans, get_onode(op->oid));
+      return _touch(ctx, get_onode(op->oid));
     }
     break;
     case Transaction::OP_WRITE:
@@ -297,13 +280,13 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
       uint32_t fadvise_flags = i.get_fadvise_flags();
       ceph::bufferlist bl;
       i.decode_bl(bl);
-      return _write(trans, get_onode(op->oid), off, len, bl, fadvise_flags);
+      return _write(ctx, get_onode(op->oid), off, len, bl, fadvise_flags);
     }
     break;
     case Transaction::OP_TRUNCATE:
     {
       uint64_t off = op->off;
-      return _truncate(trans, get_onode(op->oid), off);
+      return _truncate(ctx, get_onode(op->oid), off);
     }
     break;
     case Transaction::OP_SETATTR:
@@ -313,34 +296,34 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
       i.decode_bl(bl);
       std::map<std::string, bufferptr> to_set;
       to_set[name] = bufferptr(bl.c_str(), bl.length());
-      return _setattrs(trans, get_onode(op->oid), to_set);
+      return _setattrs(ctx, get_onode(op->oid), to_set);
     }
     break;
     case Transaction::OP_MKCOLL:
     {
       coll_t cid = i.get_cid(op->cid);
-      return _create_collection(trans, cid, op->split_bits);
+      return _create_collection(ctx, cid, op->split_bits);
     }
     break;
     case Transaction::OP_OMAP_SETKEYS:
     {
       std::map<std::string, ceph::bufferlist> aset;
       i.decode_attrset(aset);
-      return _omap_set_values(trans, get_onode(op->oid), std::move(aset));
+      return _omap_set_values(ctx, get_onode(op->oid), std::move(aset));
     }
     break;
     case Transaction::OP_OMAP_SETHEADER:
     {
       ceph::bufferlist bl;
       i.decode_bl(bl);
-      return _omap_set_header(trans, get_onode(op->oid), bl);
+      return _omap_set_header(ctx, get_onode(op->oid), bl);
     }
     break;
     case Transaction::OP_OMAP_RMKEYS:
     {
       omap_keys_t keys;
       i.decode_keyset(keys);
-      return _omap_rmkeys(trans, get_onode(op->oid), keys);
+      return _omap_rmkeys(ctx, get_onode(op->oid), keys);
     }
     break;
     case Transaction::OP_OMAP_RMKEYRANGE:
@@ -348,14 +331,14 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
       string first, last;
       first = i.decode_string();
       last = i.decode_string();
-      return _omap_rmkeyrange(trans, get_onode(op->oid), first, last);
+      return _omap_rmkeyrange(ctx, get_onode(op->oid), first, last);
     }
     break;
     case Transaction::OP_COLL_HINT:
     {
       ceph::bufferlist hint;
       i.decode_bl(hint);
-      return write_ertr::now();
+      return tm_ertr::now();
     }
     default:
       logger().error("bad op {}", static_cast<unsigned>(op->op));
@@ -367,26 +350,26 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
   }
 }
 
-SeaStore::write_ertr::future<> SeaStore::_remove(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_remove(
+  internal_context_t &ctx,
   OnodeRef &onode)
 {
   logger().debug("{} onode={}",
                 __func__, *onode);
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_touch(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_touch(
+  internal_context_t &ctx,
   OnodeRef &onode)
 {
   logger().debug("{} onode={}",
                 __func__, *onode);
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_write(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_write(
+  internal_context_t &ctx,
   OnodeRef &onode,
   uint64_t offset, size_t len, const ceph::bufferlist& bl,
   uint32_t fadvise_flags)
@@ -405,11 +388,11 @@ SeaStore::write_ertr::future<> SeaStore::_write(
     OnodeManager::open_ertr::pass_further{}
   );
   */
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_omap_set_values(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_set_values(
+  internal_context_t &ctx,
   OnodeRef &onode,
   std::map<std::string, ceph::bufferlist> &&aset)
 {
@@ -417,33 +400,33 @@ SeaStore::write_ertr::future<> SeaStore::_omap_set_values(
     "{}: {} {} keys",
     __func__, *onode, aset.size());
 
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_omap_set_header(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_set_header(
+  internal_context_t &ctx,
   OnodeRef &onode,
   const ceph::bufferlist &header)
 {
   logger().debug(
     "{}: {} {} bytes",
     __func__, *onode, header.length());
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_omap_rmkeys(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_rmkeys(
+  internal_context_t &ctx,
   OnodeRef &onode,
   const omap_keys_t& aset)
 {
   logger().debug(
     "{} {} {} keys",
     __func__, *onode, aset.size());
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_omap_rmkeyrange(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_rmkeyrange(
+  internal_context_t &ctx,
   OnodeRef &onode,
   const std::string &first,
   const std::string &last)
@@ -451,34 +434,34 @@ SeaStore::write_ertr::future<> SeaStore::_omap_rmkeyrange(
   logger().debug(
     "{} {} first={} last={}",
     __func__, *onode, first, last);
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_truncate(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_truncate(
+  internal_context_t &ctx,
   OnodeRef &onode,
   uint64_t size)
 {
   logger().debug("{} onode={} size={}",
                 __func__, *onode, size);
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_setattrs(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_setattrs(
+  internal_context_t &ctx,
   OnodeRef &onode,
   std::map<std::string,bufferptr>& aset)
 {
   logger().debug("{} onode={}",
                 __func__, *onode);
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
-SeaStore::write_ertr::future<> SeaStore::_create_collection(
-  TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_create_collection(
+  internal_context_t &ctx,
   const coll_t& cid, int bits)
 {
-  return write_ertr::now();
+  return tm_ertr::now();
 }
 
 boost::intrusive_ptr<SeastoreCollection> SeaStore::_get_collection(const coll_t& cid)
index 4e57542a14fd9c4c5700b77c4c2c8a0323de740c..25ccdd899c7c2cbfa554ea47b9b0f5d1af375ab9 100644 (file)
@@ -115,54 +115,99 @@ public:
   }
 
 private:
+  struct internal_context_t {
+    CollectionRef ch;
+    ceph::os::Transaction ext_transaction;
+
+    internal_context_t(
+      CollectionRef ch,
+      ceph::os::Transaction &&_ext_transaction)
+      : ch(ch), ext_transaction(std::move(_ext_transaction)),
+       iter(ext_transaction.begin()) {}
+
+    TransactionRef transaction;
+    std::vector<OnodeRef> onodes;
+
+    ceph::os::Transaction::iterator iter;
+
+    void reset(TransactionRef &&t) {
+      transaction = std::move(t);
+      onodes.clear();
+      iter = ext_transaction.begin();
+    }
+  };
+
+  static void on_error(ceph::os::Transaction &t);
+
+  template <typename F>
+  auto repeat_with_internal_context(
+    CollectionRef ch,
+    ceph::os::Transaction &&t,
+    F &&f) {
+    return seastar::do_with(
+      internal_context_t{ ch, std::move(t) },
+      std::forward<F>(f),
+      [](auto &ctx, auto &f) {
+       return repeat_eagain([&]() {
+         ctx.reset(make_transaction());
+         return std::invoke(f, ctx);
+       }).handle_error(
+         crimson::ct_error::eagain::pass_further{},
+         crimson::ct_error::all_same_way([&ctx](auto e) {
+           on_error(ctx.ext_transaction);
+         })
+       );
+      });
+  }
+
   TransactionManager &transaction_manager;
   std::unique_ptr<OnodeManager> onode_manager;
 
-  using write_ertr = crimson::errorator<
-    crimson::ct_error::input_output_error>;
-  write_ertr::future<> _do_transaction_step(
-    TransactionRef &trans,
+  using tm_ertr = TransactionManager::base_ertr;
+  using tm_ret = tm_ertr::future<>;
+  tm_ret _do_transaction_step(
+    internal_context_t &ctx,
     CollectionRef &col,
     std::vector<OnodeRef> &onodes,
     ceph::os::Transaction::iterator &i);
 
-  write_ertr::future<> _remove(
-    TransactionRef &trans,
+  tm_ret _remove(
+    internal_context_t &ctx,
     OnodeRef &onode);
-  write_ertr::future<> _touch(
-    TransactionRef &trans,
+  tm_ret _touch(
+    internal_context_t &ctx,
     OnodeRef &onode);
-  write_ertr::future<> _write(
-    TransactionRef &trans,
+  tm_ret _write(
+    internal_context_t &ctx,
     OnodeRef &onode,
     uint64_t offset, size_t len, const ceph::bufferlist& bl,
     uint32_t fadvise_flags);
-  write_ertr::future<> _omap_set_values(
-    TransactionRef &trans,
+  tm_ret _omap_set_values(
+    internal_context_t &ctx,
     OnodeRef &onode,
     std::map<std::string, ceph::bufferlist> &&aset);
-  write_ertr::future<> _omap_set_header(
-    TransactionRef &trans,
+  tm_ret _omap_set_header(
+    internal_context_t &ctx,
     OnodeRef &onode,
     const ceph::bufferlist &header);
-  write_ertr::future<> _omap_rmkeys(
-    TransactionRef &trans,
+  tm_ret _omap_rmkeys(
+    internal_context_t &ctx,
     OnodeRef &onode,
     const omap_keys_t& aset);
-  write_ertr::future<> _omap_rmkeyrange(
-    TransactionRef &trans,
+  tm_ret _omap_rmkeyrange(
+    internal_context_t &ctx,
     OnodeRef &onode,
     const std::string &first,
     const std::string &last);
-  write_ertr::future<> _truncate(
-    TransactionRef &trans,
+  tm_ret _truncate(
+    internal_context_t &ctx,
     OnodeRef &onode, uint64_t size);
-  write_ertr::future<> _setattrs(
-    TransactionRef &trans,
+  tm_ret _setattrs(
+    internal_context_t &ctx,
     OnodeRef &onode,
     std::map<std::string,bufferptr>& aset);
-  write_ertr::future<> _create_collection(
-    TransactionRef &trans,
+  tm_ret _create_collection(
+    internal_context_t &ctx,
     const coll_t& cid, int bits);
 
   boost::intrusive_ptr<SeastoreCollection> _get_collection(const coll_t& cid);