continue_recovery_op(rop, m);
}
+void ECBackend::RecoveryBackend::update_object_size_after_read(
+ uint64_t size,
+ read_result_t &res,
+ read_request_t &req) {
+ // We didn't know the size before, meaning the zero for decode calculations
+ // will be off. Recalculate them!
+ ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m());
+ sinfo.ro_size_to_zero_mask(size, zero_mask);
+ ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m());
+ sinfo.ro_size_to_read_mask(size, read_mask);
+ extent_set superset = res.buffers_read.get_extent_superset();
+
+ for (auto &&[shard, eset] : zero_mask) {
+ eset.intersection_of(superset);
+ if (!eset.empty() &&
+ (res.zero_length_reads.contains(shard) ||
+ res.buffers_read.contains(shard))) {
+ req.zeros_for_decode[shard].insert(eset);
+ }
+ }
+
+ /* Correct the shard_want_to_read, to make sure everything is within scope
+ * of the newly found object size.
+ */
+ for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) {
+ auto &&[shard, eset] = *iter;
+ bool erase = false;
+
+ if (read_mask.contains(shard)) {
+ eset.intersection_of(read_mask.get(shard));
+ erase = eset.empty();
+ } else {
+ erase = true;
+ }
+
+ /* Some shards may be empty */
+ if (erase) {
+ iter = req.shard_want_to_read.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+
+ dout(20) << "Update want and zeros from read:size=" << size
+ << " res=" << res
+ << " req=" << req
+ << dendl;
+}
+
void ECBackend::RecoveryBackend::handle_recovery_read_complete(
const hobject_t &hoid,
- ECUtil::shard_extent_map_t &&buffers_read,
- std::optional<map<string, bufferlist, less<>>> attrs,
+ read_result_t &&res,
read_request_t &req,
RecoveryMessages *m) {
- dout(10) << __func__ << ": returned " << hoid << " " << buffers_read << dendl;
+ dout(10) << __func__ << ": returned " << hoid << " " << res << dendl;
ceph_assert(recovery_ops.contains(hoid));
RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
- if (attrs) {
- op.xattrs.swap(*attrs);
+ if (res.attrs) {
+ op.xattrs.swap(*(res.attrs));
if (!op.obc) {
// attrs only reference the origin bufferlist (decode from
op.recovery_info.size = op.obc->obs.oi.size;
op.recovery_info.oi = op.obc->obs.oi;
- // We didn't know the size before, meaning the zero for decode calculations
- // will be off. Recalculate them!
- req.object_size = op.obc->obs.oi.size;
- int r = read_pipeline.get_min_avail_to_read_shards(
- op.hoid, true, false, req);
- ceph_assert(r == 0);
+ update_object_size_after_read(op.recovery_info.size, res, req);
}
}
ceph_assert(op.xattrs.size());
ceph_assert(op.obc);
- op.returned_data.emplace(std::move(buffers_read));
-
- ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m());
- sinfo.ro_size_to_read_mask(op.recovery_info.size, read_mask);
- ECUtil::shard_extent_set_t shard_want_to_read(sinfo.get_k_plus_m());
-
- for (auto &[shard, eset] : req.shard_want_to_read) {
- /* Read buffers do not need recovering! */
- if (buffers_read.contains(shard)) {
- continue;
- }
-
- /* Read-buffers will be truncated to the end-of-object. Do not attempt
- * to recover off-the-end.
- */
- shard_want_to_read[shard].intersection_of(read_mask.get(shard),eset);
-
- /* Some shards may be empty */
- if (shard_want_to_read[shard].empty()) {
- shard_want_to_read.erase(shard);
- }
- }
-
+ op.returned_data.emplace(std::move(res.buffers_read));
uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
<< dendl;
op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode);
- int r = op.returned_data->decode(ec_impl, shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
+ int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
ceph_assert(r == 0);
// Finally, we don't want to write any padding, so truncate the buffer
// to remove it.
op.returned_data->erase_after_ro_offset(aligned_size);
- for (auto &&shard: op.missing_on_shards) {
- if (read_mask.contains(shard) && op.returned_data->contains_shard(shard)) {
- ceph_assert(read_mask.at(shard).range_end() >=
- op.returned_data->get_extent_map(shard).get_end_off());
- }
- }
-
dout(20) << __func__ << ": oid=" << op.hoid << dendl;
dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
<< op.returned_data->debug_string(2048, 0)
ceph_assert(req.to_read.size() == 0);
backend.handle_recovery_read_complete(
hoid,
- std::move(res.buffers_read),
- res.attrs,
+ std::move(res),
req,
&rm);
}
buffers_read.insert_in_shard(from.shard, offset, buffer_list);
}
rop.debug_log.emplace_back(ECUtil::READ_DONE, op.from, buffers_read);
+
+ // zero length reads may need to be zero padded during recovery
+ if (!buffers_read.contains_shard(from.shard)) {
+ rop.complete.at(hoid).zero_length_reads.insert(from.shard);
+ }
}
for (auto &&[hoid, req]: rop.to_read) {
if (!rop.complete.contains(hoid)) {
struct read_request_t {
const std::list<ec_align_t> to_read;
const uint32_t flags = 0;
- const ECUtil::shard_extent_set_t shard_want_to_read;
+ ECUtil::shard_extent_set_t shard_want_to_read;
ECUtil::shard_extent_set_t zeros_for_decode;
shard_id_map<shard_read_t> shard_reads;
bool want_attrs = false;
std::optional<std::map<std::string, ceph::buffer::list, std::less<>>> attrs;
ECUtil::shard_extent_map_t buffers_read;
ECUtil::shard_extent_set_t processed_read_requests;
+ shard_id_set zero_length_reads;
read_result_t(const ECUtil::stripe_info_t *sinfo) :
r(0), buffers_read(sinfo),
os << ", noattrs";
}
os << ", buffers_read=" << buffers_read;
- os << ", processed_read_requests=" << processed_read_requests << ")";
+ os << ", processed_read_requests=" << processed_read_requests;
+ os << ", zero_length_reads=" << zero_length_reads << ")";
}
};