From c7eb7ca4437b3192ef2302fa70811658f2b366eb Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Sat, 9 Jan 2021 02:55:08 +0000 Subject: [PATCH] crimson/: move operation out of osd/ into common/ Seastore will need to use this as well. Signed-off-by: Samuel Just --- src/crimson/CMakeLists.txt | 1 + src/crimson/common/operation.cc | 91 +++++ src/crimson/common/operation.h | 330 ++++++++++++++++++ src/crimson/osd/osd_operation.cc | 83 ----- src/crimson/osd/osd_operation.h | 306 +--------------- .../compound_peering_request.cc | 11 +- src/crimson/osd/pg_recovery.cc | 16 +- src/crimson/osd/pg_recovery.h | 12 +- src/crimson/osd/recovery_backend.h | 4 +- src/crimson/osd/shard_services.h | 2 +- 10 files changed, 451 insertions(+), 405 deletions(-) create mode 100644 src/crimson/common/operation.cc create mode 100644 src/crimson/common/operation.h diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 26f7293365ec..44e65a238f31 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -18,6 +18,7 @@ set(crimson_common_srcs common/formatter.cc common/perf_counters_collection.cc common/log.cc + common/operation.cc common/throttle.cc common/tri_mutex.cc) diff --git a/src/crimson/common/operation.cc b/src/crimson/common/operation.cc new file mode 100644 index 000000000000..1657b7a5ebaf --- /dev/null +++ b/src/crimson/common/operation.cc @@ -0,0 +1,91 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "operation.h" +#include "common/Formatter.h" + +namespace crimson { + +void Operation::dump(ceph::Formatter* f) +{ + f->open_object_section("operation"); + f->dump_string("type", get_type_name()); + f->dump_unsigned("id", id); + { + f->open_object_section("detail"); + dump_detail(f); + f->close_section(); + } + f->open_array_section("blockers"); + for (auto &blocker : blockers) { + blocker->dump(f); + } + f->close_section(); + f->close_section(); +} + +void Operation::dump_brief(ceph::Formatter* f) +{ + f->open_object_section("operation"); + f->dump_string("type", get_type_name()); + f->dump_unsigned("id", id); + f->close_section(); +} + +std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) { + lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail="; + rhs.print(lhs); + lhs << ")"; + return lhs; +} + +void Blocker::dump(ceph::Formatter* f) const +{ + f->open_object_section("blocker"); + f->dump_string("op_type", get_type_name()); + { + f->open_object_section("detail"); + dump_detail(f); + f->close_section(); + } + f->close_section(); +} + +void AggregateBlocker::dump_detail(ceph::Formatter *f) const +{ + f->open_array_section("parent_blockers"); + for (auto b : parent_blockers) { + f->open_object_section("parent_blocker"); + b->dump(f); + f->close_section(); + } + f->close_section(); +} + +void OrderedPipelinePhase::Handle::exit() +{ + if (phase) { + phase->mutex.unlock(); + phase = nullptr; + } +} + +blocking_future<> OrderedPipelinePhase::Handle::enter( + OrderedPipelinePhase &new_phase) +{ + auto fut = new_phase.mutex.lock(); + exit(); + phase = &new_phase; + return new_phase.make_blocking_future(std::move(fut)); +} + +OrderedPipelinePhase::Handle::~Handle() +{ + exit(); +} + +void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const +{ +} + +} diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h new file mode 100644 index 000000000000..1f555e49cb44 --- /dev/null +++ b/src/crimson/common/operation.h @@ -0,0 +1,330 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "include/ceph_assert.h" + +namespace ceph { + class Formatter; +} + +namespace crimson { + +using registry_hook_t = boost::intrusive::list_member_hook< + boost::intrusive::link_mode>; + +class Operation; +class Blocker; + +/** + * Provides an abstraction for registering and unregistering a blocker + * for the duration of a future becoming available. + */ +template +class blocking_future_detail { + friend class Operation; + friend class Blocker; + Blocker *blocker; + Fut fut; + blocking_future_detail(Blocker *b, Fut &&f) + : blocker(b), fut(std::move(f)) {} + + template + friend blocking_future_detail> make_ready_blocking_future(U&& args); + template + friend blocking_future_detail> + make_exception_blocking_future(Exception&& e); + + template + friend blocking_future_detail> join_blocking_futures(U &&u); + + template + friend class blocking_future_detail; + +public: + template + auto then(F &&f) && { + using result = decltype(std::declval().then(f)); + return blocking_future_detail>( + blocker, + std::move(fut).then(std::forward(f))); + } +}; + +template +using blocking_future = blocking_future_detail>; + +template +blocking_future_detail> make_ready_blocking_future(U&& args) { + return blocking_future( + nullptr, + seastar::make_ready_future(std::forward(args))); +} + +template +blocking_future_detail> +make_exception_blocking_future(Exception&& e) { + return blocking_future( + nullptr, + seastar::make_exception_future(e)); +} + +/** + * Provides an interface for dumping diagnostic information about + * why a particular op is not making progress. + */ +class Blocker { +public: + template + blocking_future make_blocking_future(seastar::future &&f) { + return blocking_future(this, std::move(f)); + } + void dump(ceph::Formatter *f) const; + virtual ~Blocker() = default; + +private: + virtual void dump_detail(ceph::Formatter *f) const = 0; + virtual const char *get_type_name() const = 0; +}; + +template +class BlockerT : public Blocker { +public: + virtual ~BlockerT() = default; +private: + const char *get_type_name() const final { + return T::type_name; + } +}; + +class AggregateBlocker : public BlockerT { + std::vector parent_blockers; +public: + AggregateBlocker(std::vector &&parent_blockers) + : parent_blockers(std::move(parent_blockers)) {} + static constexpr const char *type_name = "AggregateBlocker"; +private: + void dump_detail(ceph::Formatter *f) const final; +}; + +template +blocking_future<> join_blocking_futures(T &&t) { + std::vector blockers; + blockers.reserve(t.size()); + for (auto &&bf: t) { + blockers.push_back(bf.blocker); + bf.blocker = nullptr; + } + auto agg = std::make_unique(std::move(blockers)); + return agg->make_blocking_future( + seastar::parallel_for_each( + std::forward(t), + [](auto &&bf) { + return std::move(bf.fut); + }).then([agg=std::move(agg)] { + return seastar::make_ready_future<>(); + })); +} + + +/** + * Common base for all crimson-osd operations. Mainly provides + * an interface for registering ops in flight and dumping + * diagnostic information. + */ +class Operation : public boost::intrusive_ref_counter< + Operation, boost::thread_unsafe_counter> { + public: + uint64_t get_id() const { + return id; + } + + virtual unsigned get_type() const = 0; + virtual const char *get_type_name() const = 0; + virtual void print(std::ostream &) const = 0; + + template + seastar::future with_blocking_future(blocking_future &&f) { + if (f.fut.available()) { + return std::move(f.fut); + } + assert(f.blocker); + add_blocker(f.blocker); + return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) { + clear_blocker(blocker); + return std::move(arg); + }); + } + + void dump(ceph::Formatter *f); + void dump_brief(ceph::Formatter *f); + virtual ~Operation() = default; + + private: + virtual void dump_detail(ceph::Formatter *f) const = 0; + + registry_hook_t registry_hook; + + std::vector blockers; + uint64_t id = 0; + void set_id(uint64_t in_id) { + id = in_id; + } + + void add_blocker(Blocker *b) { + blockers.push_back(b); + } + + void clear_blocker(Blocker *b) { + auto iter = std::find(blockers.begin(), blockers.end(), b); + if (iter != blockers.end()) { + blockers.erase(iter); + } + } + + friend class OperationRegistryI; + template + friend class OperationRegistryT; +}; +using OperationRef = boost::intrusive_ptr; + +std::ostream &operator<<(std::ostream &, const Operation &op); + +/** + * Maintains a set of lists of all active ops. + */ +class OperationRegistryI { + friend class Operation; + seastar::timer shutdown_timer; + seastar::promise<> shutdown; + +protected: + virtual void do_register(Operation *op) = 0; + virtual bool registries_empty() const = 0; + +public: + template + typename T::IRef create_operation(Args&&... args) { + typename T::IRef op = new T(std::forward(args)...); + do_register(&*op); + return op; + } + + seastar::future<> stop() { + shutdown_timer.set_callback([this] { + if (registries_empty()) { + shutdown.set_value(); + shutdown_timer.cancel(); + } + }); + shutdown_timer.arm_periodic( + std::chrono::milliseconds(100/*TODO: use option instead*/)); + return shutdown.get_future(); + } +}; + + +template +class OperationRegistryT : public OperationRegistryI { + using op_list_member_option = boost::intrusive::member_hook< + Operation, + registry_hook_t, + &Operation::registry_hook + >; + using op_list = boost::intrusive::list< + Operation, + op_list_member_option, + boost::intrusive::constant_time_size>; + + std::array< + op_list, + NUM_REGISTRIES + > registries; + + std::array< + uint64_t, + NUM_REGISTRIES + > op_id_counters = {}; + +protected: + void do_register(Operation *op) final { + registries[op->get_type()].push_back(*op); + op->set_id(op_id_counters[op->get_type()]++); + } + + bool registries_empty() const final { + return std::all_of(registries.begin(), + registries.end(), + [](auto& opl) { + return opl.empty(); + }); + } +}; + +/** + * Ensures that at most one op may consider itself in the phase at a time. + * Ops will see enter() unblock in the order in which they tried to enter + * the phase. entering (though not necessarily waiting for the future to + * resolve) a new phase prior to exiting the previous one will ensure that + * the op ordering is preserved. + */ +class OrderedPipelinePhase : public Blocker { +private: + void dump_detail(ceph::Formatter *f) const final; + const char *get_type_name() const final { + return name; + } + +public: + /** + * Used to encapsulate pipeline residency state. + */ + class Handle { + OrderedPipelinePhase *phase = nullptr; + + public: + Handle() = default; + + Handle(const Handle&) = delete; + Handle(Handle&&) = delete; + Handle &operator=(const Handle&) = delete; + Handle &operator=(Handle&&) = delete; + + /** + * Returns a future which unblocks when the handle has entered the passed + * OrderedPipelinePhase. If already in a phase, enter will also release + * that phase after placing itself in the queue for the next one to preserve + * ordering. + */ + blocking_future<> enter(OrderedPipelinePhase &phase); + + /** + * Releases the current phase if there is one. Called in ~Handle(). + */ + void exit(); + + ~Handle(); + }; + + OrderedPipelinePhase(const char *name) : name(name) {} + +private: + const char * name; + seastar::shared_mutex mutex; +}; + +} diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc index b5f3c3cbbc84..0ab3c2cf74d1 100644 --- a/src/crimson/osd/osd_operation.cc +++ b/src/crimson/osd/osd_operation.cc @@ -6,62 +6,6 @@ namespace crimson::osd { -void Operation::dump(ceph::Formatter* f) -{ - f->open_object_section("operation"); - f->dump_string("type", get_type_name()); - f->dump_unsigned("id", id); - { - f->open_object_section("detail"); - dump_detail(f); - f->close_section(); - } - f->open_array_section("blockers"); - for (auto &blocker : blockers) { - blocker->dump(f); - } - f->close_section(); - f->close_section(); -} - -void Operation::dump_brief(ceph::Formatter* f) -{ - f->open_object_section("operation"); - f->dump_string("type", get_type_name()); - f->dump_unsigned("id", id); - f->close_section(); -} - -std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) { - lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail="; - rhs.print(lhs); - lhs << ")"; - return lhs; -} - -void Blocker::dump(ceph::Formatter* f) const -{ - f->open_object_section("blocker"); - f->dump_string("op_type", get_type_name()); - { - f->open_object_section("detail"); - dump_detail(f); - f->close_section(); - } - f->close_section(); -} - -void AggregateBlocker::dump_detail(ceph::Formatter *f) const -{ - f->open_array_section("parent_blockers"); - for (auto b : parent_blockers) { - f->open_object_section("parent_blocker"); - b->dump(f); - f->close_section(); - } - f->close_section(); -} - OperationThrottler::OperationThrottler(ConfigProxy &conf) : scheduler(crimson::osd::scheduler::make_scheduler(conf)) { @@ -129,31 +73,4 @@ void OperationThrottler::handle_conf_change( update_from_config(conf); } - -void OrderedPipelinePhase::Handle::exit() -{ - if (phase) { - phase->mutex.unlock(); - phase = nullptr; - } -} - -blocking_future<> OrderedPipelinePhase::Handle::enter( - OrderedPipelinePhase &new_phase) -{ - auto fut = new_phase.mutex.lock(); - exit(); - phase = &new_phase; - return new_phase.make_blocking_future(std::move(fut)); -} - -OrderedPipelinePhase::Handle::~Handle() -{ - exit(); -} - -void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const -{ -} - } diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 5178749b0dda..85f5be2edf03 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -3,25 +3,9 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "include/ceph_assert.h" +#include "crimson/common/operation.h" #include "crimson/osd/scheduler/scheduler.h" -namespace ceph { - class Formatter; -} - namespace crimson::osd { enum class OperationTypeCode { @@ -52,195 +36,14 @@ static_assert( (sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) == static_cast(OperationTypeCode::last_op)); -class OperationRegistry; - -using registry_hook_t = boost::intrusive::list_member_hook< - boost::intrusive::link_mode>; - -class Operation; -class Blocker; - -/** - * Provides an abstraction for registering and unregistering a blocker - * for the duration of a future becoming available. - */ -template -class blocking_future_detail { - friend class Operation; - friend class Blocker; - Blocker *blocker; - Fut fut; - blocking_future_detail(Blocker *b, Fut &&f) - : blocker(b), fut(std::move(f)) {} - - template - friend blocking_future_detail> make_ready_blocking_future(U&& args); - template - friend blocking_future_detail> - make_exception_blocking_future(Exception&& e); - - template - friend blocking_future_detail> join_blocking_futures(U &&u); - - template - friend class blocking_future_detail; - -public: - template - auto then(F &&f) && { - using result = decltype(std::declval().then(f)); - return blocking_future_detail>( - blocker, - std::move(fut).then(std::forward(f))); - } -}; - -template -using blocking_future = blocking_future_detail>; - -template -blocking_future_detail> make_ready_blocking_future(U&& args) { - return blocking_future( - nullptr, - seastar::make_ready_future(std::forward(args))); -} - -template -blocking_future_detail> -make_exception_blocking_future(Exception&& e) { - return blocking_future( - nullptr, - seastar::make_exception_future(e)); -} - -/** - * Provides an interface for dumping diagnostic information about - * why a particular op is not making progress. - */ -class Blocker { -public: - template - blocking_future make_blocking_future(seastar::future &&f) { - return blocking_future(this, std::move(f)); - } - void dump(ceph::Formatter *f) const; - virtual ~Blocker() = default; - -private: - virtual void dump_detail(ceph::Formatter *f) const = 0; - virtual const char *get_type_name() const = 0; -}; - -template -class BlockerT : public Blocker { -public: - virtual ~BlockerT() = default; -private: - const char *get_type_name() const final { - return T::type_name; - } -}; - -class AggregateBlocker : public BlockerT { - vector parent_blockers; -public: - AggregateBlocker(vector &&parent_blockers) - : parent_blockers(std::move(parent_blockers)) {} - static constexpr const char *type_name = "AggregateBlocker"; -private: - void dump_detail(ceph::Formatter *f) const final; -}; - -template -blocking_future<> join_blocking_futures(T &&t) { - vector blockers; - blockers.reserve(t.size()); - for (auto &&bf: t) { - blockers.push_back(bf.blocker); - bf.blocker = nullptr; - } - auto agg = std::make_unique(std::move(blockers)); - return agg->make_blocking_future( - seastar::parallel_for_each( - std::forward(t), - [](auto &&bf) { - return std::move(bf.fut); - }).then([agg=std::move(agg)] { - return seastar::make_ready_future<>(); - })); -} - - -/** - * Common base for all crimson-osd operations. Mainly provides - * an interface for registering ops in flight and dumping - * diagnostic information. - */ -class Operation : public boost::intrusive_ref_counter< - Operation, boost::thread_unsafe_counter> { - public: - uint64_t get_id() const { - return id; - } - - virtual OperationTypeCode get_type() const = 0; - virtual const char *get_type_name() const = 0; - virtual void print(std::ostream &) const = 0; - - template - seastar::future with_blocking_future(blocking_future &&f) { - if (f.fut.available()) { - return std::move(f.fut); - } - assert(f.blocker); - add_blocker(f.blocker); - return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) { - clear_blocker(blocker); - return std::move(arg); - }); - } - - void dump(ceph::Formatter *f); - void dump_brief(ceph::Formatter *f); - virtual ~Operation() = default; - - private: - virtual void dump_detail(ceph::Formatter *f) const = 0; - - private: - registry_hook_t registry_hook; - - std::vector blockers; - uint64_t id = 0; - void set_id(uint64_t in_id) { - id = in_id; - } - - void add_blocker(Blocker *b) { - blockers.push_back(b); - } - - void clear_blocker(Blocker *b) { - auto iter = std::find(blockers.begin(), blockers.end(), b); - if (iter != blockers.end()) { - blockers.erase(iter); - } - } - - friend class OperationRegistry; -}; -using OperationRef = boost::intrusive_ptr; - -std::ostream &operator<<(std::ostream &, const Operation &op); - template class OperationT : public Operation { public: static constexpr const char *type_name = OP_NAMES[static_cast(T::type)]; using IRef = boost::intrusive_ptr; - OperationTypeCode get_type() const final { - return T::type; + unsigned get_type() const final { + return static_cast(T::type); } const char *get_type_name() const final { @@ -256,54 +59,9 @@ private: /** * Maintains a set of lists of all active ops. */ -class OperationRegistry { - friend class Operation; - using op_list_member_option = boost::intrusive::member_hook< - Operation, - registry_hook_t, - &Operation::registry_hook - >; - using op_list = boost::intrusive::list< - Operation, - op_list_member_option, - boost::intrusive::constant_time_size>; - - std::array< - op_list, - static_cast(OperationTypeCode::last_op) - > registries; - - std::array< - uint64_t, - static_cast(OperationTypeCode::last_op) - > op_id_counters = {}; - - seastar::timer shutdown_timer; - seastar::promise<> shutdown; -public: - template - typename T::IRef create_operation(Args&&... args) { - typename T::IRef op = new T(std::forward(args)...); - registries[static_cast(T::type)].push_back(*op); - op->set_id(op_id_counters[static_cast(T::type)]++); - return op; - } - - seastar::future<> stop() { - shutdown_timer.set_callback([this] { - if (std::all_of(registries.begin(), - registries.end(), - [](auto& opl) { - return opl.empty(); - })) { - shutdown.set_value(); - shutdown_timer.cancel(); - } - }); - shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/)); - return shutdown.get_future(); - } -}; +using OSDOperationRegistry = OperationRegistryT< + static_cast(OperationTypeCode::last_op) + >; /** * Throttles set of currently running operations @@ -372,56 +130,4 @@ private: void release_throttle(); }; -/** - * Ensures that at most one op may consider itself in the phase at a time. - * Ops will see enter() unblock in the order in which they tried to enter - * the phase. entering (though not necessarily waiting for the future to - * resolve) a new phase prior to exiting the previous one will ensure that - * the op ordering is preserved. - */ -class OrderedPipelinePhase : public Blocker { -private: - void dump_detail(ceph::Formatter *f) const final; - const char *get_type_name() const final { - return name; - } - -public: - /** - * Used to encapsulate pipeline residency state. - */ - class Handle { - OrderedPipelinePhase *phase = nullptr; - - public: - Handle() = default; - - Handle(const Handle&) = delete; - Handle(Handle&&) = delete; - Handle &operator=(const Handle&) = delete; - Handle &operator=(Handle&&) = delete; - - /** - * Returns a future which unblocks when the handle has entered the passed - * OrderedPipelinePhase. If already in a phase, enter will also release - * that phase after placing itself in the queue for the next one to preserve - * ordering. - */ - blocking_future<> enter(OrderedPipelinePhase &phase); - - /** - * Releases the current phase if there is one. Called in ~Handle(). - */ - void exit(); - - ~Handle(); - }; - - OrderedPipelinePhase(const char *name) : name(name) {} - -private: - const char * name; - seastar::shared_mutex mutex; -}; - } diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index e55760096e4e..6290b9c77162 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -57,13 +57,13 @@ public: } }; -std::vector handle_pg_create( +std::vector handle_pg_create( OSD &osd, crimson::net::ConnectionRef conn, compound_state_ref state, Ref m) { - std::vector ret; + std::vector ret; for (auto& [pgid, when] : m->pgs) { const auto &[created, created_stamp] = when; auto q = m->pg_extra.find(pgid); @@ -100,11 +100,12 @@ std::vector handle_pg_create( return ret; } -struct SubOpBlocker : BlockerT { +struct SubOpBlocker : crimson::BlockerT { static constexpr const char * type_name = "CompoundOpBlocker"; - std::vector subops; - SubOpBlocker(std::vector &&subops) : subops(subops) {} + std::vector subops; + SubOpBlocker(std::vector &&subops) + : subops(subops) {} virtual void dump_detail(Formatter *f) const { f->open_array_section("dependent_operations"); diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 7d70b5e8ffbf..19c8962f6df4 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -36,7 +36,7 @@ void PGRecovery::start_pglogbased_recovery() pg->get_osdmap_epoch()); } -crimson::osd::blocking_future +crimson::blocking_future PGRecovery::start_recovery_ops(size_t max_to_start) { assert(pg->is_primary()); @@ -51,13 +51,13 @@ PGRecovery::start_recovery_ops(size_t max_to_start) assert(!pg->is_backfilling()); assert(!pg->get_peering_state().is_deleting()); - std::vector> started; + std::vector> started; started.reserve(max_to_start); max_to_start -= start_primary_recovery_ops(max_to_start, &started); if (max_to_start > 0) { max_to_start -= start_replica_recovery_ops(max_to_start, &started); } - return crimson::osd::join_blocking_futures(std::move(started)).then( + return crimson::join_blocking_futures(std::move(started)).then( [this] { bool done = !pg->get_peering_state().needs_recovery(); if (done) { @@ -94,7 +94,7 @@ PGRecovery::start_recovery_ops(size_t max_to_start) size_t PGRecovery::start_primary_recovery_ops( size_t max_to_start, - std::vector> *out) + std::vector> *out) { if (!pg->is_recovering()) { return 0; @@ -169,7 +169,7 @@ size_t PGRecovery::start_primary_recovery_ops( size_t PGRecovery::start_replica_recovery_ops( size_t max_to_start, - std::vector> *out) + std::vector> *out) { if (!pg->is_recovering()) { return 0; @@ -254,7 +254,7 @@ size_t PGRecovery::start_replica_recovery_ops( return started; } -crimson::osd::blocking_future<> PGRecovery::recover_missing( +crimson::blocking_future<> PGRecovery::recover_missing( const hobject_t &soid, eversion_t need) { if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) { @@ -274,7 +274,7 @@ crimson::osd::blocking_future<> PGRecovery::recover_missing( size_t PGRecovery::prep_object_replica_deletes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress) + std::vector> *in_progress) { in_progress->push_back( pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( @@ -292,7 +292,7 @@ size_t PGRecovery::prep_object_replica_deletes( size_t PGRecovery::prep_object_replica_pushes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress) + std::vector> *in_progress) { in_progress->push_back( pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index 86f259de5423..19303e58d923 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -22,7 +22,7 @@ public: virtual ~PGRecovery() {} void start_pglogbased_recovery(); - crimson::osd::blocking_future start_recovery_ops(size_t max_to_start); + crimson::blocking_future start_recovery_ops(size_t max_to_start); void on_backfill_reserved(); void dispatch_backfill_event( boost::intrusive_ptr evt); @@ -32,24 +32,24 @@ private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( size_t max_to_start, - std::vector> *out); + std::vector> *out); size_t start_replica_recovery_ops( size_t max_to_start, - std::vector> *out); + std::vector> *out); std::vector get_replica_recovery_order() const { return pg->get_replica_recovery_order(); } - crimson::osd::blocking_future<> recover_missing( + crimson::blocking_future<> recover_missing( const hobject_t &soid, eversion_t need); size_t prep_object_replica_deletes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress); + std::vector> *in_progress); size_t prep_object_replica_pushes( const hobject_t& soid, eversion_t need, - std::vector> *in_progress); + std::vector> *in_progress); void on_local_recover( const hobject_t& soid, diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index cb0ae9f20565..2216fdeed684 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -127,7 +127,7 @@ protected: object_stat_sum_t stat; }; - class WaitForObjectRecovery : public crimson::osd::BlockerT { + class WaitForObjectRecovery : public crimson::BlockerT { seastar::shared_promise<> readable, recovered, pulled; std::map> pushes; public: @@ -146,7 +146,7 @@ protected: seastar::future<> wait_for_recovered() { return recovered.get_shared_future(); } - crimson::osd::blocking_future<> + crimson::blocking_future<> wait_for_recovered_blocking() { return make_blocking_future( recovered.get_shared_future()); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 2957639c6347..01e0e91e471e 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -88,7 +88,7 @@ public: } // Op Management - OperationRegistry registry; + OSDOperationRegistry registry; OperationThrottler throttler; template -- 2.47.3