From b37416b2f15e2622ed5c9d9176770b5d7a308120 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Mon, 11 Sep 2023 14:00:16 +0200 Subject: [PATCH] osd: rework the callback infrastructure around read_pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/osd/ECBackend.cc | 168 +++++++++++++++++++------------------------ src/osd/ECBackend.h | 29 +++++--- 2 files changed, 92 insertions(+), 105 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 6c7364f5f835e..386c1eeabe32a 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -254,48 +254,17 @@ void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &re get_parent()->on_failed_pull(fl, hoid, v); } -struct OnRecoveryReadComplete : - public GenContext { - struct RecoveryMessages* rm; - ECBackend *backend; - hobject_t hoid; - - OnRecoveryReadComplete() = delete; - OnRecoveryReadComplete(RecoveryMessages* rm, ECBackend *backend, const hobject_t &hoid) - : rm(rm), backend(backend), hoid(hoid) {} - void finish(ECBackend::read_result_t &res) override { - if (!(res.r == 0 && res.errors.empty())) { - backend->_failed_push(hoid, res); - return; - } - ceph_assert(res.returned.size() == 1); - backend->handle_recovery_read_complete( - hoid, - res.returned.back(), - res.attrs, - rm); - } -}; - -struct RecoveryMessages : GenContext { - ECBackend *ec; +struct RecoveryMessages { map recovery_reads; - RecoveryMessages* next_recovery_messages = nullptr; map> want_to_read; - RecoveryMessages(ECBackend* ec) : ec(ec) {} - void recovery_read( - ECBackend *ec, const hobject_t &hoid, uint64_t off, uint64_t len, set &&_want_to_read, const map>> &need, bool attrs) { - if (!next_recovery_messages) { - next_recovery_messages = new RecoveryMessages{ec}; - } list > to_read; to_read.push_back(boost::make_tuple(off, len, 0)); ceph_assert(!recovery_reads.count(hoid)); @@ -306,22 +275,12 @@ struct RecoveryMessages : GenContext { ECBackend::read_request_t( to_read, need, - attrs, - new OnRecoveryReadComplete( - next_recovery_messages, - ec, - hoid)))); + attrs))); } map > pushes; map > push_replies; ObjectStore::Transaction t; - RecoveryMessages() = delete; - ~RecoveryMessages() {} - - void finish(int priority) override { - ec->dispatch_recovery_messages(*this, priority); - } }; void ECBackend::handle_recovery_push( @@ -556,6 +515,36 @@ struct SendPushReplies : public Context { } }; +struct RecoveryReadCompleter : ECBackend::ReadCompleter { + RecoveryReadCompleter(ECBackend& backend) + : backend(backend) {} + + void finish_single_request( + const hobject_t &hoid, + ECBackend::read_result_t &res, + list >) override + { + if (!(res.r == 0 && res.errors.empty())) { + backend._failed_push(hoid, res); + return; + } + ceph_assert(res.returned.size() == 1); + backend.handle_recovery_read_complete( + hoid, + res.returned.back(), + res.attrs, + &rm); + } + + void finish(int priority) && override + { + backend.dispatch_recovery_messages(rm, priority); + } + + ECBackend& backend; + RecoveryMessages rm; +}; + void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) { for (map >::iterator i = m.pushes.begin(); @@ -607,8 +596,9 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) m.want_to_read, m.recovery_reads, OpRequestRef(), - false, true, m.next_recovery_messages); - m.next_recovery_messages = nullptr; + false, + true, + std::make_unique(*this)); } void ECBackend::continue_recovery_op( @@ -656,7 +646,6 @@ void ECBackend::continue_recovery_op( return; } m->recovery_read( - this, op.hoid, op.recovery_progress.data_recovered_to, amount, @@ -773,7 +762,7 @@ void ECBackend::run_recovery_op( int priority) { ECRecoveryHandle *h = static_cast(_h); - RecoveryMessages m{this}; + RecoveryMessages m; for (list::iterator i = h->ops.begin(); i != h->ops.end(); ++i) { @@ -884,7 +873,7 @@ bool ECBackend::_handle_message( } case MSG_OSD_PG_PUSH: { auto op = _op->get_req(); - RecoveryMessages rm{this}; + RecoveryMessages rm; for (vector::const_iterator i = op->pushes.begin(); i != op->pushes.end(); ++i) { @@ -896,7 +885,7 @@ bool ECBackend::_handle_message( case MSG_OSD_PG_PUSH_REPLY: { const MOSDPGPushReply *op = static_cast( _op->get_req()); - RecoveryMessages rm{this}; + RecoveryMessages rm; for (vector::const_iterator i = op->replies.begin(); i != op->replies.end(); ++i) { @@ -1366,11 +1355,14 @@ void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop) rop.complete.begin(); ceph_assert(rop.to_read.size() == rop.complete.size()); for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) { - if (reqiter->second.cb) { - reqiter->second.cb->complete(resiter->second); - reqiter->second.cb = nullptr; - } - } + rop.on_complete->finish_single_request( + reqiter->first, + resiter->second, + reqiter->second.to_read); + } + ceph_assert(rop.on_complete); + std::move(*rop.on_complete).finish(rop.priority); + rop.on_complete = nullptr; // if the read op is over. clean all the data of this tid. for (set::iterator iter = rop.in_progress.begin(); iter != rop.in_progress.end(); @@ -1379,10 +1371,6 @@ void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop) } rop.in_progress.clear(); tid_to_read_map.erase(rop.tid); - if (rop.on_complete) { - rop.on_complete->complete(rop.priority); - rop.on_complete = nullptr; - } } struct FinishReadOp : public GenContext { @@ -1445,10 +1433,6 @@ void ECBackend::ReadPipeline::filter_read_op( read_request_t &req = op.to_read.find(*i)->second; dout(10) << __func__ << ": canceling " << req << " for obj " << *i << dendl; - ceph_assert(req.cb); - delete req.cb; - req.cb = nullptr; - op.to_read.erase(*i); op.complete.erase(*i); on_erase(*i); @@ -1516,13 +1500,6 @@ void ECBackend::ReadPipeline::on_change() i != tid_to_read_map.end(); ++i) { dout(10) << __func__ << ": cancelling " << i->second << dendl; - for (map::iterator j = - i->second.to_read.begin(); - j != i->second.to_read.end(); - ++j) { - delete j->second.cb; - j->second.cb = nullptr; - } } tid_to_read_map.clear(); shard_to_read_map.clear(); @@ -1843,7 +1820,7 @@ void ECBackend::ReadPipeline::start_read_op( OpRequestRef _op, bool do_redundant_reads, bool for_recovery, - GenContext *on_complete) + std::unique_ptr on_complete) { ceph_tid_t tid = get_parent()->get_tid(); ceph_assert(!tid_to_read_map.count(tid)); @@ -1854,7 +1831,7 @@ void ECBackend::ReadPipeline::start_read_op( tid, do_redundant_reads, for_recovery, - on_complete, + std::move(on_complete), _op, std::move(want_to_read), std::move(to_read))).first->second; @@ -2429,19 +2406,17 @@ void ECBackend::objects_read_async( on_complete))); } -struct CallClientContexts : - public GenContext { - hobject_t hoid; - ECBackend::ReadPipeline &read_pipeline; - ECBackend::ClientAsyncReadStatus *status; - list > to_read; - CallClientContexts( - hobject_t hoid, - ECBackend::ReadPipeline &read_pipeline, - ECBackend::ClientAsyncReadStatus *status, - const list > &to_read) - : hoid(hoid), read_pipeline(read_pipeline), status(status), to_read(to_read) {} - void finish(ECBackend::read_result_t &res) override { +struct ClientReadCompleter : ECBackend::ReadCompleter { + ClientReadCompleter(ECBackend::ReadPipeline &read_pipeline, + ECBackend::ClientAsyncReadStatus *status) + : read_pipeline(read_pipeline), + status(status) {} + + void finish_single_request( + const hobject_t &hoid, + ECBackend::read_result_t &res, + list > to_read) override + { extent_map result; if (res.r != 0) goto out; @@ -2484,8 +2459,17 @@ out: status->complete_object(hoid, res.r, std::move(result)); read_pipeline.kick_reads(); } + + void finish(int priority) && override + { + // NOP + } + + ECBackend::ReadPipeline &read_pipeline; + ECBackend::ClientAsyncReadStatus *status; }; + void ECBackend::objects_read_and_reconstruct( const map > @@ -2541,19 +2525,13 @@ void ECBackend::ReadPipeline::objects_read_and_reconstruct( &shards); ceph_assert(r == 0); - CallClientContexts *c = new CallClientContexts( - to_read.first, - *this, - &(in_progress_client_reads.back()), - to_read.second); for_read_op.insert( make_pair( to_read.first, read_request_t( to_read.second, shards, - false, - c))); + false))); obj_want_to_read.insert(make_pair(to_read.first, want_to_read)); } @@ -2562,7 +2540,9 @@ void ECBackend::ReadPipeline::objects_read_and_reconstruct( obj_want_to_read, for_read_op, OpRequestRef(), - fast_read, false, nullptr); + fast_read, + false, + std::make_unique(*this, &(in_progress_client_reads.back()))); }(); return; } @@ -2585,7 +2565,6 @@ int ECBackend::ReadPipeline::send_all_remaining_reads( list > offsets = rop.to_read.find(hoid)->second.to_read; - GenContext *c = rop.to_read.find(hoid)->second.cb; // (Note cuixf) If we need to read attrs and we read failed, try to read again. bool want_attrs = @@ -2601,8 +2580,7 @@ int ECBackend::ReadPipeline::send_all_remaining_reads( read_request_t( offsets, shards, - want_attrs, - c))); + want_attrs))); return 0; } diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index 94267144ba4cd..b859414160bd7 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -272,6 +272,7 @@ private: friend struct RecoveryMessages; void dispatch_recovery_messages(RecoveryMessages &m, int priority); friend struct OnRecoveryReadComplete; + friend struct RecoveryReadCompleter; void handle_recovery_read_complete( const hobject_t &hoid, boost::tuple > &to_read, @@ -321,17 +322,25 @@ public: const std::list > to_read; std::map>> need; bool want_attrs; - GenContext *cb; read_request_t( const std::list > &to_read, const std::map>> &need, - bool want_attrs, - GenContext *cb) - : to_read(to_read), need(need), want_attrs(want_attrs), - cb(cb) {} + bool want_attrs) + : to_read(to_read), need(need), want_attrs(want_attrs) {} }; friend ostream &operator<<(ostream &lhs, const read_request_t &rhs); + struct ReadCompleter { + virtual void finish_single_request( + const hobject_t &hoid, + ECBackend::read_result_t &res, + std::list > to_read) = 0; + + virtual void finish(int priority) && = 0; + + virtual ~ReadCompleter() = default; + }; + struct ReadOp { int priority; ceph_tid_t tid; @@ -343,7 +352,7 @@ public: // True if reading for recovery which could possibly reading only a subset // of the available shards. bool for_recovery; - GenContext *on_complete; + std::unique_ptr on_complete; ZTracer::Trace trace; @@ -363,7 +372,7 @@ public: ceph_tid_t tid, bool do_redundant_reads, bool for_recovery, - GenContext *on_complete, + std::unique_ptr _on_complete, OpRequestRef op, std::map> &&_want_to_read, std::map &&_to_read) @@ -372,7 +381,7 @@ public: op(op), do_redundant_reads(do_redundant_reads), for_recovery(for_recovery), - on_complete(on_complete), + on_complete(std::move(_on_complete)), want_to_read(std::move(_want_to_read)), to_read(std::move(_to_read)) { for (auto &&hpair: to_read) { @@ -387,7 +396,7 @@ public: } } ReadOp() = delete; - ReadOp(const ReadOp &) = default; + ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr ReadOp(ReadOp &&) = default; }; @@ -417,7 +426,7 @@ public: OpRequestRef op, bool do_redundant_reads, bool for_recovery, - GenContext *on_complete); + std::unique_ptr on_complete); void do_read_op(ReadOp &rop); -- 2.39.5