From 76d780f743cbdd648bd9eca73e0c8d2fc4c10532 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Thu, 28 Sep 2023 18:22:41 +0200 Subject: [PATCH] osd: finish shuffling ECCommon::ReadPipeline to ECCommon.cc MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/osd/ECBackend.cc | 183 ------------------------------------------- src/osd/ECCommon.cc | 169 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+), 183 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index afa5d5bba44e6..e29d181ec0b66 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -55,12 +55,6 @@ using ceph::Formatter; static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { return pgb->get_parent()->gen_dbg_prefix(*_dout); } -static ostream& _prefix(std::ostream *_dout, ECCommon::RMWPipeline *rmw_pipeline) { - return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout); -} -static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) { - return read_pipeline->get_parent()->gen_dbg_prefix(*_dout); -} struct ECRecoveryHandle : public PGBackend::RecoveryHandle { list ops; @@ -92,14 +86,6 @@ static ostream &operator<<(ostream &lhs, const map &rhs) return lhs << "]"; } -static ostream &operator<<( - ostream &lhs, - const boost::tuple > &rhs) -{ - return lhs << "(" << rhs.get<0>() << ", " - << rhs.get<1>() << ", " << rhs.get<2>() << ")"; -} - ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) { return lhs << "RecoveryOp(" @@ -1589,70 +1575,6 @@ void ECBackend::objects_read_async( on_complete))); } -struct ClientReadCompleter : ECCommon::ReadCompleter { - ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline, - ECCommon::ClientAsyncReadStatus *status) - : read_pipeline(read_pipeline), - status(status) {} - - void finish_single_request( - const hobject_t &hoid, - ECCommon::read_result_t &res, - list > to_read) override - { - extent_map result; - if (res.r != 0) - goto out; - ceph_assert(res.returned.size() == to_read.size()); - ceph_assert(res.errors.empty()); - for (auto &&read: to_read) { - pair adjusted = - read_pipeline.sinfo.offset_len_to_stripe_bounds( - make_pair(read.get<0>(), read.get<1>())); - ceph_assert(res.returned.front().get<0>() == adjusted.first); - ceph_assert(res.returned.front().get<1>() == adjusted.second); - map to_decode; - bufferlist bl; - for (map::iterator j = - res.returned.front().get<2>().begin(); - j != res.returned.front().get<2>().end(); - ++j) { - to_decode[j->first.shard] = std::move(j->second); - } - int r = ECUtil::decode( - read_pipeline.sinfo, - read_pipeline.ec_impl, - to_decode, - &bl); - if (r < 0) { - res.r = r; - goto out; - } - bufferlist trimmed; - trimmed.substr_of( - bl, - read.get<0>() - adjusted.first, - std::min(read.get<1>(), - bl.length() - (read.get<0>() - adjusted.first))); - result.insert( - read.get<0>(), trimmed.length(), std::move(trimmed)); - res.returned.pop_front(); - } -out: - status->complete_object(hoid, res.r, std::move(result)); - read_pipeline.kick_reads(); - } - - void finish(int priority) && override - { - // NOP - } - - ECCommon::ReadPipeline &read_pipeline; - ECCommon::ClientAsyncReadStatus *status; -}; - - void ECBackend::objects_read_and_reconstruct( const map > @@ -1664,111 +1586,6 @@ void ECBackend::objects_read_and_reconstruct( reads, fast_read, std::move(func)); } -void ECCommon::ReadPipeline::get_want_to_read_shards( - std::set *want_to_read) const -{ - const std::vector &chunk_mapping = ec_impl->get_chunk_mapping(); - for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) { - int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i; - want_to_read->insert(chunk); - } -} - -void ECCommon::ReadPipeline::objects_read_and_reconstruct( - const map > - > &reads, - bool fast_read, - GenContextURef > &&> &&func) -{ - in_progress_client_reads.emplace_back( - reads.size(), std::move(func)); - if (!reads.size()) { - kick_reads(); - return; - } - - map> obj_want_to_read; - set want_to_read; - get_want_to_read_shards(&want_to_read); - - map for_read_op; - for (auto &&to_read: reads) { - map>> shards; - int r = get_min_avail_to_read_shards( - to_read.first, - want_to_read, - false, - fast_read, - &shards); - ceph_assert(r == 0); - - for_read_op.insert( - make_pair( - to_read.first, - read_request_t( - to_read.second, - shards, - false))); - obj_want_to_read.insert(make_pair(to_read.first, want_to_read)); - } - - start_read_op( - CEPH_MSG_PRIO_DEFAULT, - obj_want_to_read, - for_read_op, - OpRequestRef(), - fast_read, - false, - std::make_unique(*this, &(in_progress_client_reads.back()))); -} - - -int ECCommon::ReadPipeline::send_all_remaining_reads( - const hobject_t &hoid, - ReadOp &rop) -{ - set already_read; - const set& ots = rop.obj_to_source[hoid]; - for (set::iterator i = ots.begin(); i != ots.end(); ++i) - already_read.insert(i->shard); - dout(10) << __func__ << " have/error shards=" << already_read << dendl; - map>> shards; - int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid], - rop.complete[hoid], &shards, rop.for_recovery); - if (r) - return r; - - list > offsets = - rop.to_read.find(hoid)->second.to_read; - - // (Note cuixf) If we need to read attrs and we read failed, try to read again. - bool want_attrs = - rop.to_read.find(hoid)->second.want_attrs && - (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty()); - if (want_attrs) { - dout(10) << __func__ << " want attrs again" << dendl; - } - - rop.to_read.erase(hoid); - rop.to_read.insert(make_pair( - hoid, - read_request_t( - offsets, - shards, - want_attrs))); - return 0; -} - -void ECCommon::ReadPipeline::kick_reads() -{ - while (in_progress_client_reads.size() && - in_progress_client_reads.front().is_complete()) { - in_progress_client_reads.front().run(); - in_progress_client_reads.pop_front(); - } -} - void ECBackend::kick_reads() { read_pipeline.kick_reads(); } diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc index 7ed15a061f810..e1bbfce7796ce 100644 --- a/src/osd/ECCommon.cc +++ b/src/osd/ECCommon.cc @@ -565,6 +565,175 @@ void ECCommon::ReadPipeline::do_read_op(ReadOp &op) dout(10) << __func__ << ": started " << op << dendl; } +void ECCommon::ReadPipeline::get_want_to_read_shards( + std::set *want_to_read) const +{ + const std::vector &chunk_mapping = ec_impl->get_chunk_mapping(); + for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) { + int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i; + want_to_read->insert(chunk); + } +} + +struct ClientReadCompleter : ECCommon::ReadCompleter { + ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline, + ECCommon::ClientAsyncReadStatus *status) + : read_pipeline(read_pipeline), + status(status) {} + + void finish_single_request( + const hobject_t &hoid, + ECCommon::read_result_t &res, + list > to_read) override + { + extent_map result; + if (res.r != 0) + goto out; + ceph_assert(res.returned.size() == to_read.size()); + ceph_assert(res.errors.empty()); + for (auto &&read: to_read) { + pair adjusted = + read_pipeline.sinfo.offset_len_to_stripe_bounds( + make_pair(read.get<0>(), read.get<1>())); + ceph_assert(res.returned.front().get<0>() == adjusted.first); + ceph_assert(res.returned.front().get<1>() == adjusted.second); + map to_decode; + bufferlist bl; + for (map::iterator j = + res.returned.front().get<2>().begin(); + j != res.returned.front().get<2>().end(); + ++j) { + to_decode[j->first.shard] = std::move(j->second); + } + int r = ECUtil::decode( + read_pipeline.sinfo, + read_pipeline.ec_impl, + to_decode, + &bl); + if (r < 0) { + res.r = r; + goto out; + } + bufferlist trimmed; + trimmed.substr_of( + bl, + read.get<0>() - adjusted.first, + std::min(read.get<1>(), + bl.length() - (read.get<0>() - adjusted.first))); + result.insert( + read.get<0>(), trimmed.length(), std::move(trimmed)); + res.returned.pop_front(); + } +out: + status->complete_object(hoid, res.r, std::move(result)); + read_pipeline.kick_reads(); + } + + void finish(int priority) && override + { + // NOP + } + + ECCommon::ReadPipeline &read_pipeline; + ECCommon::ClientAsyncReadStatus *status; +}; + +void ECCommon::ReadPipeline::objects_read_and_reconstruct( + const map > + > &reads, + bool fast_read, + GenContextURef > &&> &&func) +{ + in_progress_client_reads.emplace_back( + reads.size(), std::move(func)); + if (!reads.size()) { + kick_reads(); + return; + } + + map> obj_want_to_read; + set want_to_read; + get_want_to_read_shards(&want_to_read); + + map for_read_op; + for (auto &&to_read: reads) { + map>> shards; + int r = get_min_avail_to_read_shards( + to_read.first, + want_to_read, + false, + fast_read, + &shards); + ceph_assert(r == 0); + + for_read_op.insert( + make_pair( + to_read.first, + read_request_t( + to_read.second, + shards, + false))); + obj_want_to_read.insert(make_pair(to_read.first, want_to_read)); + } + + start_read_op( + CEPH_MSG_PRIO_DEFAULT, + obj_want_to_read, + for_read_op, + OpRequestRef(), + fast_read, + false, + std::make_unique(*this, &(in_progress_client_reads.back()))); +} + + +int ECCommon::ReadPipeline::send_all_remaining_reads( + const hobject_t &hoid, + ReadOp &rop) +{ + set already_read; + const set& ots = rop.obj_to_source[hoid]; + for (set::iterator i = ots.begin(); i != ots.end(); ++i) + already_read.insert(i->shard); + dout(10) << __func__ << " have/error shards=" << already_read << dendl; + map>> shards; + int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid], + rop.complete[hoid], &shards, rop.for_recovery); + if (r) + return r; + + list > offsets = + rop.to_read.find(hoid)->second.to_read; + + // (Note cuixf) If we need to read attrs and we read failed, try to read again. + bool want_attrs = + rop.to_read.find(hoid)->second.want_attrs && + (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty()); + if (want_attrs) { + dout(10) << __func__ << " want attrs again" << dendl; + } + + rop.to_read.erase(hoid); + rop.to_read.insert(make_pair( + hoid, + read_request_t( + offsets, + shards, + want_attrs))); + return 0; +} + +void ECCommon::ReadPipeline::kick_reads() +{ + while (in_progress_client_reads.size() && + in_progress_client_reads.front().is_complete()) { + in_progress_client_reads.front().run(); + in_progress_client_reads.pop_front(); + } +} + + void ECCommon::RMWPipeline::start_rmw(OpRef op) { ceph_assert(op); -- 2.39.5