: pg(pg), hoid(hoid) {}
void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
ECBackend::read_result_t &res = in.second;
+ // FIXME???
assert(res.r == 0);
assert(res.errors.empty());
assert(res.returned.size() == 1);
ECSubRead &op,
ECSubReadReply *reply)
{
+ shard_id_t shard = get_parent()->whoami_shard().shard;
for(map<hobject_t, list<boost::tuple<uint64_t, uint64_t, uint32_t> >, hobject_t::BitwiseComparator>::iterator i =
op.to_read.begin();
i != op.to_read.end();
++i) {
- for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
+ bufferhash h(-1);
+ uint64_t total_read = 0;
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j;
+ for (j = i->second.begin(); j != i->second.end(); ++j) {
bufferlist bl;
int r = store->read(
coll,
- ghobject_t(
- i->first, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ ghobject_t(i->first, ghobject_t::NO_GEN, shard),
j->get<0>(),
j->get<1>(),
bl, j->get<2>(),
- false);
+ true); // Allow EIO return
if (r < 0) {
- assert(0);
reply->buffers_read.erase(i->first);
reply->errors[i->first] = r;
break;
} else {
+ dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
+ total_read += r;
+ h << bl;
reply->buffers_read[i->first].push_back(
make_pair(
j->get<0>(),
);
}
}
+ // If all reads happened then lets check digest
+ if (j == i->second.end()) {
+ dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
+ ECUtil::HashInfoRef hinfo = get_hash_info(i->first);
+ // This shows that we still need deep scrub because large enough files
+ // are read in sections, so the digest check here won't be done here.
+ if (!hinfo || (total_read == hinfo->get_total_chunk_size() &&
+ h.digest() != hinfo->get_chunk_hash(shard))) {
+ if (!hinfo) {
+ get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n";
+ dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
+ } else {
+ get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
+ << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n";
+ dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
+ << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
+ }
+ // Do NOT check osd_read_eio_on_bad_digest here. We need to report
+ // the state of our chunk in case other chunks could substitute.
+ reply->buffers_read.erase(i->first);
+ reply->errors[i->first] = -EIO;
+ }
+ }
}
for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = op.attrs_to_read.begin();
i != op.attrs_to_read.end();
*i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
reply->attrs_read[*i]);
if (r < 0) {
- assert(0);
reply->buffers_read.erase(*i);
reply->errors[*i] = r;
}
op.buffers_read.begin();
i != op.buffers_read.end();
++i) {
- assert(!op.errors.count(i->first));
+ assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
if (!rop.to_read.count(i->first)) {
// We canceled this read! @see filter_read_op
continue;
for (map<hobject_t, map<string, bufferlist>, hobject_t::BitwiseComparator>::iterator i = op.attrs_read.begin();
i != op.attrs_read.end();
++i) {
- assert(!op.errors.count(i->first));
+ assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
if (!rop.to_read.count(i->first)) {
// We canceled this read! @see filter_read_op
continue;
}
map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
-shard_to_read_map.find(from);
+ shard_to_read_map.find(from);
assert(siter != shard_to_read_map.end());
assert(siter->second.count(op.tid));
siter->second.erase(op.tid);
: ec(ec), status(status), to_read(to_read) {}
void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
ECBackend::read_result_t &res = in.second;
+ if (res.r != 0)
+ goto out;
assert(res.returned.size() == to_read.size());
assert(res.r == 0);
assert(res.errors.empty());
}
res.returned.pop_front();
}
+out:
status->complete = true;
list<ECBackend::ClientAsyncReadStatus> &ip =
ec->in_progress_client_reads;
while (ip.size() && ip.front().complete) {
if (ip.front().on_complete) {
- ip.front().on_complete->complete(0);
+ ip.front().on_complete->complete(res.r);
ip.front().on_complete = NULL;
}
ip.pop_front();
return 0;
}
-struct FillInExtent : public Context {
+struct FillInVerifyExtent : public Context {
ceph_le64 *r;
- FillInExtent(ceph_le64 *r) : r(r) {}
- void finish(int _r) {
- if (_r >= 0) {
- *r = _r;
+ int32_t *rval;
+ bufferlist *outdatap;
+ boost::optional<uint32_t> maybe_crc;
+ uint64_t size;
+ OSDService *osd;
+ hobject_t soid;
+ __le32 flags;
+ FillInVerifyExtent(ceph_le64 *r, int32_t *rv, bufferlist *blp,
+ boost::optional<uint32_t> mc, uint64_t size,
+ OSDService *osd, hobject_t soid, __le32 flags) :
+ r(r), rval(rv), outdatap(blp), maybe_crc(mc),
+ size(size), osd(osd), soid(soid), flags(flags) {}
+ void finish(int len) {
+ *rval = len;
+ *r = len;
+ // whole object? can we verify the checksum?
+ if (maybe_crc && *r == size) {
+ uint32_t crc = outdatap->crc32c(-1);
+ if (maybe_crc != crc) {
+ osd->clog->error() << std::hex << " full-object read crc 0x" << crc
+ << " != expected 0x" << *maybe_crc
+ << std::dec << " on " << soid << "\n";
+ if (!(flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ *rval = -EIO;
+ *r = 0;
+ }
+ }
}
}
};
// read into a buffer
bufferlist bl;
+ bool async = false;
if (trimmed_read && op.extent.length == 0) {
// read size was trimmed to zero and it is expected to do nothing
// a read operation of 0 bytes does *not* do nothing, this is why
// the trimmed_read boolean is needed
} else if (pool.info.require_rollback()) {
+ async = true;
+ boost::optional<uint32_t> maybe_crc;
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest. FillInVerifyExtent will
+ // will check the oi.size again.
+ if (oi.is_data_digest() && op.extent.offset == 0 &&
+ op.extent.length >= oi.size)
+ maybe_crc = oi.data_digest;
ctx->pending_async_reads.push_back(
make_pair(
boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
- make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
+ make_pair(&osd_op.outdata,
+ new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
+ &osd_op.outdata, maybe_crc, oi.size, osd,
+ soid, op.flags))));
dout(10) << " async_read noted for " << soid << dendl;
} else {
int r = pgbackend->objects_read_sync(
first_read = false;
ctx->data_off = op.extent.offset;
}
+ // XXX the op.extent.length is the requested length for async read
+ // On error this length is changed to 0 after the error comes back.
ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
ctx->delta_stats.num_rd++;
+ // Skip checking the result and just proceed to the next operation
+ if (async)
+ continue;
+
}
break;
assert(ctx->async_reads_complete());
for (vector<OSDOp>::iterator p = ctx->ops.begin(); p != ctx->ops.end(); ++p) {
+ if (p->rval < 0 && !(p->op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = p->rval;
+ break;
+ }
ctx->bytes_read += p->outdata.length();
}
ctx->reply->claim_op_out_data(ctx->ops);