From 31536662811302b400fbafe4daf512de8bd1c998 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Thu, 9 May 2024 21:00:05 +0000 Subject: [PATCH] osd: shuffle ECCommon::RecoveryBackend from ECBackend.cc to ECCommon.cc It's just code movement; there is no changes apart that. Signed-off-by: Radoslaw Zarzynski (cherry picked from commit ef644c9d29b8adaef228a20fc96830724d1fc3f5) --- src/osd/ECBackend.cc | 579 ------------------------------------------ src/osd/ECCommon.cc | 583 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 583 insertions(+), 579 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index bb641d1eefd..c5158d87577 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -67,17 +67,6 @@ struct ECBackend::ECRecoveryBackend::ECRecoveryHandle : public PGBackend::Recove list ops; }; -void ECCommon::RecoveryBackend::RecoveryOp::dump(Formatter *f) const { - f->dump_stream("hoid") << hoid; - f->dump_stream("v") << v; - f->dump_stream("missing_on") << missing_on; - f->dump_stream("missing_on_shards") << missing_on_shards; - f->dump_stream("recovery_info") << recovery_info; - f->dump_stream("recovery_progress") << recovery_progress; - f->dump_stream("state") << tostr(state); - f->dump_stream("waiting_on_pushes") << waiting_on_pushes; -} - ECBackend::ECBackend( PGBackend::Listener *pg, CephContext *cct, @@ -107,41 +96,10 @@ PGBackend::RecoveryHandle *ECBackend::open_recovery_op() { recovery_backend.open_recovery_op()); } -ECCommon::RecoveryBackend::RecoveryBackend( - CephContext *cct, - const coll_t &coll, - ceph::ErasureCodeInterfaceRef ec_impl, - const ECUtil::stripe_info_t &sinfo, - ReadPipeline &read_pipeline, - ECListener *parent) - : cct(cct), - coll(coll), - ec_impl(std::move(ec_impl)), - sinfo(sinfo), - read_pipeline(read_pipeline), - parent(parent) {} - ECBackend::ECRecoveryBackend::ECRecoveryHandle *ECBackend::ECRecoveryBackend::open_recovery_op() { return new ECRecoveryHandle; } -void ECCommon::RecoveryBackend::_failed_push(const hobject_t &hoid, - ECCommon::read_result_t &res) { - dout(10) << __func__ << ": Read error " << hoid << " r=" - << res.r << " errors=" << res.errors << dendl; - dout(10) << __func__ << ": canceling recovery op for obj " << hoid - << dendl; - ceph_assert(recovery_ops.count(hoid)); - eversion_t v = recovery_ops[hoid].v; - recovery_ops.erase(hoid); - - set fl; - for (auto &&i: res.errors) { - fl.insert(i.first); - } - get_parent()->on_failed_pull(fl, hoid, v); -} - void ECBackend::handle_recovery_push( const PushOp &op, RecoveryMessages *m, @@ -176,218 +134,6 @@ void ECBackend::handle_recovery_push( } } -void ECCommon::RecoveryBackend::handle_recovery_push( - const PushOp &op, - RecoveryMessages *m, - bool is_repair) { - if (get_parent()->check_failsafe_full()) { - dout(10) << __func__ << " Out of space (failsafe) processing push request." - << dendl; - ceph_abort(); - } - - bool oneshot = op.before_progress.first && op.after_progress.data_complete; - ghobject_t tobj; - if (oneshot) { - tobj = ghobject_t(op.soid, ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard); - } else { - tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid, - op.version), - ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard); - if (op.before_progress.first) { - dout(10) << __func__ << ": Adding oid " - << tobj.hobj << " in the temp collection" << dendl; - add_temp_obj(tobj.hobj); - } - } - - if (op.before_progress.first) { - m->t.remove(coll, tobj); - m->t.touch(coll, tobj); - } - - ceph_assert(op.data.length() == op.data_included.size()); - uint64_t tobj_size = 0; - - uint64_t cursor = 0; - for (auto [off, len] : op.data_included) { - bufferlist bl; - if (len != op.data.length()) { - bl.substr_of(op.data, cursor, len); - } else { - bl = op.data; - } - m->t.write(coll, tobj, off, len, bl); - tobj_size = off + len; - cursor += len; - } - - if (op.before_progress.first) { - ceph_assert(op.attrset.contains(OI_ATTR)); - m->t.setattrs( - coll, - tobj, - op.attrset); - } - - if (op.after_progress.data_complete) { - uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size, - get_parent()->whoami_shard().shard); - ceph_assert(shard_size >= tobj_size); - if (shard_size != tobj_size) { - m->t.truncate( coll, tobj, shard_size); - } - } - - if (op.after_progress.data_complete && !oneshot) { - dout(10) << __func__ << ": Removing oid " - << tobj.hobj << " from the temp collection" << dendl; - clear_temp_obj(tobj.hobj); - m->t.remove(coll, ghobject_t( - op.soid, ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard)); - m->t.collection_move_rename( - coll, tobj, - coll, ghobject_t( - op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); - } - if (op.after_progress.data_complete) { - if ((get_parent()->pgb_is_primary())) { - ceph_assert(recovery_ops.count(op.soid)); - ceph_assert(recovery_ops[op.soid].obc); - if (get_parent()->pg_is_repair() || is_repair) - get_parent()->inc_osd_stat_repaired(); - get_parent()->on_local_recover( - op.soid, - op.recovery_info, - recovery_ops[op.soid].obc, - false, - &m->t); - } else { - // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired - if (is_repair) - get_parent()->inc_osd_stat_repaired(); - get_parent()->on_local_recover( - op.soid, - op.recovery_info, - ObjectContextRef(), - false, - &m->t); - } - } - m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp()); - m->push_replies[get_parent()->primary_shard()].back().soid = op.soid; -} - -void ECCommon::RecoveryBackend::handle_recovery_push_reply( - const PushReplyOp &op, - pg_shard_t from, - RecoveryMessages *m) { - if (!recovery_ops.count(op.soid)) - return; - RecoveryOp &rop = recovery_ops[op.soid]; - ceph_assert(rop.waiting_on_pushes.contains(from)); - rop.waiting_on_pushes.erase(from); - continue_recovery_op(rop, m); -} - -void ECCommon::RecoveryBackend::update_object_size_after_read( - uint64_t size, - read_result_t &res, - read_request_t &req) { - // We didn't know the size before, meaning the zero for decode calculations - // will be off. Recalculate them! - ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m()); - sinfo.ro_size_to_zero_mask(size, zero_mask); - ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m()); - sinfo.ro_size_to_read_mask(size, read_mask); - extent_set superset = res.buffers_read.get_extent_superset(); - - for (auto &&[shard, eset] : zero_mask) { - eset.intersection_of(superset); - if (!eset.empty() && - (res.zero_length_reads.contains(shard) || - res.buffers_read.contains(shard))) { - req.zeros_for_decode[shard].insert(eset); - } - } - - /* Correct the shard_want_to_read, to make sure everything is within scope - * of the newly found object size. - */ - for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) { - auto &&[shard, eset] = *iter; - bool erase = false; - - if (read_mask.contains(shard)) { - eset.intersection_of(read_mask.get(shard)); - erase = eset.empty(); - } else { - erase = true; - } - - /* Some shards may be empty */ - if (erase) { - iter = req.shard_want_to_read.erase(iter); - } else { - ++iter; - } - } - - dout(20) << "Update want and zeros from read:size=" << size - << " res=" << res - << " req=" << req - << dendl; -} - -void ECCommon::RecoveryBackend::handle_recovery_read_complete( - const hobject_t &hoid, - read_result_t &&res, - read_request_t &req, - RecoveryMessages *m) { - dout(10) << __func__ << ": returned " << hoid << " " << res << dendl; - ceph_assert(recovery_ops.contains(hoid)); - RecoveryBackend::RecoveryOp &op = recovery_ops[hoid]; - - if (res.attrs) { - op.xattrs.swap(*(res.attrs)); - const auto empty_obc = !op.obc; - maybe_load_obc(op.xattrs, op); -#ifdef WITH_CRIMSON - ceph_assert(hoid == op.hoid); -#endif - if (empty_obc) { - update_object_size_after_read(op.recovery_info.size, res, req); - } - } - ceph_assert(op.xattrs.size()); - ceph_assert(op.obc); - - op.returned_data.emplace(std::move(res.buffers_read)); - uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size); - - dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: " - << op.returned_data->debug_string(2048, 0) - << dendl; - - op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode); - int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true); - ceph_assert(r == 0); - - // Finally, we don't want to write any padding, so truncate the buffer - // to remove it. - op.returned_data->erase_after_ro_offset(aligned_size); - - dout(20) << __func__ << ": oid=" << op.hoid << dendl; - dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: " - << op.returned_data->debug_string(2048, 0) - << dendl; - - continue_recovery_op(op, m); -} - void ECBackend::ECRecoveryBackend::maybe_load_obc( const std::map>& raw_attrs, RecoveryOp &op) @@ -447,34 +193,6 @@ struct SendPushReplies : public Context { } }; -struct RecoveryReadCompleter : ECCommon::ReadCompleter { - RecoveryReadCompleter(ECCommon::RecoveryBackend &backend) - : backend(backend) {} - - void finish_single_request( - const hobject_t &hoid, - ECCommon::read_result_t &&res, - ECCommon::read_request_t &req) override { - if (!(res.r == 0 && res.errors.empty())) { - backend._failed_push(hoid, res); - return; - } - ceph_assert(req.to_read.size() == 0); - backend.handle_recovery_read_complete( - hoid, - std::move(res), - req, - &rm); - } - - void finish(int priority) && override { - backend.dispatch_recovery_messages(rm, priority); - } - - ECCommon::RecoveryBackend &backend; - RecoveryMessages rm; -}; - void ECBackend::ECRecoveryBackend::commit_txn_send_replies( ceph::os::Transaction &&txn, std::map replies) { @@ -487,261 +205,6 @@ void ECBackend::ECRecoveryBackend::commit_txn_send_replies( get_parent()->queue_transaction(std::move(txn)); } -void ECCommon::RecoveryBackend::dispatch_recovery_messages( - RecoveryMessages &m, int priority) { - for (map>::iterator i = m.pushes.begin(); - i != m.pushes.end(); - m.pushes.erase(i++)) { - MOSDPGPush *msg = new MOSDPGPush(); - msg->set_priority(priority); - msg->map_epoch = get_parent()->pgb_get_osdmap_epoch(); - msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); - msg->from = get_parent()->whoami_shard(); - msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); - msg->pushes.swap(i->second); - msg->compute_cost(cct); - msg->is_repair = get_parent()->pg_is_repair(); - get_parent()->send_message_osd_cluster(i->first.osd, msg, msg->map_epoch); - } - std::map replies; - for (map>::iterator i = - m.push_replies.begin(); - i != m.push_replies.end(); - m.push_replies.erase(i++)) { - MOSDPGPushReply *msg = new MOSDPGPushReply(); - msg->set_priority(priority); - msg->map_epoch = get_parent()->pgb_get_osdmap_epoch(); - msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); - msg->from = get_parent()->whoami_shard(); - msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); - msg->replies.swap(i->second); - msg->compute_cost(cct); - replies.insert(std::pair(i->first.osd, msg)); - } - - if (!replies.empty()) { - dout(20) << __func__ << " recovery_transactions="; - Formatter *f = Formatter::create("json"); - f->open_object_section("t"); - m.t.dump(f); - f->close_section(); - f->flush(*_dout); - delete f; - *_dout << dendl; - commit_txn_send_replies(std::move(m.t), std::move(replies)); - } - - if (m.recovery_reads.empty()) - return; - read_pipeline.start_read_op( - priority, - m.recovery_reads, - false, - true, - std::make_unique(*this)); -} - -void ECCommon::RecoveryBackend::continue_recovery_op( - RecoveryBackend::RecoveryOp &op, - RecoveryMessages *m) { - dout(10) << __func__ << ": continuing " << op << dendl; - using RecoveryOp = RecoveryBackend::RecoveryOp; - while (1) { - switch (op.state) { - case RecoveryOp::IDLE: { - ceph_assert(!op.recovery_progress.data_complete); - ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m()); - - op.state = RecoveryOp::READING; - - /* When beginning recovery, the OI may not be known. As such the object - * size is not known. For the first read, attempt to read the default - * size. If this is larger than the object sizes, then the OSD will - * return truncated reads. If the object size is known, then attempt - * correctly sized reads. - */ - uint64_t read_size = get_recovery_chunk_size(); - if (op.obc) { - uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) - - op.recovery_progress.data_recovered_to; - - if (read_to_end < read_size) { - read_size = read_to_end; - } - } - sinfo.ro_range_to_shard_extent_set_with_parity( - op.recovery_progress.data_recovered_to, read_size, want); - - op.recovery_progress.data_recovered_to += read_size; - - // We only need to recover shards that are missing. - for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) { - want.erase(shard); - } - - if (op.recovery_progress.first && op.obc) { - op.xattrs = op.obc->attr_cache; - } - - read_request_t read_request(std::move(want), - op.recovery_progress.first && !op.obc, - op.obc - ? op.obc->obs.oi.size - : get_recovery_chunk_size()); - - int r = read_pipeline.get_min_avail_to_read_shards( - op.hoid, true, false, read_request); - - if (r != 0) { - // we must have lost a recovery source - ceph_assert(!op.recovery_progress.first); - dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid - << dendl; - // in crimson - get_parent()->cancel_pull(op.hoid); - recovery_ops.erase(op.hoid); - return; - } - if (read_request.shard_reads.empty()) { - ceph_assert(op.obc); - /* This can happen for several reasons - * - A zero-sized object. - * - The missing shards have no data. - * - The previous recovery did not need the last data shard. In this - * case, data_recovered_to may indicate that the last shard still - * needs recovery, when it does not. - * We can just skip the read and fall through below. - */ - dout(10) << __func__ << " No reads required " << op << dendl; - // Create an empty read result and fall through. - op.returned_data.emplace(&sinfo); - } else { - m->recovery_read( - op.hoid, - read_request); - dout(10) << __func__ << ": IDLE return " << op << dendl; - return; - } - } - [[fallthrough]]; - case RecoveryOp::READING: { - // read completed, start write - ceph_assert(op.xattrs.size()); - ceph_assert(op.returned_data); - dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl; - op.state = RecoveryOp::WRITING; - ObjectRecoveryProgress after_progress = op.recovery_progress; - after_progress.first = false; - if (after_progress.data_recovered_to >= op.obc->obs.oi.size) { - after_progress.data_complete = true; - } - - for (auto &&pg_shard: op.missing_on) { - m->pushes[pg_shard].push_back(PushOp()); - PushOp &pop = m->pushes[pg_shard].back(); - pop.soid = op.hoid; - pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard); - - op.returned_data->get_sparse_buffer(pg_shard.shard, pop.data, pop.data_included); - ceph_assert(pop.data.length() == pop.data_included.size()); - - dout(10) << __func__ << ": pop shard=" << pg_shard - << ", oid=" << pop.soid - << ", before_progress=" << op.recovery_progress - << ", after_progress=" << after_progress - << ", pop.data.length()=" << pop.data.length() - << ", pop.data_included=" << pop.data_included - << ", size=" << op.obc->obs.oi.size << dendl; - - if (op.recovery_progress.first) { - if (sinfo.is_nonprimary_shard(pg_shard.shard)) { - if (pop.version == op.recovery_info.oi.version) { - dout(10) << __func__ << ": copy OI attr only" << dendl; - pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR]; - } else { - // We are recovering a partial write - make sure we push the correct - // version in the OI or a scrub error will occur. - object_info_t oi(op.recovery_info.oi); - oi.shard_versions.clear(); - oi.version = pop.version; - dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl; - bufferlist bl; - oi.encode(bl, get_osdmap()->get_features( - CEPH_ENTITY_TYPE_OSD, nullptr)); - pop.attrset[OI_ATTR] = bl; - } - } else { - dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl; - pop.attrset = op.xattrs; - } - - // Following an upgrade, or turning of overwrites, we can take this - // opportunity to clean up hinfo. - if (pop.attrset.contains(ECUtil::get_hinfo_key())) { - pop.attrset.erase(ECUtil::get_hinfo_key()); - } - } - pop.recovery_info = op.recovery_info; - pop.before_progress = op.recovery_progress; - pop.after_progress = after_progress; - if (pg_shard != get_parent()->primary_shard()) { - // already in crimson -- junction point with PeeringState - get_parent()->begin_peer_recover( - pg_shard, - op.hoid); - } - } - op.returned_data.reset(); - op.waiting_on_pushes = op.missing_on; - op.recovery_progress = after_progress; - dout(10) << __func__ << ": READING return " << op << dendl; - return; - } - case RecoveryOp::WRITING: { - if (op.waiting_on_pushes.empty()) { - if (op.recovery_progress.data_complete) { - op.state = RecoveryOp::COMPLETE; - for (set::iterator i = op.missing_on.begin(); - i != op.missing_on.end(); - ++i) { - if (*i != get_parent()->primary_shard()) { - dout(10) << __func__ << ": on_peer_recover on " << *i - << ", obj " << op.hoid << dendl; - get_parent()->on_peer_recover( - *i, - op.hoid, - op.recovery_info); - } - } - object_stat_sum_t stat; - stat.num_bytes_recovered = op.recovery_info.size; - stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ? - stat.num_objects_recovered = 1; - // TODO: not in crimson yet - if (get_parent()->pg_is_repair()) - stat.num_objects_repaired = 1; - // pg_recovery.cc in crimson has it - get_parent()->on_global_recover(op.hoid, stat, false); - dout(10) << __func__ << ": WRITING return " << op << dendl; - recovery_ops.erase(op.hoid); - return; - } else { - op.state = RecoveryOp::IDLE; - dout(10) << __func__ << ": WRITING continue " << op << dendl; - continue; - } - } - return; - } - // should never be called once complete - case RecoveryOp::COMPLETE: - default: { - ceph_abort(); - }; - } - } -} - void ECBackend::run_recovery_op( PGBackend::RecoveryHandle *_h, int priority) { @@ -778,48 +241,6 @@ int ECBackend::recover_object( return 0; } -ECCommon::RecoveryBackend::RecoveryOp -ECCommon::RecoveryBackend::recover_object( - const hobject_t &hoid, - eversion_t v, - ObjectContextRef head, - ObjectContextRef obc) { - RecoveryOp op; - op.v = v; - op.hoid = hoid; - op.obc = obc; - op.recovery_info.soid = hoid; - op.recovery_info.version = v; - if (obc) { - op.recovery_info.size = obc->obs.oi.size; - op.recovery_info.oi = obc->obs.oi; - } - if (hoid.is_snap()) { - if (obc) { - ceph_assert(obc->ssc); - op.recovery_info.ss = obc->ssc->snapset; - } else if (head) { - ceph_assert(head->ssc); - op.recovery_info.ss = head->ssc->snapset; - } else { - ceph_abort_msg("neither obc nor head set for a snap object"); - } - } - op.recovery_progress.omap_complete = true; - for (set::const_iterator i = - get_parent()->get_acting_recovery_backfill_shards().begin(); - i != get_parent()->get_acting_recovery_backfill_shards().end(); - ++i) { - dout(10) << "checking " << *i << dendl; - if (get_parent()->get_shard_missing(*i).is_missing(hoid)) { - op.missing_on.insert(*i); - op.missing_on_shards.insert(i->shard); - } - } - dout(10) << __func__ << ": built op " << op << dendl; - return op; -} - bool ECBackend::can_handle_while_inactive( OpRequestRef _op) { return false; diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc index b06c68c1659..46e203fa25c 100644 --- a/src/osd/ECCommon.cc +++ b/src/osd/ECCommon.cc @@ -86,6 +86,18 @@ void ECCommon::ReadOp::dump(Formatter *f) const { f->dump_stream("in_progress") << in_progress; } +void ECCommon::RecoveryBackend::RecoveryOp::dump(Formatter *f) const { + f->dump_stream("hoid") << hoid; + f->dump_stream("v") << v; + f->dump_stream("missing_on") << missing_on; + f->dump_stream("missing_on_shards") << missing_on_shards; + f->dump_stream("recovery_info") << recovery_info; + f->dump_stream("recovery_progress") << recovery_progress; + f->dump_stream("state") << tostr(state); + f->dump_stream("waiting_on_pushes") << waiting_on_pushes; +} + + void ECCommon::ReadPipeline::complete_read_op(ReadOp &&rop) { dout(20) << __func__ << " completing " << rop << dendl; auto req_iter = rop.to_read.begin(); @@ -977,3 +989,574 @@ void ECCommon::RMWPipeline::call_write_ordered(std::function &&cb) { next_write_all_shards = true; extent_cache.add_on_write(std::move(cb)); } + +ECCommon::RecoveryBackend::RecoveryBackend( + CephContext *cct, + const coll_t &coll, + ceph::ErasureCodeInterfaceRef ec_impl, + const ECUtil::stripe_info_t &sinfo, + ReadPipeline &read_pipeline, + ECListener *parent) + : cct(cct), + coll(coll), + ec_impl(std::move(ec_impl)), + sinfo(sinfo), + read_pipeline(read_pipeline), + parent(parent) {} + +void ECCommon::RecoveryBackend::_failed_push(const hobject_t &hoid, + ECCommon::read_result_t &res) { + dout(10) << __func__ << ": Read error " << hoid << " r=" + << res.r << " errors=" << res.errors << dendl; + dout(10) << __func__ << ": canceling recovery op for obj " << hoid + << dendl; + ceph_assert(recovery_ops.count(hoid)); + eversion_t v = recovery_ops[hoid].v; + recovery_ops.erase(hoid); + + set fl; + for (auto &&i: res.errors) { + fl.insert(i.first); + } + get_parent()->on_failed_pull(fl, hoid, v); +} + +void ECCommon::RecoveryBackend::handle_recovery_push( + const PushOp &op, + RecoveryMessages *m, + bool is_repair) { + if (get_parent()->check_failsafe_full()) { + dout(10) << __func__ << " Out of space (failsafe) processing push request." + << dendl; + ceph_abort(); + } + + bool oneshot = op.before_progress.first && op.after_progress.data_complete; + ghobject_t tobj; + if (oneshot) { + tobj = ghobject_t(op.soid, ghobject_t::NO_GEN, + get_parent()->whoami_shard().shard); + } else { + tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid, + op.version), + ghobject_t::NO_GEN, + get_parent()->whoami_shard().shard); + if (op.before_progress.first) { + dout(10) << __func__ << ": Adding oid " + << tobj.hobj << " in the temp collection" << dendl; + add_temp_obj(tobj.hobj); + } + } + + if (op.before_progress.first) { + m->t.remove(coll, tobj); + m->t.touch(coll, tobj); + } + + ceph_assert(op.data.length() == op.data_included.size()); + uint64_t tobj_size = 0; + + uint64_t cursor = 0; + for (auto [off, len] : op.data_included) { + bufferlist bl; + if (len != op.data.length()) { + bl.substr_of(op.data, cursor, len); + } else { + bl = op.data; + } + m->t.write(coll, tobj, off, len, bl); + tobj_size = off + len; + cursor += len; + } + + if (op.before_progress.first) { + ceph_assert(op.attrset.contains(OI_ATTR)); + m->t.setattrs( + coll, + tobj, + op.attrset); + } + + if (op.after_progress.data_complete) { + uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size, + get_parent()->whoami_shard().shard); + ceph_assert(shard_size >= tobj_size); + if (shard_size != tobj_size) { + m->t.truncate( coll, tobj, shard_size); + } + } + + if (op.after_progress.data_complete && !oneshot) { + dout(10) << __func__ << ": Removing oid " + << tobj.hobj << " from the temp collection" << dendl; + clear_temp_obj(tobj.hobj); + m->t.remove(coll, ghobject_t( + op.soid, ghobject_t::NO_GEN, + get_parent()->whoami_shard().shard)); + m->t.collection_move_rename( + coll, tobj, + coll, ghobject_t( + op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); + } + if (op.after_progress.data_complete) { + if ((get_parent()->pgb_is_primary())) { + ceph_assert(recovery_ops.count(op.soid)); + ceph_assert(recovery_ops[op.soid].obc); + if (get_parent()->pg_is_repair() || is_repair) + get_parent()->inc_osd_stat_repaired(); + get_parent()->on_local_recover( + op.soid, + op.recovery_info, + recovery_ops[op.soid].obc, + false, + &m->t); + } else { + // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired + if (is_repair) + get_parent()->inc_osd_stat_repaired(); + get_parent()->on_local_recover( + op.soid, + op.recovery_info, + ObjectContextRef(), + false, + &m->t); + } + } + m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp()); + m->push_replies[get_parent()->primary_shard()].back().soid = op.soid; +} + +void ECCommon::RecoveryBackend::handle_recovery_push_reply( + const PushReplyOp &op, + pg_shard_t from, + RecoveryMessages *m) { + if (!recovery_ops.count(op.soid)) + return; + RecoveryOp &rop = recovery_ops[op.soid]; + ceph_assert(rop.waiting_on_pushes.contains(from)); + rop.waiting_on_pushes.erase(from); + continue_recovery_op(rop, m); +} + +void ECCommon::RecoveryBackend::update_object_size_after_read( + uint64_t size, + read_result_t &res, + read_request_t &req) { + // We didn't know the size before, meaning the zero for decode calculations + // will be off. Recalculate them! + ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m()); + sinfo.ro_size_to_zero_mask(size, zero_mask); + ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m()); + sinfo.ro_size_to_read_mask(size, read_mask); + extent_set superset = res.buffers_read.get_extent_superset(); + + for (auto &&[shard, eset] : zero_mask) { + eset.intersection_of(superset); + if (!eset.empty() && + (res.zero_length_reads.contains(shard) || + res.buffers_read.contains(shard))) { + req.zeros_for_decode[shard].insert(eset); + } + } + + /* Correct the shard_want_to_read, to make sure everything is within scope + * of the newly found object size. + */ + for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) { + auto &&[shard, eset] = *iter; + bool erase = false; + + if (read_mask.contains(shard)) { + eset.intersection_of(read_mask.get(shard)); + erase = eset.empty(); + } else { + erase = true; + } + + /* Some shards may be empty */ + if (erase) { + iter = req.shard_want_to_read.erase(iter); + } else { + ++iter; + } + } + + dout(20) << "Update want and zeros from read:size=" << size + << " res=" << res + << " req=" << req + << dendl; +} + +void ECCommon::RecoveryBackend::handle_recovery_read_complete( + const hobject_t &hoid, + read_result_t &&res, + read_request_t &req, + RecoveryMessages *m) { + dout(10) << __func__ << ": returned " << hoid << " " << res << dendl; + ceph_assert(recovery_ops.contains(hoid)); + RecoveryBackend::RecoveryOp &op = recovery_ops[hoid]; + + if (res.attrs) { + op.xattrs.swap(*(res.attrs)); + const auto empty_obc = !op.obc; + maybe_load_obc(op.xattrs, op); +#ifdef WITH_CRIMSON + ceph_assert(hoid == op.hoid); +#endif + if (empty_obc) { + update_object_size_after_read(op.recovery_info.size, res, req); + } + } + ceph_assert(op.xattrs.size()); + ceph_assert(op.obc); + + op.returned_data.emplace(std::move(res.buffers_read)); + uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size); + + dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: " + << op.returned_data->debug_string(2048, 0) + << dendl; + + op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode); + int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true); + ceph_assert(r == 0); + + // Finally, we don't want to write any padding, so truncate the buffer + // to remove it. + op.returned_data->erase_after_ro_offset(aligned_size); + + dout(20) << __func__ << ": oid=" << op.hoid << dendl; + dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: " + << op.returned_data->debug_string(2048, 0) + << dendl; + + continue_recovery_op(op, m); +} + + +struct RecoveryReadCompleter : ECCommon::ReadCompleter { + RecoveryReadCompleter(ECCommon::RecoveryBackend &backend) + : backend(backend) {} + + void finish_single_request( + const hobject_t &hoid, + ECCommon::read_result_t &&res, + ECCommon::read_request_t &req) override { + if (!(res.r == 0 && res.errors.empty())) { + backend._failed_push(hoid, res); + return; + } + ceph_assert(req.to_read.size() == 0); + backend.handle_recovery_read_complete( + hoid, + std::move(res), + req, + &rm); + } + + void finish(int priority) && override { + backend.dispatch_recovery_messages(rm, priority); + } + + ECCommon::RecoveryBackend &backend; + RecoveryMessages rm; +}; + +void ECCommon::RecoveryBackend::dispatch_recovery_messages( + RecoveryMessages &m, int priority) { + for (map>::iterator i = m.pushes.begin(); + i != m.pushes.end(); + m.pushes.erase(i++)) { + MOSDPGPush *msg = new MOSDPGPush(); + msg->set_priority(priority); + msg->map_epoch = get_parent()->pgb_get_osdmap_epoch(); + msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); + msg->pushes.swap(i->second); + msg->compute_cost(cct); + msg->is_repair = get_parent()->pg_is_repair(); + get_parent()->send_message_osd_cluster(i->first.osd, msg, msg->map_epoch); + } + std::map replies; + for (map>::iterator i = + m.push_replies.begin(); + i != m.push_replies.end(); + m.push_replies.erase(i++)) { + MOSDPGPushReply *msg = new MOSDPGPushReply(); + msg->set_priority(priority); + msg->map_epoch = get_parent()->pgb_get_osdmap_epoch(); + msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); + msg->replies.swap(i->second); + msg->compute_cost(cct); + replies.insert(std::pair(i->first.osd, msg)); + } + + if (!replies.empty()) { + dout(20) << __func__ << " recovery_transactions="; + Formatter *f = Formatter::create("json"); + f->open_object_section("t"); + m.t.dump(f); + f->close_section(); + f->flush(*_dout); + delete f; + *_dout << dendl; + commit_txn_send_replies(std::move(m.t), std::move(replies)); + } + + if (m.recovery_reads.empty()) + return; + read_pipeline.start_read_op( + priority, + m.recovery_reads, + false, + true, + std::make_unique(*this)); +} + +void ECCommon::RecoveryBackend::continue_recovery_op( + RecoveryBackend::RecoveryOp &op, + RecoveryMessages *m) { + dout(10) << __func__ << ": continuing " << op << dendl; + using RecoveryOp = RecoveryBackend::RecoveryOp; + while (1) { + switch (op.state) { + case RecoveryOp::IDLE: { + ceph_assert(!op.recovery_progress.data_complete); + ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m()); + + op.state = RecoveryOp::READING; + + /* When beginning recovery, the OI may not be known. As such the object + * size is not known. For the first read, attempt to read the default + * size. If this is larger than the object sizes, then the OSD will + * return truncated reads. If the object size is known, then attempt + * correctly sized reads. + */ + uint64_t read_size = get_recovery_chunk_size(); + if (op.obc) { + uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) - + op.recovery_progress.data_recovered_to; + + if (read_to_end < read_size) { + read_size = read_to_end; + } + } + sinfo.ro_range_to_shard_extent_set_with_parity( + op.recovery_progress.data_recovered_to, read_size, want); + + op.recovery_progress.data_recovered_to += read_size; + + // We only need to recover shards that are missing. + for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) { + want.erase(shard); + } + + if (op.recovery_progress.first && op.obc) { + op.xattrs = op.obc->attr_cache; + } + + read_request_t read_request(std::move(want), + op.recovery_progress.first && !op.obc, + op.obc + ? op.obc->obs.oi.size + : get_recovery_chunk_size()); + + int r = read_pipeline.get_min_avail_to_read_shards( + op.hoid, true, false, read_request); + + if (r != 0) { + // we must have lost a recovery source + ceph_assert(!op.recovery_progress.first); + dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid + << dendl; + // in crimson + get_parent()->cancel_pull(op.hoid); + recovery_ops.erase(op.hoid); + return; + } + if (read_request.shard_reads.empty()) { + ceph_assert(op.obc); + /* This can happen for several reasons + * - A zero-sized object. + * - The missing shards have no data. + * - The previous recovery did not need the last data shard. In this + * case, data_recovered_to may indicate that the last shard still + * needs recovery, when it does not. + * We can just skip the read and fall through below. + */ + dout(10) << __func__ << " No reads required " << op << dendl; + // Create an empty read result and fall through. + op.returned_data.emplace(&sinfo); + } else { + m->recovery_read( + op.hoid, + read_request); + dout(10) << __func__ << ": IDLE return " << op << dendl; + return; + } + } + [[fallthrough]]; + case RecoveryOp::READING: { + // read completed, start write + ceph_assert(op.xattrs.size()); + ceph_assert(op.returned_data); + dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl; + op.state = RecoveryOp::WRITING; + ObjectRecoveryProgress after_progress = op.recovery_progress; + after_progress.first = false; + if (after_progress.data_recovered_to >= op.obc->obs.oi.size) { + after_progress.data_complete = true; + } + + for (auto &&pg_shard: op.missing_on) { + m->pushes[pg_shard].push_back(PushOp()); + PushOp &pop = m->pushes[pg_shard].back(); + pop.soid = op.hoid; + pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard); + + op.returned_data->get_sparse_buffer(pg_shard.shard, pop.data, pop.data_included); + ceph_assert(pop.data.length() == pop.data_included.size()); + + dout(10) << __func__ << ": pop shard=" << pg_shard + << ", oid=" << pop.soid + << ", before_progress=" << op.recovery_progress + << ", after_progress=" << after_progress + << ", pop.data.length()=" << pop.data.length() + << ", pop.data_included=" << pop.data_included + << ", size=" << op.obc->obs.oi.size << dendl; + + if (op.recovery_progress.first) { + if (sinfo.is_nonprimary_shard(pg_shard.shard)) { + if (pop.version == op.recovery_info.oi.version) { + dout(10) << __func__ << ": copy OI attr only" << dendl; + pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR]; + } else { + // We are recovering a partial write - make sure we push the correct + // version in the OI or a scrub error will occur. + object_info_t oi(op.recovery_info.oi); + oi.shard_versions.clear(); + oi.version = pop.version; + dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl; + bufferlist bl; + oi.encode(bl, get_osdmap()->get_features( + CEPH_ENTITY_TYPE_OSD, nullptr)); + pop.attrset[OI_ATTR] = bl; + } + } else { + dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl; + pop.attrset = op.xattrs; + } + + // Following an upgrade, or turning of overwrites, we can take this + // opportunity to clean up hinfo. + if (pop.attrset.contains(ECUtil::get_hinfo_key())) { + pop.attrset.erase(ECUtil::get_hinfo_key()); + } + } + pop.recovery_info = op.recovery_info; + pop.before_progress = op.recovery_progress; + pop.after_progress = after_progress; + if (pg_shard != get_parent()->primary_shard()) { + // already in crimson -- junction point with PeeringState + get_parent()->begin_peer_recover( + pg_shard, + op.hoid); + } + } + op.returned_data.reset(); + op.waiting_on_pushes = op.missing_on; + op.recovery_progress = after_progress; + dout(10) << __func__ << ": READING return " << op << dendl; + return; + } + case RecoveryOp::WRITING: { + if (op.waiting_on_pushes.empty()) { + if (op.recovery_progress.data_complete) { + op.state = RecoveryOp::COMPLETE; + for (set::iterator i = op.missing_on.begin(); + i != op.missing_on.end(); + ++i) { + if (*i != get_parent()->primary_shard()) { + dout(10) << __func__ << ": on_peer_recover on " << *i + << ", obj " << op.hoid << dendl; + get_parent()->on_peer_recover( + *i, + op.hoid, + op.recovery_info); + } + } + object_stat_sum_t stat; + stat.num_bytes_recovered = op.recovery_info.size; + stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ? + stat.num_objects_recovered = 1; + // TODO: not in crimson yet + if (get_parent()->pg_is_repair()) + stat.num_objects_repaired = 1; + // pg_recovery.cc in crimson has it + get_parent()->on_global_recover(op.hoid, stat, false); + dout(10) << __func__ << ": WRITING return " << op << dendl; + recovery_ops.erase(op.hoid); + return; + } else { + op.state = RecoveryOp::IDLE; + dout(10) << __func__ << ": WRITING continue " << op << dendl; + continue; + } + } + return; + } + // should never be called once complete + case RecoveryOp::COMPLETE: + default: { + ceph_abort(); + }; + } + } +} + +ECCommon::RecoveryBackend::RecoveryOp +ECCommon::RecoveryBackend::recover_object( + const hobject_t &hoid, + eversion_t v, + ObjectContextRef head, + ObjectContextRef obc) { + RecoveryOp op; + op.v = v; + op.hoid = hoid; + op.obc = obc; + op.recovery_info.soid = hoid; + op.recovery_info.version = v; + if (obc) { + op.recovery_info.size = obc->obs.oi.size; + op.recovery_info.oi = obc->obs.oi; + } + if (hoid.is_snap()) { + if (obc) { + ceph_assert(obc->ssc); + op.recovery_info.ss = obc->ssc->snapset; + } else if (head) { + ceph_assert(head->ssc); + op.recovery_info.ss = head->ssc->snapset; + } else { + ceph_abort_msg("neither obc nor head set for a snap object"); + } + } + op.recovery_progress.omap_complete = true; + for (set::const_iterator i = + get_parent()->get_acting_recovery_backfill_shards().begin(); + i != get_parent()->get_acting_recovery_backfill_shards().end(); + ++i) { + dout(10) << "checking " << *i << dendl; + if (get_parent()->get_shard_missing(*i).is_missing(hoid)) { + op.missing_on.insert(*i); + op.missing_on_shards.insert(i->shard); + } + } + dout(10) << __func__ << ": built op " << op << dendl; + return op; +} + +END_IGNORE_DEPRECATED -- 2.39.5