]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: implement ceph::do_{for_each(),do_with()} helpers.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 23 Sep 2019 22:01:08 +0000 (00:01 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 20 Nov 2019 19:37:43 +0000 (20:37 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/common/errorator.h
src/crimson/osd/pg.cc

index 8f7e2164eba75b095944e5c9a236b7f30852c332..93995b9802221b8cd42954fa217ccefee339c66f 100644 (file)
@@ -4,9 +4,37 @@
 #pragma once
 
 #include <exception>
+#include <seastar/core/future-util.hh>
 
 namespace crimson {
 
+template<class Container, class AsyncAction>
+static inline auto do_for_each(Container& c, AsyncAction action) {
+  using ActionItem = typename Container::value_type;
+  using ActionReturn = std::invoke_result_t<AsyncAction, ActionItem&>;
+  using Errorator = typename ActionReturn::errorator_type;
+  using Futurator = typename Errorator::template futurize<ActionReturn>;
+  return typename Futurator::type {
+    seastar::do_for_each(std::begin(c), std::end(c),
+      [action = std::move(action)] (auto& item) -> seastar::future<> {
+        return Errorator::plainify(action(item));
+      })
+  };
+}
+
+template<typename T, typename F>
+static inline auto do_with(T&& rvalue, F&& f) {
+  using FuncReturn = decltype(std::move(f)(rvalue));
+  using Errorator = typename FuncReturn::errorator_type;
+  using Futurator = typename Errorator::template futurize<FuncReturn>;
+  return typename Futurator::type {
+    seastar::do_with(std::move(rvalue),
+      [f = std::move(f)] (T& moved_rvalue) mutable {
+        return Errorator::plainify(std::move(f)(moved_rvalue));
+      })
+  };
+}
+
 // define the interface between error types and errorator
 template <class ConcreteErrorT>
 class error_t {
@@ -613,6 +641,12 @@ private:
   //  * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`.
   template <class... ValueT>
   friend class future;
+
+  template<class Container, class AsyncAction>
+  friend inline auto ::crimson::do_for_each(Container&, AsyncAction);
+
+  template<typename T, typename F>
+  friend inline auto do_with(T&&, F&&);
 }; // class errorator, generic template
 
 // no errors? errorator<>::future is plain seastar::future then!
index 53849847779707f0a4d327af7ab3ef32016873dd..4404e0159654752de6a6b268454c449d6c641d3e 100644 (file)
@@ -438,18 +438,12 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
   const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
                                                    : m->get_hobj();
   return backend->get_object_state(oid).then([this, m](auto os) mutable {
-    return seastar::do_with(OpsExecuter{std::move(os), *this/* as const& */, m},
+    return crimson::do_with(OpsExecuter{std::move(os), *this/* as const& */, m},
                             [this, m] (auto& ox) {
-      return seastar::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) {
+      return crimson::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) {
         logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op));
-        return ox.execute_osd_op(osd_op).safe_then(
-          [] {
-            return seastar::now();
-          }, OpsExecuter::osd_op_errorator::all_same_way([] (const std::error_code& err) {
-            assert(err.value() > 0);
-            throw ceph::osd::make_error(err.value());
-          }));
-      }).then([this, m, &ox] {
+        return ox.execute_osd_op(osd_op);
+      }).safe_then([this, m, &ox] {
         logger().debug("all operations have been executed successfully");
         return std::move(ox).submit_changes([this, m] (auto&& txn, auto&& os) {
           // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
@@ -460,15 +454,18 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
            return submit_transaction(std::move(os), std::move(txn), *m);
          }
         });
-      });
-    });
-  }).then([m,this] {
-    auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
-                                           0, false);
-    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+      }, OpsExecuter::osd_op_errorator::pass_further{});
+    }).safe_then([m,this] {
+      auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+                                             0, false);
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    }, OpsExecuter::osd_op_errorator::all_same_way([] (const std::error_code& err) {
+      assert(err.value() > 0);
+      throw crimson::osd::make_error(err.value());
+    }));
   }).handle_exception_type([=,&oid](const crimson::osd::error& e) {
-    logger().debug("got crimson::osd::error while handling object {}: {} ({})",
+    logger().debug("got ceph::osd::error while handling object {}: {} ({})",
                    oid, e.code(), e.what());
     return backend->evict_object_state(oid).then([=] {
       auto reply = make_message<MOSDOpReply>(