From 7b8148ecf92d0175932823de340df5dea0848f4e Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Tue, 24 Sep 2019 00:01:08 +0200 Subject: [PATCH] crimson: implement ceph::do_{for_each(),do_with()} helpers. Signed-off-by: Radoslaw Zarzynski --- src/crimson/common/errorator.h | 34 ++++++++++++++++++++++++++++++++++ src/crimson/osd/pg.cc | 33 +++++++++++++++------------------ 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h index 8f7e2164eba..93995b98022 100644 --- a/src/crimson/common/errorator.h +++ b/src/crimson/common/errorator.h @@ -4,9 +4,37 @@ #pragma once #include +#include namespace crimson { +template +static inline auto do_for_each(Container& c, AsyncAction action) { + using ActionItem = typename Container::value_type; + using ActionReturn = std::invoke_result_t; + using Errorator = typename ActionReturn::errorator_type; + using Futurator = typename Errorator::template futurize; + return typename Futurator::type { + seastar::do_for_each(std::begin(c), std::end(c), + [action = std::move(action)] (auto& item) -> seastar::future<> { + return Errorator::plainify(action(item)); + }) + }; +} + +template +static inline auto do_with(T&& rvalue, F&& f) { + using FuncReturn = decltype(std::move(f)(rvalue)); + using Errorator = typename FuncReturn::errorator_type; + using Futurator = typename Errorator::template futurize; + return typename Futurator::type { + seastar::do_with(std::move(rvalue), + [f = std::move(f)] (T& moved_rvalue) mutable { + return Errorator::plainify(std::move(f)(moved_rvalue)); + }) + }; +} + // define the interface between error types and errorator template class error_t { @@ -613,6 +641,12 @@ private: // * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`. template friend class future; + + template + friend inline auto ::crimson::do_for_each(Container&, AsyncAction); + + template + friend inline auto do_with(T&&, F&&); }; // class errorator, generic template // no errors? errorator<>::future is plain seastar::future then! diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 53849847779..4404e015965 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -438,18 +438,12 @@ seastar::future> PG::do_osd_ops(Ref m) const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head() : m->get_hobj(); return backend->get_object_state(oid).then([this, m](auto os) mutable { - return seastar::do_with(OpsExecuter{std::move(os), *this/* as const& */, m}, + return crimson::do_with(OpsExecuter{std::move(os), *this/* as const& */, m}, [this, m] (auto& ox) { - return seastar::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) { + return crimson::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) { logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op)); - return ox.execute_osd_op(osd_op).safe_then( - [] { - return seastar::now(); - }, OpsExecuter::osd_op_errorator::all_same_way([] (const std::error_code& err) { - assert(err.value() > 0); - throw ceph::osd::make_error(err.value()); - })); - }).then([this, m, &ox] { + return ox.execute_osd_op(osd_op); + }).safe_then([this, m, &ox] { logger().debug("all operations have been executed successfully"); return std::move(ox).submit_changes([this, m] (auto&& txn, auto&& os) { // XXX: the entire lambda could be scheduled conditionally. ::if_then()? @@ -460,15 +454,18 @@ seastar::future> PG::do_osd_ops(Ref m) return submit_transaction(std::move(os), std::move(txn), *m); } }); - }); - }); - }).then([m,this] { - auto reply = make_message(m.get(), 0, get_osdmap_epoch(), - 0, false); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - return seastar::make_ready_future>(std::move(reply)); + }, OpsExecuter::osd_op_errorator::pass_further{}); + }).safe_then([m,this] { + auto reply = make_message(m.get(), 0, get_osdmap_epoch(), + 0, false); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + return seastar::make_ready_future>(std::move(reply)); + }, OpsExecuter::osd_op_errorator::all_same_way([] (const std::error_code& err) { + assert(err.value() > 0); + throw crimson::osd::make_error(err.value()); + })); }).handle_exception_type([=,&oid](const crimson::osd::error& e) { - logger().debug("got crimson::osd::error while handling object {}: {} ({})", + logger().debug("got ceph::osd::error while handling object {}: {} ({})", oid, e.code(), e.what()); return backend->evict_object_state(oid).then([=] { auto reply = make_message( -- 2.39.5