From bc48f2dfb168b74cf8f00d4064b91b32f3c64ad5 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 10 Jun 2019 17:29:55 -0700 Subject: [PATCH] crimson/osd: add osd_operation Adds basic machinery for registering in progress operations along with blocking conditions. Signed-off-by: Samuel Just --- src/crimson/osd/CMakeLists.txt | 1 + src/crimson/osd/osd_operation.cc | 79 +++++++++ src/crimson/osd/osd_operation.h | 269 +++++++++++++++++++++++++++++++ 3 files changed, 349 insertions(+) create mode 100644 src/crimson/osd/osd_operation.cc create mode 100644 src/crimson/osd/osd_operation.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index ae9a463d142..77ac3858b69 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(crimson-osd pg_meta.cc replicated_backend.cc shard_services.cc + osd_operation.cc ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc new file mode 100644 index 00000000000..69ec5b7607b --- /dev/null +++ b/src/crimson/osd/osd_operation.cc @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "osd_operation.h" + +namespace ceph::osd { + +void Operation::dump(Formatter *f) +{ + f->open_object_section("operation"); + f->dump_string("type", get_type_name()); + f->dump_unsigned("id", id); + { + f->open_object_section("detail"); + dump_detail(f); + f->close_section(); + } + f->open_array_section("blockers"); + for (auto &blocker : blockers) { + blocker->dump(f); + } + f->close_section(); + f->close_section(); +} + +void Operation::dump_brief(Formatter *f) +{ + f->open_object_section("operation"); + f->dump_string("type", get_type_name()); + f->dump_unsigned("id", id); + f->close_section(); +} + +std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) { + lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail="; + rhs.print(lhs); + lhs << ")"; + return lhs; +} + +void Blocker::dump(Formatter *f) const +{ + f->open_object_section("blocker"); + f->dump_string("op_type", get_type_name()); + { + f->open_object_section("detail"); + dump_detail(f); + f->close_section(); + } + f->close_section(); +} + +void OrderedPipelinePhase::Handle::exit() +{ + if (phase) { + phase->mutex.unlock(); + phase = nullptr; + } +} + +blocking_future<> OrderedPipelinePhase::Handle::enter( + OrderedPipelinePhase &new_phase) +{ + auto fut = new_phase.mutex.lock(); + exit(); + phase = &new_phase; + return new_phase.make_blocking_future(std::move(fut)); +} + +OrderedPipelinePhase::Handle::~Handle() +{ + exit(); +} + +void OrderedPipelinePhase::dump_detail(Formatter *f) const +{ +} + +} diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h new file mode 100644 index 00000000000..73ce75476d5 --- /dev/null +++ b/src/crimson/osd/osd_operation.h @@ -0,0 +1,269 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "common/Formatter.h" + +namespace ceph::osd { + +enum class OperationTypeCode { + client_request = 0, + peering_event, + compound_peering_request, + last_op +}; + +static constexpr const char* const OP_NAMES[] = { + "client_write", + "peering_event", + "compound_peering_request" +}; + +// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: +static_assert( + (sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) == + static_cast(OperationTypeCode::last_op)); + +class OperationRegistry; + +using registry_hook_t = boost::intrusive::list_member_hook< + boost::intrusive::link_mode>; + +class Operation; +class Blocker; + +/** + * Provides an abstraction for registering and unregistering a blocker + * for the duration of a future becoming available. + */ +template +class blocking_future { + friend class Operation; + friend class Blocker; + Blocker *blocker; + seastar::future fut; + blocking_future(Blocker *b, seastar::future &&f) + : blocker(b), fut(std::move(f)) {} + + template + friend blocking_future make_ready_blocking_future(U&&... args); +}; + +template +blocking_future make_ready_blocking_future(U&&... args) { + return blocking_future( + nullptr, + seastar::make_ready_future(std::forward(args)...)); +} + +/** + * Provides an interface for dumping diagnostic information about + * why a particular op is not making progress. + */ +class Blocker { +protected: + virtual void dump_detail(Formatter *f) const = 0; + +public: + template + blocking_future make_blocking_future(seastar::future &&f) { + return blocking_future(this, std::move(f)); + } + + void dump(Formatter *f) const; + + virtual const char *get_type_name() const = 0; + + virtual ~Blocker() = default; +}; + +template +class BlockerT : public Blocker { +public: + const char *get_type_name() const final { + return T::type_name; + } + + virtual ~BlockerT() = default; +}; + +/** + * Common base for all crimson-osd operations. Mainly provides + * an interface for registering ops in flight and dumping + * diagnostic information. + */ +class Operation : public boost::intrusive_ref_counter< + Operation, boost::thread_unsafe_counter> { + friend class OperationRegistry; + registry_hook_t registry_hook; + + std::vector blockers; + uint64_t id = 0; + void set_id(uint64_t in_id) { + id = in_id; + } +protected: + virtual void dump_detail(Formatter *f) const = 0; + +public: + uint64_t get_id() const { + return id; + } + + virtual OperationTypeCode get_type() const = 0; + virtual const char *get_type_name() const = 0; + virtual void print(std::ostream &) const = 0; + + void add_blocker(Blocker *b) { + blockers.push_back(b); + } + + void clear_blocker(Blocker *b) { + auto iter = std::find(blockers.begin(), blockers.end(), b); + if (iter != blockers.end()) { + blockers.erase(iter); + } + } + + template + seastar::future with_blocking_future(blocking_future &&f) { + if (f.fut.available() || f.fut.failed()) { + return std::move(f.fut); + } + ceph_assert(f.blocker); + add_blocker(f.blocker); + return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) { + clear_blocker(blocker); + return std::move(arg); + }); + } + + void dump(Formatter *f); + void dump_brief(Formatter *f); + virtual ~Operation() = default; +}; +using OperationRef = boost::intrusive_ptr; + +std::ostream &operator<<(std::ostream &, const Operation &op); + +template +class OperationT : public Operation { + +protected: + virtual void dump_detail(Formatter *f) const = 0; + +public: + static constexpr const char *type_name = OP_NAMES[static_cast(T::type)]; + using IRef = boost::intrusive_ptr; + + OperationTypeCode get_type() const final { + return T::type; + } + + const char *get_type_name() const final { + return T::type_name; + } + + virtual ~OperationT() = default; +}; + +/** + * Maintains a set of lists of all active ops. + */ +class OperationRegistry { + friend class Operation; + using op_list_member_option = boost::intrusive::member_hook< + Operation, + registry_hook_t, + &Operation::registry_hook + >; + using op_list = boost::intrusive::list< + Operation, + op_list_member_option, + boost::intrusive::constant_time_size>; + + std::array< + op_list, + static_cast(OperationTypeCode::last_op) + > registries; + + std::array< + uint64_t, + static_cast(OperationTypeCode::last_op) + > op_id_counters = {}; + +public: + template + typename T::IRef create_operation(Args&&... args) { + typename T::IRef op = new T(std::forward(args)...); + registries[static_cast(T::type)].push_back(*op); + op->set_id(op_id_counters[static_cast(T::type)]++); + return op; + } +}; + +/** + * Ensures that at most one op may consider itself in the phase at a time. + * Ops will see enter() unblock in the order in which they tried to enter + * the phase. entering (though not necessarily waiting for the future to + * resolve) a new phase prior to exiting the previous one will ensure that + * the op ordering is preserved. + */ +class OrderedPipelinePhase : public Blocker { + const char * name; + +protected: + virtual void dump_detail(Formatter *f) const final; + const char *get_type_name() const final { + return name; + } + +public: + seastar::shared_mutex mutex; + + /** + * Used to encapsulate pipeline residency state. + */ + class Handle { + OrderedPipelinePhase *phase = nullptr; + + public: + Handle() = default; + + Handle(const Handle&) = delete; + Handle(Handle&&) = delete; + Handle &operator=(const Handle&) = delete; + Handle &operator=(Handle&&) = delete; + + /** + * Returns a future which unblocks when the handle has entered the passed + * OrderedPipelinePhase. If already in a phase, enter will also release + * that phase after placing itself in the queue for the next one to preserve + * ordering. + */ + blocking_future<> enter(OrderedPipelinePhase &phase); + + /** + * Releases the current phase if there is one. Called in ~Handle(). + */ + void exit(); + + ~Handle(); + }; + + OrderedPipelinePhase(const char *name) : name(name) {} +}; + +} -- 2.39.5