list<ECCommon::RecoveryBackend::RecoveryOp> ops;
};
-void ECCommon::RecoveryBackend::RecoveryOp::dump(Formatter *f) const {
- f->dump_stream("hoid") << hoid;
- f->dump_stream("v") << v;
- f->dump_stream("missing_on") << missing_on;
- f->dump_stream("missing_on_shards") << missing_on_shards;
- f->dump_stream("recovery_info") << recovery_info;
- f->dump_stream("recovery_progress") << recovery_progress;
- f->dump_stream("state") << tostr(state);
- f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
-}
-
ECBackend::ECBackend(
PGBackend::Listener *pg,
CephContext *cct,
recovery_backend.open_recovery_op());
}
-ECCommon::RecoveryBackend::RecoveryBackend(
- CephContext *cct,
- const coll_t &coll,
- ceph::ErasureCodeInterfaceRef ec_impl,
- const ECUtil::stripe_info_t &sinfo,
- ReadPipeline &read_pipeline,
- ECListener *parent)
- : cct(cct),
- coll(coll),
- ec_impl(std::move(ec_impl)),
- sinfo(sinfo),
- read_pipeline(read_pipeline),
- parent(parent) {}
-
ECBackend::ECRecoveryBackend::ECRecoveryHandle *ECBackend::ECRecoveryBackend::open_recovery_op() {
return new ECRecoveryHandle;
}
-void ECCommon::RecoveryBackend::_failed_push(const hobject_t &hoid,
- ECCommon::read_result_t &res) {
- dout(10) << __func__ << ": Read error " << hoid << " r="
- << res.r << " errors=" << res.errors << dendl;
- dout(10) << __func__ << ": canceling recovery op for obj " << hoid
- << dendl;
- ceph_assert(recovery_ops.count(hoid));
- eversion_t v = recovery_ops[hoid].v;
- recovery_ops.erase(hoid);
-
- set<pg_shard_t> fl;
- for (auto &&i: res.errors) {
- fl.insert(i.first);
- }
- get_parent()->on_failed_pull(fl, hoid, v);
-}
-
void ECBackend::handle_recovery_push(
const PushOp &op,
RecoveryMessages *m,
}
}
-void ECCommon::RecoveryBackend::handle_recovery_push(
- const PushOp &op,
- RecoveryMessages *m,
- bool is_repair) {
- if (get_parent()->check_failsafe_full()) {
- dout(10) << __func__ << " Out of space (failsafe) processing push request."
- << dendl;
- ceph_abort();
- }
-
- bool oneshot = op.before_progress.first && op.after_progress.data_complete;
- ghobject_t tobj;
- if (oneshot) {
- tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
- get_parent()->whoami_shard().shard);
- } else {
- tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
- op.version),
- ghobject_t::NO_GEN,
- get_parent()->whoami_shard().shard);
- if (op.before_progress.first) {
- dout(10) << __func__ << ": Adding oid "
- << tobj.hobj << " in the temp collection" << dendl;
- add_temp_obj(tobj.hobj);
- }
- }
-
- if (op.before_progress.first) {
- m->t.remove(coll, tobj);
- m->t.touch(coll, tobj);
- }
-
- ceph_assert(op.data.length() == op.data_included.size());
- uint64_t tobj_size = 0;
-
- uint64_t cursor = 0;
- for (auto [off, len] : op.data_included) {
- bufferlist bl;
- if (len != op.data.length()) {
- bl.substr_of(op.data, cursor, len);
- } else {
- bl = op.data;
- }
- m->t.write(coll, tobj, off, len, bl);
- tobj_size = off + len;
- cursor += len;
- }
-
- if (op.before_progress.first) {
- ceph_assert(op.attrset.contains(OI_ATTR));
- m->t.setattrs(
- coll,
- tobj,
- op.attrset);
- }
-
- if (op.after_progress.data_complete) {
- uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size,
- get_parent()->whoami_shard().shard);
- ceph_assert(shard_size >= tobj_size);
- if (shard_size != tobj_size) {
- m->t.truncate( coll, tobj, shard_size);
- }
- }
-
- if (op.after_progress.data_complete && !oneshot) {
- dout(10) << __func__ << ": Removing oid "
- << tobj.hobj << " from the temp collection" << dendl;
- clear_temp_obj(tobj.hobj);
- m->t.remove(coll, ghobject_t(
- op.soid, ghobject_t::NO_GEN,
- get_parent()->whoami_shard().shard));
- m->t.collection_move_rename(
- coll, tobj,
- coll, ghobject_t(
- op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
- }
- if (op.after_progress.data_complete) {
- if ((get_parent()->pgb_is_primary())) {
- ceph_assert(recovery_ops.count(op.soid));
- ceph_assert(recovery_ops[op.soid].obc);
- if (get_parent()->pg_is_repair() || is_repair)
- get_parent()->inc_osd_stat_repaired();
- get_parent()->on_local_recover(
- op.soid,
- op.recovery_info,
- recovery_ops[op.soid].obc,
- false,
- &m->t);
- } else {
- // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
- if (is_repair)
- get_parent()->inc_osd_stat_repaired();
- get_parent()->on_local_recover(
- op.soid,
- op.recovery_info,
- ObjectContextRef(),
- false,
- &m->t);
- }
- }
- m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
- m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
-}
-
-void ECCommon::RecoveryBackend::handle_recovery_push_reply(
- const PushReplyOp &op,
- pg_shard_t from,
- RecoveryMessages *m) {
- if (!recovery_ops.count(op.soid))
- return;
- RecoveryOp &rop = recovery_ops[op.soid];
- ceph_assert(rop.waiting_on_pushes.contains(from));
- rop.waiting_on_pushes.erase(from);
- continue_recovery_op(rop, m);
-}
-
-void ECCommon::RecoveryBackend::update_object_size_after_read(
- uint64_t size,
- read_result_t &res,
- read_request_t &req) {
- // We didn't know the size before, meaning the zero for decode calculations
- // will be off. Recalculate them!
- ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m());
- sinfo.ro_size_to_zero_mask(size, zero_mask);
- ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m());
- sinfo.ro_size_to_read_mask(size, read_mask);
- extent_set superset = res.buffers_read.get_extent_superset();
-
- for (auto &&[shard, eset] : zero_mask) {
- eset.intersection_of(superset);
- if (!eset.empty() &&
- (res.zero_length_reads.contains(shard) ||
- res.buffers_read.contains(shard))) {
- req.zeros_for_decode[shard].insert(eset);
- }
- }
-
- /* Correct the shard_want_to_read, to make sure everything is within scope
- * of the newly found object size.
- */
- for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) {
- auto &&[shard, eset] = *iter;
- bool erase = false;
-
- if (read_mask.contains(shard)) {
- eset.intersection_of(read_mask.get(shard));
- erase = eset.empty();
- } else {
- erase = true;
- }
-
- /* Some shards may be empty */
- if (erase) {
- iter = req.shard_want_to_read.erase(iter);
- } else {
- ++iter;
- }
- }
-
- dout(20) << "Update want and zeros from read:size=" << size
- << " res=" << res
- << " req=" << req
- << dendl;
-}
-
-void ECCommon::RecoveryBackend::handle_recovery_read_complete(
- const hobject_t &hoid,
- read_result_t &&res,
- read_request_t &req,
- RecoveryMessages *m) {
- dout(10) << __func__ << ": returned " << hoid << " " << res << dendl;
- ceph_assert(recovery_ops.contains(hoid));
- RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
-
- if (res.attrs) {
- op.xattrs.swap(*(res.attrs));
- const auto empty_obc = !op.obc;
- maybe_load_obc(op.xattrs, op);
-#ifdef WITH_CRIMSON
- ceph_assert(hoid == op.hoid);
-#endif
- if (empty_obc) {
- update_object_size_after_read(op.recovery_info.size, res, req);
- }
- }
- ceph_assert(op.xattrs.size());
- ceph_assert(op.obc);
-
- op.returned_data.emplace(std::move(res.buffers_read));
- uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
-
- dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
- << op.returned_data->debug_string(2048, 0)
- << dendl;
-
- op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode);
- int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
- ceph_assert(r == 0);
-
- // Finally, we don't want to write any padding, so truncate the buffer
- // to remove it.
- op.returned_data->erase_after_ro_offset(aligned_size);
-
- dout(20) << __func__ << ": oid=" << op.hoid << dendl;
- dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
- << op.returned_data->debug_string(2048, 0)
- << dendl;
-
- continue_recovery_op(op, m);
-}
-
void ECBackend::ECRecoveryBackend::maybe_load_obc(
const std::map<std::string, ceph::bufferlist, std::less<>>& raw_attrs,
RecoveryOp &op)
}
};
-struct RecoveryReadCompleter : ECCommon::ReadCompleter {
- RecoveryReadCompleter(ECCommon::RecoveryBackend &backend)
- : backend(backend) {}
-
- void finish_single_request(
- const hobject_t &hoid,
- ECCommon::read_result_t &&res,
- ECCommon::read_request_t &req) override {
- if (!(res.r == 0 && res.errors.empty())) {
- backend._failed_push(hoid, res);
- return;
- }
- ceph_assert(req.to_read.size() == 0);
- backend.handle_recovery_read_complete(
- hoid,
- std::move(res),
- req,
- &rm);
- }
-
- void finish(int priority) && override {
- backend.dispatch_recovery_messages(rm, priority);
- }
-
- ECCommon::RecoveryBackend &backend;
- RecoveryMessages rm;
-};
-
void ECBackend::ECRecoveryBackend::commit_txn_send_replies(
ceph::os::Transaction &&txn,
std::map<int, MOSDPGPushReply*> replies) {
get_parent()->queue_transaction(std::move(txn));
}
-void ECCommon::RecoveryBackend::dispatch_recovery_messages(
- RecoveryMessages &m, int priority) {
- for (map<pg_shard_t, vector<PushOp>>::iterator i = m.pushes.begin();
- i != m.pushes.end();
- m.pushes.erase(i++)) {
- MOSDPGPush *msg = new MOSDPGPush();
- msg->set_priority(priority);
- msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
- msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
- msg->from = get_parent()->whoami_shard();
- msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
- msg->pushes.swap(i->second);
- msg->compute_cost(cct);
- msg->is_repair = get_parent()->pg_is_repair();
- get_parent()->send_message_osd_cluster(i->first.osd, msg, msg->map_epoch);
- }
- std::map<int, MOSDPGPushReply*> replies;
- for (map<pg_shard_t, vector<PushReplyOp>>::iterator i =
- m.push_replies.begin();
- i != m.push_replies.end();
- m.push_replies.erase(i++)) {
- MOSDPGPushReply *msg = new MOSDPGPushReply();
- msg->set_priority(priority);
- msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
- msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
- msg->from = get_parent()->whoami_shard();
- msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
- msg->replies.swap(i->second);
- msg->compute_cost(cct);
- replies.insert(std::pair(i->first.osd, msg));
- }
-
- if (!replies.empty()) {
- dout(20) << __func__ << " recovery_transactions=";
- Formatter *f = Formatter::create("json");
- f->open_object_section("t");
- m.t.dump(f);
- f->close_section();
- f->flush(*_dout);
- delete f;
- *_dout << dendl;
- commit_txn_send_replies(std::move(m.t), std::move(replies));
- }
-
- if (m.recovery_reads.empty())
- return;
- read_pipeline.start_read_op(
- priority,
- m.recovery_reads,
- false,
- true,
- std::make_unique<RecoveryReadCompleter>(*this));
-}
-
-void ECCommon::RecoveryBackend::continue_recovery_op(
- RecoveryBackend::RecoveryOp &op,
- RecoveryMessages *m) {
- dout(10) << __func__ << ": continuing " << op << dendl;
- using RecoveryOp = RecoveryBackend::RecoveryOp;
- while (1) {
- switch (op.state) {
- case RecoveryOp::IDLE: {
- ceph_assert(!op.recovery_progress.data_complete);
- ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m());
-
- op.state = RecoveryOp::READING;
-
- /* When beginning recovery, the OI may not be known. As such the object
- * size is not known. For the first read, attempt to read the default
- * size. If this is larger than the object sizes, then the OSD will
- * return truncated reads. If the object size is known, then attempt
- * correctly sized reads.
- */
- uint64_t read_size = get_recovery_chunk_size();
- if (op.obc) {
- uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) -
- op.recovery_progress.data_recovered_to;
-
- if (read_to_end < read_size) {
- read_size = read_to_end;
- }
- }
- sinfo.ro_range_to_shard_extent_set_with_parity(
- op.recovery_progress.data_recovered_to, read_size, want);
-
- op.recovery_progress.data_recovered_to += read_size;
-
- // We only need to recover shards that are missing.
- for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) {
- want.erase(shard);
- }
-
- if (op.recovery_progress.first && op.obc) {
- op.xattrs = op.obc->attr_cache;
- }
-
- read_request_t read_request(std::move(want),
- op.recovery_progress.first && !op.obc,
- op.obc
- ? op.obc->obs.oi.size
- : get_recovery_chunk_size());
-
- int r = read_pipeline.get_min_avail_to_read_shards(
- op.hoid, true, false, read_request);
-
- if (r != 0) {
- // we must have lost a recovery source
- ceph_assert(!op.recovery_progress.first);
- dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
- << dendl;
- // in crimson
- get_parent()->cancel_pull(op.hoid);
- recovery_ops.erase(op.hoid);
- return;
- }
- if (read_request.shard_reads.empty()) {
- ceph_assert(op.obc);
- /* This can happen for several reasons
- * - A zero-sized object.
- * - The missing shards have no data.
- * - The previous recovery did not need the last data shard. In this
- * case, data_recovered_to may indicate that the last shard still
- * needs recovery, when it does not.
- * We can just skip the read and fall through below.
- */
- dout(10) << __func__ << " No reads required " << op << dendl;
- // Create an empty read result and fall through.
- op.returned_data.emplace(&sinfo);
- } else {
- m->recovery_read(
- op.hoid,
- read_request);
- dout(10) << __func__ << ": IDLE return " << op << dendl;
- return;
- }
- }
- [[fallthrough]];
- case RecoveryOp::READING: {
- // read completed, start write
- ceph_assert(op.xattrs.size());
- ceph_assert(op.returned_data);
- dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl;
- op.state = RecoveryOp::WRITING;
- ObjectRecoveryProgress after_progress = op.recovery_progress;
- after_progress.first = false;
- if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
- after_progress.data_complete = true;
- }
-
- for (auto &&pg_shard: op.missing_on) {
- m->pushes[pg_shard].push_back(PushOp());
- PushOp &pop = m->pushes[pg_shard].back();
- pop.soid = op.hoid;
- pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard);
-
- op.returned_data->get_sparse_buffer(pg_shard.shard, pop.data, pop.data_included);
- ceph_assert(pop.data.length() == pop.data_included.size());
-
- dout(10) << __func__ << ": pop shard=" << pg_shard
- << ", oid=" << pop.soid
- << ", before_progress=" << op.recovery_progress
- << ", after_progress=" << after_progress
- << ", pop.data.length()=" << pop.data.length()
- << ", pop.data_included=" << pop.data_included
- << ", size=" << op.obc->obs.oi.size << dendl;
-
- if (op.recovery_progress.first) {
- if (sinfo.is_nonprimary_shard(pg_shard.shard)) {
- if (pop.version == op.recovery_info.oi.version) {
- dout(10) << __func__ << ": copy OI attr only" << dendl;
- pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR];
- } else {
- // We are recovering a partial write - make sure we push the correct
- // version in the OI or a scrub error will occur.
- object_info_t oi(op.recovery_info.oi);
- oi.shard_versions.clear();
- oi.version = pop.version;
- dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl;
- bufferlist bl;
- oi.encode(bl, get_osdmap()->get_features(
- CEPH_ENTITY_TYPE_OSD, nullptr));
- pop.attrset[OI_ATTR] = bl;
- }
- } else {
- dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl;
- pop.attrset = op.xattrs;
- }
-
- // Following an upgrade, or turning of overwrites, we can take this
- // opportunity to clean up hinfo.
- if (pop.attrset.contains(ECUtil::get_hinfo_key())) {
- pop.attrset.erase(ECUtil::get_hinfo_key());
- }
- }
- pop.recovery_info = op.recovery_info;
- pop.before_progress = op.recovery_progress;
- pop.after_progress = after_progress;
- if (pg_shard != get_parent()->primary_shard()) {
- // already in crimson -- junction point with PeeringState
- get_parent()->begin_peer_recover(
- pg_shard,
- op.hoid);
- }
- }
- op.returned_data.reset();
- op.waiting_on_pushes = op.missing_on;
- op.recovery_progress = after_progress;
- dout(10) << __func__ << ": READING return " << op << dendl;
- return;
- }
- case RecoveryOp::WRITING: {
- if (op.waiting_on_pushes.empty()) {
- if (op.recovery_progress.data_complete) {
- op.state = RecoveryOp::COMPLETE;
- for (set<pg_shard_t>::iterator i = op.missing_on.begin();
- i != op.missing_on.end();
- ++i) {
- if (*i != get_parent()->primary_shard()) {
- dout(10) << __func__ << ": on_peer_recover on " << *i
- << ", obj " << op.hoid << dendl;
- get_parent()->on_peer_recover(
- *i,
- op.hoid,
- op.recovery_info);
- }
- }
- object_stat_sum_t stat;
- stat.num_bytes_recovered = op.recovery_info.size;
- stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
- stat.num_objects_recovered = 1;
- // TODO: not in crimson yet
- if (get_parent()->pg_is_repair())
- stat.num_objects_repaired = 1;
- // pg_recovery.cc in crimson has it
- get_parent()->on_global_recover(op.hoid, stat, false);
- dout(10) << __func__ << ": WRITING return " << op << dendl;
- recovery_ops.erase(op.hoid);
- return;
- } else {
- op.state = RecoveryOp::IDLE;
- dout(10) << __func__ << ": WRITING continue " << op << dendl;
- continue;
- }
- }
- return;
- }
- // should never be called once complete
- case RecoveryOp::COMPLETE:
- default: {
- ceph_abort();
- };
- }
- }
-}
-
void ECBackend::run_recovery_op(
PGBackend::RecoveryHandle *_h,
int priority) {
return 0;
}
-ECCommon::RecoveryBackend::RecoveryOp
-ECCommon::RecoveryBackend::recover_object(
- const hobject_t &hoid,
- eversion_t v,
- ObjectContextRef head,
- ObjectContextRef obc) {
- RecoveryOp op;
- op.v = v;
- op.hoid = hoid;
- op.obc = obc;
- op.recovery_info.soid = hoid;
- op.recovery_info.version = v;
- if (obc) {
- op.recovery_info.size = obc->obs.oi.size;
- op.recovery_info.oi = obc->obs.oi;
- }
- if (hoid.is_snap()) {
- if (obc) {
- ceph_assert(obc->ssc);
- op.recovery_info.ss = obc->ssc->snapset;
- } else if (head) {
- ceph_assert(head->ssc);
- op.recovery_info.ss = head->ssc->snapset;
- } else {
- ceph_abort_msg("neither obc nor head set for a snap object");
- }
- }
- op.recovery_progress.omap_complete = true;
- for (set<pg_shard_t>::const_iterator i =
- get_parent()->get_acting_recovery_backfill_shards().begin();
- i != get_parent()->get_acting_recovery_backfill_shards().end();
- ++i) {
- dout(10) << "checking " << *i << dendl;
- if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
- op.missing_on.insert(*i);
- op.missing_on_shards.insert(i->shard);
- }
- }
- dout(10) << __func__ << ": built op " << op << dendl;
- return op;
-}
-
bool ECBackend::can_handle_while_inactive(
OpRequestRef _op) {
return false;
f->dump_stream("in_progress") << in_progress;
}
+void ECCommon::RecoveryBackend::RecoveryOp::dump(Formatter *f) const {
+ f->dump_stream("hoid") << hoid;
+ f->dump_stream("v") << v;
+ f->dump_stream("missing_on") << missing_on;
+ f->dump_stream("missing_on_shards") << missing_on_shards;
+ f->dump_stream("recovery_info") << recovery_info;
+ f->dump_stream("recovery_progress") << recovery_progress;
+ f->dump_stream("state") << tostr(state);
+ f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
+}
+
+
void ECCommon::ReadPipeline::complete_read_op(ReadOp &&rop) {
dout(20) << __func__ << " completing " << rop << dendl;
auto req_iter = rop.to_read.begin();
next_write_all_shards = true;
extent_cache.add_on_write(std::move(cb));
}
+
+ECCommon::RecoveryBackend::RecoveryBackend(
+ CephContext *cct,
+ const coll_t &coll,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t &sinfo,
+ ReadPipeline &read_pipeline,
+ ECListener *parent)
+ : cct(cct),
+ coll(coll),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ read_pipeline(read_pipeline),
+ parent(parent) {}
+
+void ECCommon::RecoveryBackend::_failed_push(const hobject_t &hoid,
+ ECCommon::read_result_t &res) {
+ dout(10) << __func__ << ": Read error " << hoid << " r="
+ << res.r << " errors=" << res.errors << dendl;
+ dout(10) << __func__ << ": canceling recovery op for obj " << hoid
+ << dendl;
+ ceph_assert(recovery_ops.count(hoid));
+ eversion_t v = recovery_ops[hoid].v;
+ recovery_ops.erase(hoid);
+
+ set<pg_shard_t> fl;
+ for (auto &&i: res.errors) {
+ fl.insert(i.first);
+ }
+ get_parent()->on_failed_pull(fl, hoid, v);
+}
+
+void ECCommon::RecoveryBackend::handle_recovery_push(
+ const PushOp &op,
+ RecoveryMessages *m,
+ bool is_repair) {
+ if (get_parent()->check_failsafe_full()) {
+ dout(10) << __func__ << " Out of space (failsafe) processing push request."
+ << dendl;
+ ceph_abort();
+ }
+
+ bool oneshot = op.before_progress.first && op.after_progress.data_complete;
+ ghobject_t tobj;
+ if (oneshot) {
+ tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard);
+ } else {
+ tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
+ op.version),
+ ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard);
+ if (op.before_progress.first) {
+ dout(10) << __func__ << ": Adding oid "
+ << tobj.hobj << " in the temp collection" << dendl;
+ add_temp_obj(tobj.hobj);
+ }
+ }
+
+ if (op.before_progress.first) {
+ m->t.remove(coll, tobj);
+ m->t.touch(coll, tobj);
+ }
+
+ ceph_assert(op.data.length() == op.data_included.size());
+ uint64_t tobj_size = 0;
+
+ uint64_t cursor = 0;
+ for (auto [off, len] : op.data_included) {
+ bufferlist bl;
+ if (len != op.data.length()) {
+ bl.substr_of(op.data, cursor, len);
+ } else {
+ bl = op.data;
+ }
+ m->t.write(coll, tobj, off, len, bl);
+ tobj_size = off + len;
+ cursor += len;
+ }
+
+ if (op.before_progress.first) {
+ ceph_assert(op.attrset.contains(OI_ATTR));
+ m->t.setattrs(
+ coll,
+ tobj,
+ op.attrset);
+ }
+
+ if (op.after_progress.data_complete) {
+ uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size,
+ get_parent()->whoami_shard().shard);
+ ceph_assert(shard_size >= tobj_size);
+ if (shard_size != tobj_size) {
+ m->t.truncate( coll, tobj, shard_size);
+ }
+ }
+
+ if (op.after_progress.data_complete && !oneshot) {
+ dout(10) << __func__ << ": Removing oid "
+ << tobj.hobj << " from the temp collection" << dendl;
+ clear_temp_obj(tobj.hobj);
+ m->t.remove(coll, ghobject_t(
+ op.soid, ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard));
+ m->t.collection_move_rename(
+ coll, tobj,
+ coll, ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ }
+ if (op.after_progress.data_complete) {
+ if ((get_parent()->pgb_is_primary())) {
+ ceph_assert(recovery_ops.count(op.soid));
+ ceph_assert(recovery_ops[op.soid].obc);
+ if (get_parent()->pg_is_repair() || is_repair)
+ get_parent()->inc_osd_stat_repaired();
+ get_parent()->on_local_recover(
+ op.soid,
+ op.recovery_info,
+ recovery_ops[op.soid].obc,
+ false,
+ &m->t);
+ } else {
+ // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
+ if (is_repair)
+ get_parent()->inc_osd_stat_repaired();
+ get_parent()->on_local_recover(
+ op.soid,
+ op.recovery_info,
+ ObjectContextRef(),
+ false,
+ &m->t);
+ }
+ }
+ m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
+ m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
+}
+
+void ECCommon::RecoveryBackend::handle_recovery_push_reply(
+ const PushReplyOp &op,
+ pg_shard_t from,
+ RecoveryMessages *m) {
+ if (!recovery_ops.count(op.soid))
+ return;
+ RecoveryOp &rop = recovery_ops[op.soid];
+ ceph_assert(rop.waiting_on_pushes.contains(from));
+ rop.waiting_on_pushes.erase(from);
+ continue_recovery_op(rop, m);
+}
+
+void ECCommon::RecoveryBackend::update_object_size_after_read(
+ uint64_t size,
+ read_result_t &res,
+ read_request_t &req) {
+ // We didn't know the size before, meaning the zero for decode calculations
+ // will be off. Recalculate them!
+ ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m());
+ sinfo.ro_size_to_zero_mask(size, zero_mask);
+ ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m());
+ sinfo.ro_size_to_read_mask(size, read_mask);
+ extent_set superset = res.buffers_read.get_extent_superset();
+
+ for (auto &&[shard, eset] : zero_mask) {
+ eset.intersection_of(superset);
+ if (!eset.empty() &&
+ (res.zero_length_reads.contains(shard) ||
+ res.buffers_read.contains(shard))) {
+ req.zeros_for_decode[shard].insert(eset);
+ }
+ }
+
+ /* Correct the shard_want_to_read, to make sure everything is within scope
+ * of the newly found object size.
+ */
+ for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) {
+ auto &&[shard, eset] = *iter;
+ bool erase = false;
+
+ if (read_mask.contains(shard)) {
+ eset.intersection_of(read_mask.get(shard));
+ erase = eset.empty();
+ } else {
+ erase = true;
+ }
+
+ /* Some shards may be empty */
+ if (erase) {
+ iter = req.shard_want_to_read.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+
+ dout(20) << "Update want and zeros from read:size=" << size
+ << " res=" << res
+ << " req=" << req
+ << dendl;
+}
+
+void ECCommon::RecoveryBackend::handle_recovery_read_complete(
+ const hobject_t &hoid,
+ read_result_t &&res,
+ read_request_t &req,
+ RecoveryMessages *m) {
+ dout(10) << __func__ << ": returned " << hoid << " " << res << dendl;
+ ceph_assert(recovery_ops.contains(hoid));
+ RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
+
+ if (res.attrs) {
+ op.xattrs.swap(*(res.attrs));
+ const auto empty_obc = !op.obc;
+ maybe_load_obc(op.xattrs, op);
+#ifdef WITH_CRIMSON
+ ceph_assert(hoid == op.hoid);
+#endif
+ if (empty_obc) {
+ update_object_size_after_read(op.recovery_info.size, res, req);
+ }
+ }
+ ceph_assert(op.xattrs.size());
+ ceph_assert(op.obc);
+
+ op.returned_data.emplace(std::move(res.buffers_read));
+ uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
+
+ dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
+ << op.returned_data->debug_string(2048, 0)
+ << dendl;
+
+ op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode);
+ int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
+ ceph_assert(r == 0);
+
+ // Finally, we don't want to write any padding, so truncate the buffer
+ // to remove it.
+ op.returned_data->erase_after_ro_offset(aligned_size);
+
+ dout(20) << __func__ << ": oid=" << op.hoid << dendl;
+ dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
+ << op.returned_data->debug_string(2048, 0)
+ << dendl;
+
+ continue_recovery_op(op, m);
+}
+
+
+struct RecoveryReadCompleter : ECCommon::ReadCompleter {
+ RecoveryReadCompleter(ECCommon::RecoveryBackend &backend)
+ : backend(backend) {}
+
+ void finish_single_request(
+ const hobject_t &hoid,
+ ECCommon::read_result_t &&res,
+ ECCommon::read_request_t &req) override {
+ if (!(res.r == 0 && res.errors.empty())) {
+ backend._failed_push(hoid, res);
+ return;
+ }
+ ceph_assert(req.to_read.size() == 0);
+ backend.handle_recovery_read_complete(
+ hoid,
+ std::move(res),
+ req,
+ &rm);
+ }
+
+ void finish(int priority) && override {
+ backend.dispatch_recovery_messages(rm, priority);
+ }
+
+ ECCommon::RecoveryBackend &backend;
+ RecoveryMessages rm;
+};
+
+void ECCommon::RecoveryBackend::dispatch_recovery_messages(
+ RecoveryMessages &m, int priority) {
+ for (map<pg_shard_t, vector<PushOp>>::iterator i = m.pushes.begin();
+ i != m.pushes.end();
+ m.pushes.erase(i++)) {
+ MOSDPGPush *msg = new MOSDPGPush();
+ msg->set_priority(priority);
+ msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
+ msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+ msg->pushes.swap(i->second);
+ msg->compute_cost(cct);
+ msg->is_repair = get_parent()->pg_is_repair();
+ get_parent()->send_message_osd_cluster(i->first.osd, msg, msg->map_epoch);
+ }
+ std::map<int, MOSDPGPushReply*> replies;
+ for (map<pg_shard_t, vector<PushReplyOp>>::iterator i =
+ m.push_replies.begin();
+ i != m.push_replies.end();
+ m.push_replies.erase(i++)) {
+ MOSDPGPushReply *msg = new MOSDPGPushReply();
+ msg->set_priority(priority);
+ msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
+ msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+ msg->replies.swap(i->second);
+ msg->compute_cost(cct);
+ replies.insert(std::pair(i->first.osd, msg));
+ }
+
+ if (!replies.empty()) {
+ dout(20) << __func__ << " recovery_transactions=";
+ Formatter *f = Formatter::create("json");
+ f->open_object_section("t");
+ m.t.dump(f);
+ f->close_section();
+ f->flush(*_dout);
+ delete f;
+ *_dout << dendl;
+ commit_txn_send_replies(std::move(m.t), std::move(replies));
+ }
+
+ if (m.recovery_reads.empty())
+ return;
+ read_pipeline.start_read_op(
+ priority,
+ m.recovery_reads,
+ false,
+ true,
+ std::make_unique<RecoveryReadCompleter>(*this));
+}
+
+void ECCommon::RecoveryBackend::continue_recovery_op(
+ RecoveryBackend::RecoveryOp &op,
+ RecoveryMessages *m) {
+ dout(10) << __func__ << ": continuing " << op << dendl;
+ using RecoveryOp = RecoveryBackend::RecoveryOp;
+ while (1) {
+ switch (op.state) {
+ case RecoveryOp::IDLE: {
+ ceph_assert(!op.recovery_progress.data_complete);
+ ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m());
+
+ op.state = RecoveryOp::READING;
+
+ /* When beginning recovery, the OI may not be known. As such the object
+ * size is not known. For the first read, attempt to read the default
+ * size. If this is larger than the object sizes, then the OSD will
+ * return truncated reads. If the object size is known, then attempt
+ * correctly sized reads.
+ */
+ uint64_t read_size = get_recovery_chunk_size();
+ if (op.obc) {
+ uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) -
+ op.recovery_progress.data_recovered_to;
+
+ if (read_to_end < read_size) {
+ read_size = read_to_end;
+ }
+ }
+ sinfo.ro_range_to_shard_extent_set_with_parity(
+ op.recovery_progress.data_recovered_to, read_size, want);
+
+ op.recovery_progress.data_recovered_to += read_size;
+
+ // We only need to recover shards that are missing.
+ for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) {
+ want.erase(shard);
+ }
+
+ if (op.recovery_progress.first && op.obc) {
+ op.xattrs = op.obc->attr_cache;
+ }
+
+ read_request_t read_request(std::move(want),
+ op.recovery_progress.first && !op.obc,
+ op.obc
+ ? op.obc->obs.oi.size
+ : get_recovery_chunk_size());
+
+ int r = read_pipeline.get_min_avail_to_read_shards(
+ op.hoid, true, false, read_request);
+
+ if (r != 0) {
+ // we must have lost a recovery source
+ ceph_assert(!op.recovery_progress.first);
+ dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
+ << dendl;
+ // in crimson
+ get_parent()->cancel_pull(op.hoid);
+ recovery_ops.erase(op.hoid);
+ return;
+ }
+ if (read_request.shard_reads.empty()) {
+ ceph_assert(op.obc);
+ /* This can happen for several reasons
+ * - A zero-sized object.
+ * - The missing shards have no data.
+ * - The previous recovery did not need the last data shard. In this
+ * case, data_recovered_to may indicate that the last shard still
+ * needs recovery, when it does not.
+ * We can just skip the read and fall through below.
+ */
+ dout(10) << __func__ << " No reads required " << op << dendl;
+ // Create an empty read result and fall through.
+ op.returned_data.emplace(&sinfo);
+ } else {
+ m->recovery_read(
+ op.hoid,
+ read_request);
+ dout(10) << __func__ << ": IDLE return " << op << dendl;
+ return;
+ }
+ }
+ [[fallthrough]];
+ case RecoveryOp::READING: {
+ // read completed, start write
+ ceph_assert(op.xattrs.size());
+ ceph_assert(op.returned_data);
+ dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl;
+ op.state = RecoveryOp::WRITING;
+ ObjectRecoveryProgress after_progress = op.recovery_progress;
+ after_progress.first = false;
+ if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
+ after_progress.data_complete = true;
+ }
+
+ for (auto &&pg_shard: op.missing_on) {
+ m->pushes[pg_shard].push_back(PushOp());
+ PushOp &pop = m->pushes[pg_shard].back();
+ pop.soid = op.hoid;
+ pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard);
+
+ op.returned_data->get_sparse_buffer(pg_shard.shard, pop.data, pop.data_included);
+ ceph_assert(pop.data.length() == pop.data_included.size());
+
+ dout(10) << __func__ << ": pop shard=" << pg_shard
+ << ", oid=" << pop.soid
+ << ", before_progress=" << op.recovery_progress
+ << ", after_progress=" << after_progress
+ << ", pop.data.length()=" << pop.data.length()
+ << ", pop.data_included=" << pop.data_included
+ << ", size=" << op.obc->obs.oi.size << dendl;
+
+ if (op.recovery_progress.first) {
+ if (sinfo.is_nonprimary_shard(pg_shard.shard)) {
+ if (pop.version == op.recovery_info.oi.version) {
+ dout(10) << __func__ << ": copy OI attr only" << dendl;
+ pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR];
+ } else {
+ // We are recovering a partial write - make sure we push the correct
+ // version in the OI or a scrub error will occur.
+ object_info_t oi(op.recovery_info.oi);
+ oi.shard_versions.clear();
+ oi.version = pop.version;
+ dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl;
+ bufferlist bl;
+ oi.encode(bl, get_osdmap()->get_features(
+ CEPH_ENTITY_TYPE_OSD, nullptr));
+ pop.attrset[OI_ATTR] = bl;
+ }
+ } else {
+ dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl;
+ pop.attrset = op.xattrs;
+ }
+
+ // Following an upgrade, or turning of overwrites, we can take this
+ // opportunity to clean up hinfo.
+ if (pop.attrset.contains(ECUtil::get_hinfo_key())) {
+ pop.attrset.erase(ECUtil::get_hinfo_key());
+ }
+ }
+ pop.recovery_info = op.recovery_info;
+ pop.before_progress = op.recovery_progress;
+ pop.after_progress = after_progress;
+ if (pg_shard != get_parent()->primary_shard()) {
+ // already in crimson -- junction point with PeeringState
+ get_parent()->begin_peer_recover(
+ pg_shard,
+ op.hoid);
+ }
+ }
+ op.returned_data.reset();
+ op.waiting_on_pushes = op.missing_on;
+ op.recovery_progress = after_progress;
+ dout(10) << __func__ << ": READING return " << op << dendl;
+ return;
+ }
+ case RecoveryOp::WRITING: {
+ if (op.waiting_on_pushes.empty()) {
+ if (op.recovery_progress.data_complete) {
+ op.state = RecoveryOp::COMPLETE;
+ for (set<pg_shard_t>::iterator i = op.missing_on.begin();
+ i != op.missing_on.end();
+ ++i) {
+ if (*i != get_parent()->primary_shard()) {
+ dout(10) << __func__ << ": on_peer_recover on " << *i
+ << ", obj " << op.hoid << dendl;
+ get_parent()->on_peer_recover(
+ *i,
+ op.hoid,
+ op.recovery_info);
+ }
+ }
+ object_stat_sum_t stat;
+ stat.num_bytes_recovered = op.recovery_info.size;
+ stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
+ stat.num_objects_recovered = 1;
+ // TODO: not in crimson yet
+ if (get_parent()->pg_is_repair())
+ stat.num_objects_repaired = 1;
+ // pg_recovery.cc in crimson has it
+ get_parent()->on_global_recover(op.hoid, stat, false);
+ dout(10) << __func__ << ": WRITING return " << op << dendl;
+ recovery_ops.erase(op.hoid);
+ return;
+ } else {
+ op.state = RecoveryOp::IDLE;
+ dout(10) << __func__ << ": WRITING continue " << op << dendl;
+ continue;
+ }
+ }
+ return;
+ }
+ // should never be called once complete
+ case RecoveryOp::COMPLETE:
+ default: {
+ ceph_abort();
+ };
+ }
+ }
+}
+
+ECCommon::RecoveryBackend::RecoveryOp
+ECCommon::RecoveryBackend::recover_object(
+ const hobject_t &hoid,
+ eversion_t v,
+ ObjectContextRef head,
+ ObjectContextRef obc) {
+ RecoveryOp op;
+ op.v = v;
+ op.hoid = hoid;
+ op.obc = obc;
+ op.recovery_info.soid = hoid;
+ op.recovery_info.version = v;
+ if (obc) {
+ op.recovery_info.size = obc->obs.oi.size;
+ op.recovery_info.oi = obc->obs.oi;
+ }
+ if (hoid.is_snap()) {
+ if (obc) {
+ ceph_assert(obc->ssc);
+ op.recovery_info.ss = obc->ssc->snapset;
+ } else if (head) {
+ ceph_assert(head->ssc);
+ op.recovery_info.ss = head->ssc->snapset;
+ } else {
+ ceph_abort_msg("neither obc nor head set for a snap object");
+ }
+ }
+ op.recovery_progress.omap_complete = true;
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_recovery_backfill_shards().begin();
+ i != get_parent()->get_acting_recovery_backfill_shards().end();
+ ++i) {
+ dout(10) << "checking " << *i << dendl;
+ if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
+ op.missing_on.insert(*i);
+ op.missing_on_shards.insert(i->shard);
+ }
+ }
+ dout(10) << __func__ << ": built op " << op << dendl;
+ return op;
+}
+
+END_IGNORE_DEPRECATED