From cb7437a700bd550ef6ea0ce0c17bf804333b819f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Tue, 19 Sep 2023 14:48:08 +0200 Subject: [PATCH] osd: dissect crimson-shareable parts of ECBackend into ECCommon MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/osd/ECBackend.cc | 60 +++--- src/osd/ECBackend.h | 447 ++++++++++++++++++++++--------------------- 2 files changed, 258 insertions(+), 249 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index ec8fdc0637a29..a19e95cba9d1c 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -58,7 +58,7 @@ static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) { return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout); } -static ostream& _prefix(std::ostream *_dout, ECBackend::ReadPipeline *read_pipeline) { +static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) { return read_pipeline->get_parent()->gen_dbg_prefix(*_dout); } @@ -112,7 +112,7 @@ static ostream &operator<<( << rhs.get<1>() << ", " << rhs.get<2>() << ")"; } -ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs) +ostream &operator<<(ostream &lhs, const ECCommon::read_request_t &rhs) { return lhs << "read_request_t(to_read=[" << rhs.to_read << "]" << ", need=" << rhs.need @@ -120,7 +120,7 @@ ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs) << ")"; } -ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs) +ostream &operator<<(ostream &lhs, const ECCommon::read_result_t &rhs) { lhs << "read_result_t(r=" << rhs.r << ", errors=" << rhs.errors; @@ -132,7 +132,7 @@ ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs) return lhs << ", returned=" << rhs.returned << ")"; } -ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs) +ostream &operator<<(ostream &lhs, const ECCommon::ReadOp &rhs) { lhs << "ReadOp(tid=" << rhs.tid; if (rhs.op && rhs.op->get_req()) { @@ -147,7 +147,7 @@ ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs) << ", in_progress=" << rhs.in_progress << ")"; } -void ECBackend::ReadOp::dump(Formatter *f) const +void ECCommon::ReadOp::dump(Formatter *f) const { f->dump_unsigned("tid", tid); if (op && op->get_req()) { @@ -237,7 +237,7 @@ PGBackend::RecoveryHandle *ECBackend::open_recovery_op() return new ECRecoveryHandle; } -void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &res) +void ECBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res) { dout(10) << __func__ << ": Read error " << hoid << " r=" << res.r << " errors=" << res.errors << dendl; @@ -256,7 +256,7 @@ void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &re struct RecoveryMessages { map recovery_reads; + ECCommon::read_request_t> recovery_reads; map> want_to_read; void recovery_read( @@ -272,7 +272,7 @@ struct RecoveryMessages { recovery_reads.insert( make_pair( hoid, - ECBackend::read_request_t( + ECCommon::read_request_t( to_read, need, attrs))); @@ -515,13 +515,13 @@ struct SendPushReplies : public Context { } }; -struct RecoveryReadCompleter : ECBackend::ReadCompleter { +struct RecoveryReadCompleter : ECCommon::ReadCompleter { RecoveryReadCompleter(ECBackend& backend) : backend(backend) {} void finish_single_request( const hobject_t &hoid, - ECBackend::read_result_t &res, + ECCommon::read_result_t &res, list >) override { if (!(res.r == 0 && res.errors.empty())) { @@ -1347,7 +1347,7 @@ void ECBackend::handle_sub_read_reply( } } -void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop) +void ECCommon::ReadPipeline::complete_read_op(ReadOp &rop) { map::iterator reqiter = rop.to_read.begin(); @@ -1374,9 +1374,9 @@ void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop) } struct FinishReadOp : public GenContext { - ECBackend::ReadPipeline& read_pipeline; + ECCommon::ReadPipeline& read_pipeline; ceph_tid_t tid; - FinishReadOp(ECBackend::ReadPipeline& read_pipeline, ceph_tid_t tid) + FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid) : read_pipeline(read_pipeline), tid(tid) {} void finish(ThreadPool::TPHandle&) override { auto ropiter = read_pipeline.tid_to_read_map.find(tid); @@ -1386,7 +1386,7 @@ struct FinishReadOp : public GenContext { }; template -void ECBackend::ReadPipeline::filter_read_op( +void ECCommon::ReadPipeline::filter_read_op( const OSDMapRef& osdmap, ReadOp &op, F&& on_erase) @@ -1462,7 +1462,7 @@ void ECBackend::ReadPipeline::filter_read_op( } template -void ECBackend::ReadPipeline::check_recovery_sources( +void ECCommon::ReadPipeline::check_recovery_sources( const OSDMapRef& osdmap, F&& on_erase) { @@ -1494,7 +1494,7 @@ void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) }); } -void ECBackend::ReadPipeline::on_change() +void ECCommon::ReadPipeline::on_change() { for (map::iterator i = tid_to_read_map.begin(); i != tid_to_read_map.end(); @@ -1663,7 +1663,7 @@ void ECBackend::RMWPipeline::call_write_ordered(std::function &&cb) } } -void ECBackend::ReadPipeline::get_all_avail_shards( +void ECCommon::ReadPipeline::get_all_avail_shards( const hobject_t &hoid, const set &error_shards, set &have, @@ -1767,7 +1767,7 @@ int ECBackend::get_min_avail_to_read_shards( return 0; } -int ECBackend::ReadPipeline::get_remaining_shards( +int ECCommon::ReadPipeline::get_remaining_shards( const hobject_t &hoid, const set &avail, const set &want, @@ -1813,14 +1813,14 @@ int ECBackend::ReadPipeline::get_remaining_shards( return 0; } -void ECBackend::ReadPipeline::start_read_op( +void ECCommon::ReadPipeline::start_read_op( int priority, map> &want_to_read, map &to_read, OpRequestRef _op, bool do_redundant_reads, bool for_recovery, - std::unique_ptr on_complete) + std::unique_ptr on_complete) { ceph_tid_t tid = get_parent()->get_tid(); ceph_assert(!tid_to_read_map.count(tid)); @@ -1843,7 +1843,7 @@ void ECBackend::ReadPipeline::start_read_op( do_read_op(op); } -void ECBackend::ReadPipeline::do_read_op(ReadOp &op) +void ECCommon::ReadPipeline::do_read_op(ReadOp &op) { int priority = op.priority; ceph_tid_t tid = op.tid; @@ -2406,15 +2406,15 @@ void ECBackend::objects_read_async( on_complete))); } -struct ClientReadCompleter : ECBackend::ReadCompleter { - ClientReadCompleter(ECBackend::ReadPipeline &read_pipeline, - ECBackend::ClientAsyncReadStatus *status) +struct ClientReadCompleter : ECCommon::ReadCompleter { + ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline, + ECCommon::ClientAsyncReadStatus *status) : read_pipeline(read_pipeline), status(status) {} void finish_single_request( const hobject_t &hoid, - ECBackend::read_result_t &res, + ECCommon::read_result_t &res, list > to_read) override { extent_map result; @@ -2465,8 +2465,8 @@ out: // NOP } - ECBackend::ReadPipeline &read_pipeline; - ECBackend::ClientAsyncReadStatus *status; + ECCommon::ReadPipeline &read_pipeline; + ECCommon::ClientAsyncReadStatus *status; }; @@ -2481,7 +2481,7 @@ void ECBackend::objects_read_and_reconstruct( *this, reads, fast_read, std::move(func)); } -void ECBackend::ReadPipeline::objects_read_and_reconstruct( +void ECCommon::ReadPipeline::objects_read_and_reconstruct( ECBackend& ec_backend, const map > @@ -2532,7 +2532,7 @@ void ECBackend::ReadPipeline::objects_read_and_reconstruct( } -int ECBackend::ReadPipeline::send_all_remaining_reads( +int ECCommon::ReadPipeline::send_all_remaining_reads( const hobject_t &hoid, ReadOp &rop) { @@ -2568,7 +2568,7 @@ int ECBackend::ReadPipeline::send_all_remaining_reads( return 0; } -void ECBackend::ReadPipeline::kick_reads() +void ECCommon::ReadPipeline::kick_reads() { while (in_progress_client_reads.size() && in_progress_client_reads.front().is_complete()) { diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index 2a351113f3fff..30b6f017f9681 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -94,7 +94,231 @@ struct RecoveryMessages; const hobject_t &soid, const object_stat_sum_t &delta_stats) = 0; }; -class ECBackend : public PGBackend { + +struct ECBackend; +struct ECCommon { + struct read_request_t { + const std::list > to_read; + std::map>> need; + bool want_attrs; + read_request_t( + const std::list > &to_read, + const std::map>> &need, + bool want_attrs) + : to_read(to_read), need(need), want_attrs(want_attrs) {} + }; + friend ostream &operator<<(ostream &lhs, const read_request_t &rhs); + struct ReadOp; + /** + * Low level async read mechanism + * + * To avoid duplicating the logic for requesting and waiting for + * multiple object shards, there is a common async read mechanism + * taking a std::map of hobject_t->read_request_t which defines callbacks + * taking read_result_ts as arguments. + * + * tid_to_read_map gives open read ops. check_recovery_sources uses + * shard_to_read_map and ReadOp::source_to_obj to restart reads + * involving down osds. + * + * The user is responsible for specifying replicas on which to read + * and for reassembling the buffer on the other side since client + * reads require the original object buffer while recovery only needs + * the missing pieces. + * + * Rather than handling reads on the primary directly, we simply send + * ourselves a message. This avoids a dedicated primary path for that + * part. + */ + struct read_result_t { + int r; + std::map errors; + std::optional> > attrs; + std::list< + boost::tuple< + uint64_t, uint64_t, std::map > > returned; + read_result_t() : r(0) {} + }; + + struct ReadCompleter { + virtual void finish_single_request( + const hobject_t &hoid, + read_result_t &res, + std::list > to_read) = 0; + + virtual void finish(int priority) && = 0; + + virtual ~ReadCompleter() = default; + }; + + friend struct CallClientContexts; + struct ClientAsyncReadStatus { + unsigned objects_to_read; + GenContextURef > &&> func; + std::map > results; + explicit ClientAsyncReadStatus( + unsigned objects_to_read, + GenContextURef > &&> &&func) + : objects_to_read(objects_to_read), func(std::move(func)) {} + void complete_object( + const hobject_t &hoid, + int err, + extent_map &&buffers) { + ceph_assert(objects_to_read); + --objects_to_read; + ceph_assert(!results.count(hoid)); + results.emplace(hoid, std::make_pair(err, std::move(buffers))); + } + bool is_complete() const { + return objects_to_read == 0; + } + void run() { + func.release()->complete(std::move(results)); + } + }; + + struct ReadOp { + int priority; + ceph_tid_t tid; + OpRequestRef op; // may be null if not on behalf of a client + // True if redundant reads are issued, false otherwise, + // this is useful to tradeoff some resources (redundant ops) for + // low latency read, especially on relatively idle cluster + bool do_redundant_reads; + // True if reading for recovery which could possibly reading only a subset + // of the available shards. + bool for_recovery; + std::unique_ptr on_complete; + + ZTracer::Trace trace; + + std::map> want_to_read; + std::map to_read; + std::map complete; + + std::map> obj_to_source; + std::map > source_to_obj; + + void dump(ceph::Formatter *f) const; + + std::set in_progress; + + ReadOp( + int priority, + ceph_tid_t tid, + bool do_redundant_reads, + bool for_recovery, + std::unique_ptr _on_complete, + OpRequestRef op, + std::map> &&_want_to_read, + std::map &&_to_read) + : priority(priority), + tid(tid), + op(op), + do_redundant_reads(do_redundant_reads), + for_recovery(for_recovery), + on_complete(std::move(_on_complete)), + want_to_read(std::move(_want_to_read)), + to_read(std::move(_to_read)) { + for (auto &&hpair: to_read) { + auto &returned = complete[hpair.first].returned; + for (auto &&extent: hpair.second.to_read) { + returned.push_back( + boost::make_tuple( + extent.get<0>(), + extent.get<1>(), + std::map())); + } + } + } + ReadOp() = delete; + ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr + ReadOp(ReadOp &&) = default; + }; + struct ReadPipeline { + void objects_read_and_reconstruct( + ECBackend& ecbackend, + const std::map > + > &reads, + bool fast_read, + GenContextURef > &&> &&func); + + template + void filter_read_op( + const OSDMapRef& osdmap, + ReadOp &op, + F&& on_erase); + + template + void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase); + + void complete_read_op(ReadOp &rop); + + void start_read_op( + int priority, + std::map> &want_to_read, + std::map &to_read, + OpRequestRef op, + bool do_redundant_reads, + bool for_recovery, + std::unique_ptr on_complete); + + void do_read_op(ReadOp &rop); + + int send_all_remaining_reads( + const hobject_t &hoid, + ReadOp &rop); + + void on_change(); + + void kick_reads(); + + std::map tid_to_read_map; + std::map > shard_to_read_map; + std::list in_progress_client_reads; + + CephContext* cct; + ceph::ErasureCodeInterfaceRef ec_impl; + const ECUtil::stripe_info_t& sinfo; + // TODO: lay an interface down here + ECListener* parent; + + ECListener *get_parent() const { return parent; } + const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); } + epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); } + const pg_info_t &get_info() { return get_parent()->get_info(); } + + ReadPipeline(CephContext* cct, + ceph::ErasureCodeInterfaceRef ec_impl, + const ECUtil::stripe_info_t& sinfo, + ECListener* parent) + : cct(cct), + ec_impl(std::move(ec_impl)), + sinfo(sinfo), + parent(parent) { + } + + int get_remaining_shards( + const hobject_t &hoid, + const std::set &avail, + const std::set &want, + const read_result_t &result, + std::map>> *to_read, + bool for_recovery); + + void get_all_avail_shards( + const hobject_t &hoid, + const std::set &error_shards, + std::set &have, + std::map &shards, + bool for_recovery); + + friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); + friend struct FinishReadOp; + }; +}; + +class ECBackend : public PGBackend, public ECCommon { public: RecoveryHandle *open_recovery_op() override; @@ -205,31 +429,6 @@ public: bool fast_read, GenContextURef > &&> &&func); - friend struct CallClientContexts; - struct ClientAsyncReadStatus { - unsigned objects_to_read; - GenContextURef > &&> func; - std::map > results; - explicit ClientAsyncReadStatus( - unsigned objects_to_read, - GenContextURef > &&> &&func) - : objects_to_read(objects_to_read), func(std::move(func)) {} - void complete_object( - const hobject_t &hoid, - int err, - extent_map &&buffers) { - ceph_assert(objects_to_read); - --objects_to_read; - ceph_assert(!results.count(hoid)); - results.emplace(hoid, std::make_pair(err, std::move(buffers))); - } - bool is_complete() const { - return objects_to_read == 0; - } - void run() { - func.release()->complete(std::move(results)); - } - }; void objects_read_async( const hobject_t &hoid, const std::list, @@ -246,6 +445,7 @@ private: sinfo.get_stripe_width()); } +public: void get_want_to_read_shards(std::set *want_to_read) const { const std::vector &chunk_mapping = ec_impl->get_chunk_mapping(); for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) { @@ -253,6 +453,7 @@ private: want_to_read->insert(chunk); } } +private: /** * Recovery @@ -350,199 +551,7 @@ private: RecoveryMessages *m); public: - /** - * Low level async read mechanism - * - * To avoid duplicating the logic for requesting and waiting for - * multiple object shards, there is a common async read mechanism - * taking a std::map of hobject_t->read_request_t which defines callbacks - * taking read_result_ts as arguments. - * - * tid_to_read_map gives open read ops. check_recovery_sources uses - * shard_to_read_map and ReadOp::source_to_obj to restart reads - * involving down osds. - * - * The user is responsible for specifying replicas on which to read - * and for reassembling the buffer on the other side since client - * reads require the original object buffer while recovery only needs - * the missing pieces. - * - * Rather than handling reads on the primary directly, we simply send - * ourselves a message. This avoids a dedicated primary path for that - * part. - */ - struct read_result_t { - int r; - std::map errors; - std::optional> > attrs; - std::list< - boost::tuple< - uint64_t, uint64_t, std::map > > returned; - read_result_t() : r(0) {} - }; - struct read_request_t { - const std::list > to_read; - std::map>> need; - bool want_attrs; - read_request_t( - const std::list > &to_read, - const std::map>> &need, - bool want_attrs) - : to_read(to_read), need(need), want_attrs(want_attrs) {} - }; - friend ostream &operator<<(ostream &lhs, const read_request_t &rhs); - - struct ReadCompleter { - virtual void finish_single_request( - const hobject_t &hoid, - ECBackend::read_result_t &res, - std::list > to_read) = 0; - - virtual void finish(int priority) && = 0; - - virtual ~ReadCompleter() = default; - }; - - struct ReadOp { - int priority; - ceph_tid_t tid; - OpRequestRef op; // may be null if not on behalf of a client - // True if redundant reads are issued, false otherwise, - // this is useful to tradeoff some resources (redundant ops) for - // low latency read, especially on relatively idle cluster - bool do_redundant_reads; - // True if reading for recovery which could possibly reading only a subset - // of the available shards. - bool for_recovery; - std::unique_ptr on_complete; - - ZTracer::Trace trace; - - std::map> want_to_read; - std::map to_read; - std::map complete; - - std::map> obj_to_source; - std::map > source_to_obj; - - void dump(ceph::Formatter *f) const; - - std::set in_progress; - - ReadOp( - int priority, - ceph_tid_t tid, - bool do_redundant_reads, - bool for_recovery, - std::unique_ptr _on_complete, - OpRequestRef op, - std::map> &&_want_to_read, - std::map &&_to_read) - : priority(priority), - tid(tid), - op(op), - do_redundant_reads(do_redundant_reads), - for_recovery(for_recovery), - on_complete(std::move(_on_complete)), - want_to_read(std::move(_want_to_read)), - to_read(std::move(_to_read)) { - for (auto &&hpair: to_read) { - auto &returned = complete[hpair.first].returned; - for (auto &&extent: hpair.second.to_read) { - returned.push_back( - boost::make_tuple( - extent.get<0>(), - extent.get<1>(), - std::map())); - } - } - } - ReadOp() = delete; - ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr - ReadOp(ReadOp &&) = default; - }; - - struct ReadPipeline { - void objects_read_and_reconstruct( - ECBackend& ecbackend, - const std::map > - > &reads, - bool fast_read, - GenContextURef > &&> &&func); - - template - void filter_read_op( - const OSDMapRef& osdmap, - ReadOp &op, - F&& on_erase); - - template - void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase); - - void complete_read_op(ReadOp &rop); - - void start_read_op( - int priority, - std::map> &want_to_read, - std::map &to_read, - OpRequestRef op, - bool do_redundant_reads, - bool for_recovery, - std::unique_ptr on_complete); - - void do_read_op(ReadOp &rop); - - int send_all_remaining_reads( - const hobject_t &hoid, - ReadOp &rop); - - void on_change(); - - void kick_reads(); - - std::map tid_to_read_map; - std::map > shard_to_read_map; - std::list in_progress_client_reads; - - CephContext* cct; - ceph::ErasureCodeInterfaceRef ec_impl; - const ECUtil::stripe_info_t& sinfo; - // TODO: lay an interface down here - ECListener* parent; - - ECListener *get_parent() const { return parent; } - const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); } - epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); } - const pg_info_t &get_info() { return get_parent()->get_info(); } - - ReadPipeline(CephContext* cct, - ceph::ErasureCodeInterfaceRef ec_impl, - const ECUtil::stripe_info_t& sinfo, - ECListener* parent) - : cct(cct), - ec_impl(std::move(ec_impl)), - sinfo(sinfo), - parent(parent) { - } - - int get_remaining_shards( - const hobject_t &hoid, - const std::set &avail, - const std::set &want, - const read_result_t &result, - std::map>> *to_read, - bool for_recovery); - - void get_all_avail_shards( - const hobject_t &hoid, - const std::set &error_shards, - std::set &have, - std::map &shards, - bool for_recovery); - - friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); - friend struct FinishReadOp; - } read_pipeline; + struct ReadPipeline read_pipeline; /** -- 2.39.5