From: Matty Williams Date: Tue, 12 May 2026 15:11:17 +0000 (+0100) Subject: osd: Allow for recovery of OMAP header and entries in EC pools X-Git-Tag: v21.0.1^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a5f4a4902075e343df154da61e3d205d2bd2a5d5;p=ceph.git osd: Allow for recovery of OMAP header and entries in EC pools Add omap fields to read_request_t, read_result_t, ECSubRead and ECSubReadReply. Read and write omap header and entries if !omap_complete. Require omap_complete to finish recovery. Fixes: https://tracker.ceph.com/issues/74244 Signed-off-by: Matty Williams --- diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index ea9b5511f65..3d9a054af82 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -122,6 +122,7 @@ void ECBackend::handle_recovery_push( recovery_backend.handle_recovery_push(op, m, is_repair); if (op.after_progress.data_complete && + op.after_progress.omap_complete && !(get_parent()->pgb_is_primary()) && get_parent()->pg_is_remote_backfilling()) { struct stat st; @@ -571,8 +572,133 @@ void ECBackend::handle_sub_read( reply->attrs_read.erase(*i); reply->buffers_read.erase(*i); reply->errors[*i] = r; + continue; + } + + if (!op.omap_headers_to_read.contains(*i)) { + continue; + } + + if (!get_parent()->get_pool().supports_omap()) { + continue; + } + + // During recovery, if the OI attribute isn't accessible, we speculatively + // say that we want to read omap data. + // Now we have access to the OI attribute so we can determine if omap data + // should be read or not. + auto oi_iter = reply->attrs_read[*i].find(OI_ATTR); + if (oi_iter == reply->attrs_read[*i].end()) { + continue; + } + + try { + object_info_t oi; + auto p = oi_iter->second.cbegin(); + decode(oi, p); + if (!oi.is_omap()) { + reply->omaps_complete[*i] = true; + dout(20) << __func__ << ": object " << *i + << " has no omap flag, skipping omap read" << dendl; + } else { + dout(20) << __func__ << ": object " << *i + << " has omap flag set" << dendl; + } + } catch (ceph::buffer::error& e) { + lgeneric_derr(cct) << __func__ << ": failed to decode OI for " << *i + << ": " << e.what() << dendl; + get_parent()->clog_warn() << "corrupt object info attribute for " << *i; + reply->errors[*i] = -EIO; + continue; + } + } + + if (get_parent()->get_pool().supports_omap()) { + for (set::iterator i = op.omap_headers_to_read.begin(); + i != op.omap_headers_to_read.end(); + ++i) { + dout(20) << __func__ << ": fulfilling omap header request on " + << *i << dendl; + if (reply->errors.contains(*i)) { + continue; + } + + // Skip if attrs already proved omap processing is complete. + if (reply->omaps_complete.contains(*i) && reply->omaps_complete[*i]) { + dout(20) << __func__ << ": skipping omap header read for " << *i + << " (no omap flag)" << dendl; + continue; + } + + int r = switcher->store->omap_get_header( + switcher->ch, + ghobject_t(*i, ghobject_t::NO_GEN, shard), + &reply->omap_headers_read[*i], false); + if (r < 0) { + reply->attrs_read.erase(*i); + reply->omap_headers_read.erase(*i); + reply->buffers_read.erase(*i); + reply->errors[*i] = r; + } + } + + for (auto const& [hoid, read_from] : op.omap_read_from) { + auto const&[start_key, max_bytes] = read_from; + dout(20) << __func__ << ": fulfilling omap read request on " << hoid + << " from key " << start_key << dendl; + + if (reply->errors.contains(hoid)) + continue; + + // Skip if attrs already proved omap processing is complete. + if (reply->omaps_complete.contains(hoid) && reply->omaps_complete[hoid]) { + dout(20) << __func__ << ": skipping omap entries read for " << hoid + << " (no omap flag)" << dendl; + continue; + } + + std::map current_batch; + reply->omaps_complete[hoid] = false; + + uint64_t available = max_bytes; + const auto result = switcher->store->omap_iterate( + switcher->ch, + ghobject_t(hoid, ghobject_t::NO_GEN, shard), + ObjectStore::omap_iter_seek_t{ + .seek_position = start_key, + .seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND + }, + [max_entries=cct->_conf->osd_recovery_max_omap_entries_per_chunk, &available, ¤t_batch] + (std::string_view key, std::string_view value) { + const auto num_new_bytes = key.size() + value.size(); + if (auto cur_num_entries = current_batch.size(); cur_num_entries > 0) { + if (max_entries > 0 && cur_num_entries >= max_entries) { + return ObjectStore::omap_iter_ret_t::STOP; + } + if (num_new_bytes >= available) { + return ObjectStore::omap_iter_ret_t::STOP; + } + } + bufferlist val_bl; + val_bl.append(value); + current_batch.insert(make_pair(key, val_bl)); + available -= std::min(available, num_new_bytes); + return ObjectStore::omap_iter_ret_t::NEXT; + }); + + if (result < 0) { + reply->attrs_read.erase(hoid); + reply->omap_headers_read.erase(hoid); + reply->buffers_read.erase(hoid); + reply->errors[hoid] = result; + current_batch.clear(); + } else if (result == 0) { + reply->omaps_complete[hoid] = true; + } + reply->omap_entries_read[hoid] = std::move(current_batch); } } + reply->from = get_parent()->whoami_shard(); reply->tid = op.tid; } @@ -688,7 +814,7 @@ void ECBackend::handle_sub_read_reply( } } for (auto &&[hoid, attr]: op.attrs_read) { - ceph_assert(!op.errors.count(hoid)); + ceph_assert(!op.errors.contains(hoid)); // if read error better not have sent an attribute if (!rop.to_read.contains(hoid)) { // We canceled this read! @see filter_read_op @@ -701,6 +827,49 @@ void ECBackend::handle_sub_read_reply( rop.complete.at(hoid).attrs.emplace(); (*(rop.complete.at(hoid).attrs)).swap(attr); } + if (get_parent()->get_pool().supports_omap()) { + for (auto &&[hoid, header]: op.omap_headers_read) { + ceph_assert(!op.errors.contains(hoid)); + // if read error better not have sent an attribute + if (!rop.to_read.contains(hoid)) { + // We canceled this read! @see filter_read_op + dout(20) << __func__ << " to_read skipping" << dendl; + continue; + } + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, &sinfo); + } + rop.complete.at(hoid).omap_header.emplace(); + (*(rop.complete.at(hoid).omap_header)).swap(header); + } + for (auto &&[hoid, entries]: op.omap_entries_read) { + ceph_assert(!op.errors.contains(hoid)); + // if read error better not have sent any entries + if (!rop.to_read.contains(hoid)) { + // We canceled this read! @see filter_read_op + dout(20) << __func__ << " to_read skipping" << dendl; + continue; + } + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, &sinfo); + } + rop.complete.at(hoid).omap_entries.emplace(); + (*(rop.complete.at(hoid).omap_entries)).swap(entries); + } + for (auto &&[hoid, omap_complete]: op.omaps_complete) { + ceph_assert(!op.errors.contains(hoid)); + // if read error better not have sent any entries + if (!rop.to_read.contains(hoid)) { + // We canceled this read! @see filter_read_op + dout(20) << __func__ << " to_read skipping" << dendl; + continue; + } + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, &sinfo); + } + rop.complete.at(hoid).omap_complete = omap_complete; + } + } for (auto &&[hoid, err]: op.errors) { if (!rop.complete.contains(hoid)) { rop.complete.emplace(hoid, &sinfo); @@ -747,7 +916,13 @@ void ECBackend::handle_sub_read_reply( } int err = -EIO; // If attributes needed but not read. - if (!rop.to_read.at(oid).want_attrs || rop.complete.at(oid).attrs) { + const auto& to_read = rop.to_read.at(oid); + const auto& complete = rop.complete.at(oid); + bool attrs_satisfied = !to_read.want_attrs || complete.attrs; + bool omap_satisfied = !get_parent()->get_pool().supports_omap() + || (!to_read.want_omap_header && !to_read.want_omap_keys) + || (complete.omap_header && complete.omap_complete); + if (attrs_satisfied && omap_satisfied) { err = ec_impl->minimum_to_decode(want_to_read, have, dummy_minimum, nullptr); } @@ -800,6 +975,7 @@ void ECBackend::handle_sub_read_reply( // uncompleted objects rop.to_read.at(oid).shard_reads.clear(); rop.to_read.at(oid).want_attrs = false; + rop.to_read.at(oid).want_omap_header = false; ++is_complete; } } diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc index 7cfa18470f9..bbbbe6e41cd 100644 --- a/src/osd/ECCommon.cc +++ b/src/osd/ECCommon.cc @@ -359,7 +359,12 @@ int ECCommon::ReadPipeline::get_remaining_shards( read_result_t &read_result, read_request_t &read_request, const bool for_recovery, - bool want_attrs) { + bool want_attrs, + bool want_omap_header) { + if (want_omap_header) { + ceph_assert(get_parent()->get_pool().supports_omap()); + } + set error_shards; for (auto &shard: std::views::keys(read_result.errors)) { error_shards.insert(shard); @@ -380,8 +385,9 @@ int ECCommon::ReadPipeline::get_remaining_shards( return -EIO; } - bool need_attr_request = want_attrs; + bool need_attr_request = want_attrs || want_omap_header; read_request.want_attrs = want_attrs; + read_request.want_omap_header = want_omap_header; // Rather than repeating whole read, we can remove everything we already have. for (auto iter = read_request.shard_reads.begin(); @@ -470,12 +476,26 @@ void ECCommon::ReadPipeline::do_read_op(ReadOp &rop) { map messages; for (auto &&[hoid, read_request]: rop.to_read) { bool need_attrs = read_request.want_attrs; + bool need_omap_header = read_request.want_omap_header; + bool need_omap_keys = read_request.want_omap_keys; + if (need_omap_header || need_omap_keys) { + ceph_assert(get_parent()->get_pool().supports_omap()); + } for (auto &&[shard, shard_read]: read_request.shard_reads) { if (need_attrs && !sinfo.is_nonprimary_shard(shard)) { messages[shard_read.pg_shard].attrs_to_read.insert(hoid); need_attrs = false; } + if (need_omap_header && !sinfo.is_nonprimary_shard(shard)) { + messages[shard_read.pg_shard].omap_headers_to_read.insert(hoid); + need_omap_header = false; + } + if (need_omap_keys && !sinfo.is_nonprimary_shard(shard)) { + messages[shard_read.pg_shard].omap_read_from.insert( + {hoid, {read_request.omap_read_from, read_request.omap_max_bytes}}); + need_omap_keys = false; + } if (shard_read.subchunk) { messages[shard_read.pg_shard].subchunks[hoid] = *shard_read.subchunk; } else { @@ -496,6 +516,8 @@ void ECCommon::ReadPipeline::do_read_op(ReadOp &rop) { } } ceph_assert(!need_attrs); + ceph_assert(!need_omap_header); + ceph_assert(!need_omap_keys); } ceph_assert(reads_sent); @@ -527,7 +549,7 @@ void ECCommon::ReadPipeline::do_read_op(ReadOp &rop) { } m.push_back(std::make_pair(pg_shard.osd, msg)); dout(10) << __func__ << ": will send msg " << *msg - << " to osd." << pg_shard << dendl; + << " to osd." << pg_shard.osd << dendl; } if (!m.empty()) { get_parent()->send_message_osd_cluster(m, get_osdmap_epoch()); @@ -698,7 +720,10 @@ void ECCommon::ReadPipeline::objects_read_and_reconstruct( get_want_to_read_shards(to_read, want_shard_reads); } - read_request_t read_request(to_read, want_shard_reads, false, object_size); + read_request_t read_request( + to_read, want_shard_reads, WantAttrs::No, WantOmapHeader::No, + WantOmapKeys::No, "", 0, object_size + ); const int r = get_min_avail_to_read_shards( hoid, false, @@ -769,11 +794,20 @@ int ECCommon::ReadPipeline::send_all_remaining_reads( dout(10) << __func__ << " want attrs again" << dendl; } + // Check if we need to read omap_header again + const bool want_omap_header = + rop.to_read.at(hoid).want_omap_header && + !rop.complete.at(hoid).omap_header; + if (want_omap_header) { + ceph_assert(get_parent()->get_pool().supports_omap()); + dout(10) << __func__ << " want omap_header again" << dendl; + } + read_request_t &read_request = rop.to_read.at(hoid); // reset the old shard reads, we are going to read them again. read_request.shard_reads.clear(); return get_remaining_shards(hoid, rop.complete.at(hoid), read_request, - rop.for_recovery, want_attrs); + rop.for_recovery, want_attrs, want_omap_header); } void ECCommon::ReadPipeline::kick_reads() { @@ -796,10 +830,14 @@ bool ECCommon::shard_read_t::operator==(const shard_read_t &other) const { bool ECCommon::read_request_t::operator==(const read_request_t &other) const { return to_read == other.to_read && - flags == other.flags && - shard_want_to_read == other.shard_want_to_read && - shard_reads == other.shard_reads && - want_attrs == other.want_attrs; + flags == other.flags && + shard_want_to_read == other.shard_want_to_read && + shard_reads == other.shard_reads && + want_attrs == other.want_attrs && + want_omap_header == other.want_omap_header && + want_omap_keys == other.want_omap_keys && + omap_read_from == other.omap_read_from && + omap_max_bytes == other.omap_max_bytes; } void ECCommon::RMWPipeline::start_rmw(OpRef op) { @@ -1122,7 +1160,8 @@ void ECCommon::RecoveryBackend::handle_recovery_push( ceph_abort(); } - bool oneshot = op.before_progress.first && op.after_progress.data_complete; + bool oneshot = op.before_progress.first + && op.after_progress.data_complete && op.after_progress.omap_complete; ghobject_t tobj; if (oneshot) { tobj = ghobject_t(op.soid, ghobject_t::NO_GEN, @@ -1166,9 +1205,32 @@ void ECCommon::RecoveryBackend::handle_recovery_push( coll, tobj, op.attrset); + if (get_parent()->get_pool().supports_omap() && + !sinfo.is_nonprimary_shard(get_parent()->whoami_shard().shard)) { + dout(20) << __func__ << ": recovery_omap_clear tobj=" << tobj << dendl; + m->t.omap_clear( + coll, + tobj); + dout(20) << __func__ << ": recovery_omap_setheader tobj=" << tobj + << " header_size=" << op.omap_header.length() << dendl; + m->t.omap_setheader( + coll, + tobj, + op.omap_header); + } } - if (op.after_progress.data_complete) { + if (!op.omap_entries.empty()) { + ceph_assert(get_parent()->get_pool().supports_omap()); + if (!sinfo.is_nonprimary_shard(get_parent()->whoami_shard().shard)) { + m->t.omap_setkeys( + coll, + tobj, + op.omap_entries); + } + } + + if (op.after_progress.data_complete && op.after_progress.omap_complete) { uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size, get_parent()->whoami_shard().shard); ceph_assert(shard_size >= tobj_size); @@ -1177,7 +1239,8 @@ void ECCommon::RecoveryBackend::handle_recovery_push( } } - if (op.after_progress.data_complete && !oneshot) { + if (op.after_progress.data_complete + && op.after_progress.omap_complete && !oneshot) { dout(10) << __func__ << ": Removing oid " << tobj.hobj << " from the temp collection" << dendl; clear_temp_obj(tobj.hobj); @@ -1189,7 +1252,7 @@ void ECCommon::RecoveryBackend::handle_recovery_push( coll, ghobject_t( op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); } - if (op.after_progress.data_complete) { + if (op.after_progress.data_complete && op.after_progress.omap_complete) { if ((get_parent()->pgb_is_primary())) { ceph_assert(recovery_ops.count(op.soid)); ceph_assert(recovery_ops[op.soid].obc); @@ -1296,11 +1359,38 @@ void ECCommon::RecoveryBackend::handle_recovery_read_complete( #endif if (empty_obc) { update_object_size_after_read(op.recovery_info.size, res, req); + + // Check if object has omap flag - if not, mark omap_complete + if (get_parent()->get_pool().supports_omap()) { + if (op.obc && !op.obc->obs.oi.is_omap()) { + op.recovery_progress.omap_complete = true; + dout(10) << __func__ << ": object " << hoid + << " has no omap flag, marking omap_complete" << dendl; + } + } else { + ceph_assert(op.recovery_progress.omap_complete); + } } } ceph_assert(op.xattrs.size()); ceph_assert(op.obc); + if (res.omap_header) { + ceph_assert(get_parent()->get_pool().supports_omap()); + op.omap_header = std::move(res.omap_header); + } + if (res.omap_entries) { + ceph_assert(get_parent()->get_pool().supports_omap()); + if (!res.omap_entries->empty()) { + op.recovery_progress.omap_recovered_to = res.omap_entries->rbegin()->first; + } + op.recovery_info.num_omap_keys += res.omap_entries->size(); + op.omap_entries = std::move(res.omap_entries); + } + if (res.omap_complete) { + op.recovery_progress.omap_complete = true; + } + op.returned_data.emplace(std::move(res.buffers_read)); uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size); @@ -1415,7 +1505,8 @@ void ECCommon::RecoveryBackend::continue_recovery_op( while (1) { switch (op.state) { case RecoveryOp::IDLE: { - ceph_assert(!op.recovery_progress.data_complete); + ceph_assert(!op.recovery_progress.data_complete + || !op.recovery_progress.omap_complete); ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m()); op.state = RecoveryOp::READING; @@ -1426,10 +1517,15 @@ void ECCommon::RecoveryBackend::continue_recovery_op( * return truncated reads. If the object size is known, then attempt * correctly sized reads. */ - uint64_t read_size = get_recovery_chunk_size(); + uint64_t available = get_recovery_chunk_size(); + uint64_t read_size = available; if (op.obc) { - uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) - - op.recovery_progress.data_recovered_to; + uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size); + uint64_t read_to_end = 0; + + if (aligned_size > op.recovery_progress.data_recovered_to) { + read_to_end = aligned_size - op.recovery_progress.data_recovered_to; + } if (read_to_end < read_size) { read_size = read_to_end; @@ -1439,6 +1535,7 @@ void ECCommon::RecoveryBackend::continue_recovery_op( op.recovery_progress.data_recovered_to, read_size, want); op.recovery_progress.data_recovered_to += read_size; + available -= read_size; // We only need to recover shards that are missing. for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) { @@ -1449,15 +1546,35 @@ void ECCommon::RecoveryBackend::continue_recovery_op( op.xattrs = op.obc->attr_cache; } - read_request_t read_request(std::move(want), + const auto want_attrs = ( #ifdef WITH_CRIMSON - op.recovery_progress.first && op.xattrs.count(OI_ATTR) == 0, + op.recovery_progress.first && op.xattrs.count(OI_ATTR) == 0 #else - op.recovery_progress.first && !op.obc, + op.recovery_progress.first && !op.obc #endif - op.obc - ? op.obc->obs.oi.size - : get_recovery_chunk_size()); + ) ? WantAttrs::Yes : WantAttrs::No; + const auto want_omap_header = (op.recovery_progress.first && !op.recovery_progress.omap_complete) + ? WantOmapHeader::Yes + : WantOmapHeader::No; + if (want_omap_header == WantOmapHeader::Yes) { + ceph_assert(get_parent()->get_pool().supports_omap()); + } + const auto want_omap_keys = !op.recovery_progress.omap_complete + ? WantOmapKeys::Yes + : WantOmapKeys::No; + if (want_omap_keys == WantOmapKeys::Yes) { + ceph_assert(get_parent()->get_pool().supports_omap()); + } + const auto chunk_size = op.obc ? op.obc->obs.oi.size : get_recovery_chunk_size(); + read_request_t read_request( + std::move(want), + want_attrs, + want_omap_header, + want_omap_keys, + op.recovery_progress.omap_recovered_to, + available, + chunk_size + ); int r = read_pipeline.get_min_avail_to_read_shards( op.hoid, true, false, read_request); @@ -1472,6 +1589,37 @@ void ECCommon::RecoveryBackend::continue_recovery_op( recovery_ops.erase(op.hoid); return; } + if (get_parent()->get_pool().supports_omap()) { + shard_id_set have; + shard_id_map pg_shards(sinfo.get_k_plus_m()); + read_pipeline.get_all_avail_shards(op.hoid, have, pg_shards, true, {}); + bool found_omap_shard = false; + for (const auto shard : have) { + if (!sinfo.is_nonprimary_shard(shard)) { + const pg_missing_t &missing = get_parent()->get_shard_missing(pg_shards[shard]); + auto miter = missing.get_items().find(op.hoid); + if (miter != missing.get_items().end() && miter->second.clean_regions.omap_is_dirty()) { + dout(20) << __func__ << ": skipping shard " << shard + << " for " << op.hoid << " due to dirty omap" << dendl; + continue; + } + shard_read_t shard_read; + shard_read.pg_shard = pg_shards[shard]; + read_request.shard_reads.insert(shard, shard_read); + found_omap_shard = true; + dout(10) << __func__ << ": selected shard " << shard + << " for omap read of " << op.hoid << dendl; + break; + } + } + if (!found_omap_shard) { + dout(10) << __func__ << ": ERROR: no shard with clean omap found for " + << op.hoid << ", canceling recovery" << dendl; + get_parent()->cancel_pull(op.hoid); + recovery_ops.erase(op.hoid); + return; + } + } if (read_request.shard_reads.empty()) { ceph_assert(op.obc); /* This can happen for several reasons @@ -1543,6 +1691,10 @@ void ECCommon::RecoveryBackend::continue_recovery_op( } else { dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl; pop.attrset = op.xattrs; + if (op.omap_header) { + ceph_assert(get_parent()->get_pool().supports_omap()); + pop.omap_header = *(op.omap_header); + } } // Following an upgrade, or turning of overwrites, we can take this @@ -1551,6 +1703,10 @@ void ECCommon::RecoveryBackend::continue_recovery_op( pop.attrset.erase(ECUtil::get_hinfo_key()); } } + if (!sinfo.is_nonprimary_shard(pg_shard.shard) && op.omap_entries) { + ceph_assert(get_parent()->get_pool().supports_omap()); + pop.omap_entries = *(op.omap_entries); + } pop.recovery_info = op.recovery_info; pop.before_progress = op.recovery_progress; pop.after_progress = after_progress; @@ -1569,7 +1725,8 @@ void ECCommon::RecoveryBackend::continue_recovery_op( } case RecoveryOp::WRITING: { if (op.waiting_on_pushes.empty()) { - if (op.recovery_progress.data_complete) { + if (op.recovery_progress.data_complete && + op.recovery_progress.omap_complete) { op.state = RecoveryOp::COMPLETE; for (set::iterator i = op.missing_on.begin(); i != op.missing_on.end(); @@ -1585,7 +1742,7 @@ void ECCommon::RecoveryBackend::continue_recovery_op( } object_stat_sum_t stat; stat.num_bytes_recovered = op.recovery_info.size; - stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ? + stat.num_keys_recovered = op.recovery_info.num_omap_keys; stat.num_objects_recovered = 1; // TODO: not in crimson yet if (get_parent()->pg_is_repair()) @@ -1639,15 +1796,42 @@ ECCommon::RecoveryBackend::recover_object( ceph_abort_msg("neither obc nor head set for a snap object"); } } - op.recovery_progress.omap_complete = true; + bool omap_dirty_in_missing = false; + bool omap_shard_missing = false; for (set::const_iterator i = get_parent()->get_acting_recovery_backfill_shards().begin(); i != get_parent()->get_acting_recovery_backfill_shards().end(); ++i) { dout(10) << "checking " << *i << dendl; - if (get_parent()->get_shard_missing(*i).is_missing(hoid)) { + const auto& missing = get_parent()->get_shard_missing(*i); + if (auto it = missing.get_items().find(hoid); + it != missing.get_items().end()) { op.missing_on.insert(*i); op.missing_on_shards.insert(i->shard); + if (get_parent()->get_pool().supports_omap()) { + if (it->second.clean_regions.omap_is_dirty()) { + omap_dirty_in_missing = true; + } + if (!sinfo.is_nonprimary_shard(i->shard)) { + omap_shard_missing = true; + } + } + } + } + if (!get_parent()->get_pool().supports_omap()) { + op.recovery_progress.omap_complete = true; + } else { + if (!obc) { + op.recovery_progress.omap_complete = false; + } else { + op.recovery_progress.omap_complete = !( + op.recovery_info.oi.is_omap() + || omap_dirty_in_missing + || omap_shard_missing + ); + } + if (!op.recovery_progress.omap_complete) { + ceph_assert(get_parent()->get_pool().supports_omap()); } } dout(10) << __func__ << ": built op " << op << dendl; diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h index 3065f52f1d7..68b45d89a30 100644 --- a/src/osd/ECCommon.h +++ b/src/osd/ECCommon.h @@ -16,6 +16,7 @@ #pragma once #include +#include #include #include "common/sharedptr_registry.hpp" @@ -110,6 +111,10 @@ struct ECCommon { } }; + enum class WantAttrs : bool { No = false, Yes = true }; + enum class WantOmapHeader : bool { No = false, Yes = true }; + enum class WantOmapKeys : bool { No = false, Yes = true }; + struct read_request_t { const std::list to_read; const uint32_t flags = 0; @@ -117,26 +122,40 @@ struct ECCommon { ECUtil::shard_extent_set_t zeros_for_decode; shard_id_map shard_reads; bool want_attrs = false; + bool want_omap_header = false; + bool want_omap_keys = false; + std::string omap_read_from; + uint64_t omap_max_bytes = 0; uint64_t object_size; read_request_t( const std::list &to_read, const ECUtil::shard_extent_set_t &shard_want_to_read, - bool want_attrs, uint64_t object_size) : + WantAttrs want_attrs, WantOmapHeader want_omap_header, WantOmapKeys want_omap_keys, + std::string omap_read_from, uint64_t omap_max_bytes, uint64_t object_size) : to_read(to_read), flags(to_read.front().flags), shard_want_to_read(shard_want_to_read), zeros_for_decode(shard_want_to_read.get_max_shards()), shard_reads(shard_want_to_read.get_max_shards()), - want_attrs(want_attrs), + want_attrs(static_cast(want_attrs)), + want_omap_header(static_cast(want_omap_header)), + want_omap_keys(static_cast(want_omap_keys)), + omap_read_from(std::move(omap_read_from)), + omap_max_bytes(omap_max_bytes), object_size(object_size) {} read_request_t(const ECUtil::shard_extent_set_t &shard_want_to_read, - bool want_attrs, uint64_t object_size) : + WantAttrs want_attrs, WantOmapHeader want_omap_header, WantOmapKeys want_omap_keys, + std::string omap_read_from, uint64_t omap_max_bytes, uint64_t object_size) : shard_want_to_read(shard_want_to_read), zeros_for_decode(shard_want_to_read.get_max_shards()), shard_reads(shard_want_to_read.get_max_shards()), - want_attrs(want_attrs), + want_attrs(static_cast(want_attrs)), + want_omap_header(static_cast(want_omap_header)), + want_omap_keys(static_cast(want_omap_keys)), + omap_read_from(std::move(omap_read_from)), + omap_max_bytes(omap_max_bytes), object_size(object_size) {} bool operator==(const read_request_t &other) const; @@ -148,6 +167,10 @@ struct ECCommon { << ", zeros_for_decode=" << zeros_for_decode << ", shard_reads=" << shard_reads << ", want_attrs=" << want_attrs + << ", want_omap_header=" << want_omap_header + << ", want_omap_keys=" << want_omap_keys + << ", omap_read_from=" << omap_read_from + << ", omap_max_bytes=" << omap_max_bytes << ")"; } }; @@ -182,6 +205,9 @@ struct ECCommon { int r; std::map errors; std::optional>> attrs; + std::optional omap_header; + std::optional> omap_entries; + bool omap_complete; ECUtil::shard_extent_map_t buffers_read; ECUtil::shard_extent_set_t processed_read_requests; shard_id_set zero_length_reads; @@ -197,6 +223,17 @@ struct ECCommon { } else { os << ", noattrs"; } + if (omap_header) { + os << ", omap_header=" << *(omap_header); + } else { + os << ", no_omap_header"; + } + if (omap_entries) { + os << ", omap_entries_len=" << omap_entries->size(); + } else { + os << ", no_omap_entries"; + } + os << ", omap_complete=" << omap_complete; os << ", buffers_read=" << buffers_read; os << ", processed_read_requests=" << processed_read_requests; os << ", zero_length_reads=" << zero_length_reads << ")"; @@ -415,7 +452,8 @@ struct ECCommon { read_result_t &read_result, read_request_t &read_request, bool for_recovery, - bool want_attrs); + bool want_attrs, + bool want_omap_header); void get_all_avail_shards( const hobject_t &hoid, @@ -591,7 +629,13 @@ struct ECCommon { void backend_read(hobject_t oid, ECUtil::shard_extent_set_t const &request, uint64_t object_size) override { std::map to_read; - to_read.emplace(oid, read_request_t(request, false, object_size)); + to_read.emplace( + oid, + read_request_t( + request, WantAttrs::No, WantOmapHeader::No, WantOmapKeys::No, + "", 0, object_size + ) + ); objects_read_async_no_cache( std::move(to_read), @@ -781,6 +825,9 @@ struct ECCommon { // must be filled if state == WRITING std::optional returned_data; std::map> xattrs; + std::optional omap_header; + std::optional> omap_entries; + ObjectContextRef obc; std::set waiting_on_pushes; diff --git a/src/osd/ECMsgTypes.cc b/src/osd/ECMsgTypes.cc index 5a2381fbfcc..206378fe1f0 100644 --- a/src/osd/ECMsgTypes.cc +++ b/src/osd/ECMsgTypes.cc @@ -219,18 +219,20 @@ void ECSubRead::encode(bufferlist &bl, uint64_t features) const return; } - ENCODE_START(3, 2, bl); + ENCODE_START(4, 2, bl); encode(from, bl); encode(tid, bl); encode(to_read, bl); encode(attrs_to_read, bl); encode(subchunks, bl); + encode(omap_read_from, bl); + encode(omap_headers_to_read, bl); ENCODE_FINISH(bl); } void ECSubRead::decode(bufferlist::const_iterator &bl) { - DECODE_START(3, bl); + DECODE_START(4, bl); decode(from, bl); decode(tid, bl); if (struct_v == 1) { @@ -254,6 +256,13 @@ void ECSubRead::decode(bufferlist::const_iterator &bl) subchunks[i.first].push_back(make_pair(0, 1)); } } + if (struct_v >= 4) { + decode(omap_read_from, bl); + decode(omap_headers_to_read, bl); + } else { + omap_read_from.clear(); + omap_headers_to_read.clear(); + } DECODE_FINISH(bl); } @@ -264,7 +273,10 @@ std::ostream &operator<<( << "ECSubRead(tid=" << rhs.tid << ", to_read=" << rhs.to_read << ", subchunks=" << rhs.subchunks - << ", attrs_to_read=" << rhs.attrs_to_read << ")"; + << ", attrs_to_read=" << rhs.attrs_to_read + << ", omap_headers_to_read=" << rhs.omap_headers_to_read + << ", omap_read_from=" << rhs.omap_read_from + << ")"; } void ECSubRead::dump(Formatter *f) const @@ -291,6 +303,20 @@ void ECSubRead::dump(Formatter *f) const f->with_obj_array_section( "object_attrs_requested"sv, attrs_to_read, [](Formatter& f, const hobject_t& oid) { f.dump_stream("oid") << oid; }); + + // 'object_omap_headers_requested': 'headers_to_read' (set) + f->with_obj_array_section( + "object_omap_headers_requested"sv, omap_headers_to_read, + [](Formatter& f, const hobject_t& oid) { f.dump_stream("oid") << oid; }); + + // 'omap_read_from' (map) + f->with_obj_array_section( + "object"sv, omap_read_from, + [](Formatter& f, const hobject_t& oid, const std::pair& read_from) { + f.dump_stream("oid") << oid; + f.dump_string("start_after", read_from.first); + f.dump_unsigned("max_bytes", read_from.second); + }); } list ECSubRead::generate_test_instances() @@ -324,8 +350,9 @@ void ECSubReadReply::encode(bufferlist &p_bl, bufferlist &d_bl, uint64_t features) const { - uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 2 : 1; - ENCODE_START(ver, ver, p_bl); + uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 3 : 1; + uint8_t compat_ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 2 : 1; + ENCODE_START(ver, compat_ver, p_bl); encode(from, p_bl); encode(tid, p_bl); if (ver >= 2) { @@ -349,6 +376,11 @@ void ECSubReadReply::encode(bufferlist &p_bl, } encode(attrs_read, p_bl); encode(errors, p_bl); + if (ver >= 3) { + encode(omap_headers_read, p_bl); + encode(omap_entries_read, p_bl); + encode(omaps_complete, p_bl); + } ENCODE_FINISH(p_bl); } @@ -360,7 +392,7 @@ void ECSubReadReply::decode(bufferlist::const_iterator &bl) void ECSubReadReply::decode(bufferlist::const_iterator &p_bl, bufferlist::const_iterator &d_bl) { - DECODE_START(2, p_bl); + DECODE_START(3, p_bl); decode(from, p_bl); decode(tid, p_bl); if (struct_v < 2) { @@ -392,16 +424,41 @@ void ECSubReadReply::decode(bufferlist::const_iterator &p_bl, } decode(attrs_read, p_bl); decode(errors, p_bl); + if (struct_v >= 3) { + decode(omap_headers_read, p_bl); + decode(omap_entries_read, p_bl); + decode(omaps_complete, p_bl); + } else { + omap_headers_read.clear(); + omap_entries_read.clear(); + omaps_complete.clear(); + } + DECODE_FINISH(p_bl); } std::ostream &operator<<( std::ostream &lhs, const ECSubReadReply &rhs) { - return lhs - << "ECSubReadReply(tid=" << rhs.tid - << ", attrs_read=" << rhs.attrs_read.size() - << ")"; + lhs << "ECSubReadReply(tid=" << rhs.tid + << ", attrs_read=" << rhs.attrs_read.size() + << ", omap_headers_read=" << rhs.omap_headers_read.size() + << ", omap_entries_read=" << rhs.omap_entries_read.size() + << ", omaps_complete=["; + + bool first = true; + for (const auto & [hoid, complete] : rhs.omaps_complete) { + if (complete) { + if (!first) { + lhs << ", "; + } + lhs << hoid; + first = false; + } + } + + lhs << "])"; + return lhs; } @@ -449,6 +506,37 @@ void ECSubReadReply::dump(Formatter* f) const f.dump_stream("oid") << oid; f.dump_int("error", err); }); + + // "omap_headers_returned": map + f->with_obj_array_section( + "object_omap_headers"sv, omap_headers_read, + [](Formatter& f, const hobject_t& oid, const ceph::buffer::list& bl) { + f.dump_stream("oid") << oid; + f.dump_unsigned("header_len", bl.length()); + }); + + // "omap_entries_returned" (mapping hobject_t to a table) + f->with_obj_array_section( + "object_omap_entries"sv, omap_entries_read, + [](Formatter& f, const hobject_t& oid, + const std::map& m) { + f.dump_stream("oid") << oid; + f.with_obj_array_section( + "omap_entry", m, + [](Formatter& f, const std::string& key, + const ceph::buffer::list& bl) { + f.dump_string("omap_key", key); + f.dump_unsigned("val_len", bl.length()); + }); + }); + + // "omap_complete": map + f->with_obj_array_section( + "object_omaps_complete"sv, omaps_complete, + [](Formatter& f, const hobject_t& oid, const bool complete) { + f.dump_stream("oid") << oid; + f.dump_unsigned("omap_complete", complete); + }); } list ECSubReadReply::generate_test_instances() diff --git a/src/osd/ECMsgTypes.h b/src/osd/ECMsgTypes.h index 1da28bcf83e..72aace2335c 100644 --- a/src/osd/ECMsgTypes.h +++ b/src/osd/ECMsgTypes.h @@ -117,6 +117,8 @@ struct ECSubRead { std::map >> to_read; std::set attrs_to_read; std::map>> subchunks; + std::set omap_headers_to_read; + std::map> omap_read_from; void encode(ceph::buffer::list &bl, uint64_t features) const; void decode(ceph::buffer::list::const_iterator &bl); void dump(ceph::Formatter *f) const; @@ -130,6 +132,9 @@ struct ECSubReadReply { std::map >> buffers_read; std::map>> attrs_read; std::map errors; + std::map omap_headers_read; + std::map> omap_entries_read; + std::map omaps_complete; void encode(ceph::buffer::list &bl) const; void encode(ceph::buffer::list &p_bl, ceph::buffer::list &d_pl, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 873163fd9a2..d0b82e6509c 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -6866,7 +6866,7 @@ void ObjectRecoveryProgress::dump(Formatter *f) const void ObjectRecoveryInfo::encode(ceph::buffer::list &bl, uint64_t features) const { - ENCODE_START(3, 1, bl); + ENCODE_START(4, 1, bl); encode(soid, bl); encode(version, bl); encode(size, bl); @@ -6875,13 +6875,14 @@ void ObjectRecoveryInfo::encode(ceph::buffer::list &bl, uint64_t features) const encode(copy_subset, bl); encode(clone_subset, bl); encode(object_exist, bl); + encode(num_omap_keys, bl); ENCODE_FINISH(bl); } void ObjectRecoveryInfo::decode(ceph::buffer::list::const_iterator &bl, int64_t pool) { - DECODE_START(3, bl); + DECODE_START(4, bl); decode(soid, bl); decode(version, bl); decode(size, bl); @@ -6889,10 +6890,16 @@ void ObjectRecoveryInfo::decode(ceph::buffer::list::const_iterator &bl, decode(ss, bl); decode(copy_subset, bl); decode(clone_subset, bl); - if (struct_v > 2) + if (struct_v > 2) { decode(object_exist, bl); - else + } else { object_exist = false; + } + if (struct_v > 3) { + decode(num_omap_keys, bl); + } else { + num_omap_keys = 0; + } DECODE_FINISH(bl); if (struct_v < 2) { if (!soid.is_max() && soid.pool == -1) @@ -6938,6 +6945,7 @@ void ObjectRecoveryInfo::dump(Formatter *f) const f->dump_stream("copy_subset") << copy_subset; f->dump_stream("clone_subset") << clone_subset; f->dump_stream("object_exist") << object_exist; + f->dump_unsigned("num_omap_keys", num_omap_keys); } ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf) @@ -6949,8 +6957,9 @@ std::string ObjectRecoveryInfo::fmt_print() const { return fmt::format( "ObjectRecoveryInfo({}@{}, size: {}, copy_subset: {}, " - "clone_subset: {}, snapset: {}, object_exist: {})", - soid, version, size, copy_subset, clone_subset, ss, object_exist); + "clone_subset: {}, snapset: {}, object_exist: {}, num_omap_keys: {})", + soid, version, size, copy_subset, clone_subset, + ss, object_exist, num_omap_keys); } // -- PushReplyOp -- diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 7d697265540..55d533be8c8 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -6536,13 +6536,14 @@ struct ObjectRecoveryInfo { hobject_t soid; eversion_t version; uint64_t size; + uint64_t num_omap_keys; object_info_t oi; SnapSet ss; // only populated if soid is_snap() interval_set copy_subset; std::map> clone_subset; bool object_exist; - ObjectRecoveryInfo() : size(0), object_exist(true) { } + ObjectRecoveryInfo() : size(0), num_omap_keys(0), object_exist(true) { } static std::list generate_test_instances(); void encode(ceph::buffer::list &bl, uint64_t features) const; diff --git a/src/test/osd/TestECBackend.cc b/src/test/osd/TestECBackend.cc index 607e8735191..ebb8c7869d4 100644 --- a/src/test/osd/TestECBackend.cc +++ b/src/test/osd/TestECBackend.cc @@ -540,10 +540,16 @@ TEST(ECCommon, get_min_avail_to_read_shards) { ECUtil::shard_extent_set_t want_to_read(s.get_k_plus_m()); ECUtil::shard_extent_set_t to_read_list(s.get_k_plus_m()); hobject_t hoid; - ECCommon::read_request_t read_request(to_read_list, false, object_size); + ECCommon::read_request_t read_request( + to_read_list, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); pipeline.get_min_avail_to_read_shards(hoid, false, false, read_request); - ECCommon::read_request_t ref(to_read_list, false, object_size); + ECCommon::read_request_t ref( + to_read_list, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); ASSERT_EQ(read_request, ref); } @@ -557,10 +563,16 @@ TEST(ECCommon, get_min_avail_to_read_shards) { to_read_list[i].insert(int(i) * 2 * align_size, align_size); } - ECCommon::read_request_t read_request(to_read_list, false, object_size); + ECCommon::read_request_t read_request( + to_read_list, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); pipeline.get_min_avail_to_read_shards(hoid, false, false, read_request); - ECCommon::read_request_t ref(to_read_list, false, object_size); + ECCommon::read_request_t ref( + to_read_list, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); for (shard_id_t shard_id; shard_id < k; ++shard_id) { ref.shard_reads[shard_id].extents = to_read_list[shard_id]; ref.shard_reads[shard_id].subchunk = ecode->default_sub_chunk; @@ -578,12 +590,17 @@ TEST(ECCommon, get_min_avail_to_read_shards) { to_read_list[i].insert(int(i) * 2 * align_size, align_size); } - ECCommon::read_request_t read_request(to_read_list, false, object_size); - + ECCommon::read_request_t read_request( + to_read_list, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); pipeline.get_min_avail_to_read_shards(hoid, false, false, read_request); - ECCommon::read_request_t ref(to_read_list, false, object_size); + ECCommon::read_request_t ref( + to_read_list, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); for (shard_id_t i; idefault_sub_chunk; @@ -730,7 +771,10 @@ TEST(ECCommon, get_min_avail_to_read_shards) { for (shard_id_t i; i want_to_read(empty_shard_vector); for (shard_id_t i; idefault_sub_chunk; @@ -821,10 +874,16 @@ TEST(ECCommon, shard_read_combo_tests) ec_align_t to_read(12*1024,12*1024, 1); pipeline.get_min_want_to_read_shards(to_read, want_to_read); - ECCommon::read_request_t read_request(want_to_read, false, object_size); + ECCommon::read_request_t read_request( + want_to_read, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); pipeline.get_min_avail_to_read_shards(hoid, false, false, read_request); - ECCommon::read_request_t ref(want_to_read, false, object_size); + ECCommon::read_request_t ref( + want_to_read, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); { ECCommon::shard_read_t shard_read; shard_read.subchunk = ecode->default_sub_chunk; @@ -921,16 +980,22 @@ TEST(ECCommon, get_remaining_shards) // Mock up a read request ECUtil::shard_extent_set_t to_read(s.get_k_plus_m()); to_read[shard_id_t(0)].insert(0, 4096); - ECCommon::read_request_t read_request(to_read, false, object_size); + ECCommon::read_request_t read_request( + to_read, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); int missing_shard = 0; // Mock up a read result. ECCommon::read_result_t read_result(&s); read_result.errors.emplace(pg_shards[missing_shard], -EIO); - pipeline.get_remaining_shards(hoid, read_result, read_request, false, false); + pipeline.get_remaining_shards(hoid, read_result, read_request, false, false, false); - ECCommon::read_request_t ref(to_read, false, object_size); + ECCommon::read_request_t ref( + to_read, ECCommon::WantAttrs::No, ECCommon::WantOmapHeader::No, + ECCommon::WantOmapKeys::No, "", 0, object_size + ); int parity_shard = 4; for (unsigned int i=0; i