Seastore will need to use this as well.
Signed-off-by: Samuel Just <sjust@redhat.com>
common/formatter.cc
common/perf_counters_collection.cc
common/log.cc
+ common/operation.cc
common/throttle.cc
common/tri_mutex.cc)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "operation.h"
+#include "common/Formatter.h"
+
+namespace crimson {
+
+void Operation::dump(ceph::Formatter* f)
+{
+ f->open_object_section("operation");
+ f->dump_string("type", get_type_name());
+ f->dump_unsigned("id", id);
+ {
+ f->open_object_section("detail");
+ dump_detail(f);
+ f->close_section();
+ }
+ f->open_array_section("blockers");
+ for (auto &blocker : blockers) {
+ blocker->dump(f);
+ }
+ f->close_section();
+ f->close_section();
+}
+
+void Operation::dump_brief(ceph::Formatter* f)
+{
+ f->open_object_section("operation");
+ f->dump_string("type", get_type_name());
+ f->dump_unsigned("id", id);
+ f->close_section();
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) {
+ lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail=";
+ rhs.print(lhs);
+ lhs << ")";
+ return lhs;
+}
+
+void Blocker::dump(ceph::Formatter* f) const
+{
+ f->open_object_section("blocker");
+ f->dump_string("op_type", get_type_name());
+ {
+ f->open_object_section("detail");
+ dump_detail(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void AggregateBlocker::dump_detail(ceph::Formatter *f) const
+{
+ f->open_array_section("parent_blockers");
+ for (auto b : parent_blockers) {
+ f->open_object_section("parent_blocker");
+ b->dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void OrderedPipelinePhase::Handle::exit()
+{
+ if (phase) {
+ phase->mutex.unlock();
+ phase = nullptr;
+ }
+}
+
+blocking_future<> OrderedPipelinePhase::Handle::enter(
+ OrderedPipelinePhase &new_phase)
+{
+ auto fut = new_phase.mutex.lock();
+ exit();
+ phase = &new_phase;
+ return new_phase.make_blocking_future(std::move(fut));
+}
+
+OrderedPipelinePhase::Handle::~Handle()
+{
+ exit();
+}
+
+void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const
+{
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <algorithm>
+#include <array>
+#include <set>
+#include <vector>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/future-util.hh>
+
+#include "include/ceph_assert.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson {
+
+using registry_hook_t = boost::intrusive::list_member_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
+
+class Operation;
+class Blocker;
+
+/**
+ * Provides an abstraction for registering and unregistering a blocker
+ * for the duration of a future becoming available.
+ */
+template <typename Fut>
+class blocking_future_detail {
+ friend class Operation;
+ friend class Blocker;
+ Blocker *blocker;
+ Fut fut;
+ blocking_future_detail(Blocker *b, Fut &&f)
+ : blocker(b), fut(std::move(f)) {}
+
+ template <typename V, typename U>
+ friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
+ template <typename V, typename Exception>
+ friend blocking_future_detail<seastar::future<V>>
+ make_exception_blocking_future(Exception&& e);
+
+ template <typename U>
+ friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+
+ template <typename U>
+ friend class blocking_future_detail;
+
+public:
+ template <typename F>
+ auto then(F &&f) && {
+ using result = decltype(std::declval<Fut>().then(f));
+ return blocking_future_detail<seastar::futurize_t<result>>(
+ blocker,
+ std::move(fut).then(std::forward<F>(f)));
+ }
+};
+
+template <typename T=void>
+using blocking_future = blocking_future_detail<seastar::future<T>>;
+
+template <typename V, typename U>
+blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
+ return blocking_future<V>(
+ nullptr,
+ seastar::make_ready_future<V>(std::forward<U>(args)));
+}
+
+template <typename V, typename Exception>
+blocking_future_detail<seastar::future<V>>
+make_exception_blocking_future(Exception&& e) {
+ return blocking_future<V>(
+ nullptr,
+ seastar::make_exception_future<V>(e));
+}
+
+/**
+ * Provides an interface for dumping diagnostic information about
+ * why a particular op is not making progress.
+ */
+class Blocker {
+public:
+ template <typename T>
+ blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
+ return blocking_future<T>(this, std::move(f));
+ }
+ void dump(ceph::Formatter *f) const;
+ virtual ~Blocker() = default;
+
+private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
+ virtual const char *get_type_name() const = 0;
+};
+
+template <typename T>
+class BlockerT : public Blocker {
+public:
+ virtual ~BlockerT() = default;
+private:
+ const char *get_type_name() const final {
+ return T::type_name;
+ }
+};
+
+class AggregateBlocker : public BlockerT<AggregateBlocker> {
+ std::vector<Blocker*> parent_blockers;
+public:
+ AggregateBlocker(std::vector<Blocker*> &&parent_blockers)
+ : parent_blockers(std::move(parent_blockers)) {}
+ static constexpr const char *type_name = "AggregateBlocker";
+private:
+ void dump_detail(ceph::Formatter *f) const final;
+};
+
+template <typename T>
+blocking_future<> join_blocking_futures(T &&t) {
+ std::vector<Blocker*> blockers;
+ blockers.reserve(t.size());
+ for (auto &&bf: t) {
+ blockers.push_back(bf.blocker);
+ bf.blocker = nullptr;
+ }
+ auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+ return agg->make_blocking_future(
+ seastar::parallel_for_each(
+ std::forward<T>(t),
+ [](auto &&bf) {
+ return std::move(bf.fut);
+ }).then([agg=std::move(agg)] {
+ return seastar::make_ready_future<>();
+ }));
+}
+
+
+/**
+ * Common base for all crimson-osd operations. Mainly provides
+ * an interface for registering ops in flight and dumping
+ * diagnostic information.
+ */
+class Operation : public boost::intrusive_ref_counter<
+ Operation, boost::thread_unsafe_counter> {
+ public:
+ uint64_t get_id() const {
+ return id;
+ }
+
+ virtual unsigned get_type() const = 0;
+ virtual const char *get_type_name() const = 0;
+ virtual void print(std::ostream &) const = 0;
+
+ template <typename T>
+ seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
+ if (f.fut.available()) {
+ return std::move(f.fut);
+ }
+ assert(f.blocker);
+ add_blocker(f.blocker);
+ return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
+ clear_blocker(blocker);
+ return std::move(arg);
+ });
+ }
+
+ void dump(ceph::Formatter *f);
+ void dump_brief(ceph::Formatter *f);
+ virtual ~Operation() = default;
+
+ private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
+
+ registry_hook_t registry_hook;
+
+ std::vector<Blocker*> blockers;
+ uint64_t id = 0;
+ void set_id(uint64_t in_id) {
+ id = in_id;
+ }
+
+ void add_blocker(Blocker *b) {
+ blockers.push_back(b);
+ }
+
+ void clear_blocker(Blocker *b) {
+ auto iter = std::find(blockers.begin(), blockers.end(), b);
+ if (iter != blockers.end()) {
+ blockers.erase(iter);
+ }
+ }
+
+ friend class OperationRegistryI;
+ template <size_t>
+ friend class OperationRegistryT;
+};
+using OperationRef = boost::intrusive_ptr<Operation>;
+
+std::ostream &operator<<(std::ostream &, const Operation &op);
+
+/**
+ * Maintains a set of lists of all active ops.
+ */
+class OperationRegistryI {
+ friend class Operation;
+ seastar::timer<seastar::lowres_clock> shutdown_timer;
+ seastar::promise<> shutdown;
+
+protected:
+ virtual void do_register(Operation *op) = 0;
+ virtual bool registries_empty() const = 0;
+
+public:
+ template <typename T, typename... Args>
+ typename T::IRef create_operation(Args&&... args) {
+ typename T::IRef op = new T(std::forward<Args>(args)...);
+ do_register(&*op);
+ return op;
+ }
+
+ seastar::future<> stop() {
+ shutdown_timer.set_callback([this] {
+ if (registries_empty()) {
+ shutdown.set_value();
+ shutdown_timer.cancel();
+ }
+ });
+ shutdown_timer.arm_periodic(
+ std::chrono::milliseconds(100/*TODO: use option instead*/));
+ return shutdown.get_future();
+ }
+};
+
+
+template <size_t NUM_REGISTRIES>
+class OperationRegistryT : public OperationRegistryI {
+ using op_list_member_option = boost::intrusive::member_hook<
+ Operation,
+ registry_hook_t,
+ &Operation::registry_hook
+ >;
+ using op_list = boost::intrusive::list<
+ Operation,
+ op_list_member_option,
+ boost::intrusive::constant_time_size<false>>;
+
+ std::array<
+ op_list,
+ NUM_REGISTRIES
+ > registries;
+
+ std::array<
+ uint64_t,
+ NUM_REGISTRIES
+ > op_id_counters = {};
+
+protected:
+ void do_register(Operation *op) final {
+ registries[op->get_type()].push_back(*op);
+ op->set_id(op_id_counters[op->get_type()]++);
+ }
+
+ bool registries_empty() const final {
+ return std::all_of(registries.begin(),
+ registries.end(),
+ [](auto& opl) {
+ return opl.empty();
+ });
+ }
+};
+
+/**
+ * Ensures that at most one op may consider itself in the phase at a time.
+ * Ops will see enter() unblock in the order in which they tried to enter
+ * the phase. entering (though not necessarily waiting for the future to
+ * resolve) a new phase prior to exiting the previous one will ensure that
+ * the op ordering is preserved.
+ */
+class OrderedPipelinePhase : public Blocker {
+private:
+ void dump_detail(ceph::Formatter *f) const final;
+ const char *get_type_name() const final {
+ return name;
+ }
+
+public:
+ /**
+ * Used to encapsulate pipeline residency state.
+ */
+ class Handle {
+ OrderedPipelinePhase *phase = nullptr;
+
+ public:
+ Handle() = default;
+
+ Handle(const Handle&) = delete;
+ Handle(Handle&&) = delete;
+ Handle &operator=(const Handle&) = delete;
+ Handle &operator=(Handle&&) = delete;
+
+ /**
+ * Returns a future which unblocks when the handle has entered the passed
+ * OrderedPipelinePhase. If already in a phase, enter will also release
+ * that phase after placing itself in the queue for the next one to preserve
+ * ordering.
+ */
+ blocking_future<> enter(OrderedPipelinePhase &phase);
+
+ /**
+ * Releases the current phase if there is one. Called in ~Handle().
+ */
+ void exit();
+
+ ~Handle();
+ };
+
+ OrderedPipelinePhase(const char *name) : name(name) {}
+
+private:
+ const char * name;
+ seastar::shared_mutex mutex;
+};
+
+}
namespace crimson::osd {
-void Operation::dump(ceph::Formatter* f)
-{
- f->open_object_section("operation");
- f->dump_string("type", get_type_name());
- f->dump_unsigned("id", id);
- {
- f->open_object_section("detail");
- dump_detail(f);
- f->close_section();
- }
- f->open_array_section("blockers");
- for (auto &blocker : blockers) {
- blocker->dump(f);
- }
- f->close_section();
- f->close_section();
-}
-
-void Operation::dump_brief(ceph::Formatter* f)
-{
- f->open_object_section("operation");
- f->dump_string("type", get_type_name());
- f->dump_unsigned("id", id);
- f->close_section();
-}
-
-std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) {
- lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail=";
- rhs.print(lhs);
- lhs << ")";
- return lhs;
-}
-
-void Blocker::dump(ceph::Formatter* f) const
-{
- f->open_object_section("blocker");
- f->dump_string("op_type", get_type_name());
- {
- f->open_object_section("detail");
- dump_detail(f);
- f->close_section();
- }
- f->close_section();
-}
-
-void AggregateBlocker::dump_detail(ceph::Formatter *f) const
-{
- f->open_array_section("parent_blockers");
- for (auto b : parent_blockers) {
- f->open_object_section("parent_blocker");
- b->dump(f);
- f->close_section();
- }
- f->close_section();
-}
-
OperationThrottler::OperationThrottler(ConfigProxy &conf)
: scheduler(crimson::osd::scheduler::make_scheduler(conf))
{
update_from_config(conf);
}
-
-void OrderedPipelinePhase::Handle::exit()
-{
- if (phase) {
- phase->mutex.unlock();
- phase = nullptr;
- }
-}
-
-blocking_future<> OrderedPipelinePhase::Handle::enter(
- OrderedPipelinePhase &new_phase)
-{
- auto fut = new_phase.mutex.lock();
- exit();
- phase = &new_phase;
- return new_phase.make_blocking_future(std::move(fut));
-}
-
-OrderedPipelinePhase::Handle::~Handle()
-{
- exit();
-}
-
-void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const
-{
-}
-
}
#pragma once
-#include <algorithm>
-#include <array>
-#include <set>
-#include <vector>
-#include <boost/intrusive/list.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/smart_ptr/intrusive_ref_counter.hpp>
-#include <seastar/core/shared_mutex.hh>
-#include <seastar/core/future.hh>
-#include <seastar/core/timer.hh>
-#include <seastar/core/lowres_clock.hh>
-
-#include "include/ceph_assert.h"
+#include "crimson/common/operation.h"
#include "crimson/osd/scheduler/scheduler.h"
-namespace ceph {
- class Formatter;
-}
-
namespace crimson::osd {
enum class OperationTypeCode {
(sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) ==
static_cast<int>(OperationTypeCode::last_op));
-class OperationRegistry;
-
-using registry_hook_t = boost::intrusive::list_member_hook<
- boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
-
-class Operation;
-class Blocker;
-
-/**
- * Provides an abstraction for registering and unregistering a blocker
- * for the duration of a future becoming available.
- */
-template <typename Fut>
-class blocking_future_detail {
- friend class Operation;
- friend class Blocker;
- Blocker *blocker;
- Fut fut;
- blocking_future_detail(Blocker *b, Fut &&f)
- : blocker(b), fut(std::move(f)) {}
-
- template <typename V, typename U>
- friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
- template <typename V, typename Exception>
- friend blocking_future_detail<seastar::future<V>>
- make_exception_blocking_future(Exception&& e);
-
- template <typename U>
- friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
-
- template <typename U>
- friend class blocking_future_detail;
-
-public:
- template <typename F>
- auto then(F &&f) && {
- using result = decltype(std::declval<Fut>().then(f));
- return blocking_future_detail<seastar::futurize_t<result>>(
- blocker,
- std::move(fut).then(std::forward<F>(f)));
- }
-};
-
-template <typename T=void>
-using blocking_future = blocking_future_detail<seastar::future<T>>;
-
-template <typename V, typename U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
- return blocking_future<V>(
- nullptr,
- seastar::make_ready_future<V>(std::forward<U>(args)));
-}
-
-template <typename V, typename Exception>
-blocking_future_detail<seastar::future<V>>
-make_exception_blocking_future(Exception&& e) {
- return blocking_future<V>(
- nullptr,
- seastar::make_exception_future<V>(e));
-}
-
-/**
- * Provides an interface for dumping diagnostic information about
- * why a particular op is not making progress.
- */
-class Blocker {
-public:
- template <typename T>
- blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
- return blocking_future<T>(this, std::move(f));
- }
- void dump(ceph::Formatter *f) const;
- virtual ~Blocker() = default;
-
-private:
- virtual void dump_detail(ceph::Formatter *f) const = 0;
- virtual const char *get_type_name() const = 0;
-};
-
-template <typename T>
-class BlockerT : public Blocker {
-public:
- virtual ~BlockerT() = default;
-private:
- const char *get_type_name() const final {
- return T::type_name;
- }
-};
-
-class AggregateBlocker : public BlockerT<AggregateBlocker> {
- vector<Blocker*> parent_blockers;
-public:
- AggregateBlocker(vector<Blocker*> &&parent_blockers)
- : parent_blockers(std::move(parent_blockers)) {}
- static constexpr const char *type_name = "AggregateBlocker";
-private:
- void dump_detail(ceph::Formatter *f) const final;
-};
-
-template <typename T>
-blocking_future<> join_blocking_futures(T &&t) {
- vector<Blocker*> blockers;
- blockers.reserve(t.size());
- for (auto &&bf: t) {
- blockers.push_back(bf.blocker);
- bf.blocker = nullptr;
- }
- auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
- return agg->make_blocking_future(
- seastar::parallel_for_each(
- std::forward<T>(t),
- [](auto &&bf) {
- return std::move(bf.fut);
- }).then([agg=std::move(agg)] {
- return seastar::make_ready_future<>();
- }));
-}
-
-
-/**
- * Common base for all crimson-osd operations. Mainly provides
- * an interface for registering ops in flight and dumping
- * diagnostic information.
- */
-class Operation : public boost::intrusive_ref_counter<
- Operation, boost::thread_unsafe_counter> {
- public:
- uint64_t get_id() const {
- return id;
- }
-
- virtual OperationTypeCode get_type() const = 0;
- virtual const char *get_type_name() const = 0;
- virtual void print(std::ostream &) const = 0;
-
- template <typename T>
- seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
- if (f.fut.available()) {
- return std::move(f.fut);
- }
- assert(f.blocker);
- add_blocker(f.blocker);
- return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
- clear_blocker(blocker);
- return std::move(arg);
- });
- }
-
- void dump(ceph::Formatter *f);
- void dump_brief(ceph::Formatter *f);
- virtual ~Operation() = default;
-
- private:
- virtual void dump_detail(ceph::Formatter *f) const = 0;
-
- private:
- registry_hook_t registry_hook;
-
- std::vector<Blocker*> blockers;
- uint64_t id = 0;
- void set_id(uint64_t in_id) {
- id = in_id;
- }
-
- void add_blocker(Blocker *b) {
- blockers.push_back(b);
- }
-
- void clear_blocker(Blocker *b) {
- auto iter = std::find(blockers.begin(), blockers.end(), b);
- if (iter != blockers.end()) {
- blockers.erase(iter);
- }
- }
-
- friend class OperationRegistry;
-};
-using OperationRef = boost::intrusive_ptr<Operation>;
-
-std::ostream &operator<<(std::ostream &, const Operation &op);
-
template <typename T>
class OperationT : public Operation {
public:
static constexpr const char *type_name = OP_NAMES[static_cast<int>(T::type)];
using IRef = boost::intrusive_ptr<T>;
- OperationTypeCode get_type() const final {
- return T::type;
+ unsigned get_type() const final {
+ return static_cast<unsigned>(T::type);
}
const char *get_type_name() const final {
/**
* Maintains a set of lists of all active ops.
*/
-class OperationRegistry {
- friend class Operation;
- using op_list_member_option = boost::intrusive::member_hook<
- Operation,
- registry_hook_t,
- &Operation::registry_hook
- >;
- using op_list = boost::intrusive::list<
- Operation,
- op_list_member_option,
- boost::intrusive::constant_time_size<false>>;
-
- std::array<
- op_list,
- static_cast<int>(OperationTypeCode::last_op)
- > registries;
-
- std::array<
- uint64_t,
- static_cast<int>(OperationTypeCode::last_op)
- > op_id_counters = {};
-
- seastar::timer<seastar::lowres_clock> shutdown_timer;
- seastar::promise<> shutdown;
-public:
- template <typename T, typename... Args>
- typename T::IRef create_operation(Args&&... args) {
- typename T::IRef op = new T(std::forward<Args>(args)...);
- registries[static_cast<int>(T::type)].push_back(*op);
- op->set_id(op_id_counters[static_cast<int>(T::type)]++);
- return op;
- }
-
- seastar::future<> stop() {
- shutdown_timer.set_callback([this] {
- if (std::all_of(registries.begin(),
- registries.end(),
- [](auto& opl) {
- return opl.empty();
- })) {
- shutdown.set_value();
- shutdown_timer.cancel();
- }
- });
- shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/));
- return shutdown.get_future();
- }
-};
+using OSDOperationRegistry = OperationRegistryT<
+ static_cast<size_t>(OperationTypeCode::last_op)
+ >;
/**
* Throttles set of currently running operations
void release_throttle();
};
-/**
- * Ensures that at most one op may consider itself in the phase at a time.
- * Ops will see enter() unblock in the order in which they tried to enter
- * the phase. entering (though not necessarily waiting for the future to
- * resolve) a new phase prior to exiting the previous one will ensure that
- * the op ordering is preserved.
- */
-class OrderedPipelinePhase : public Blocker {
-private:
- void dump_detail(ceph::Formatter *f) const final;
- const char *get_type_name() const final {
- return name;
- }
-
-public:
- /**
- * Used to encapsulate pipeline residency state.
- */
- class Handle {
- OrderedPipelinePhase *phase = nullptr;
-
- public:
- Handle() = default;
-
- Handle(const Handle&) = delete;
- Handle(Handle&&) = delete;
- Handle &operator=(const Handle&) = delete;
- Handle &operator=(Handle&&) = delete;
-
- /**
- * Returns a future which unblocks when the handle has entered the passed
- * OrderedPipelinePhase. If already in a phase, enter will also release
- * that phase after placing itself in the queue for the next one to preserve
- * ordering.
- */
- blocking_future<> enter(OrderedPipelinePhase &phase);
-
- /**
- * Releases the current phase if there is one. Called in ~Handle().
- */
- void exit();
-
- ~Handle();
- };
-
- OrderedPipelinePhase(const char *name) : name(name) {}
-
-private:
- const char * name;
- seastar::shared_mutex mutex;
-};
-
}
}
};
-std::vector<OperationRef> handle_pg_create(
+std::vector<crimson::OperationRef> handle_pg_create(
OSD &osd,
crimson::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGCreate2> m)
{
- std::vector<OperationRef> ret;
+ std::vector<crimson::OperationRef> ret;
for (auto& [pgid, when] : m->pgs) {
const auto &[created, created_stamp] = when;
auto q = m->pg_extra.find(pgid);
return ret;
}
-struct SubOpBlocker : BlockerT<SubOpBlocker> {
+struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
static constexpr const char * type_name = "CompoundOpBlocker";
- std::vector<OperationRef> subops;
- SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+ std::vector<crimson::OperationRef> subops;
+ SubOpBlocker(std::vector<crimson::OperationRef> &&subops)
+ : subops(subops) {}
virtual void dump_detail(Formatter *f) const {
f->open_array_section("dependent_operations");
pg->get_osdmap_epoch());
}
-crimson::osd::blocking_future<bool>
+crimson::blocking_future<bool>
PGRecovery::start_recovery_ops(size_t max_to_start)
{
assert(pg->is_primary());
assert(!pg->is_backfilling());
assert(!pg->get_peering_state().is_deleting());
- std::vector<crimson::osd::blocking_future<>> started;
+ std::vector<crimson::blocking_future<>> started;
started.reserve(max_to_start);
max_to_start -= start_primary_recovery_ops(max_to_start, &started);
if (max_to_start > 0) {
max_to_start -= start_replica_recovery_ops(max_to_start, &started);
}
- return crimson::osd::join_blocking_futures(std::move(started)).then(
+ return crimson::join_blocking_futures(std::move(started)).then(
[this] {
bool done = !pg->get_peering_state().needs_recovery();
if (done) {
size_t PGRecovery::start_primary_recovery_ops(
size_t max_to_start,
- std::vector<crimson::osd::blocking_future<>> *out)
+ std::vector<crimson::blocking_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
size_t PGRecovery::start_replica_recovery_ops(
size_t max_to_start,
- std::vector<crimson::osd::blocking_future<>> *out)
+ std::vector<crimson::blocking_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
return started;
}
-crimson::osd::blocking_future<> PGRecovery::recover_missing(
+crimson::blocking_future<> PGRecovery::recover_missing(
const hobject_t &soid, eversion_t need)
{
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
size_t PGRecovery::prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::osd::blocking_future<>> *in_progress)
+ std::vector<crimson::blocking_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
size_t PGRecovery::prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::osd::blocking_future<>> *in_progress)
+ std::vector<crimson::blocking_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
virtual ~PGRecovery() {}
void start_pglogbased_recovery();
- crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
+ crimson::blocking_future<bool> start_recovery_ops(size_t max_to_start);
void on_backfill_reserved();
void dispatch_backfill_event(
boost::intrusive_ptr<const boost::statechart::event_base> evt);
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
size_t max_to_start,
- std::vector<crimson::osd::blocking_future<>> *out);
+ std::vector<crimson::blocking_future<>> *out);
size_t start_replica_recovery_ops(
size_t max_to_start,
- std::vector<crimson::osd::blocking_future<>> *out);
+ std::vector<crimson::blocking_future<>> *out);
std::vector<pg_shard_t> get_replica_recovery_order() const {
return pg->get_replica_recovery_order();
}
- crimson::osd::blocking_future<> recover_missing(
+ crimson::blocking_future<> recover_missing(
const hobject_t &soid, eversion_t need);
size_t prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::osd::blocking_future<>> *in_progress);
+ std::vector<crimson::blocking_future<>> *in_progress);
size_t prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
- std::vector<crimson::osd::blocking_future<>> *in_progress);
+ std::vector<crimson::blocking_future<>> *in_progress);
void on_local_recover(
const hobject_t& soid,
object_stat_sum_t stat;
};
- class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> {
+ class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> {
seastar::shared_promise<> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
public:
seastar::future<> wait_for_recovered() {
return recovered.get_shared_future();
}
- crimson::osd::blocking_future<>
+ crimson::blocking_future<>
wait_for_recovered_blocking() {
return make_blocking_future(
recovered.get_shared_future());
}
// Op Management
- OperationRegistry registry;
+ OSDOperationRegistry registry;
OperationThrottler throttler;
template <typename T, typename... Args>