#pragma once
#include <exception>
+#include <seastar/core/future-util.hh>
namespace crimson {
+template<class Container, class AsyncAction>
+static inline auto do_for_each(Container& c, AsyncAction action) {
+ using ActionItem = typename Container::value_type;
+ using ActionReturn = std::invoke_result_t<AsyncAction, ActionItem&>;
+ using Errorator = typename ActionReturn::errorator_type;
+ using Futurator = typename Errorator::template futurize<ActionReturn>;
+ return typename Futurator::type {
+ seastar::do_for_each(std::begin(c), std::end(c),
+ [action = std::move(action)] (auto& item) -> seastar::future<> {
+ return Errorator::plainify(action(item));
+ })
+ };
+}
+
+template<typename T, typename F>
+static inline auto do_with(T&& rvalue, F&& f) {
+ using FuncReturn = decltype(std::move(f)(rvalue));
+ using Errorator = typename FuncReturn::errorator_type;
+ using Futurator = typename Errorator::template futurize<FuncReturn>;
+ return typename Futurator::type {
+ seastar::do_with(std::move(rvalue),
+ [f = std::move(f)] (T& moved_rvalue) mutable {
+ return Errorator::plainify(std::move(f)(moved_rvalue));
+ })
+ };
+}
+
// define the interface between error types and errorator
template <class ConcreteErrorT>
class error_t {
// * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`.
template <class... ValueT>
friend class future;
+
+ template<class Container, class AsyncAction>
+ friend inline auto ::crimson::do_for_each(Container&, AsyncAction);
+
+ template<typename T, typename F>
+ friend inline auto do_with(T&&, F&&);
}; // class errorator, generic template
// no errors? errorator<>::future is plain seastar::future then!
const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
: m->get_hobj();
return backend->get_object_state(oid).then([this, m](auto os) mutable {
- return seastar::do_with(OpsExecuter{std::move(os), *this/* as const& */, m},
+ return crimson::do_with(OpsExecuter{std::move(os), *this/* as const& */, m},
[this, m] (auto& ox) {
- return seastar::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) {
+ return crimson::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) {
logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op));
- return ox.execute_osd_op(osd_op).safe_then(
- [] {
- return seastar::now();
- }, OpsExecuter::osd_op_errorator::all_same_way([] (const std::error_code& err) {
- assert(err.value() > 0);
- throw ceph::osd::make_error(err.value());
- }));
- }).then([this, m, &ox] {
+ return ox.execute_osd_op(osd_op);
+ }).safe_then([this, m, &ox] {
logger().debug("all operations have been executed successfully");
return std::move(ox).submit_changes([this, m] (auto&& txn, auto&& os) {
// XXX: the entire lambda could be scheduled conditionally. ::if_then()?
return submit_transaction(std::move(os), std::move(txn), *m);
}
});
- });
- });
- }).then([m,this] {
- auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
- 0, false);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }, OpsExecuter::osd_op_errorator::pass_further{});
+ }).safe_then([m,this] {
+ auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ 0, false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }, OpsExecuter::osd_op_errorator::all_same_way([] (const std::error_code& err) {
+ assert(err.value() > 0);
+ throw crimson::osd::make_error(err.value());
+ }));
}).handle_exception_type([=,&oid](const crimson::osd::error& e) {
- logger().debug("got crimson::osd::error while handling object {}: {} ({})",
+ logger().debug("got ceph::osd::error while handling object {}: {} ({})",
oid, e.code(), e.what());
return backend->evict_object_state(oid).then([=] {
auto reply = make_message<MOSDOpReply>(