From 416d3281f1009ad539ff04c03c2801affa7634d2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Thu, 6 Jul 2023 13:30:53 +0200 Subject: [PATCH] osd: dissect the EC read pipeline from ECBackend into dedicated class MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/osd/ECBackend.cc | 161 ++++++++++++++++++++++++++++--------------- src/osd/ECBackend.h | 125 +++++++++++++++++++++------------ 2 files changed, 188 insertions(+), 98 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 906b1d66450..4bc7695ebb1 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -58,6 +58,9 @@ static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) { return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout); } +static ostream& _prefix(std::ostream *_dout, ECBackend::ReadPipeline *read_pipeline) { + return read_pipeline->get_parent()->gen_dbg_prefix(*_dout); +} struct ECRecoveryHandle : public PGBackend::RecoveryHandle { list ops; @@ -221,6 +224,7 @@ ECBackend::ECBackend( ErasureCodeInterfaceRef ec_impl, uint64_t stripe_width) : PGBackend(cct, pg, store, coll, ch), + read_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this), rmw_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this), ec_impl(ec_impl), sinfo(ec_impl->get_data_chunk_count(), stripe_width) { @@ -593,7 +597,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) if (m.recovery_reads.empty()) return; - start_read_op( + read_pipeline.start_read_op( priority, m.want_to_read, m.recovery_reads, @@ -1208,8 +1212,8 @@ void ECBackend::handle_sub_read_reply( { trace.event("ec sub read reply"); dout(10) << __func__ << ": reply " << op << dendl; - map::iterator iter = tid_to_read_map.find(op.tid); - if (iter == tid_to_read_map.end()) { + map::iterator iter = read_pipeline.tid_to_read_map.find(op.tid); + if (iter == read_pipeline.tid_to_read_map.end()) { //canceled dout(20) << __func__ << ": dropped " << op << dendl; return; @@ -1265,8 +1269,8 @@ void ECBackend::handle_sub_read_reply( } map >::iterator siter = - shard_to_read_map.find(from); - ceph_assert(siter != shard_to_read_map.end()); + read_pipeline.shard_to_read_map.find(from); + ceph_assert(siter != read_pipeline.shard_to_read_map.end()); ceph_assert(siter->second.count(op.tid)); siter->second.erase(op.tid); @@ -1298,7 +1302,7 @@ void ECBackend::handle_sub_read_reply( // During recovery there may be multiple osds with copies of the same shard, // so getting EIO from one may result in multiple passes through this code path. if (!rop.do_redundant_reads) { - int r = send_all_remaining_reads(iter->first, rop); + int r = read_pipeline.send_all_remaining_reads(iter->first, rop); if (r == 0) { // We changed the rop's to_read and not incrementing is_complete need_resend = true; @@ -1337,18 +1341,18 @@ void ECBackend::handle_sub_read_reply( } } if (need_resend) { - do_read_op(rop); + read_pipeline.do_read_op(rop); } else if (rop.in_progress.empty() || is_complete == rop.complete.size()) { dout(20) << __func__ << " Complete: " << rop << dendl; rop.trace.event("ec read complete"); - complete_read_op(rop); + read_pipeline.complete_read_op(rop); } else { dout(10) << __func__ << " readop not complete: " << rop << dendl; } } -void ECBackend::complete_read_op(ReadOp &rop) +void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop) { map::iterator reqiter = rop.to_read.begin(); @@ -1376,17 +1380,18 @@ void ECBackend::complete_read_op(ReadOp &rop) } struct FinishReadOp : public GenContext { - ECBackend *ec; + ECBackend::ReadPipeline& read_pipeline; ceph_tid_t tid; - FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {} + FinishReadOp(ECBackend::ReadPipeline& read_pipeline, ceph_tid_t tid) + : read_pipeline(read_pipeline), tid(tid) {} void finish(ThreadPool::TPHandle&) override { - auto ropiter = ec->tid_to_read_map.find(tid); - ceph_assert(ropiter != ec->tid_to_read_map.end()); - ec->complete_read_op(ropiter->second); + auto ropiter = read_pipeline.tid_to_read_map.find(tid); + ceph_assert(ropiter != read_pipeline.tid_to_read_map.end()); + read_pipeline.complete_read_op(ropiter->second); } }; -void ECBackend::filter_read_op( +void ECBackend::ReadPipeline::filter_read_op( const OSDMapRef& osdmap, ReadOp &op) { @@ -1438,7 +1443,8 @@ void ECBackend::filter_read_op( op.to_read.erase(*i); op.complete.erase(*i); - recovery_ops.erase(*i); + // TODO: meh, this doesn't look like a part of the read pipeline + //recovery_ops.erase(*i); } if (op.in_progress.empty()) { @@ -1459,21 +1465,22 @@ void ECBackend::filter_read_op( */ get_parent()->schedule_recovery_work( get_parent()->bless_unlocked_gencontext( - new FinishReadOp(this, op.tid)), + new FinishReadOp(*this, op.tid)), 1); } } void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) { + // TODO: dissect into ReadPipeline set tids_to_filter; for (map >::iterator - i = shard_to_read_map.begin(); - i != shard_to_read_map.end(); + i = read_pipeline.shard_to_read_map.begin(); + i != read_pipeline.shard_to_read_map.end(); ) { if (osdmap->is_down(i->first.osd)) { tids_to_filter.insert(i->second.begin(), i->second.end()); - shard_to_read_map.erase(i++); + read_pipeline.shard_to_read_map.erase(i++); } else { ++i; } @@ -1481,12 +1488,31 @@ void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) for (set::iterator i = tids_to_filter.begin(); i != tids_to_filter.end(); ++i) { - map::iterator j = tid_to_read_map.find(*i); - ceph_assert(j != tid_to_read_map.end()); - filter_read_op(osdmap, j->second); + map::iterator j = read_pipeline.tid_to_read_map.find(*i); + ceph_assert(j != read_pipeline.tid_to_read_map.end()); + read_pipeline.filter_read_op(osdmap, j->second); } } +void ECBackend::ReadPipeline::on_change() +{ + for (map::iterator i = tid_to_read_map.begin(); + i != tid_to_read_map.end(); + ++i) { + dout(10) << __func__ << ": cancelling " << i->second << dendl; + for (map::iterator j = + i->second.to_read.begin(); + j != i->second.to_read.end(); + ++j) { + delete j->second.cb; + j->second.cb = nullptr; + } + } + tid_to_read_map.clear(); + shard_to_read_map.clear(); + in_progress_client_reads.clear(); +} + void ECBackend::RMWPipeline::on_change() { dout(10) << __func__ << dendl; @@ -1506,21 +1532,7 @@ void ECBackend::RMWPipeline::on_change() void ECBackend::on_change() { rmw_pipeline.on_change(); - for (map::iterator i = tid_to_read_map.begin(); - i != tid_to_read_map.end(); - ++i) { - dout(10) << __func__ << ": cancelling " << i->second << dendl; - for (map::iterator j = - i->second.to_read.begin(); - j != i->second.to_read.end(); - ++j) { - delete j->second.cb; - j->second.cb = nullptr; - } - } - tid_to_read_map.clear(); - shard_to_read_map.clear(); - in_progress_client_reads.clear(); + read_pipeline.on_change(); clear_recovery_state(); } @@ -1541,8 +1553,8 @@ void ECBackend::dump_recovery_info(Formatter *f) const } f->close_section(); f->open_array_section("read_ops"); - for (map::const_iterator i = tid_to_read_map.begin(); - i != tid_to_read_map.end(); + for (map::const_iterator i = read_pipeline.tid_to_read_map.begin(); + i != read_pipeline.tid_to_read_map.end(); ++i) { f->open_object_section("read_op"); i->second.dump(f); @@ -1658,7 +1670,7 @@ void ECBackend::RMWPipeline::call_write_ordered(std::function &&cb) } } -void ECBackend::get_all_avail_shards( +void ECBackend::ReadPipeline::get_all_avail_shards( const hobject_t &hoid, const set &error_shards, set &have, @@ -1737,7 +1749,7 @@ int ECBackend::get_min_avail_to_read_shards( map shards; set error_shards; - get_all_avail_shards(hoid, error_shards, have, shards, for_recovery); + read_pipeline.get_all_avail_shards(hoid, error_shards, have, shards, for_recovery); map>> need; int r = ec_impl->minimum_to_decode(want, have, &need); @@ -1762,7 +1774,7 @@ int ECBackend::get_min_avail_to_read_shards( return 0; } -int ECBackend::get_remaining_shards( +int ECBackend::ReadPipeline::get_remaining_shards( const hobject_t &hoid, const set &avail, const set &want, @@ -1808,7 +1820,7 @@ int ECBackend::get_remaining_shards( return 0; } -void ECBackend::start_read_op( +void ECBackend::ReadPipeline::start_read_op( int priority, map> &want_to_read, map &to_read, @@ -1838,7 +1850,7 @@ void ECBackend::start_read_op( do_read_op(op); } -void ECBackend::do_read_op(ReadOp &op) +void ECBackend::ReadPipeline::do_read_op(ReadOp &op) { int priority = op.priority; ceph_tid_t tid = op.tid; @@ -2404,15 +2416,15 @@ void ECBackend::objects_read_async( struct CallClientContexts : public GenContext { hobject_t hoid; - ECBackend *ec; + ECBackend::ReadPipeline &read_pipeline; ECBackend::ClientAsyncReadStatus *status; list > to_read; CallClientContexts( hobject_t hoid, - ECBackend *ec, + ECBackend::ReadPipeline &read_pipeline, ECBackend::ClientAsyncReadStatus *status, const list > &to_read) - : hoid(hoid), ec(ec), status(status), to_read(to_read) {} + : hoid(hoid), read_pipeline(read_pipeline), status(status), to_read(to_read) {} void finish(ECBackend::read_result_t &res) override { extent_map result; if (res.r != 0) @@ -2421,7 +2433,7 @@ struct CallClientContexts : ceph_assert(res.errors.empty()); for (auto &&read: to_read) { pair adjusted = - ec->sinfo.offset_len_to_stripe_bounds( + read_pipeline.sinfo.offset_len_to_stripe_bounds( make_pair(read.get<0>(), read.get<1>())); ceph_assert(res.returned.front().get<0>() == adjusted.first); ceph_assert(res.returned.front().get<1>() == adjusted.second); @@ -2434,8 +2446,8 @@ struct CallClientContexts : to_decode[j->first.shard] = std::move(j->second); } int r = ECUtil::decode( - ec->sinfo, - ec->ec_impl, + read_pipeline.sinfo, + read_pipeline.ec_impl, to_decode, &bl); if (r < 0) { @@ -2454,7 +2466,7 @@ struct CallClientContexts : } out: status->complete_object(hoid, res.r, std::move(result)); - ec->kick_reads(); + read_pipeline.kick_reads(); } }; @@ -2465,6 +2477,32 @@ void ECBackend::objects_read_and_reconstruct( bool fast_read, GenContextURef > &&> &&func) { + return read_pipeline.objects_read_and_reconstruct( + *this, reads, fast_read, std::move(func)); +} + +void ECBackend::ReadPipeline::objects_read_and_reconstruct( + ECBackend& ecbackend, + const map > + > &reads, + bool fast_read, + GenContextURef > &&> &&func) +{ + return [this, + kick_reads=[this] (auto...) { return this->kick_reads();}, + get_want_to_read_shards=[&ecbackend] (auto&&... args) { + return ecbackend.get_want_to_read_shards(std::forward(args)...); + }, + get_min_avail_to_read_shards=[&ecbackend] (auto&&... args) { + return ecbackend.get_min_avail_to_read_shards(std::forward(args)...); + }, + cct=(CephContext*)nullptr, + // params + &reads, + fast_read, + func=std::move(func) + ]() mutable { in_progress_client_reads.emplace_back( reads.size(), std::move(func)); if (!reads.size()) { @@ -2489,7 +2527,7 @@ void ECBackend::objects_read_and_reconstruct( CallClientContexts *c = new CallClientContexts( to_read.first, - this, + *this, &(in_progress_client_reads.back()), to_read.second); for_read_op.insert( @@ -2509,11 +2547,12 @@ void ECBackend::objects_read_and_reconstruct( for_read_op, OpRequestRef(), fast_read, false, nullptr); + }(); return; } -int ECBackend::send_all_remaining_reads( +int ECBackend::ReadPipeline::send_all_remaining_reads( const hobject_t &hoid, ReadOp &rop) { @@ -2551,6 +2590,20 @@ int ECBackend::send_all_remaining_reads( return 0; } +void ECBackend::ReadPipeline::kick_reads() +{ + while (in_progress_client_reads.size() && + in_progress_client_reads.front().is_complete()) { + in_progress_client_reads.front().run(); + in_progress_client_reads.pop_front(); + } +} + +void ECBackend::kick_reads() { + read_pipeline.kick_reads(); +} + + int ECBackend::objects_get_attrs( const hobject_t &hoid, map> *out) diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index d604a176ac7..b3951f664a8 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -168,7 +168,6 @@ public: func.release()->complete(std::move(results)); } }; - std::list in_progress_client_reads; void objects_read_async( const hobject_t &hoid, const std::list, @@ -176,13 +175,7 @@ public: Context *on_complete, bool fast_read = false) override; - void kick_reads() { - while (in_progress_client_reads.size() && - in_progress_client_reads.front().is_complete()) { - in_progress_client_reads.front().run(); - in_progress_client_reads.pop_front(); - } - } + void kick_reads(); private: friend struct ECRecoveryHandle; @@ -292,12 +285,6 @@ private: const PushReplyOp &op, pg_shard_t from, RecoveryMessages *m); - void get_all_avail_shards( - const hobject_t &hoid, - const std::set &error_shards, - std::set &have, - std::map &shards, - bool for_recovery); public: /** @@ -403,28 +390,86 @@ public: ReadOp(const ReadOp &) = default; ReadOp(ReadOp &&) = default; }; - friend struct FinishReadOp; - void filter_read_op( - const OSDMapRef& osdmap, - ReadOp &op); - void complete_read_op(ReadOp &rop); - friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); - - std::map tid_to_read_map; - std::map > shard_to_read_map; - void start_read_op( - int priority, - std::map> &want_to_read, - std::map &to_read, - OpRequestRef op, - bool do_redundant_reads, - bool for_recovery, - GenContext *on_complete); - - void do_read_op(ReadOp &rop); - int send_all_remaining_reads( - const hobject_t &hoid, - ReadOp &rop); + + struct ReadPipeline { + void objects_read_and_reconstruct( + ECBackend& ecbackend, + const std::map > + > &reads, + bool fast_read, + GenContextURef > &&> &&func); + + void filter_read_op( + const OSDMapRef& osdmap, + ReadOp &op); + + void complete_read_op(ReadOp &rop); + + void start_read_op( + int priority, + std::map> &want_to_read, + std::map &to_read, + OpRequestRef op, + bool do_redundant_reads, + bool for_recovery, + GenContext *on_complete); + + void do_read_op(ReadOp &rop); + + int send_all_remaining_reads( + const hobject_t &hoid, + ReadOp &rop); + + void on_change(); + + void kick_reads(); + + std::map tid_to_read_map; + std::map > shard_to_read_map; + std::list in_progress_client_reads; + + CephContext* cct; + ceph::ErasureCodeInterfaceRef ec_impl; + const ECUtil::stripe_info_t& sinfo; + PGBackend::Listener* parent; + // TODO: lay an interface down here + ECBackend& ec_backend; + + PGBackend::Listener *get_parent() const { return parent; } + const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); } + epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); } + const pg_info_t &get_info() { return get_parent()->get_info(); } + + ReadPipeline(CephContext* cct, + ceph::ErasureCodeInterfaceRef ec_impl, + const ECUtil::stripe_info_t& sinfo, + PGBackend::Listener* parent, + ECBackend& ec_backend) + : cct(cct), + ec_impl(std::move(ec_impl)), + sinfo(sinfo), + parent(parent), + ec_backend(ec_backend) { + } + + int get_remaining_shards( + const hobject_t &hoid, + const std::set &avail, + const std::set &want, + const read_result_t &result, + std::map>> *to_read, + bool for_recovery); + + void get_all_avail_shards( + const hobject_t &hoid, + const std::set &error_shards, + std::set &have, + std::map &shards, + bool for_recovery); + + friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); + friend struct FinishReadOp; + } read_pipeline; /** @@ -721,14 +766,6 @@ public: std::map>> *to_read ///< [out] shards, corresponding subchunks to read ); ///< @return error code, 0 on success - int get_remaining_shards( - const hobject_t &hoid, - const std::set &avail, - const std::set &want, - const read_result_t &result, - std::map>> *to_read, - bool for_recovery); - int objects_get_attrs( const hobject_t &hoid, std::map> *out) override; -- 2.39.5