#include "include/ceph_assert.h"
+namespace crimson::interruptible {
+
+template <typename, typename>
+class interruptible_future_detail;
+
+}
+
namespace crimson {
template<typename Iterator, typename AsyncAction>
template <class ValueT>
struct errorated_future_marker{};
+template <class T>
+static inline constexpr bool is_error_v = std::is_base_of_v<error_t<T>, T>;
+
template <class... AllowedErrors>
struct errorator {
- template <class T>
- static inline constexpr bool is_error_v = std::is_base_of_v<error_t<T>, T>;
static_assert((... && is_error_v<AllowedErrors>),
"errorator expects presence of ::is_error in all error types");
}
});
}
+
template <class ErrorFuncHead,
class... ErrorFuncTail>
auto handle_error(ErrorFuncHead&& error_func_head,
friend inline auto ::seastar::internal::do_with_impl(T&& rvalue, F&& f);
template<typename T1, typename T2, typename T3_or_F, typename... More>
friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
+ template<typename, typename>
+ friend class ::crimson::interruptible::interruptible_future_detail;
};
class Enabler {};
}
};
+ template <typename InterruptCond, typename FutureType>
+ class futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>> {
+ public:
+ using type = ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, typename futurize<FutureType>::type>;
+
+ template <typename Func, typename... Args>
+ static type apply(Func&& func, std::tuple<Args...>&& args) {
+ try {
+ return ::seastar::futurize_apply(std::forward<Func>(func),
+ std::forward<std::tuple<Args...>>(args));
+ } catch (...) {
+ return seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::current_exception());
+ }
+ }
+
+ template <typename Func, typename... Args>
+ static type invoke(Func&& func, Args&&... args) {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func),
+ std::forward<Args>(args)...);
+ } catch(...) {
+ return seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::current_exception());
+ }
+ }
+ template <typename Func>
+ static type invoke(Func&& func, seastar::internal::monostate) {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func));
+ } catch(...) {
+ return seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::current_exception());
+ }
+ }
+ template <typename Arg>
+ static type make_exception_future(Arg&& arg) {
+ return ::seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::forward<Arg>(arg));
+ }
+ };
+
template <class ErrorT>
static std::exception_ptr make_exception_ptr(ErrorT&& e) {
// calling via interface class due to encapsulation and friend relations.
// we were exploiting before.
template <class...>
friend class errorator;
+ template<typename, typename>
+ friend class ::crimson::interruptible::interruptible_future_detail;
}; // class errorator, generic template
// no errors? errorator<>::future is plain seastar::future then!
#include <seastar/core/future-util.hh>
#include "crimson/common/log.h"
+#include "crimson/common/interruptible_future.h"
namespace crimson::common {
-class system_shutdown_exception final : public std::exception{
+class interruption : public std::exception
+{};
+
+class system_shutdown_exception final : public interruption{
public:
const char* what() const noexcept final {
return "system shutting down";
}
};
-class actingset_changed final : public std::exception {
+class actingset_changed final : public interruption {
public:
actingset_changed(bool sp) : still_primary(sp) {}
const char* what() const noexcept final {
const bool still_primary;
};
+template <typename InterruptCond, bool may_loop,
+ typename... InterruptExceptions,
+ typename OpFunc, typename OnInterrupt>
+inline seastar::future<> with_interruption(
+ OpFunc&& opfunc, OnInterrupt&& efunc) {
+ if constexpr (may_loop) {
+ return ::seastar::repeat(
+ [opfunc=std::move(opfunc), efunc=std::move(efunc)]() mutable {
+ return ::crimson::interruptible::interruptor<InterruptCond>::template futurize<
+ std::result_of_t<OpFunc()>>::apply(std::move(opfunc), std::make_tuple())
+ .template handle_interruption<InterruptExceptions...>(std::move(efunc));
+ });
+ } else {
+ return ::crimson::interruptible::interruptor<InterruptCond>::template futurize<
+ std::result_of_t<OpFunc()>>::apply(std::move(opfunc), std::make_tuple())
+ .template handle_interruption<InterruptExceptions...>(std::move(efunc));
+ }
+}
+
template<typename Func, typename... Args>
inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args)
{
typename interruptor<InterruptCond>::template futurize_t<U>;
using core_type::get0;
using core_type::core_type;
+ using core_type::get_exception;
+ using core_type::ignore_ready_future;
[[gnu::always_inline]]
interruptible_future_detail(seastar::future<T>&& base)
}
}
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto then_unpack_interruptible(Func&& func) {
+ return then_interruptible([func=std::forward<Func>(func)](T&& tuple) mutable {
+ return std::apply(std::forward<Func>(func), std::move(tuple));
+ });
+ }
+
template <typename Func,
typename Result =interrupt_futurize_t<
std::result_of_t<Func(std::exception_ptr)>>>
});
}
+
+ using my_type = interruptible_future_detail<InterruptCond, seastar::future<T>>;
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ my_type finally(Func&& func) {
+ return core_type::finally(std::forward<Func>(func));
+ }
private:
seastar::future<T> to_future() {
return static_cast<core_type&&>(std::move(*this));
std::move(fut));
});
}
- // this is only supposed to be invoked by seastar functions
- template <typename Func,
- typename Result = interrupt_futurize_t<
- std::invoke_result_t<Func>>>
- [[gnu::always_inline]]
- Result finally(Func&& func) {
- return core_type::finally(std::forward<Func>(func));
- }
template <typename Iterator, typename AsyncAction, typename Result,
std::enable_if_t<
is_interruptible_future<Result>::value, int>>
std::forward<ErrorVisitorTailT>(err_func_tail)...));
}
- template <typename ErrorFunc>
+ template <bool interruptible = true, typename ErrorFunc>
auto handle_error_interruptible(ErrorFunc&& errfunc) {
- assert(interrupt_cond<InterruptCond>);
- auto fut = core_type::handle_error(
- [errfunc=std::move(errfunc),
- interrupt_condition=interrupt_cond<InterruptCond>]
- (auto&& err) mutable -> decltype(auto) {
- constexpr bool return_void = std::is_void_v<
- std::invoke_result_t<ErrorFunc,
- std::decay_t<decltype(err)>>>;
- constexpr bool return_err = ::crimson::is_error_v<
- std::decay_t<std::invoke_result_t<ErrorFunc,
- std::decay_t<decltype(err)>>>>;
- if constexpr (return_err || return_void) {
- return non_futurized_call_with_interruption(
- interrupt_condition,
- std::move(errfunc),
- std::move(err));
- } else {
- return call_with_interruption(
- interrupt_condition,
- std::move(errfunc),
- std::move(err));
- }
- });
- return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ if constexpr (interruptible) {
+ assert(interrupt_cond<InterruptCond>);
+ auto fut = core_type::handle_error(
+ [errfunc=std::move(errfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>]
+ (auto&& err) mutable -> decltype(auto) {
+ constexpr bool return_void = std::is_void_v<
+ std::invoke_result_t<ErrorFunc,
+ std::decay_t<decltype(err)>>>;
+ constexpr bool return_err = ::crimson::is_error_v<
+ std::decay_t<std::invoke_result_t<ErrorFunc,
+ std::decay_t<decltype(err)>>>>;
+ if constexpr (return_err || return_void) {
+ return non_futurized_call_with_interruption(
+ interrupt_condition,
+ std::move(errfunc),
+ std::move(err));
+ } else {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(errfunc),
+ std::move(err));
+ }
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ } else {
+ return core_type::handle_error(std::forward<ErrorFunc>(errfunc));
+ }
}
template <typename ErrorFuncHead,
std::forward<ErrorFuncHead>(error_func_head),
std::forward<ErrorFuncTail>(error_func_tail)...));
}
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto finally(Func&& func) {
+ auto fut = core_type::finally(std::forward<Func>(func));
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
private:
ErroratedFuture<::crimson::errorated_future_marker<T>>
to_future() {
return fut;
}
+ template <typename Func>
+ [[gnu::always_inline]]
+ static auto wrap_function(Func&& func) {
+ return [func=std::forward<Func>(func),
+ interrupt_condition=interrupt_cond<InterruptCond>]() mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::forward<Func>(func));
+ };
+ }
+
template <typename Iterator, typename AsyncAction,
typename Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>,
std::enable_if_t<is_interruptible_future<Result>::value, int> = 0>
return ::seastar::internal::when_all_impl(
futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
}
+
+ template <typename... FutOrFuncs>
+ static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept {
+ return ::seastar::internal::when_all_succeed_impl(
+ futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
+ }
private:
// return true if an new interrupt condition is created and false otherwise
template <typename... Args>
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include <seastar/core/future-util.hh>
#include "include/ceph_assert.h"
+#include "crimson/common/interruptible_future.h"
namespace ceph {
class Formatter;
template <typename U>
friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+ template <typename InterruptCond, typename T>
+ friend blocking_future_detail<
+ ::crimson::interruptible::interruptible_future<InterruptCond>>
+ join_blocking_interruptible_futures(T&& t);
+
template <typename U>
friend class blocking_future_detail;
blocker,
std::move(fut).then(std::forward<F>(f)));
}
+ template <typename InterruptCond, typename F>
+ auto then_interruptible(F &&f) && {
+ using result = decltype(std::declval<Fut>().then_interruptible(f));
+ return blocking_future_detail<
+ typename ::crimson::interruptible::interruptor<
+ InterruptCond>::template futurize<result>::type>(
+ blocker,
+ std::move(fut).then_interruptible(std::forward<F>(f)));
+ }
};
template <typename T=void>
using blocking_future = blocking_future_detail<seastar::future<T>>;
+template <typename InterruptCond, typename T = void>
+using blocking_interruptible_future = blocking_future_detail<
+ ::crimson::interruptible::interruptible_future<InterruptCond, T>>;
+
+template <typename InterruptCond, typename V, typename U>
+blocking_interruptible_future<InterruptCond, V>
+make_ready_blocking_interruptible_future(U&& args) {
+ return blocking_interruptible_future<InterruptCond, V>(
+ nullptr,
+ seastar::make_ready_future<V>(std::forward<U>(args)));
+}
+
+template <typename InterruptCond, typename V, typename Exception>
+blocking_interruptible_future<InterruptCond, V>
+make_exception_blocking_interruptible_future(Exception&& e) {
+ return blocking_interruptible_future<InterruptCond, V>(
+ nullptr,
+ seastar::make_exception_future<InterruptCond, V>(e));
+}
+
template <typename V=void, typename... U>
blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
return blocking_future<V>(
blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
return blocking_future<T>(this, std::move(f));
}
+
+ template <typename InterruptCond, typename T>
+ blocking_interruptible_future<InterruptCond, T>
+ make_blocking_future(
+ crimson::interruptible::interruptible_future<InterruptCond, T> &&f) {
+ return blocking_interruptible_future<InterruptCond, T>(
+ this, std::move(f));
+ }
+ template <typename InterruptCond, typename T = void>
+ blocking_interruptible_future<InterruptCond, T>
+ make_blocking_interruptible_future(seastar::future<T> &&f) {
+ return blocking_interruptible_future<InterruptCond, T>(
+ this,
+ ::crimson::interruptible::interruptor<InterruptCond>::make_interruptible(
+ std::move(f)));
+ }
+
void dump(ceph::Formatter *f) const;
virtual ~Blocker() = default;
}));
}
+template <typename InterruptCond, typename T>
+blocking_interruptible_future<InterruptCond>
+join_blocking_interruptible_futures(T&& t) {
+ std::vector<Blocker*> blockers;
+ blockers.reserve(t.size());
+ for (auto &&bf: t) {
+ blockers.push_back(bf.blocker);
+ bf.blocker = nullptr;
+ }
+ auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+ return agg->make_blocking_future(
+ ::crimson::interruptible::interruptor<InterruptCond>::parallel_for_each(
+ std::forward<T>(t),
+ [](auto &&bf) {
+ return std::move(bf.fut);
+ }).then_interruptible([agg=std::move(agg)] {
+ return seastar::make_ready_future<>();
+ }));
+}
+
/**
* Common base for all crimson-osd operations. Mainly provides
});
}
+ template <typename InterruptCond, typename T>
+ ::crimson::interruptible::interruptible_future<InterruptCond, T>
+ with_blocking_future_interruptible(blocking_future<T> &&f) {
+ if (f.fut.available()) {
+ return std::move(f.fut);
+ }
+ assert(f.blocker);
+ add_blocker(f.blocker);
+ auto fut = std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
+ clear_blocker(blocker);
+ return std::move(arg);
+ });
+ return ::crimson::interruptible::interruptible_future<
+ InterruptCond, T>(std::move(fut));
+ }
+
+ template <typename InterruptCond, typename T>
+ ::crimson::interruptible::interruptible_future<InterruptCond, T>
+ with_blocking_future_interruptible(
+ blocking_interruptible_future<InterruptCond, T> &&f) {
+ if (f.fut.available()) {
+ return std::move(f.fut);
+ }
+ assert(f.blocker);
+ add_blocker(f.blocker);
+ return std::move(f.fut).template then_wrapped_interruptible(
+ [this, blocker=f.blocker](auto &&arg) {
+ clear_blocker(blocker);
+ return std::move(arg);
+ });
+ }
+
void dump(ceph::Formatter *f);
void dump_brief(ceph::Formatter *f);
virtual ~Operation() = default;
// todo
}
-ECBackend::ll_read_errorator::future<ceph::bufferlist>
+ECBackend::ll_read_ierrorator::future<ceph::bufferlist>
ECBackend::_read(const hobject_t& hoid,
const uint64_t off,
const uint64_t len,
return seastar::make_ready_future<bufferlist>();
}
-seastar::future<crimson::osd::acked_peers_t>
+ECBackend::interruptible_future<crimson::osd::acked_peers_t>
ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
}
void on_actingset_changed(peering_info_t pi) final {}
private:
- ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
- uint64_t off,
- uint64_t len,
- uint32_t flags) override;
- seastar::future<crimson::osd::acked_peers_t>
+ ll_read_ierrorator::future<ceph::bufferlist>
+ _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
+ interruptible_future<crimson::osd::acked_peers_t>
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
// created for us by `seastar::async` in `::do_op_call()`.
int ret = 0;
using osd_op_errorator = crimson::osd::OpsExecuter::osd_op_errorator;
- reinterpret_cast<crimson::osd::OpsExecuter*>(hctx)->execute_op(op).handle_error(
+ reinterpret_cast<crimson::osd::OpsExecuter*>(hctx)->execute_op(op)
+ .handle_error_interruptible(
osd_op_errorator::all_same_way([&ret] (const std::error_code& err) {
assert(err.value() > 0);
ret = -err.value();
boost::intrusive::list_member_hook<>,
&ObjectContext::list_hook>;
- template<RWState::State Type, typename Func>
+ template<RWState::State Type, typename InterruptCond = void, typename Func>
auto with_lock(Func&& func) {
- switch (Type) {
- case RWState::RWWRITE:
- return _with_lock(lock.for_write(), std::forward<Func>(func));
- case RWState::RWREAD:
- return _with_lock(lock.for_read(), std::forward<Func>(func));
- case RWState::RWEXCL:
- return _with_lock(lock.for_excl(), std::forward<Func>(func));
- case RWState::RWNONE:
- return seastar::futurize_invoke(std::forward<Func>(func));
- default:
- assert(0 == "noop");
+ if constexpr (!std::is_void_v<InterruptCond>) {
+ auto wrapper = ::crimson::interruptible::interruptor<InterruptCond>::wrap_function(std::forward<Func>(func));
+ switch (Type) {
+ case RWState::RWWRITE:
+ return _with_lock(lock.for_write(), std::move(wrapper));
+ case RWState::RWREAD:
+ return _with_lock(lock.for_read(), std::move(wrapper));
+ case RWState::RWEXCL:
+ return _with_lock(lock.for_excl(), std::move(wrapper));
+ case RWState::RWNONE:
+ return seastar::futurize_invoke(std::move(wrapper));
+ default:
+ assert(0 == "noop");
+ }
+ } else {
+ switch (Type) {
+ case RWState::RWWRITE:
+ return _with_lock(lock.for_write(), std::forward<Func>(func));
+ case RWState::RWREAD:
+ return _with_lock(lock.for_read(), std::forward<Func>(func));
+ case RWState::RWEXCL:
+ return _with_lock(lock.for_excl(), std::forward<Func>(func));
+ case RWState::RWNONE:
+ return seastar::futurize_invoke(std::forward<Func>(func));
+ default:
+ assert(0 == "noop");
+ }
}
}
- template<RWState::State Type, typename Func>
+ template<RWState::State Type, typename InterruptCond = void, typename Func>
auto with_promoted_lock(Func&& func) {
- switch (Type) {
- case RWState::RWWRITE:
- return _with_lock(lock.excl_from_write(), std::forward<Func>(func));
- case RWState::RWREAD:
- return _with_lock(lock.excl_from_read(), std::forward<Func>(func));
- case RWState::RWEXCL:
- return _with_lock(lock.excl_from_excl(), std::forward<Func>(func));
- case RWState::RWNONE:
- return _with_lock(lock.for_excl(), std::forward<Func>(func));
- default:
- assert(0 == "noop");
+ if constexpr (!std::is_void_v<InterruptCond>) {
+ auto wrapper = ::crimson::interruptible::interruptor<InterruptCond>::wrap_function(std::forward<Func>(func));
+ switch (Type) {
+ case RWState::RWWRITE:
+ return _with_lock(lock.excl_from_write(), std::move(wrapper));
+ case RWState::RWREAD:
+ return _with_lock(lock.excl_from_read(), std::move(wrapper));
+ case RWState::RWEXCL:
+ return _with_lock(lock.excl_from_excl(), std::move(wrapper));
+ case RWState::RWNONE:
+ return _with_lock(lock.for_excl(), std::move(wrapper));
+ default:
+ assert(0 == "noop");
+ }
+ } else {
+ switch (Type) {
+ case RWState::RWWRITE:
+ return _with_lock(lock.excl_from_write(), std::forward<Func>(func));
+ case RWState::RWREAD:
+ return _with_lock(lock.excl_from_read(), std::forward<Func>(func));
+ case RWState::RWEXCL:
+ return _with_lock(lock.excl_from_excl(), std::forward<Func>(func));
+ case RWState::RWNONE:
+ return _with_lock(lock.for_excl(), std::forward<Func>(func));
+ default:
+ assert(0 == "noop");
+ }
}
}
namespace crimson::osd {
-OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
+OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
{
std::string cname, mname;
ceph::bufferlist indata;
};
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
OSDOp& osd_op,
ObjectState& os,
ceph::os::Transaction& txn)
});
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
OSDOp& osd_op,
ObjectState& os,
ceph::os::Transaction& txn)
}
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
OSDOp& osd_op,
ObjectState& os,
ceph::os::Transaction& txn)
});
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_ping(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_ping(
OSDOp& osd_op,
ObjectState& os,
ceph::os::Transaction& txn)
return seastar::now();
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch(
OSDOp& osd_op,
ObjectState& os,
ceph::os::Transaction& txn)
return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
OSDOp& osd_op,
const ObjectState& os)
{
});
}
-OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack(
+OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify_ack(
OSDOp& osd_op,
const ObjectState& os)
{
});
}
-OpsExecuter::osd_op_errorator::future<>
+OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
OpsExecuter::execute_op(OSDOp& osd_op)
{
// TODO: dispatch via call table?
return backend.get_xattrs(os, osd_op);
});
case CEPH_OSD_OP_RMXATTR:
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_write_op(
+ [&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.rm_xattr(os, osd_op, txn);
}, true);
case CEPH_OSD_OP_CREATE:
return filter;
}
-static seastar::future<hobject_t> pgls_filter(
+static PG::interruptible_future<hobject_t> pgls_filter(
const PGLSFilter& filter,
const PGBackend& backend,
const hobject_t& sobj)
if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
xattr, sobj);
- return backend.getxattr(sobj, xattr).safe_then(
+ return backend.getxattr(sobj, xattr).safe_then_interruptible(
[&filter, sobj] (ceph::bufferptr bp) {
logger().debug("pgls_filter: got xvalue for obj={}", sobj);
}
}
-static seastar::future<ceph::bufferlist> do_pgnls_common(
+static PG::interruptible_future<ceph::bufferlist> do_pgnls_common(
const hobject_t& pg_start,
const hobject_t& pg_end,
const PGBackend& backend,
throw std::invalid_argument("outside of PG bounds");
}
- return backend.list_objects(lower_bound, limit).then(
- [&backend, filter, nspace](auto&& ret) {
+ return backend.list_objects(lower_bound, limit).then_interruptible(
+ [&backend, filter, nspace](auto&& ret)
+ -> PG::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>> {
auto& [objects, next] = ret;
auto in_my_namespace = [&nspace](const hobject_t& obj) {
using crimson::common::local_conf;
return obj.get_namespace() == nspace;
}
};
- auto to_pglsed = [&backend, filter] (const hobject_t& obj) {
+ auto to_pglsed = [&backend, filter] (const hobject_t& obj)
+ -> PG::interruptible_future<hobject_t> {
// this transformation looks costly. However, I don't have any
// reason to think PGLS* operations are critical for, let's say,
// general performance.
std::tuple<std::vector<hobject_t>, hobject_t>>(
std::make_tuple(std::move(items), std::move(next)));
});
- }).then(
+ }).then_interruptible(
[pg_end] (auto&& ret) {
auto& [items, next] = ret;
auto is_matched = [] (const auto& obj) {
});
}
-static seastar::future<> do_pgnls(
+static PG::interruptible_future<> do_pgnls(
const PG& pg,
const std::string& nspace,
OSDOp& osd_op)
nspace,
osd_op.op.pgls.count,
nullptr /* no filter */)
- .then([&osd_op](bufferlist bl) {
+ .then_interruptible([&osd_op](bufferlist bl) {
osd_op.outdata = std::move(bl);
return seastar::now();
});
}
-static seastar::future<> do_pgnls_filtered(
+static PG::interruptible_future<> do_pgnls_filtered(
const PG& pg,
const std::string& nspace,
OSDOp& osd_op)
nspace,
osd_op.op.pgls.count,
filter.get())
- .then([&osd_op](bufferlist bl) {
+ .then_interruptible([&osd_op](bufferlist bl) {
osd_op.outdata = std::move(bl);
return seastar::now();
});
});
}
-static seastar::future<ceph::bufferlist> do_pgls_common(
+static PG::interruptible_future<ceph::bufferlist> do_pgls_common(
const hobject_t& pg_start,
const hobject_t& pg_end,
const PGBackend& backend,
}
using entries_t = decltype(pg_ls_response_t::entries);
- return backend.list_objects(lower_bound, limit).then(
+ return backend.list_objects(lower_bound, limit).then_interruptible(
[&backend, filter, nspace](auto&& ret) {
auto& [objects, next] = ret;
- return seastar::when_all(
- seastar::map_reduce(std::move(objects),
- [&backend, filter, nspace](const hobject_t& obj) {
+ return PG::interruptor::when_all(
+ PG::interruptor::map_reduce(std::move(objects),
+ [&backend, filter, nspace](const hobject_t& obj)
+ -> PG::interruptible_future<hobject_t>{
if (obj.get_namespace() == nspace) {
if (filter) {
return pgls_filter(*filter, backend, obj);
return entries;
}),
seastar::make_ready_future<hobject_t>(next));
- }).then([pg_end](auto&& ret) {
+ }).then_interruptible([pg_end](auto&& ret) {
auto entries = std::move(std::get<0>(ret).get0());
auto next = std::move(std::get<1>(ret).get0());
pg_ls_response_t response;
});
}
-static seastar::future<> do_pgls(
+static PG::interruptible_future<> do_pgls(
const PG& pg,
const std::string& nspace,
OSDOp& osd_op)
nspace,
osd_op.op.pgls.count,
nullptr /* no filter */)
- .then([&osd_op](bufferlist bl) {
+ .then_interruptible([&osd_op](bufferlist bl) {
osd_op.outdata = std::move(bl);
return seastar::now();
});
}
-static seastar::future<> do_pgls_filtered(
+static PG::interruptible_future<> do_pgls_filtered(
const PG& pg,
const std::string& nspace,
OSDOp& osd_op)
nspace,
osd_op.op.pgls.count,
filter.get())
- .then([&osd_op](bufferlist bl) {
+ .then_interruptible([&osd_op](bufferlist bl) {
osd_op.outdata = std::move(bl);
return seastar::now();
});
});
}
-seastar::future<>
+PgOpsExecuter::interruptible_future<>
PgOpsExecuter::execute_op(OSDOp& osd_op)
{
logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op));
#include "crimson/osd/object_context.h"
#include "crimson/common/errorator.h"
+#include "crimson/common/interruptible_future.h"
#include "crimson/common/type_helpers.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/shard_services.h"
crimson::ct_error::not_connected,
crimson::ct_error::timed_out>;
+ using call_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, call_errorator>;
+ using read_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, read_errorator>;
+ using write_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, write_ertr>;
+ using get_attr_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, get_attr_errorator>;
+ using watch_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, watch_errorator>;
+
+ template <typename Errorator, typename T = void>
+ using interruptible_errorated_future =
+ ::crimson::interruptible::interruptible_errorated_future<
+ IOInterruptCondition, Errorator, T>;
+ using interruptor =
+ ::crimson::interruptible::interruptor<IOInterruptCondition>;
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ IOInterruptCondition, T>;
+
public:
// because OpsExecuter is pretty heavy-weight object we want to ensure
// it's not copied nor even moved by accident. Performance is the sole
get_attr_errorator,
watch_errorator,
PGBackend::stat_errorator>;
+ using osd_op_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, osd_op_errorator>;
private:
// an operation can be divided into two stages: main and effect-exposing
MainFunc&& main_func,
EffectFunc&& effect_func);
- call_errorator::future<> do_op_call(class OSDOp& osd_op);
- watch_errorator::future<> do_op_watch(
+ call_ierrorator::future<> do_op_call(class OSDOp& osd_op);
+ watch_ierrorator::future<> do_op_watch(
class OSDOp& osd_op,
class ObjectState& os,
ceph::os::Transaction& txn);
- watch_errorator::future<> do_op_watch_subop_watch(
+ watch_ierrorator::future<> do_op_watch_subop_watch(
class OSDOp& osd_op,
class ObjectState& os,
ceph::os::Transaction& txn);
- watch_errorator::future<> do_op_watch_subop_reconnect(
+ watch_ierrorator::future<> do_op_watch_subop_reconnect(
class OSDOp& osd_op,
class ObjectState& os,
ceph::os::Transaction& txn);
- watch_errorator::future<> do_op_watch_subop_unwatch(
+ watch_ierrorator::future<> do_op_watch_subop_unwatch(
class OSDOp& osd_op,
class ObjectState& os,
ceph::os::Transaction& txn);
- watch_errorator::future<> do_op_watch_subop_ping(
+ watch_ierrorator::future<> do_op_watch_subop_ping(
class OSDOp& osd_op,
class ObjectState& os,
ceph::os::Transaction& txn);
- watch_errorator::future<> do_op_notify(
+ watch_ierrorator::future<> do_op_notify(
class OSDOp& osd_op,
const class ObjectState& os);
- watch_errorator::future<> do_op_notify_ack(
+ watch_ierrorator::future<> do_op_notify_ack(
class OSDOp& osd_op,
const class ObjectState& os);
msg(msg) {
}
- osd_op_errorator::future<> execute_op(class OSDOp& osd_op);
+ interruptible_errorated_future<osd_op_errorator>
+ execute_op(class OSDOp& osd_op);
template <typename MutFunc>
- osd_op_errorator::future<> flush_changes(MutFunc&& mut_func) &&;
+ osd_op_ierrorator::future<> flush_changes(MutFunc&& mut_func) &&;
const auto& get_message() const {
return msg;
}
template <typename MutFunc>
-OpsExecuter::osd_op_errorator::future<> OpsExecuter::flush_changes(
+OpsExecuter::osd_op_ierrorator::future<> OpsExecuter::flush_changes(
MutFunc&& mut_func) &&
{
const bool want_mutate = !txn.empty();
// osd_op_params are instantiated by every wr-like operation.
assert(osd_op_params || !want_mutate);
assert(obc);
- auto maybe_mutated = osd_op_errorator::now();
+ auto maybe_mutated = interruptor::make_interruptible(osd_op_errorator::now());
if (want_mutate) {
maybe_mutated = std::forward<MutFunc>(mut_func)(std::move(txn),
std::move(obc),
if (__builtin_expect(op_effects.empty(), true)) {
return maybe_mutated;
} else {
- return maybe_mutated.safe_then([this] {
+ return maybe_mutated.safe_then_interruptible([this] {
// let's do the cleaning of `op_effects` in destructor
- return crimson::do_for_each(op_effects, [] (auto& op_effect) {
+ return interruptor::do_for_each(op_effects, [] (auto& op_effect) {
return op_effect->execute();
});
});
// PgOpsExecuter -- a class for executing ops targeting a certain PG.
class PgOpsExecuter {
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ IOInterruptCondition, T>;
+
public:
PgOpsExecuter(const PG& pg, const MOSDOp& msg)
: pg(pg), nspace(msg.get_hobj().nspace) {
}
- seastar::future<> execute_op(class OSDOp& osd_op);
+ interruptible_future<> execute_op(class OSDOp& osd_op);
private:
const PG& pg;
#pragma once
#include "crimson/osd/osd_operation_sequencer.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/scheduler/scheduler.h"
namespace crimson::osd {
template <typename T>
class OperationT : public Operation {
public:
+ template <typename ValuesT = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, ValuesT>;
+ using interruptor =
+ ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
static constexpr const char *type_name = OP_NAMES[static_cast<int>(T::type)];
using IRef = boost::intrusive_ptr<T>;
IRef ref = this;
return ss.throttler.with_throttle_while(
this, get_scheduler_params(), [this] {
- return do_recovery();
+ return interruptor::with_interruption([this] {
+ return do_recovery();
+ }, [](std::exception_ptr) {
+ return seastar::make_ready_future<bool>(false);
+ }, pg);
}).handle_exception_type([ref, this](const std::system_error& err) {
if (err.code() == std::make_error_code(std::errc::interrupted)) {
logger().debug("{} recovery interruped: {}", *pg, err.what());
});
}
-seastar::future<bool> UrgentRecovery::do_recovery()
+UrgentRecovery::interruptible_future<bool>
+UrgentRecovery::do_recovery()
{
+ logger().debug("{}: {}", __func__, *this);
if (!pg->has_reset_since(epoch_started)) {
- return with_blocking_future(
+ return with_blocking_future_interruptible<IOInterruptCondition>(
pg->get_recovery_handler()->recover_missing(soid, need)
- ).then([] {
+ ).then_interruptible([] {
return seastar::make_ready_future<bool>(false);
});
}
void UrgentRecovery::print(std::ostream &lhs) const
{
lhs << "UrgentRecovery(" << pg->get_pgid() << ", "
- << soid << ", v" << need << ")";
+ << soid << ", v" << need << ", epoch_started: "
+ << epoch_started << ")";
}
void UrgentRecovery::dump_detail(Formatter *f) const
crimson::osd::scheduler::scheduler_class_t::background_recovery)
{}
-seastar::future<bool> PglogBasedRecovery::do_recovery()
+PglogBasedRecovery::interruptible_future<bool>
+PglogBasedRecovery::do_recovery()
{
if (pg->has_reset_since(epoch_started))
return seastar::make_ready_future<bool>(false);
- return with_blocking_future(
+ return with_blocking_future_interruptible<IOInterruptCondition>(
pg->get_recovery_handler()->start_recovery_ops(
crimson::common::local_conf()->osd_recovery_max_single_start));
}
return pg.backfill_pipeline;
}
-seastar::future<bool> BackfillRecovery::do_recovery()
+BackfillRecovery::interruptible_future<bool>
+BackfillRecovery::do_recovery()
{
logger().debug("{}", __func__);
return seastar::make_ready_future<bool>(false);
}
// TODO: limits
- return with_blocking_future(
+ return with_blocking_future_interruptible<IOInterruptCondition>(
// process_event() of our boost::statechart machine is non-reentrant.
// with the backfill_pipeline we protect it from a second entry from
// the implementation of BackfillListener.
// additionally, this stage serves to synchronize with PeeringEvent.
handle.enter(bp(*pg).process)
- ).then([this] {
+ ).then_interruptible([this] {
pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
return seastar::make_ready_future<bool>(false);
});
scheduler_class
};
}
- virtual seastar::future<bool> do_recovery() = 0;
+ virtual interruptible_future<bool> do_recovery() = 0;
ShardServices &ss;
const crimson::osd::scheduler::scheduler_class_t scheduler_class;
};
private:
void dump_detail(Formatter* f) const final;
- seastar::future<bool> do_recovery() override;
+ interruptible_future<bool> do_recovery() override;
const hobject_t soid;
const eversion_t need;
};
epoch_t epoch_started);
private:
- seastar::future<bool> do_recovery() override;
+ interruptible_future<bool> do_recovery() override;
};
class BackfillRecovery final : public BackgroundRecovery {
private:
boost::intrusive_ptr<const boost::statechart::event_base> evt;
PipelineHandle handle;
- seastar::future<bool> do_recovery() override;
+ interruptible_future<bool> do_recovery() override;
};
template <class EventT>
{
logger().debug("{}: start", *this);
- return crimson::common::handle_system_shutdown(
- [this, opref=IRef{this}]() mutable {
- return seastar::repeat([this, opref=std::move(opref)]() mutable {
+ return seastar::repeat([this, opref=IRef{this}]() mutable {
logger().debug("{}: in repeat", *this);
return with_blocking_future(handle.enter(cp().await_map))
- .then([this] {
- return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ .then([this]() {
+ return with_blocking_future(
+ osd.osdmap_gate.wait_for_map(
+ m->get_min_epoch()));
}).then([this](epoch_t epoch) {
return with_blocking_future(handle.enter(cp().get_pg));
}).then([this] {
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
}).then([this](Ref<PG> pgref) mutable {
- epoch_t same_interval_since = pgref->get_interval_start_epoch();
- logger().debug("{} same_interval_since: {}", *this, same_interval_since);
- return sequencer.start_op(
- handle, prev_op_id, get_id(),
- [this, pgref] {
- PG &pg = *pgref;
- if (pg.can_discard_op(*m)) {
- logger().debug("{} op discarded, {}, same_primary_since: {}",
- *this, pg, pg.get_info().history.same_primary_since);
- return osd.send_incremental_map(conn, m->get_map_epoch());
- }
- return with_blocking_future(
- handle.enter(pp(pg).await_map)
- ).then([this, &pg]() mutable {
- return with_blocking_future(
- pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
- }).then([this, &pg](auto map) mutable {
- return with_blocking_future(
- handle.enter(pp(pg).wait_for_active));
- }).then([this, &pg]() mutable {
- return with_blocking_future(pg.wait_for_active_blocker.wait());
- }).then([this, pgref=std::move(pgref)]() mutable {
- if (m->finish_decode()) {
- m->clear_payload();
- }
- if (is_pg_op()) {
- return process_pg_op(pgref);
- } else {
- return process_op(pgref);
- }
- });
- }).then([this] {
- sequencer.finish_op(get_id());
- return seastar::stop_iteration::yes;
- }).handle_exception_type(
- [this](crimson::common::actingset_changed& e) mutable {
- if (e.is_primary()) {
- logger().debug("operation restart, acting set changed");
- sequencer.maybe_reset(get_id());
- return seastar::stop_iteration::no;
- } else {
- sequencer.abort();
- logger().debug("operation abort, up primary changed");
+ return interruptor::with_interruption([this, pgref]() mutable {
+ epoch_t same_interval_since = pgref->get_interval_start_epoch();
+ logger().debug("{} same_interval_since: {}", *this, same_interval_since);
+ return sequencer.start_op(
+ handle, prev_op_id, get_id(),
+ interruptor::wrap_function(
+ [this, pgref]() mutable -> interruptible_future<> {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return osd.send_incremental_map(conn, m->get_map_epoch());
+ }
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(pg).await_map)
+ ).then_interruptible([this, &pg]() mutable {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then_interruptible([this, &pg](auto map) mutable {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(pg).wait_for_active));
+ }).then_interruptible([this, &pg]() mutable {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ pg.wait_for_active_blocker.wait());
+ }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(pgref);
+ }
+ });
+ })
+ ).then_interruptible([this, pgref]() mutable {
+ sequencer.finish_op(get_id());
return seastar::stop_iteration::yes;
- }
- }).handle_exception_type(
- [](seastar::broken_condition_variable&) {
- return seastar::stop_iteration::yes;
- });
+ });
+ }, [this, pgref](std::exception_ptr eptr) mutable {
+ if (*eptr.__cxa_exception_type() ==
+ typeid(::crimson::common::actingset_changed)) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch(::crimson::common::actingset_changed& e) {
+ if (e.is_primary()) {
+ logger().debug("{} operation restart, acting set changed", *this);
+ sequencer.maybe_reset(get_id());
+ return seastar::stop_iteration::no;
+ } else {
+ logger().debug("{} operation abort, up primary changed", *this);
+ sequencer.abort();
+ return seastar::stop_iteration::yes;
+ }
+ }
+ }
+ assert(*eptr.__cxa_exception_type() ==
+ typeid(crimson::common::system_shutdown_exception));
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "{} operation skipped, system shutdown", *this);
+ return seastar::stop_iteration::yes;
+ }, pgref);
});
});
- });
}
-seastar::future<> ClientRequest::process_pg_op(
+ClientRequest::interruptible_future<>
+ClientRequest::process_pg_op(
Ref<PG> &pg)
{
return pg->do_pg_ops(m)
- .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
+ .then_interruptible([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
}
-seastar::future<> ClientRequest::process_op(Ref<PG> &pg)
+ClientRequest::interruptible_future<>
+ClientRequest::process_op(Ref<PG> &pg)
{
- return with_blocking_future(handle.enter(pp(*pg).recover_missing)).then(
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(*pg).recover_missing))
+ .then_interruptible(
[this, pg]() mutable {
return do_recover_missing(pg);
- }).then([this, pg]() mutable {
- return pg->already_complete(m->get_reqid()).then_unpack(
+ }).then_interruptible([this, pg]() mutable {
+ return pg->already_complete(m->get_reqid()).then_unpack_interruptible(
[this, pg](bool completed, int ret) mutable
- -> PG::load_obc_ertr::future<> {
+ -> PG::load_obc_iertr::future<> {
if (completed) {
auto reply = make_message<MOSDOpReply>(
m.get(), ret, pg->get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
return conn->send(std::move(reply));
} else {
- return with_blocking_future(handle.enter(pp(*pg).get_obc)).then(
- [this, pg]() mutable -> PG::load_obc_ertr::future<> {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(*pg).get_obc)).then_interruptible(
+ [this, pg]() mutable -> PG::load_obc_iertr::future<> {
logger().debug("{}: got obc lock", *this);
op_info.set_from_op(&*m, *pg->get_osdmap());
return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable {
- return with_blocking_future(handle.enter(pp(*pg).process)).then(
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(*pg).process)).then_interruptible(
[this, pg, obc]() mutable {
return do_process(pg, obc);
});
});
}
});
- }).safe_then([pg=std::move(pg)] {
+ }).safe_then_interruptible([pg=std::move(pg)] {
return seastar::now();
}, PG::load_obc_ertr::all_same_way([](auto &code) {
logger().error("ClientRequest saw error code {}", code);
}));
}
-seastar::future<> ClientRequest::do_recover_missing(Ref<PG>& pg)
+ClientRequest::interruptible_future<>
+ClientRequest::do_recover_missing(Ref<PG>& pg)
{
eversion_t ver;
const hobject_t& soid = m->get_hobj();
}
}
-seastar::future<>
+ClientRequest::interruptible_future<>
ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
{
if (!pg->is_primary()) {
return conn->send(std::move(reply));
}
}
- return pg->do_osd_ops(m, obc, op_info).safe_then([this](Ref<MOSDOpReply> reply) {
+ return pg->do_osd_ops(m, obc, op_info).safe_then_interruptible(
+ [this](Ref<MOSDOpReply> reply) -> interruptible_future<> {
return conn->send(std::move(reply));
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
return process_op(pg);
seastar::future<> start();
private:
- seastar::future<> process_pg_op(Ref<PG>& pg);
- seastar::future<> process_op(Ref<PG>& pg);
- seastar::future<> do_recover_missing(Ref<PG>& pgref);
- seastar::future<> do_process(
+ interruptible_future<> do_recover_missing(Ref<PG>& pgref);
+ interruptible_future<> do_process(
Ref<PG>& pg,
crimson::osd::ObjectContextRef obc);
-
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition> process_pg_op(
+ Ref<PG> &pg);
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition> process_op(
+ Ref<PG> &pg);
bool is_pg_op() const;
ConnectionPipeline &cp();
OpSequencer& sequencer;
const uint64_t prev_op_id;
+ template <typename Errorator>
+ using interruptible_errorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ Errorator>;
private:
bool is_misdirected(const PG& pg) const;
};
logger().debug("{}: start", *this);
IRef opref = this;
- return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()))
+ return with_blocking_future(
+ osd.osdmap_gate.wait_for_map(m->get_min_epoch()))
.then([this] (epoch_t epoch) {
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
}).then([this, opref=std::move(opref)] (Ref<PG> pgref) {
- return seastar::do_with(std::move(pgref), std::move(opref),
- [this](auto& pgref, auto& opref) {
- return pgref->get_recovery_backend()->handle_recovery_op(m);
- });
+ return interruptor::with_interruption([this, opref, pgref] {
+ return seastar::do_with(std::move(pgref), std::move(opref),
+ [this](auto& pgref, auto& opref) {
+ return pgref->get_recovery_backend()->handle_recovery_op(m);
+ });
+ }, [](std::exception_ptr) { return seastar::now(); }, pgref);
});
}
{
logger().debug("{} start", *this);
IRef ref = this;
+
return with_blocking_future(handle.enter(cp().await_map))
- .then([this]() {
- return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch()));
- }).then([this](epoch_t epoch) {
- return with_blocking_future(handle.enter(cp().get_pg));
- }).then([this] {
- return with_blocking_future(osd.wait_for_pg(req->get_spg()));
- }).then([this, ref=std::move(ref)](Ref<PG> pg) {
- return pg->handle_rep_op(std::move(req));
- });
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(req->get_spg()));
+ }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+ return interruptor::with_interruption([this, ref, pg] {
+ return pg->handle_rep_op(std::move(req));
+ }, [](std::exception_ptr) { return seastar::now(); }, pg);
+ });
}
}
return seastar::now();
}
-seastar::future<> PG::submit_transaction(const OpInfo& op_info,
+PG::interruptible_future<> PG::submit_transaction(const OpInfo& op_info,
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p)
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
map_epoch,
- std::move(log_entries)).then(
+ std::move(log_entries)).then_interruptible(
[this, last_complete=peering_state.get_info().last_complete,
at_version=osd_op_p.at_version](auto acked) {
for (const auto& peer : acked) {
}
}
-seastar::future<Ref<MOSDOpReply>> PG::handle_failed_op(
+PG::interruptible_future<Ref<MOSDOpReply>> PG::handle_failed_op(
const std::error_code& e,
ObjectContextRef obc,
const OpsExecuter& ox,
e.message(),
need_reload_obc);
return (need_reload_obc ? reload_obc(*obc)
- : load_obc_ertr::now()
- ).safe_then([e, &m, obc = std::move(obc), this] {
+ : interruptor::make_interruptible(load_obc_ertr::now())
+ ).safe_then_interruptible([e, &m, obc = std::move(obc), this] {
auto reply = make_message<MOSDOpReply>(
&m, -e.value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(
}, load_obc_ertr::assert_all{ "can't live with object state messed up" });
}
-seastar::future<> PG::repair_object(
+PG::interruptible_future<> PG::repair_object(
Ref<MOSDOp> m,
const hobject_t& oid,
eversion_t& v)
return std::move(fut);
}
-PG::do_osd_ops_ertr::future<Ref<MOSDOpReply>>
+PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
PG::do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
throw crimson::common::system_shutdown_exception();
}
- using osd_op_errorator = OpsExecuter::osd_op_errorator;
+ using osd_op_ierrorator =
+ OpsExecuter::osd_op_ierrorator;
const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
: m->get_hobj();
auto ox = std::make_unique<OpsExecuter>(
obc, op_info, get_pool().info, get_backend(), *m);
- return crimson::do_for_each(
- m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
+ return interruptor::do_for_each(
+ m->ops.begin(), m->ops.end(), [obc, m, ox = ox.get()](OSDOp& osd_op) {
logger().debug(
"do_osd_ops: {} - object {} - handling op {}",
*m,
obc->obs.oi.soid,
ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
- }).safe_then([this, obc, m, ox = ox.get(), &op_info] {
+ }).safe_then_interruptible([this, obc, m, ox = ox.get(), &op_info] {
logger().debug(
"do_osd_ops: {} - object {} all operations successful",
*m,
[this, m, &op_info] (auto&& txn,
auto&& obc,
auto&& osd_op_p,
- bool user_modify) -> osd_op_errorator::future<> {
+ bool user_modify) -> osd_op_ierrorator::future<> {
logger().debug(
"do_osd_ops: {} - object {} submitting txn",
*m,
std::move(txn),
std::move(osd_op_p));
});
- }).safe_then([this, m, obc, rvec = op_info.allows_returnvec()] {
+ }).safe_then_interruptible_tuple([this, m, obc, rvec = op_info.allows_returnvec()]()
+ -> PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>> {
// TODO: should stop at the first op which returns a negative retval,
// cmpext uses it for returning the index of first unmatched byte
int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
return PG::do_osd_ops_ertr::make_ready_future<Ref<MOSDOpReply>>(
std::move(reply));
}, crimson::ct_error::object_corrupted::handle([m, obc, this] {
- return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version).then([] {
+ return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version)
+ .then_interruptible([] {
return PG::do_osd_ops_ertr::future<Ref<MOSDOpReply>>(
crimson::ct_error::eagain::make());
});
}));
}
-seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
std::as_const(*m));
- return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
+ return interruptor::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
- }).then([m, this, ox = std::move(ox)] {
+ }).then_interruptible([m, this, ox = std::move(ox)] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
false);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }).handle_exception_type([=](const crimson::osd::error& e) {
+ }).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
auto reply = make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
}
template<RWState::State State>
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
{
logger().debug("{} {}", __func__, oid);
assert(oid.is_head());
auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
obc->append_to(obc_set_accessing);
- return obc->with_lock<State>(
+ return obc->with_lock<State, IOInterruptCondition>(
[oid=std::move(oid), existed=existed, obc=obc,
func=std::move(func), this] {
- auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
if (existed) {
logger().debug("with_head_obc: found {} in cache", oid);
} else {
logger().debug("with_head_obc: cache miss on {}", oid);
- loaded = obc->with_promoted_lock<State>([this, obc] {
+ loaded = obc->with_promoted_lock<State, IOInterruptCondition>([this, obc] {
return load_head_obc(obc);
});
}
- return loaded.safe_then([func=std::move(func)](auto obc) {
+ return loaded.safe_then_interruptible([func = std::move(func)](auto obc) {
return func(std::move(obc));
});
}).finally([this, pgref, obc=std::move(obc)] {
}
template<RWState::State State>
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
{
assert(!oid.is_head());
return with_head_obc<RWState::RWREAD>(oid.get_head(),
- [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> {
+ [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
auto coid = resolve_oid(head->get_ro_ss(), oid);
if (!coid) {
// TODO: return crimson::ct_error::enoent::make();
return clone->template with_lock<State>(
[coid=*coid, existed=existed,
head=std::move(head), clone=std::move(clone),
- func=std::move(func), this]() -> load_obc_ertr::future<> {
- auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(clone);
+ func=std::move(func), this]() -> load_obc_iertr::future<> {
+ auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(clone);
if (existed) {
logger().debug("with_clone_obc: found {} in cache", coid);
} else {
logger().debug("with_clone_obc: cache miss on {}", coid);
loaded = clone->template with_promoted_lock<State>(
[coid, clone, head, this] {
- return backend->load_metadata(coid).safe_then(
+ return backend->load_metadata(coid).safe_then_interruptible(
[coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
clone->set_clone_state(std::move(md->os), std::move(head));
return clone;
});
});
}
- return loaded.safe_then([func=std::move(func)](auto clone) {
+ return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
return func(std::move(clone));
});
});
}
// explicitly instantiate the used instantiations
-template PG::load_obc_ertr::future<>
+template PG::load_obc_iertr::future<>
PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
-PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
PG::load_head_obc(ObjectContextRef obc)
{
hobject_t oid = obc->get_oid();
- return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md)
+ return backend->load_metadata(oid).safe_then_interruptible(
+ [obc=std::move(obc)](auto md)
-> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
const hobject_t& oid = md->os.oi.soid;
logger().debug(
});
}
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
PG::reload_obc(crimson::osd::ObjectContext& obc) const
{
assert(obc.is_head());
- return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md)
+ return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
-> load_obc_ertr::future<> {
logger().debug(
"{}: reloaded obs {} for {}",
});
}
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
Operation *op, PG::with_obc_func_t &&f)
{
};
}
-seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
decode(log_entries, p);
peering_state.append_log(std::move(log_entries), req->pg_trim_to,
req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
- return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
- .then([req, lcod=peering_state.get_info().last_complete, this] {
+ return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+ coll_ref, std::move(txn))).then_interruptible(
+ [req, lcod=peering_state.get_info().last_complete, this] {
peering_state.update_last_complete_ondisk(lcod);
const auto map_epoch = get_osdmap_epoch();
auto reply = make_message<MOSDRepOpReply>(
return false;
}
-seastar::future<std::tuple<bool, int>>
+PG::interruptible_future<std::tuple<bool, int>>
PG::already_complete(const osd_reqid_t& reqid)
{
eversion_t version;
#include "crimson/osd/object_context.h"
#include "osd/PeeringState.h"
+#include "crimson/common/interruptible_future.h"
#include "crimson/common/type_helpers.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/osd/backfill_state.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
seastar::timer<seastar::lowres_clock> renew_lease_timer;
public:
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
+
PG(spg_t pgid,
pg_shard_t pg_shard,
crimson::os::CollectionRef coll_ref,
using load_obc_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
-
- load_obc_ertr::future<crimson::osd::ObjectContextRef>
+ using load_obc_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ load_obc_ertr>;
+ using interruptor = ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
+ load_obc_iertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+ get_or_load_clone_obc(
+ hobject_t oid, crimson::osd::ObjectContextRef head_obc);
+
+ load_obc_iertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+ get_or_load_head_obc(hobject_t oid);
+
+ load_obc_iertr::future<crimson::osd::ObjectContextRef>
load_head_obc(ObjectContextRef obc);
- load_obc_ertr::future<>
+ load_obc_iertr::future<>
reload_obc(crimson::osd::ObjectContext& obc) const;
public:
using with_obc_func_t =
- std::function<load_obc_ertr::future<> (ObjectContextRef)>;
+ std::function<load_obc_iertr::future<> (ObjectContextRef)>;
using obc_accessing_list_t = boost::intrusive::list<
ObjectContext,
obc_accessing_list_t obc_set_accessing;
template<RWState::State State>
- load_obc_ertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
+ load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
- load_obc_ertr::future<> with_locked_obc(
+ load_obc_iertr::future<> with_locked_obc(
Ref<MOSDOp> &m,
const OpInfo &op_info,
Operation *op,
with_obc_func_t&& f);
- seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
+ interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
void handle_rep_op_reply(crimson::net::ConnectionRef conn,
const MOSDRepOpReply& m);
private:
template<RWState::State State>
- load_obc_ertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
+ load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
- load_obc_ertr::future<ObjectContextRef> get_locked_obc(
+ load_obc_iertr::future<ObjectContextRef> get_locked_obc(
Operation *op,
const hobject_t &oid,
RWState::State type);
osd_op_params_t& osd_op_p,
Ref<MOSDOp> m,
const bool user_modify);
- seastar::future<Ref<MOSDOpReply>> handle_failed_op(
+ interruptible_future<Ref<MOSDOpReply>> handle_failed_op(
const std::error_code& e,
ObjectContextRef obc,
const OpsExecuter& ox,
const MOSDOp& m) const;
using do_osd_ops_ertr = crimson::errorator<
crimson::ct_error::eagain>;
- do_osd_ops_ertr::future<Ref<MOSDOpReply>> do_osd_ops(
+ using do_osd_ops_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ ::crimson::errorator<crimson::ct_error::eagain>>;
+ do_osd_ops_iertr::future<Ref<MOSDOpReply>> do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
const OpInfo &op_info);
- seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
- seastar::future<> submit_transaction(const OpInfo& op_info,
+ interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
+ interruptible_future<> submit_transaction(const OpInfo& op_info,
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
osd_op_params_t&& oop);
- seastar::future<> repair_object(Ref<MOSDOp> m,
+ interruptible_future<> repair_object(Ref<MOSDOp> m,
const hobject_t& oid,
eversion_t& v);
return &it->second;
}
}
- seastar::future<std::tuple<bool, int>> already_complete(const osd_reqid_t& reqid);
+ interruptible_future<std::tuple<bool, int>> already_complete(const osd_reqid_t& reqid);
int get_recovery_op_priority() const {
int64_t pri = 0;
get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
private:
BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
+
+ friend class IOInterruptCondition;
};
std::ostream& operator<<(std::ostream&, const PG& pg);
store{store}
{}
-PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref>
+PGBackend::load_metadata_iertr::future
+ <PGBackend::loaded_object_md_t::ref>
PGBackend::load_metadata(const hobject_t& oid)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
- return store->get_attrs(
+ return interruptor::make_interruptible(store->get_attrs(
coll,
- ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then(
+ ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
[oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
loaded_object_md_t::ref ret(new loaded_object_md_t());
if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
}));
}
-seastar::future<crimson::osd::acked_peers_t>
+PGBackend::interruptible_future<crimson::osd::acked_peers_t>
PGBackend::mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
return true;
}
-PGBackend::read_errorator::future<>
+PGBackend::read_ierrorator::future<>
PGBackend::read(const ObjectState& os, OSDOp& osd_op)
{
const auto& oi = os.oi;
// read the whole object if length is 0
length = size;
}
- return _read(oi.soid, offset, length, op.flags).safe_then(
+ return _read(oi.soid, offset, length, op.flags).safe_then_interruptible_tuple(
[&oi, &osd_op](auto&& bl) -> read_errorator::future<> {
if (!_read_verify_data(oi, bl)) {
// crc mismatches
read_errorator::pass_further{});
}
-PGBackend::read_errorator::future<>
+PGBackend::read_ierrorator::future<>
PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op)
{
const auto& op = osd_op.op;
logger().trace("sparse_read: {} {}~{}",
os.oi.soid, op.extent.offset, op.extent.length);
- return store->fiemap(coll, ghobject_t{os.oi.soid},
+ return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
op.extent.offset,
- op.extent.length).then([&os, &osd_op, this](auto&& m) {
+ op.extent.length)).then_interruptible(
+ [&os, &osd_op, this](auto&& m) {
return seastar::do_with(interval_set<uint64_t>{std::move(m)},
[&os, &osd_op, this](auto&& extents) {
- return store->readv(coll, ghobject_t{os.oi.soid},
- extents, osd_op.op.flags).safe_then(
+ return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid},
+ extents, osd_op.op.flags)).safe_then_interruptible_tuple(
[&os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
if (_read_verify_data(os.oi, bl)) {
osd_op.op.extent.length = bl.length();
}
}
-PGBackend::checksum_errorator::future<>
+PGBackend::checksum_ierrorator::future<>
PGBackend::checksum(const ObjectState& os, OSDOp& osd_op)
{
// sanity tests and normalize the argments
}
// read the chunk to be checksum'ed
- return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags).safe_then(
+ return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags)
+ .safe_then_interruptible(
[&osd_op](auto&& read_bl) mutable -> checksum_errorator::future<> {
auto& checksum = osd_op.op.checksum;
if (read_bl.length() != checksum.length) {
});
}
-PGBackend::cmp_ext_errorator::future<>
+PGBackend::cmp_ext_ierrorator::future<>
PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op)
{
const ceph_osd_op& op = osd_op.op;
} else {
ext_len = op.extent.length;
}
- auto read_ext = ll_read_errorator::make_ready_future<ceph::bufferlist>();
+ auto read_ext = ll_read_ierrorator::make_ready_future<ceph::bufferlist>();
if (ext_len == 0) {
logger().debug("{}: zero length extent", __func__);
} else if (!os.exists || os.oi.is_whiteout()) {
} else {
read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0);
}
- return read_ext.safe_then([&osd_op](auto&& read_bl) {
+ return read_ext.safe_then_interruptible([&osd_op](auto&& read_bl) {
int32_t retcode = 0;
for (unsigned index = 0; index < osd_op.indata.length(); index++) {
char byte_in_op = osd_op.indata[index];
});
}
-PGBackend::stat_errorator::future<> PGBackend::stat(
+PGBackend::stat_ierrorator::future<>
+PGBackend::stat(
const ObjectState& os,
OSDOp& osd_op)
{
}
}
-seastar::future<> PGBackend::write(
+PGBackend::interruptible_future<> PGBackend::write(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
return seastar::now();
}
-seastar::future<> PGBackend::write_same(
+PGBackend::interruptible_future<> PGBackend::write_same(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
return seastar::now();
}
-seastar::future<> PGBackend::writefull(
+PGBackend::interruptible_future<> PGBackend::writefull(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
return seastar::now();
}
-PGBackend::append_errorator::future<> PGBackend::append(
+PGBackend::append_ierrorator::future<> PGBackend::append(
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& txn,
return seastar::now();
}
-PGBackend::write_ertr::future<> PGBackend::truncate(
+PGBackend::write_iertr::future<> PGBackend::truncate(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
return write_ertr::now();
}
-PGBackend::write_ertr::future<> PGBackend::zero(
+PGBackend::write_iertr::future<> PGBackend::zero(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
return write_ertr::now();
}
-seastar::future<> PGBackend::create(
+PGBackend::interruptible_future<> PGBackend::create(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
return seastar::now();
}
-seastar::future<> PGBackend::remove(ObjectState& os,
- ceph::os::Transaction& txn)
+PGBackend::interruptible_future<>
+PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn)
{
// todo: snapset
txn.remove(coll->get_cid(),
return seastar::now();
}
-seastar::future<std::tuple<std::vector<hobject_t>, hobject_t>>
+PGBackend::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>>
PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
{
if (__builtin_expect(stopping, false)) {
}
auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
- return store->list_objects(coll,
- gstart,
- ghobject_t::get_max(),
- limit)
- .then([](auto ret) {
+ return interruptor::make_interruptible(store->list_objects(coll,
+ gstart,
+ ghobject_t::get_max(),
+ limit))
+ .then_interruptible([](auto ret) {
auto& [gobjects, next] = ret;
std::vector<hobject_t> objects;
boost::copy(gobjects |
});
}
-seastar::future<> PGBackend::setxattr(
+PGBackend::interruptible_future<> PGBackend::setxattr(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
//ctx->delta_stats.num_wr++;
}
-PGBackend::get_attr_errorator::future<> PGBackend::getxattr(
+PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
const ObjectState& os,
OSDOp& osd_op) const
{
name = "_" + aname;
}
logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
- return getxattr(os.oi.soid, name).safe_then([&osd_op] (ceph::bufferptr val) {
+ return getxattr(os.oi.soid, name).safe_then_interruptible(
+ [&osd_op] (ceph::bufferptr val) {
osd_op.outdata.clear();
osd_op.outdata.push_back(std::move(val));
osd_op.op.xattr.value_len = osd_op.outdata.length();
//ctx->delta_stats.num_rd++;
}
-PGBackend::get_attr_errorator::future<ceph::bufferptr> PGBackend::getxattr(
+PGBackend::get_attr_ierrorator::future<ceph::bufferptr>
+PGBackend::getxattr(
const hobject_t& soid,
std::string_view key) const
{
return store->get_attr(coll, ghobject_t{soid}, key);
}
-PGBackend::get_attr_errorator::future<> PGBackend::get_xattrs(
+PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
const ObjectState& os,
OSDOp& osd_op) const
{
});
}
-PGBackend::rm_xattr_ertr::future<> PGBackend::rm_xattr(
+PGBackend::rm_xattr_iertr::future<>
+PGBackend::rm_xattr(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
string attr_name{"_"};
bp.copy(osd_op.op.xattr.name_len, attr_name);
txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name);
- return rm_xattr_ertr::now();
+ return rm_xattr_iertr::now();
}
using get_omap_ertr =
crimson::os::FuturizedStore::read_errorator::extend<
crimson::ct_error::enodata>;
+using get_omap_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ get_omap_ertr>;
static
-get_omap_ertr::future<
+get_omap_iertr::future<
crimson::os::FuturizedStore::omap_values_t>
maybe_get_omap_vals_by_keys(
crimson::os::FuturizedStore* store,
}
static
-get_omap_ertr::future<
+get_omap_iertr::future<
std::tuple<bool, crimson::os::FuturizedStore::omap_values_t>>
maybe_get_omap_vals(
crimson::os::FuturizedStore* store,
}
}
-PGBackend::ll_read_errorator::future<ceph::bufferlist>
+PGBackend::ll_read_ierrorator::future<ceph::bufferlist>
PGBackend::omap_get_header(
const crimson::os::CollectionRef& c,
const ghobject_t& oid) const
return store->omap_get_header(c, oid);
}
-PGBackend::ll_read_errorator::future<>
+PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_header(
const ObjectState& os,
OSDOp& osd_op) const
{
- return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then(
+ return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then_interruptible(
[&osd_op] (ceph::bufferlist&& header) {
osd_op.outdata = std::move(header);
return seastar::now();
});
}
-PGBackend::ll_read_errorator::future<>
+PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_keys(
const ObjectState& os,
OSDOp& osd_op) const
std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
// TODO: truly chunk the reading
- return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then(
+ return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible(
[=, &osd_op](auto ret) {
ceph::bufferlist result;
bool truncated = false;
osd_op.outdata.claim_append(result);
encode(truncated, osd_op.outdata);
return seastar::now();
- }).handle_error(
+ }).handle_error_interruptible(
crimson::ct_error::enodata::handle([&osd_op] {
uint32_t num = 0;
bool truncated = false;
//ctx->delta_stats.num_rd++;
}
-PGBackend::ll_read_errorator::future<>
+PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_vals(
const ObjectState& os,
OSDOp& osd_op) const
std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
// TODO: truly chunk the reading
- return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then(
+ return maybe_get_omap_vals(store, coll, os.oi, start_after)
+ .safe_then_interruptible(
[=, &osd_op] (auto&& ret) {
auto [done, vals] = std::move(ret);
assert(done);
osd_op.outdata.claim_append(result);
encode(truncated, osd_op.outdata);
return ll_read_errorator::now();
- }).handle_error(
+ }).handle_error_interruptible(
crimson::ct_error::enodata::handle([&osd_op] {
encode(uint32_t{0} /* num */, osd_op.outdata);
encode(bool{false} /* truncated */, osd_op.outdata);
//ctx->delta_stats.num_rd++;
}
-PGBackend::ll_read_errorator::future<>
+PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_vals_by_keys(
const ObjectState& os,
OSDOp& osd_op) const
} catch (buffer::error&) {
throw crimson::osd::invalid_argument();
}
- return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get).safe_then(
+ return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get)
+ .safe_then_interruptible(
[&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) {
encode(vals, osd_op.outdata);
return ll_read_errorator::now();
- }).handle_error(
+ }).handle_error_interruptible(
crimson::ct_error::enodata::handle([&osd_op] {
uint32_t num = 0;
encode(num, osd_op.outdata);
//ctx->delta_stats.num_rd++;
}
-seastar::future<> PGBackend::omap_set_vals(
+PGBackend::interruptible_future<>
+PGBackend::omap_set_vals(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
return seastar::now();
}
-seastar::future<> PGBackend::omap_set_header(
+PGBackend::interruptible_future<>
+PGBackend::omap_set_header(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
return seastar::now();
}
-seastar::future<> PGBackend::omap_remove_range(
+PGBackend::interruptible_future<> PGBackend::omap_remove_range(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
return seastar::now();
}
-PGBackend::omap_clear_ertr::future<>
+PGBackend::omap_clear_iertr::future<>
PGBackend::omap_clear(
ObjectState& os,
OSDOp& osd_op,
return omap_clear_ertr::now();
}
-seastar::future<struct stat> PGBackend::stat(
+PGBackend::interruptible_future<struct stat>
+PGBackend::stat(
CollectionRef c,
const ghobject_t& oid) const
{
return store->stat(c, oid);
}
-seastar::future<std::map<uint64_t, uint64_t>>
+PGBackend::interruptible_future<std::map<uint64_t, uint64_t>>
PGBackend::fiemap(
CollectionRef c,
const ghobject_t& oid,
using ec_profile_t = std::map<std::string, std::string>;
// low-level read errorator
using ll_read_errorator = crimson::os::FuturizedStore::read_errorator;
+ using ll_read_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ ll_read_errorator>;
public:
using load_metadata_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
+ using load_metadata_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ load_metadata_ertr>;
+ using interruptor =
+ ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store);
virtual ~PGBackend() = default;
static std::unique_ptr<PGBackend> create(pg_t pgid,
std::map<std::string, ceph::bufferptr, std::less<>>;
using read_errorator = ll_read_errorator::extend<
crimson::ct_error::object_corrupted>;
- read_errorator::future<> read(
+ using read_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ read_errorator>;
+ read_ierrorator::future<> read(
const ObjectState& os,
OSDOp& osd_op);
- read_errorator::future<> sparse_read(
+ read_ierrorator::future<> sparse_read(
const ObjectState& os,
OSDOp& osd_op);
using checksum_errorator = ll_read_errorator::extend<
crimson::ct_error::object_corrupted,
crimson::ct_error::invarg>;
- checksum_errorator::future<> checksum(
+ using checksum_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ checksum_errorator>;
+ checksum_ierrorator::future<> checksum(
const ObjectState& os,
OSDOp& osd_op);
using cmp_ext_errorator = ll_read_errorator::extend<
crimson::ct_error::invarg>;
- cmp_ext_errorator::future<> cmp_ext(
+ using cmp_ext_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ cmp_ext_errorator>;
+ cmp_ext_ierrorator::future<> cmp_ext(
const ObjectState& os,
OSDOp& osd_op);
using stat_errorator = crimson::errorator<crimson::ct_error::enoent>;
- stat_errorator::future<> stat(
+ using stat_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ stat_errorator>;
+ stat_ierrorator::future<> stat(
const ObjectState& os,
OSDOp& osd_op);
// TODO: switch the entire write family to errorator.
using write_ertr = crimson::errorator<
crimson::ct_error::file_too_large>;
- seastar::future<> create(
+ using write_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ write_ertr>;
+ interruptible_future<> create(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
- seastar::future<> remove(
+ interruptible_future<> remove(
ObjectState& os,
ceph::os::Transaction& txn);
- seastar::future<> write(
+ interruptible_future<> write(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- seastar::future<> write_same(
+ interruptible_future<> write_same(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- seastar::future<> writefull(
+ interruptible_future<> writefull(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
using append_errorator = crimson::errorator<
crimson::ct_error::invarg>;
- append_errorator::future<> append(
+ using append_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ append_errorator>;
+ append_ierrorator::future<> append(
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- write_ertr::future<> truncate(
+ write_iertr::future<> truncate(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- write_ertr::future<> zero(
+ write_iertr::future<> zero(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- seastar::future<crimson::osd::acked_peers_t> mutate_object(
+ interruptible_future<crimson::osd::acked_peers_t> mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
epoch_t min_epoch,
epoch_t map_epoch,
std::vector<pg_log_entry_t>&& log_entries);
- seastar::future<std::tuple<std::vector<hobject_t>, hobject_t>> list_objects(
+ interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>> list_objects(
const hobject_t& start,
uint64_t limit) const;
- seastar::future<> setxattr(
+ interruptible_future<> setxattr(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
using get_attr_errorator = crimson::os::FuturizedStore::get_attr_errorator;
- get_attr_errorator::future<> getxattr(
+ using get_attr_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ get_attr_errorator>;
+ get_attr_ierrorator::future<> getxattr(
const ObjectState& os,
OSDOp& osd_op) const;
- get_attr_errorator::future<ceph::bufferptr> getxattr(
+ get_attr_ierrorator::future<ceph::bufferptr> getxattr(
const hobject_t& soid,
std::string_view key) const;
- get_attr_errorator::future<> get_xattrs(
+ get_attr_ierrorator::future<> get_xattrs(
const ObjectState& os,
OSDOp& osd_op) const;
using rm_xattr_ertr = crimson::errorator<crimson::ct_error::enoent>;
- rm_xattr_ertr::future<> rm_xattr(
+ using rm_xattr_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ rm_xattr_ertr>;
+ rm_xattr_iertr::future<> rm_xattr(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
- seastar::future<struct stat> stat(
+ interruptible_future<struct stat> stat(
CollectionRef c,
const ghobject_t& oid) const;
- seastar::future<std::map<uint64_t, uint64_t>> fiemap(
+ interruptible_future<std::map<uint64_t, uint64_t>> fiemap(
CollectionRef c,
const ghobject_t& oid,
uint64_t off,
uint64_t len);
// OMAP
- ll_read_errorator::future<> omap_get_keys(
+ ll_read_ierrorator::future<> omap_get_keys(
const ObjectState& os,
OSDOp& osd_op) const;
- ll_read_errorator::future<> omap_get_vals(
+ ll_read_ierrorator::future<> omap_get_vals(
const ObjectState& os,
OSDOp& osd_op) const;
- ll_read_errorator::future<> omap_get_vals_by_keys(
+ ll_read_ierrorator::future<> omap_get_vals_by_keys(
const ObjectState& os,
OSDOp& osd_op) const;
- seastar::future<> omap_set_vals(
+ interruptible_future<> omap_set_vals(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- ll_read_errorator::future<ceph::bufferlist> omap_get_header(
+ ll_read_ierrorator::future<ceph::bufferlist> omap_get_header(
const crimson::os::CollectionRef& c,
const ghobject_t& oid) const;
- ll_read_errorator::future<> omap_get_header(
+ ll_read_ierrorator::future<> omap_get_header(
const ObjectState& os,
OSDOp& osd_op) const;
- seastar::future<> omap_set_header(
+ interruptible_future<> omap_set_header(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
- seastar::future<> omap_remove_range(
+ interruptible_future<> omap_remove_range(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
using omap_clear_ertr = crimson::errorator<crimson::ct_error::enoent>;
- omap_clear_ertr::future<> omap_clear(
+ using omap_clear_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ omap_clear_ertr>;
+ omap_clear_iertr::future<> omap_clear(
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& trans,
std::optional<SnapSet> ss;
using ref = std::unique_ptr<loaded_object_md_t>;
};
- load_metadata_ertr::future<loaded_object_md_t::ref> load_metadata(
+ load_metadata_iertr::future<loaded_object_md_t::ref>
+ load_metadata(
const hobject_t &oid);
private:
- virtual ll_read_errorator::future<ceph::bufferlist> _read(
+ virtual ll_read_ierrorator::future<ceph::bufferlist> _read(
const hobject_t& hoid,
size_t offset,
size_t length,
uint32_t flags) = 0;
bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
- virtual seastar::future<crimson::osd::acked_peers_t>
+ virtual interruptible_future<crimson::osd::acked_peers_t>
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
-#include "io_interrupt_condition.h"
+#include "pg_interval_interrupt_condition.h"
#include "pg.h"
+#include "crimson/common/log.h"
+
namespace crimson::osd {
IOInterruptCondition::IOInterruptCondition(Ref<PG>& pg)
: pg(pg), e(pg->get_osdmap_epoch()) {}
-epoch_t IOInterruptCondition::get_current_osdmap_epoch() {
- return pg->get_osdmap_epoch();
+bool IOInterruptCondition::new_interval_created() {
+ bool ret = e < pg->get_interval_start_epoch();
+ if (ret)
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "{} new interval, should interrupt, e{}", *pg);
+ return ret;
}
bool IOInterruptCondition::is_stopping() {
+ if (pg->stopping)
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "{} shutting down, should interrupt", *pg);
return pg->stopping;
}
public:
IOInterruptCondition(Ref<PG>& pg);
- epoch_t get_current_osdmap_epoch();
+ bool new_interval_created();
bool is_stopping();
bool is_primary();
- template <typename T>
- std::pair<bool, std::optional<T>> may_interrupt() {
- if (e != get_current_osdmap_epoch()) {
- return std::pair<bool, std::optional<T>>(
- true, seastar::futurize<T>::make_exception_future(
- ::crimson::common::actingset_changed(is_primary())));
+ template <typename Fut>
+ std::pair<bool, std::optional<Fut>> may_interrupt() {
+ if (new_interval_created()) {
+ return {true, seastar::futurize<Fut>::make_exception_future(
+ ::crimson::common::actingset_changed(is_primary()))};
}
if (is_stopping()) {
return {true, seastar::futurize<Fut>::make_exception_future(
pg->get_osdmap_epoch());
}
-crimson::blocking_future<bool>
+PGRecovery::blocking_interruptible_future<bool>
PGRecovery::start_recovery_ops(size_t max_to_start)
{
assert(pg->is_primary());
assert(!pg->is_backfilling());
assert(!pg->get_peering_state().is_deleting());
- std::vector<crimson::blocking_future<>> started;
+ std::vector<blocking_interruptible_future<>> started;
started.reserve(max_to_start);
max_to_start -= start_primary_recovery_ops(max_to_start, &started);
if (max_to_start > 0) {
max_to_start -= start_replica_recovery_ops(max_to_start, &started);
}
- return crimson::join_blocking_futures(std::move(started)).then(
+ return crimson::join_blocking_interruptible_futures<
+ ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible<
+ ::crimson::osd::IOInterruptCondition>(
[this] {
bool done = !pg->get_peering_state().needs_recovery();
if (done) {
size_t PGRecovery::start_primary_recovery_ops(
size_t max_to_start,
- std::vector<crimson::blocking_future<>> *out)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
// TODO: handle lost/unfound
if (pg->get_recovery_backend()->is_recovering(soid)) {
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking());
+ out->push_back(recovery_waiter.wait_for_recovered_blocking<
+ ::crimson::osd::IOInterruptCondition>());
++started;
} else if (pg->get_recovery_backend()->is_recovering(head)) {
++skipped;
size_t PGRecovery::start_replica_recovery_ops(
size_t max_to_start,
- std::vector<crimson::blocking_future<>> *out)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
if (pg->get_recovery_backend()->is_recovering(soid)) {
logger().debug("{}: already recovering object {}", __func__, soid);
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking());
+ out->push_back(recovery_waiter.wait_for_recovered_blocking<
+ ::crimson::osd::IOInterruptCondition>());
started++;
continue;
}
return started;
}
-crimson::blocking_future<> PGRecovery::recover_missing(
+PGRecovery::blocking_interruptible_future<>
+PGRecovery::recover_missing(
const hobject_t &soid, eversion_t need)
{
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
pg->get_recovery_backend()->recover_delete(soid, need));
} else {
return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
+ pg->get_recovery_backend()->recover_object(soid, need)
+ .handle_exception_interruptible(
[=, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
return seastar::make_ready_future<>();
size_t PGRecovery::prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::blocking_future<>> *in_progress)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->push_delete(soid, need).then([=] {
+ pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
+ [=] {
object_stat_sum_t stat_diff;
stat_diff.num_objects_recovered = 1;
on_global_recover(soid, stat_diff, true);
size_t PGRecovery::prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::blocking_future<>> *in_progress)
+ std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
+ pg->get_recovery_backend()->recover_object(soid, need)
+ .handle_exception_interruptible(
[=, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
return seastar::make_ready_future<>();
begin,
local_conf()->osd_backfill_scan_min,
local_conf()->osd_backfill_scan_max
- ).then([this] (BackfillInterval bi) {
+ ).then_interruptible([this] (BackfillInterval bi) {
logger().debug("request_primary_scan:{}", __func__);
using BackfillState = crimson::osd::BackfillState;
start_backfill_recovery(BackfillState::PrimaryScanned{ std::move(bi) });
__func__, obj, v);
pg->get_recovery_backend()->add_recovering(obj);
std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
- handle_exception([] (auto) {
+ handle_exception_interruptible([] (auto) {
ceph_abort_msg("got exception on backfill's push");
return seastar::make_ready_future<>();
- }).then([this, obj] {
+ }).then_interruptible([this, obj] {
logger().debug("enqueue_push:{}", __func__);
using BackfillState = crimson::osd::BackfillState;
start_backfill_recovery(BackfillState::ObjectPushed(std::move(obj)));
#include <seastar/core/future.hh>
#include "crimson/osd/backfill_state.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/pg_recovery_listener.h"
#include "crimson/osd/scheduler/scheduler.h"
class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
public:
+ template <typename T = void>
+ using blocking_interruptible_future =
+ ::crimson::blocking_interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
virtual ~PGRecovery() {}
void start_pglogbased_recovery();
- crimson::blocking_future<bool> start_recovery_ops(size_t max_to_start);
+ blocking_interruptible_future<bool> start_recovery_ops(size_t max_to_start);
void on_backfill_reserved();
void dispatch_backfill_event(
boost::intrusive_ptr<const boost::statechart::event_base> evt);
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
size_t max_to_start,
- std::vector<crimson::blocking_future<>> *out);
+ std::vector<blocking_interruptible_future<>> *out);
size_t start_replica_recovery_ops(
size_t max_to_start,
- std::vector<crimson::blocking_future<>> *out);
+ std::vector<blocking_interruptible_future<>> *out);
std::vector<pg_shard_t> get_replica_recovery_order() const {
return pg->get_replica_recovery_order();
}
- crimson::blocking_future<> recover_missing(
+ blocking_interruptible_future<> recover_missing(
const hobject_t &soid, eversion_t need);
size_t prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::blocking_future<>> *in_progress);
+ std::vector<blocking_interruptible_future<>> *in_progress);
size_t prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::blocking_future<>> *in_progress);
+ std::vector<blocking_interruptible_future<>> *in_progress);
void on_local_recover(
const hobject_t& soid,
RecoveryDone{});
}
-seastar::future<> RecoveryBackend::handle_backfill_progress(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_backfill_progress(
MOSDPGBackfill& m)
{
logger().debug("{}", __func__);
).or_terminate();
}
-seastar::future<> RecoveryBackend::handle_backfill_finish_ack(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_backfill_finish_ack(
MOSDPGBackfill& m)
{
logger().debug("{}", __func__);
return seastar::now();
}
-seastar::future<> RecoveryBackend::handle_backfill(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_backfill(
MOSDPGBackfill& m)
{
logger().debug("{}", __func__);
}
}
-seastar::future<> RecoveryBackend::handle_backfill_remove(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_backfill_remove(
MOSDPGBackfillRemove& m)
{
logger().debug("{} m.ls={}", __func__, m.ls);
).or_terminate();
}
-seastar::future<BackfillInterval> RecoveryBackend::scan_for_backfill(
+RecoveryBackend::interruptible_future<BackfillInterval>
+RecoveryBackend::scan_for_backfill(
const hobject_t& start,
[[maybe_unused]] const std::int64_t min,
const std::int64_t max)
{
logger().debug("{} starting from {}", __func__, start);
auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
- return backend->list_objects(start, max).then(
+ return backend->list_objects(start, max).then_interruptible(
[this, start, version_map] (auto&& ret) {
auto&& [objects, next] = std::move(ret);
- return seastar::parallel_for_each(std::move(objects),
- [this, version_map] (const hobject_t& object) {
+ return interruptor::parallel_for_each(std::move(objects),
+ [this, version_map] (const hobject_t& object)
+ -> interruptible_future<> {
crimson::osd::ObjectContextRef obc;
if (pg.is_primary()) {
obc = shard_services.obc_registry.maybe_get_cached_obc(object);
}
return seastar::now();
} else {
- return backend->load_metadata(object).safe_then(
+ return backend->load_metadata(object).safe_then_interruptible(
[version_map, object] (auto md) {
if (md->os.exists) {
logger().debug("scan_for_backfill found: {} {}",
return seastar::now();
}, PGBackend::load_metadata_ertr::assert_all{});
}
- }).then([version_map, start=std::move(start), next=std::move(next), this] {
+ }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
BackfillInterval bi;
bi.begin = std::move(start);
bi.end = std::move(next);
});
}
-seastar::future<> RecoveryBackend::handle_scan_get_digest(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_scan_get_digest(
MOSDPGScan& m)
{
logger().debug("{}", __func__);
std::move(m.begin),
crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
- ).then([this,
+ ).then_interruptible([this,
query_epoch=m.query_epoch,
conn=m.get_connection()] (auto backfill_interval) {
auto reply = make_message<MOSDPGScan>(
});
}
-seastar::future<> RecoveryBackend::handle_scan_digest(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_scan_digest(
MOSDPGScan& m)
{
logger().debug("{}", __func__);
return seastar::now();
}
-seastar::future<> RecoveryBackend::handle_scan(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_scan(
MOSDPGScan& m)
{
logger().debug("{}", __func__);
}
}
-seastar::future<> RecoveryBackend::handle_recovery_op(
+RecoveryBackend::interruptible_future<>
+RecoveryBackend::handle_recovery_op(
Ref<MOSDFastDispatchOp> m)
{
switch (m->get_header().type) {
#include "crimson/common/type_helpers.h"
#include "crimson/os/futurized_store.h"
#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/shard_services.h"
class PGBackend;
class RecoveryBackend {
- void handle_backfill_finish(
- MOSDPGBackfill& m);
- seastar::future<> handle_backfill_progress(
- MOSDPGBackfill& m);
- seastar::future<> handle_backfill_finish_ack(
- MOSDPGBackfill& m);
- seastar::future<> handle_backfill(MOSDPGBackfill& m);
-
- seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
-
- seastar::future<> handle_scan_get_digest(
- MOSDPGScan& m);
- seastar::future<> handle_scan_digest(
- MOSDPGScan& m);
- seastar::future<> handle_scan(
- MOSDPGScan& m);
protected:
class WaitForObjectRecovery;
public:
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
+ using interruptor =
+ ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
RecoveryBackend(crimson::osd::PG& pg,
crimson::osd::ShardServices& shard_services,
crimson::os::CollectionRef coll,
return recovering.size();
}
- virtual seastar::future<> handle_recovery_op(
+ virtual interruptible_future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m);
- virtual seastar::future<> recover_object(
+ virtual interruptible_future<> recover_object(
const hobject_t& soid,
eversion_t need) = 0;
- virtual seastar::future<> recover_delete(
+ virtual interruptible_future<> recover_delete(
const hobject_t& soid,
eversion_t need) = 0;
- virtual seastar::future<> push_delete(
+ virtual interruptible_future<> push_delete(
const hobject_t& soid,
eversion_t need) = 0;
- seastar::future<BackfillInterval> scan_for_backfill(
+ interruptible_future<BackfillInterval> scan_for_backfill(
const hobject_t& from,
std::int64_t min,
std::int64_t max);
seastar::future<> wait_for_recovered() {
return recovered.get_shared_future();
}
- crimson::blocking_future<>
+ template <typename InterruptCond>
+ crimson::blocking_interruptible_future<InterruptCond>
wait_for_recovered_blocking() {
- return make_blocking_future(
+ return make_blocking_interruptible_future<InterruptCond>(
recovered.get_shared_future());
}
seastar::future<> wait_for_pull() {
}
void clean_up(ceph::os::Transaction& t, std::string_view why);
virtual seastar::future<> on_stop() = 0;
+private:
+ void handle_backfill_finish(
+ MOSDPGBackfill& m);
+ interruptible_future<> handle_backfill_progress(
+ MOSDPGBackfill& m);
+ interruptible_future<> handle_backfill_finish_ack(
+ MOSDPGBackfill& m);
+ interruptible_future<> handle_backfill(MOSDPGBackfill& m);
+
+ interruptible_future<> handle_scan_get_digest(
+ MOSDPGScan& m);
+ interruptible_future<> handle_scan_digest(
+ MOSDPGScan& m);
+ interruptible_future<> handle_scan(
+ MOSDPGScan& m);
+ interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
};
shard_services{shard_services}
{}
-ReplicatedBackend::ll_read_errorator::future<ceph::bufferlist>
+ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist>
ReplicatedBackend::_read(const hobject_t& hoid,
const uint64_t off,
const uint64_t len,
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
-seastar::future<crimson::osd::acked_peers_t>
+ReplicatedBackend::interruptible_future<crimson::osd::acked_peers_t>
ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
bufferlist encoded_txn;
encode(txn, encoded_txn);
- return seastar::parallel_for_each(std::move(pg_shards),
+ return interruptor::parallel_for_each(std::move(pg_shards),
[=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)]
(auto pg_shard) mutable {
if (pg_shard == whoami) {
// TODO: set more stuff. e.g., pg_states
return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
}
- }).then([this, peers=pending_txn->second.weak_from_this()] {
+ }).then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
if (!peers) {
// for now, only actingset_changed can cause peers
// to be nullptr
return seastar::now();
}
return peers->all_committed.get_shared_future();
- }).then([pending_txn, this] {
+ }).then_interruptible([pending_txn, this] {
auto acked_peers = std::move(pending_txn->second.acked_peers);
pending_trans.erase(pending_txn);
return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
seastar::future<> stop() final;
void on_actingset_changed(peering_info_t pi) final;
private:
- ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
- uint64_t off,
- uint64_t len,
- uint32_t flags) override;
- seastar::future<crimson::osd::acked_peers_t>
+ ll_read_ierrorator::future<ceph::bufferlist>
+ _read(const hobject_t& hoid, uint64_t off,
+ uint64_t len, uint32_t flags) override;
+ interruptible_future<crimson::osd::acked_peers_t>
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
}
}
-seastar::future<> ReplicatedRecoveryBackend::recover_object(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::recover_object(
const hobject_t& soid,
eversion_t need)
{
// always add_recovering(soid) before recover_object(soid)
assert(is_recovering(soid));
// start tracking the recovery of soid
- return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
+ return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
logger().debug("recover_object: loading obc: {}", soid);
return pg.with_head_obc<RWState::RWREAD>(soid,
[this, soid, need](auto obc) {
recovery_waiter.obc = obc;
recovery_waiter.obc->wait_recovery_read();
return maybe_push_shards(soid, need);
- }).handle_error(
+ }).handle_error_interruptible(
crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
// TODO: may need eio handling?
logger().error("recover_object saw error code {}, ignoring object {}",
});
}
-seastar::future<>
+RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::maybe_push_shards(
const hobject_t& soid,
eversion_t need)
{
- return seastar::parallel_for_each(get_shards_to_push(soid),
+ return interruptor::parallel_for_each(get_shards_to_push(soid),
[this, need, soid](auto shard) {
- return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
+ return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->min_epoch = pg.get_last_peering_reset();
msg->pushes.push_back(std::move(push));
msg->set_priority(pg.get_recovery_op_priority());
- return shard_services.send_to_osd(shard.osd,
- std::move(msg),
- pg.get_osdmap_epoch()).then(
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(shard.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()))
+ .then_interruptible(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
- }).then([this, soid] {
+ }).then_interruptible([this, soid] {
auto &recovery = recovering.at(soid);
auto push_info = recovery.pushing.begin();
object_stat_sum_t stat = {};
}
pg.get_recovery_handler()->on_global_recover(soid, stat, false);
return seastar::make_ready_future<>();
- }).handle_exception([this, soid](auto e) {
+ }).handle_exception_interruptible([this, soid](auto e) {
auto &recovery = recovering.at(soid);
if (recovery.obc) {
recovery.obc->drop_recovery_read();
});
}
-seastar::future<>
+RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::maybe_pull_missing_obj(
const hobject_t& soid,
eversion_t need)
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_pulls({std::move(po)});
- return shard_services.send_to_osd(
- pi.from.osd,
- std::move(msg),
- pg.get_osdmap_epoch()
- ).then([&recovery_waiter] {
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ pi.from.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()
+ )).then_interruptible([&recovery_waiter] {
return recovery_waiter.wait_for_pull();
});
}
-seastar::future<> ReplicatedRecoveryBackend::push_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::push_delete(
const hobject_t& soid,
eversion_t need)
{
logger().debug("{}: {}, {}", __func__, soid, need);
- recovering[soid];
epoch_t min_epoch = pg.get_last_peering_reset();
assert(pg.get_acting_recovery_backfill().size() > 0);
- return seastar::parallel_for_each(pg.get_acting_recovery_backfill(),
- [this, soid, need, min_epoch](pg_shard_t shard) {
+ return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(),
+ [this, soid, need, min_epoch](pg_shard_t shard)
+ -> interruptible_future<> {
if (shard == pg.get_pg_whoami())
return seastar::make_ready_future<>();
auto iter = pg.get_shard_missing().find(shard);
pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
msg->set_priority(pg.get_recovery_op_priority());
msg->objects.push_back(std::make_pair(soid, need));
- return shard_services.send_to_osd(shard.osd, std::move(msg),
- pg.get_osdmap_epoch()).then(
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(shard.osd, std::move(msg),
+ pg.get_osdmap_epoch())).then_interruptible(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
}
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_delete(
Ref<MOSDPGRecoveryDelete> m)
{
logger().debug("{}: {}", __func__, *m);
auto& p = m->objects.front(); //TODO: only one delete per message for now.
- return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then(
+ return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch())
+ .then_interruptible(
[this, m] {
auto reply = make_message<MOSDPGRecoveryDeleteReply>();
reply->from = pg.get_pg_whoami();
});
}
-seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::on_local_recover_persist(
const hobject_t& soid,
const ObjectRecoveryInfo& _recovery_info,
bool is_delete,
logger().debug("{}", __func__);
ceph::os::Transaction t;
pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t);
- return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+ return interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t)))
+ .then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
return seastar::make_ready_future<>();
});
}
-seastar::future<> ReplicatedRecoveryBackend::local_recover_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::local_recover_delete(
const hobject_t& soid,
eversion_t need,
epoch_t epoch_to_freeze)
{
logger().debug("{}: {}, {}", __func__, soid, need);
- return backend->load_metadata(soid).safe_then([this]
- (auto lomt) {
+ return backend->load_metadata(soid).safe_then_interruptible([this]
+ (auto lomt) -> interruptible_future<> {
if (lomt->os.exists) {
return seastar::do_with(ceph::os::Transaction(),
[this, lomt = std::move(lomt)](auto& txn) {
- return backend->remove(lomt->os, txn).then([this, &txn]() mutable {
+ return backend->remove(lomt->os, txn).then_interruptible(
+ [this, &txn]() mutable {
return shard_services.get_store().do_transaction(coll,
std::move(txn));
});
});
}
return seastar::make_ready_future<>();
- }).safe_then([this, soid, epoch_to_freeze, need] {
+ }).safe_then_interruptible([this, soid, epoch_to_freeze, need] {
ObjectRecoveryInfo recovery_info;
recovery_info.soid = soid;
recovery_info.version = need;
);
}
-seastar::future<> ReplicatedRecoveryBackend::recover_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::recover_delete(
const hobject_t &soid, eversion_t need)
{
logger().debug("{}: {}, {}", __func__, soid, need);
epoch_t cur_epoch = pg.get_osdmap_epoch();
return seastar::do_with(object_stat_sum_t(),
[this, soid, need, cur_epoch](auto& stat_diff) {
- return local_recover_delete(soid, need, cur_epoch).then(
- [this, &stat_diff, cur_epoch, soid, need] {
+ return local_recover_delete(soid, need, cur_epoch).then_interruptible(
+ [this, &stat_diff, cur_epoch, soid, need]()
+ -> interruptible_future<> {
if (!pg.has_reset_since(cur_epoch)) {
bool object_missing = false;
for (const auto& shard : pg.get_acting_recovery_backfill()) {
}
}
return seastar::make_ready_future<>();
- }).then([this, soid, &stat_diff] {
+ }).then_interruptible([this, soid, &stat_diff] {
pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true);
return seastar::make_ready_future<>();
});
});
}
-seastar::future<PushOp>
+RecoveryBackend::interruptible_future<PushOp>
ReplicatedRecoveryBackend::prep_push(
const hobject_t& soid,
eversion_t need,
pi.recovery_progress.omap_complete =
!missing_iter->second.clean_regions.omap_is_dirty();
- return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then(
+ return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible(
[this, soid, pg_shard](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = po.recovery_progress;
}
-seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
+RecoveryBackend::interruptible_future<PushOp>
+ReplicatedRecoveryBackend::build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat)
(auto& new_progress, auto& available, auto& v, auto& pop) {
return read_metadata_for_push_op(recovery_info.soid,
progress, new_progress,
- v, &pop).then([&](eversion_t local_ver) mutable {
+ v, &pop).then_interruptible([&](eversion_t local_ver) mutable {
// If requestor didn't know the version, use ours
if (v == eversion_t()) {
v = local_ver;
progress,
new_progress,
&available, &pop);
- }).then([this, &recovery_info, &progress, &available, &pop]() mutable {
+ }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
available, &pop);
- }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+ }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop]
(uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
});
}
-seastar::future<eversion_t>
+RecoveryBackend::interruptible_future<eversion_t>
ReplicatedRecoveryBackend::read_metadata_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
eversion_t ver,
PushOp* push_op)
{
+ logger().debug("{}, {}", __func__, oid);
if (!progress.first) {
return seastar::make_ready_future<eversion_t>(ver);
}
- return seastar::when_all_succeed(
- backend->omap_get_header(coll, ghobject_t(oid)).handle_error(
- crimson::os::FuturizedStore::read_errorator::all_same_way(
- [] (const std::error_code& e) {
- return seastar::make_ready_future<bufferlist>();
- })),
- store->get_attrs(coll, ghobject_t(oid)).handle_error(
- crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
- [] (const std::error_code& e) {
- return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
- }))
- ).then_unpack([&new_progress, push_op](auto bl, auto attrs) {
+ return interruptor::make_interruptible(interruptor::when_all_succeed(
+ backend->omap_get_header(coll, ghobject_t(oid)).handle_error_interruptible<false>(
+ crimson::os::FuturizedStore::read_errorator::all_same_way(
+ [oid] (const std::error_code& e) {
+ logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
+ return seastar::make_ready_future<bufferlist>();
+ })),
+ interruptor::make_interruptible(store->get_attrs(coll, ghobject_t(oid)))
+ .handle_error_interruptible<false>(
+ crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
+ [oid] (const std::error_code& e) {
+ logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
+ return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
+ }))
+ )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
if (bl.length() == 0) {
logger().warn("read_metadata_for_push_op: fail to read omap header");
} else if (attrs.empty()) {
});
}
-seastar::future<uint64_t>
+RecoveryBackend::interruptible_future<uint64_t>
ReplicatedRecoveryBackend::read_object_for_push_op(
const hobject_t& oid,
const interval_set<uint64_t>& copy_subset,
}
// 1. get the extents in the interested range
return backend->fiemap(coll, ghobject_t{oid},
- 0, copy_subset.range_end()).then_wrapped(
+ 0, copy_subset.range_end()).then_wrapped_interruptible(
[=](auto&& fiemap_included) mutable {
interval_set<uint64_t> extents;
try {
// 3. read the truncated extents
// TODO: check if the returned extents are pruned
return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0);
- }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) {
+ }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) {
push_op->data.claim_append(std::move(bl));
uint64_t recovered_to = 0;
if (push_op->data_included.empty()) {
}));
}
-seastar::future<>
+RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::read_omap_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
return shards;
}
-seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
{
logger().debug("{}: {}", __func__, *m);
return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) {
- return seastar::parallel_for_each(pulls,
+ return interruptor::parallel_for_each(pulls,
[this, from](auto& pull_op) {
const hobject_t& soid = pull_op.soid;
logger().debug("handle_pull: {}", soid);
- return backend->stat(coll, ghobject_t(soid)).then(
+ return backend->stat(coll, ghobject_t(soid)).then_interruptible(
[this, &pull_op](auto st) {
ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
ObjectRecoveryProgress &progress = pull_op.recovery_progress;
assert(recovery_info.clone_subset.empty());
}
return build_push_op(recovery_info, progress, 0);
- }).then([this, from](auto pop) {
+ }).then_interruptible([this, from](auto pop) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
});
}
-seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
+RecoveryBackend::interruptible_future<bool>
+ReplicatedRecoveryBackend::_handle_pull_response(
pg_shard_t from,
PushOp& pop,
PullOp* response,
if (pi.recovery_info.version == eversion_t())
pi.recovery_info.version = pop.version;
- auto prepare_waiter = seastar::make_ready_future<>();
+ auto prepare_waiter = interruptor::make_interruptible(
+ seastar::make_ready_future<>());
if (pi.recovery_progress.first) {
prepare_waiter = pg.with_head_obc<RWState::RWNONE>(
pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
obc->obs.oi.decode(pop.attrset.at(OI_ATTR));
pi.recovery_info.oi = obc->obs.oi;
return crimson::osd::PG::load_obc_ertr::now();
- }).handle_error(crimson::ct_error::assert_all{});
+ }).handle_error_interruptible(crimson::ct_error::assert_all{});
};
- return prepare_waiter.then([this, &pi, &pop, t, response]() mutable {
+ return prepare_waiter.then_interruptible([this, &pi, &pop, t, response]() mutable {
const bool first = pi.recovery_progress.first;
pi.recovery_progress = pop.after_progress;
logger().debug("new recovery_info {}, new progress {}",
return submit_push_data(pi.recovery_info, first, complete, clear_omap,
std::move(data_zeros), std::move(usable_intervals),
std::move(data), std::move(pop.omap_header),
- pop.attrset, std::move(pop.omap_entries), t).then(
+ pop.attrset, std::move(pop.omap_entries), t)
+ .then_interruptible(
[this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] {
pi.stat.num_keys_recovered += pop.omap_entries.size();
pi.stat.num_bytes_recovered += bytes_recovered;
});
}
-seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_pull_response(
Ref<MOSDPGPush> m)
{
const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now.
[this, &response](auto& t, auto& m) {
pg_shard_t from = m->from;
PushOp& pop = m->pushes[0]; // only one push per message for now
- return _handle_pull_response(from, pop, &response, &t).then(
+ return _handle_pull_response(from, pop, &response, &t).then_interruptible(
[this, &t](bool complete) {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
return shard_services.get_store().do_transaction(coll, std::move(t))
return seastar::make_ready_future<bool>(complete);
});
});
- }).then([this, m, &response](bool complete) {
+ }).then_interruptible([this, m, &response](bool complete) {
if (complete) {
auto& pop = m->pushes[0];
recovering.at(pop.soid).set_pulled();
});
}
-seastar::future<> ReplicatedRecoveryBackend::_handle_push(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::_handle_push(
pg_shard_t from,
PushOp &pop,
PushReplyOp *response,
return submit_push_data(pop.recovery_info, first, complete, clear_omap,
std::move(data_zeros), std::move(pop.data_included),
std::move(pop.data), std::move(pop.omap_header),
- pop.attrset, std::move(pop.omap_entries), t).then(
+ pop.attrset, std::move(pop.omap_entries), t)
+ .then_interruptible(
[this, complete, &pop, t] {
if (complete) {
pg.get_recovery_handler()->on_local_recover(
});
}
-seastar::future<> ReplicatedRecoveryBackend::handle_push(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_push(
Ref<MOSDPGPush> m)
{
if (pg.is_primary()) {
PushOp& pop = m->pushes[0]; // TODO: only one push per message for now
return seastar::do_with(ceph::os::Transaction(),
[this, m, &pop, &response](auto& t) {
- return _handle_push(m->from, pop, &response, &t).then(
+ return _handle_push(m->from, pop, &response, &t).then_interruptible(
[this, &t] {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
- return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+ return interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
//TODO: this should be grouped with pg.on_local_recover somehow.
pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
});
});
- }).then([this, m, &response]() mutable {
+ }).then_interruptible([this, m, &response]() mutable {
auto reply = make_message<MOSDPGPushReply>();
reply->from = pg.get_pg_whoami();
reply->set_priority(m->get_priority());
});
}
-seastar::future<std::optional<PushOp>>
+RecoveryBackend::interruptible_future<std::optional<PushOp>>
ReplicatedRecoveryBackend::_handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op)
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat).then([&pi] (auto pop) {
+ &pi.stat).then_interruptible([&pi] (auto pop) {
pi.recovery_progress = pop.after_progress;
return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
- }).handle_exception([recovering_iter, &pi, peer] (auto e) {
+ }).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
recovering_iter->second.set_push_failed(peer, e);
return seastar::make_ready_future<std::optional<PushOp>>();
}
}
-seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_push_reply(
Ref<MOSDPGPushReply> m)
{
logger().debug("{}: {}", __func__, *m);
auto from = m->from;
auto& push_reply = m->replies[0]; //TODO: only one reply per message
- return _handle_push_reply(from, push_reply).then(
+ return _handle_push_reply(from, push_reply).then_interruptible(
[this, from](std::optional<PushOp> push_op) {
if (push_op) {
auto msg = make_message<MOSDPGPush>();
return {intervals_usable, data_usable};
}
-seastar::future<hobject_t>
+RecoveryBackend::interruptible_future<hobject_t>
ReplicatedRecoveryBackend::prep_push_target(
const ObjectRecoveryInfo& recovery_info,
bool first,
return seastar::make_ready_future<hobject_t>(target_oid.hobj);
}
// clone overlap content in local object if using a new object
- return store->stat(coll, ghobject_t(recovery_info.soid)).then(
+ return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid)))
+ .then_interruptible(
[this, &recovery_info, t, target_oid] (auto st) {
// TODO: pg num bytes counting
uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
return seastar::make_ready_future<hobject_t>(target_oid.hobj);
});
}
-
-seastar::future<> ReplicatedRecoveryBackend::submit_push_data(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::submit_push_data(
const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
logger().debug("{}", __func__);
return prep_push_target(recovery_info, first, complete,
clear_omap, t, attrs,
- std::move(omap_header)).then(
+ std::move(omap_header)).then_interruptible(
[this,
&recovery_info, t,
first, complete,
data_included=std::move(data_included),
omap_entries=std::move(omap_entries),
&attrs](auto target_oid) mutable {
+
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
// Punch zeros for data, if fiemap indicates nothing but it is marked dirty
if (!data_zeros.empty()) {
}
}
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m)
{
auto& p = m->objects.front();
return seastar::now();
}
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
{
switch (m->get_header().type) {
case MSG_OSD_PG_PULL:
#pragma once
+#include "crimson/common/interruptible_future.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/recovery_backend.h"
#include "messages/MOSDPGPull.h"
crimson::osd::ShardServices& shard_services,
crimson::os::CollectionRef coll,
PGBackend* backend)
- : RecoveryBackend(pg, shard_services, coll, backend) {}
- seastar::future<> handle_recovery_op(
+ : RecoveryBackend(pg, shard_services, coll, backend)
+ {}
+ interruptible_future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m) final;
- seastar::future<> recover_object(
+ interruptible_future<> recover_object(
const hobject_t& soid,
eversion_t need) final;
- seastar::future<> recover_delete(
+ interruptible_future<> recover_delete(
const hobject_t& soid,
eversion_t need) final;
- seastar::future<> push_delete(
+ interruptible_future<> push_delete(
const hobject_t& soid,
eversion_t need) final;
protected:
- seastar::future<> handle_pull(
+ interruptible_future<> handle_pull(
Ref<MOSDPGPull> m);
- seastar::future<> handle_pull_response(
+ interruptible_future<> handle_pull_response(
Ref<MOSDPGPush> m);
- seastar::future<> handle_push(
+ interruptible_future<> handle_push(
Ref<MOSDPGPush> m);
- seastar::future<> handle_push_reply(
+ interruptible_future<> handle_push_reply(
Ref<MOSDPGPushReply> m);
- seastar::future<> handle_recovery_delete(
+ interruptible_future<> handle_recovery_delete(
Ref<MOSDPGRecoveryDelete> m);
- seastar::future<> handle_recovery_delete_reply(
+ interruptible_future<> handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m);
- seastar::future<PushOp> prep_push(
+ interruptible_future<PushOp> prep_push(
const hobject_t& soid,
eversion_t need,
pg_shard_t pg_shard);
eversion_t need);
std::vector<pg_shard_t> get_shards_to_push(
const hobject_t& soid) const;
- seastar::future<PushOp> build_push_op(
+ interruptible_future<PushOp> build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat);
/// @returns true if this push op is the last push op for
/// recovery @c pop.soid
- seastar::future<bool> _handle_pull_response(
+ interruptible_future<bool> _handle_pull_response(
pg_shard_t from,
PushOp& pop,
PullOp* response,
const interval_set<uint64_t> ©_subset,
const interval_set<uint64_t> &intervals_received,
ceph::bufferlist data_received);
- seastar::future<> submit_push_data(
+ interruptible_future<> submit_push_data(
const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
void submit_push_complete(
const ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t);
- seastar::future<> _handle_push(
+ interruptible_future<> _handle_push(
pg_shard_t from,
PushOp& pop,
PushReplyOp *response,
ceph::os::Transaction *t);
- seastar::future<std::optional<PushOp>> _handle_push_reply(
+ interruptible_future<std::optional<PushOp>> _handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op);
- seastar::future<> on_local_recover_persist(
+ interruptible_future<> on_local_recover_persist(
const hobject_t& soid,
const ObjectRecoveryInfo& _recovery_info,
bool is_delete,
epoch_t epoch_to_freeze);
- seastar::future<> local_recover_delete(
+ interruptible_future<> local_recover_delete(
const hobject_t& soid,
eversion_t need,
epoch_t epoch_frozen);
}
private:
/// pull missing object from peer
- seastar::future<> maybe_pull_missing_obj(
+ interruptible_future<> maybe_pull_missing_obj(
const hobject_t& soid,
eversion_t need);
/// load object context for recovery if it is not ready yet
using load_obc_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
+ using load_obc_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ load_obc_ertr>;
- seastar::future<> maybe_push_shards(
+ interruptible_future<> maybe_push_shards(
const hobject_t& soid,
eversion_t need);
/// be relatively small.
///
/// @return @c oi.version
- seastar::future<eversion_t> read_metadata_for_push_op(
+ interruptible_future<eversion_t> read_metadata_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
ObjectRecoveryProgress& new_progress,
/// @param copy_subset extents we want
/// @param offset the offset in object from where we should read
/// @return the new offset
- seastar::future<uint64_t> read_object_for_push_op(
+ interruptible_future<uint64_t> read_object_for_push_op(
const hobject_t& oid,
const interval_set<uint64_t>& copy_subset,
uint64_t offset,
uint64_t max_len,
PushOp* push_op);
- seastar::future<> read_omap_for_push_op(
+ interruptible_future<> read_omap_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
ObjectRecoveryProgress& new_progress,
uint64_t* max_len,
PushOp* push_op);
- seastar::future<hobject_t> prep_push_target(
+ interruptible_future<hobject_t> prep_push_target(
const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
ObjectStore::Transaction* t,
const map<string, bufferlist> &attrs,
bufferlist&& omap_header);
+ using interruptor = crimson::interruptible::interruptor<
+ crimson::osd::IOInterruptCondition>;
};