From 882bbb930432cb61e33464a378ef0b00b1078aa1 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Tue, 19 Sep 2023 15:23:31 +0200 Subject: [PATCH] osd: move ReadPipeline from ECBackend to shareable ECCommon MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/osd/ECBackend.cc | 26 +-- src/osd/ECBackend.h | 400 +++++++++++++++++++++---------------------- 2 files changed, 213 insertions(+), 213 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 6750a8a538d..4e8f0b4e019 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -55,7 +55,7 @@ using ceph::Formatter; static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { return pgb->get_parent()->gen_dbg_prefix(*_dout); } -static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) { +static ostream& _prefix(std::ostream *_dout, ECCommon::RMWPipeline *rmw_pipeline) { return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout); } static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) { @@ -66,11 +66,11 @@ struct ECRecoveryHandle : public PGBackend::RecoveryHandle { list ops; }; -ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::pipeline_state_t &rhs) { +ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::pipeline_state_t &rhs) { switch (rhs.pipeline_state) { - case ECBackend::RMWPipeline::pipeline_state_t::CACHE_VALID: + case ECCommon::RMWPipeline::pipeline_state_t::CACHE_VALID: return lhs << "CACHE_VALID"; - case ECBackend::RMWPipeline::pipeline_state_t::CACHE_INVALID: + case ECCommon::RMWPipeline::pipeline_state_t::CACHE_INVALID: return lhs << "CACHE_INVALID"; default: ceph_abort_msg("invalid pipeline state"); @@ -161,7 +161,7 @@ void ECCommon::ReadOp::dump(Formatter *f) const f->dump_stream("in_progress") << in_progress; } -ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::Op &rhs) +ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::Op &rhs) { lhs << "Op(" << rhs.hoid << " v=" << rhs.version @@ -1506,7 +1506,7 @@ void ECCommon::ReadPipeline::on_change() in_progress_client_reads.clear(); } -void ECBackend::RMWPipeline::on_change() +void ECCommon::RMWPipeline::on_change() { dout(10) << __func__ << dendl; @@ -1556,7 +1556,7 @@ void ECBackend::dump_recovery_info(Formatter *f) const f->close_section(); } -struct ECClassicalOp : ECBackend::RMWPipeline::Op { +struct ECClassicalOp : ECCommon::RMWPipeline::Op { PGTransactionUPtr t; void generate_transactions( @@ -1652,7 +1652,7 @@ void ECBackend::submit_transaction( rmw_pipeline.start_rmw(std::move(op)); } -void ECBackend::RMWPipeline::call_write_ordered(std::function &&cb) { +void ECCommon::RMWPipeline::call_write_ordered(std::function &&cb) { if (!waiting_state.empty()) { waiting_state.back().on_write.emplace_back(std::move(cb)); } else if (!waiting_reads.empty()) { @@ -1978,7 +1978,7 @@ ECUtil::HashInfoRef ECBackend::get_hash_info( return ref; } -void ECBackend::RMWPipeline::start_rmw(OpRef op) +void ECCommon::RMWPipeline::start_rmw(OpRef op) { ceph_assert(op); dout(10) << __func__ << ": " << *op << dendl; @@ -1989,7 +1989,7 @@ void ECBackend::RMWPipeline::start_rmw(OpRef op) check_ops(); } -bool ECBackend::RMWPipeline::try_state_to_reads() +bool ECCommon::RMWPipeline::try_state_to_reads() { if (waiting_state.empty()) return false; @@ -2063,7 +2063,7 @@ bool ECBackend::RMWPipeline::try_state_to_reads() return true; } -bool ECBackend::RMWPipeline::try_reads_to_commit() +bool ECCommon::RMWPipeline::try_reads_to_commit() { if (waiting_reads.empty()) return false; @@ -2221,7 +2221,7 @@ bool ECBackend::RMWPipeline::try_reads_to_commit() return true; } -bool ECBackend::RMWPipeline::try_finish_rmw() +bool ECCommon::RMWPipeline::try_finish_rmw() { if (waiting_commit.empty()) return false; @@ -2270,7 +2270,7 @@ bool ECBackend::RMWPipeline::try_finish_rmw() return true; } -void ECBackend::RMWPipeline::check_ops() +void ECCommon::RMWPipeline::check_ops() { while (try_state_to_reads() || try_reads_to_commit() || diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index a2769e6cb6d..5fbe8ccbfc6 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -341,6 +341,205 @@ struct ECCommon { std::map>> *to_read ///< [out] shards, corresponding subchunks to read ); ///< @return error code, 0 on success }; + + /** + * Client writes + * + * ECTransaction is responsible for generating a transaction for + * each shard to which we need to send the write. As required + * by the PGBackend interface, the ECBackend write mechanism + * passes trim information with the write and last_complete back + * with the reply. + * + * As with client reads, there is a possibility of out-of-order + * completions. Thus, callbacks and completion are called in order + * on the writing std::list. + */ + + struct RMWPipeline { + struct Op : boost::intrusive::list_base_hook<> { + /// From submit_transaction caller, describes operation + hobject_t hoid; + object_stat_sum_t delta_stats; + eversion_t version; + eversion_t trim_to; + std::optional updated_hit_set_history; + std::vector log_entries; + ceph_tid_t tid; + osd_reqid_t reqid; + ZTracer::Trace trace; + + eversion_t roll_forward_to; /// Soon to be generated internally + + /// Ancillary also provided from submit_transaction caller + std::map obc_map; + + /// see call_write_ordered + std::list > on_write; + + /// Generated internally + std::set temp_added; + std::set temp_cleared; + + ECTransaction::WritePlan plan; + bool requires_rmw() const { return !plan.to_read.empty(); } + bool invalidates_cache() const { return plan.invalidates_cache; } + + // must be true if requires_rmw(), must be false if invalidates_cache() + bool using_cache = true; + + /// In progress read state; + std::map pending_read; // subset already being read + std::map remote_read; // subset we must read + std::map remote_read_result; + bool read_in_progress() const { + return !remote_read.empty() && remote_read_result.empty(); + } + + /// In progress write state. + std::set pending_commit; + // we need pending_apply for pre-mimic peers so that we don't issue a + // read on a remote shard before it has applied a previous write. We can + // remove this after nautilus. + std::set pending_apply; + bool write_in_progress() const { + return !pending_commit.empty() || !pending_apply.empty(); + } + + /// optional, may be null, for tracking purposes + OpRequestRef client_op; + + /// pin for cache + ExtentCache::write_pin pin; + + /// Callbacks + Context *on_all_commit = nullptr; + virtual ~Op() { + delete on_all_commit; + } + + virtual void generate_transactions( + ceph::ErasureCodeInterfaceRef &ecimpl, + pg_t pgid, + const ECUtil::stripe_info_t &sinfo, + std::map *written, + std::map *transactions, + DoutPrefixProvider *dpp, + const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0; + }; + using OpRef = std::unique_ptr; + using op_list = boost::intrusive::list; + friend ostream &operator<<(ostream &lhs, const Op &rhs); + + ExtentCache cache; + std::map tid_to_op_map; /// Owns Op structure + /** + * We model the possible rmw states as a std::set of waitlists. + * All writes at this time complete in order, so a write blocked + * at waiting_state blocks all writes behind it as well (same for + * other states). + * + * Future work: We can break this up into a per-object pipeline + * (almost). First, provide an ordering token to submit_transaction + * and require that all operations within a single transaction take + * place on a subset of hobject_t space partitioned by that token + * (the hashid seem about right to me -- even works for temp objects + * if you recall that a temp object created for object head foo will + * only ever be referenced by other transactions on foo and aren't + * reused). Next, factor this part into a class and maintain one per + * ordering token. Next, fixup PrimaryLogPG's repop queue to be + * partitioned by ordering token. Finally, refactor the op pipeline + * so that the log entries passed into submit_transaction aren't + * versioned. We can't assign versions to them until we actually + * submit the operation. That's probably going to be the hard part. + */ + class pipeline_state_t { + enum { + CACHE_VALID = 0, + CACHE_INVALID = 1 + } pipeline_state = CACHE_VALID; + public: + bool caching_enabled() const { + return pipeline_state == CACHE_VALID; + } + bool cache_invalid() const { + return !caching_enabled(); + } + void invalidate() { + pipeline_state = CACHE_INVALID; + } + void clear() { + pipeline_state = CACHE_VALID; + } + friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs); + } pipeline_state; + + op_list waiting_state; /// writes waiting on pipe_state + op_list waiting_reads; /// writes waiting on partial stripe reads + op_list waiting_commit; /// writes waiting on initial commit + eversion_t completed_to; + eversion_t committed_to; + void start_rmw(OpRef op); + bool try_state_to_reads(); + bool try_reads_to_commit(); + bool try_finish_rmw(); + void check_ops(); + + void on_change(); + void call_write_ordered(std::function &&cb); + + CephContext* cct; + ECListener *get_parent() const { return parent; } + const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); } + epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); } + const pg_info_t &get_info() { return get_parent()->get_info(); } + + template + void objects_read_async_no_cache( + const std::map &to_read, + Func &&on_complete + ) { + std::map > > _to_read; + for (auto &&hpair: to_read) { + auto &l = _to_read[hpair.first]; + for (auto extent: hpair.second) { + l.emplace_back(extent.first, extent.second, 0); + } + } + ec_backend.objects_read_and_reconstruct( + _to_read, + false, + make_gen_lambda_context< + std::map > &&, Func>( + std::forward(on_complete))); + } + void handle_sub_write( + pg_shard_t from, + OpRequestRef msg, + ECSubWrite &op, + const ZTracer::Trace &trace + ) { + ec_backend.handle_sub_write(from, std::move(msg), op, trace); + } + // end of iface + + ceph::ErasureCodeInterfaceRef ec_impl; + const ECUtil::stripe_info_t& sinfo; + ECListener* parent; + ECCommon& ec_backend; + + RMWPipeline(CephContext* cct, + ceph::ErasureCodeInterfaceRef ec_impl, + const ECUtil::stripe_info_t& sinfo, + ECListener* parent, + ECCommon& ec_backend) + : cct(cct), + ec_impl(std::move(ec_impl)), + sinfo(sinfo), + parent(parent), + ec_backend(ec_backend) { + } + }; }; class ECBackend : public PGBackend, public ECCommon { @@ -567,206 +766,7 @@ private: public: struct ReadPipeline read_pipeline; - - - /** - * Client writes - * - * ECTransaction is responsible for generating a transaction for - * each shard to which we need to send the write. As required - * by the PGBackend interface, the ECBackend write mechanism - * passes trim information with the write and last_complete back - * with the reply. - * - * As with client reads, there is a possibility of out-of-order - * completions. Thus, callbacks and completion are called in order - * on the writing std::list. - */ - - struct RMWPipeline { - struct Op : boost::intrusive::list_base_hook<> { - /// From submit_transaction caller, describes operation - hobject_t hoid; - object_stat_sum_t delta_stats; - eversion_t version; - eversion_t trim_to; - std::optional updated_hit_set_history; - std::vector log_entries; - ceph_tid_t tid; - osd_reqid_t reqid; - ZTracer::Trace trace; - - eversion_t roll_forward_to; /// Soon to be generated internally - - /// Ancillary also provided from submit_transaction caller - std::map obc_map; - - /// see call_write_ordered - std::list > on_write; - - /// Generated internally - std::set temp_added; - std::set temp_cleared; - - ECTransaction::WritePlan plan; - bool requires_rmw() const { return !plan.to_read.empty(); } - bool invalidates_cache() const { return plan.invalidates_cache; } - - // must be true if requires_rmw(), must be false if invalidates_cache() - bool using_cache = true; - - /// In progress read state; - std::map pending_read; // subset already being read - std::map remote_read; // subset we must read - std::map remote_read_result; - bool read_in_progress() const { - return !remote_read.empty() && remote_read_result.empty(); - } - - /// In progress write state. - std::set pending_commit; - // we need pending_apply for pre-mimic peers so that we don't issue a - // read on a remote shard before it has applied a previous write. We can - // remove this after nautilus. - std::set pending_apply; - bool write_in_progress() const { - return !pending_commit.empty() || !pending_apply.empty(); - } - - /// optional, may be null, for tracking purposes - OpRequestRef client_op; - - /// pin for cache - ExtentCache::write_pin pin; - - /// Callbacks - Context *on_all_commit = nullptr; - virtual ~Op() { - delete on_all_commit; - } - - virtual void generate_transactions( - ceph::ErasureCodeInterfaceRef &ecimpl, - pg_t pgid, - const ECUtil::stripe_info_t &sinfo, - std::map *written, - std::map *transactions, - DoutPrefixProvider *dpp, - const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0; - }; - using OpRef = std::unique_ptr; - using op_list = boost::intrusive::list; - friend ostream &operator<<(ostream &lhs, const Op &rhs); - - ExtentCache cache; - std::map tid_to_op_map; /// Owns Op structure - /** - * We model the possible rmw states as a std::set of waitlists. - * All writes at this time complete in order, so a write blocked - * at waiting_state blocks all writes behind it as well (same for - * other states). - * - * Future work: We can break this up into a per-object pipeline - * (almost). First, provide an ordering token to submit_transaction - * and require that all operations within a single transaction take - * place on a subset of hobject_t space partitioned by that token - * (the hashid seem about right to me -- even works for temp objects - * if you recall that a temp object created for object head foo will - * only ever be referenced by other transactions on foo and aren't - * reused). Next, factor this part into a class and maintain one per - * ordering token. Next, fixup PrimaryLogPG's repop queue to be - * partitioned by ordering token. Finally, refactor the op pipeline - * so that the log entries passed into submit_transaction aren't - * versioned. We can't assign versions to them until we actually - * submit the operation. That's probably going to be the hard part. - */ - class pipeline_state_t { - enum { - CACHE_VALID = 0, - CACHE_INVALID = 1 - } pipeline_state = CACHE_VALID; - public: - bool caching_enabled() const { - return pipeline_state == CACHE_VALID; - } - bool cache_invalid() const { - return !caching_enabled(); - } - void invalidate() { - pipeline_state = CACHE_INVALID; - } - void clear() { - pipeline_state = CACHE_VALID; - } - friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs); - } pipeline_state; - - op_list waiting_state; /// writes waiting on pipe_state - op_list waiting_reads; /// writes waiting on partial stripe reads - op_list waiting_commit; /// writes waiting on initial commit - eversion_t completed_to; - eversion_t committed_to; - void start_rmw(OpRef op); - bool try_state_to_reads(); - bool try_reads_to_commit(); - bool try_finish_rmw(); - void check_ops(); - - void on_change(); - void call_write_ordered(std::function &&cb); - - CephContext* cct; - ECListener *get_parent() const { return parent; } - const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); } - epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); } - const pg_info_t &get_info() { return get_parent()->get_info(); } - - template - void objects_read_async_no_cache( - const std::map &to_read, - Func &&on_complete - ) { - std::map > > _to_read; - for (auto &&hpair: to_read) { - auto &l = _to_read[hpair.first]; - for (auto extent: hpair.second) { - l.emplace_back(extent.first, extent.second, 0); - } - } - ec_backend.objects_read_and_reconstruct( - _to_read, - false, - make_gen_lambda_context< - std::map > &&, Func>( - std::forward(on_complete))); - } - void handle_sub_write( - pg_shard_t from, - OpRequestRef msg, - ECSubWrite &op, - const ZTracer::Trace &trace - ) { - ec_backend.handle_sub_write(from, std::move(msg), op, trace); - } - // end of iface - - ceph::ErasureCodeInterfaceRef ec_impl; - const ECUtil::stripe_info_t& sinfo; - ECListener* parent; - ECCommon& ec_backend; - - RMWPipeline(CephContext* cct, - ceph::ErasureCodeInterfaceRef ec_impl, - const ECUtil::stripe_info_t& sinfo, - ECListener* parent, - ECCommon& ec_backend) - : cct(cct), - ec_impl(std::move(ec_impl)), - sinfo(sinfo), - parent(parent), - ec_backend(ec_backend) { - } - } rmw_pipeline; + struct RMWPipeline rmw_pipeline; ceph::ErasureCodeInterfaceRef ec_impl; -- 2.39.5