]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Check CRC when able to on async read
authorDavid Zafman <dzafman@redhat.com>
Wed, 1 Jul 2015 05:06:22 +0000 (22:06 -0700)
committerDavid Zafman <dzafman@redhat.com>
Thu, 27 Aug 2015 21:03:22 +0000 (14:03 -0700)
Fixes: #12000
Signed-off-by: David Zafman <dzafman@redhat.com>
src/osd/ECBackend.cc
src/osd/ReplicatedPG.cc

index 388a41b5494e6a14c09e81b992897226eb524326..1629cb8c5d33205edf49560de5580b3cb6dfe77d 100644 (file)
@@ -196,6 +196,7 @@ struct OnRecoveryReadComplete :
     : pg(pg), hoid(hoid) {}
   void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
     ECBackend::read_result_t &res = in.second;
+    // FIXME???
     assert(res.r == 0);
     assert(res.errors.empty());
     assert(res.returned.size() == 1);
@@ -885,28 +886,31 @@ void ECBackend::handle_sub_read(
   ECSubRead &op,
   ECSubReadReply *reply)
 {
+  shard_id_t shard = get_parent()->whoami_shard().shard;
   for(map<hobject_t, list<boost::tuple<uint64_t, uint64_t, uint32_t> >, hobject_t::BitwiseComparator>::iterator i =
         op.to_read.begin();
       i != op.to_read.end();
       ++i) {
-    for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j = i->second.begin();
-        j != i->second.end();
-        ++j) {
+    bufferhash h(-1);
+    uint64_t total_read = 0;
+    list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j;
+    for (j = i->second.begin(); j != i->second.end(); ++j) {
       bufferlist bl;
       int r = store->read(
        coll,
-       ghobject_t(
-         i->first, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+       ghobject_t(i->first, ghobject_t::NO_GEN, shard),
        j->get<0>(),
        j->get<1>(),
        bl, j->get<2>(),
-       false);
+       true); // Allow EIO return
       if (r < 0) {
-       assert(0);
        reply->buffers_read.erase(i->first);
        reply->errors[i->first] = r;
        break;
       } else {
+        dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
+       total_read += r;
+        h << bl;
        reply->buffers_read[i->first].push_back(
          make_pair(
            j->get<0>(),
@@ -914,6 +918,29 @@ void ECBackend::handle_sub_read(
          );
       }
     }
+    // If all reads happened then lets check digest
+    if (j == i->second.end()) {
+      dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
+      ECUtil::HashInfoRef hinfo = get_hash_info(i->first);
+      // This shows that we still need deep scrub because large enough files
+      // are read in sections, so the digest check here won't be done here.
+      if (!hinfo || (total_read == hinfo->get_total_chunk_size() &&
+         h.digest() != hinfo->get_chunk_hash(shard))) {
+        if (!hinfo) {
+         get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n";
+         dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
+       } else {
+         get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
+                 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n";
+         dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
+                 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
+       }
+       // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
+       // the state of our chunk in case other chunks could substitute.
+       reply->buffers_read.erase(i->first);
+       reply->errors[i->first] = -EIO;
+      }
+    }
   }
   for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = op.attrs_to_read.begin();
        i != op.attrs_to_read.end();
@@ -928,7 +955,6 @@ void ECBackend::handle_sub_read(
        *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
       reply->attrs_read[*i]);
     if (r < 0) {
-      assert(0);
       reply->buffers_read.erase(*i);
       reply->errors[*i] = r;
     }
@@ -973,7 +999,7 @@ void ECBackend::handle_sub_read_reply(
         op.buffers_read.begin();
        i != op.buffers_read.end();
        ++i) {
-    assert(!op.errors.count(i->first));
+    assert(!op.errors.count(i->first));        // If attribute error we better not have sent a buffer
     if (!rop.to_read.count(i->first)) {
       // We canceled this read! @see filter_read_op
       continue;
@@ -999,7 +1025,7 @@ void ECBackend::handle_sub_read_reply(
   for (map<hobject_t, map<string, bufferlist>, hobject_t::BitwiseComparator>::iterator i = op.attrs_read.begin();
        i != op.attrs_read.end();
        ++i) {
-    assert(!op.errors.count(i->first));
+    assert(!op.errors.count(i->first));        // if read error better not have sent an attribute
     if (!rop.to_read.count(i->first)) {
       // We canceled this read! @see filter_read_op
       continue;
@@ -1019,7 +1045,7 @@ void ECBackend::handle_sub_read_reply(
   }
 
   map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
-shard_to_read_map.find(from);
+                                       shard_to_read_map.find(from);
   assert(siter != shard_to_read_map.end());
   assert(siter->second.count(op.tid));
   siter->second.erase(op.tid);
@@ -1664,6 +1690,8 @@ struct CallClientContexts :
     : ec(ec), status(status), to_read(to_read) {}
   void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
     ECBackend::read_result_t &res = in.second;
+    if (res.r != 0)
+      goto out;
     assert(res.returned.size() == to_read.size());
     assert(res.r == 0);
     assert(res.errors.empty());
@@ -1703,12 +1731,13 @@ struct CallClientContexts :
       }
       res.returned.pop_front();
     }
+out:
     status->complete = true;
     list<ECBackend::ClientAsyncReadStatus> &ip =
       ec->in_progress_client_reads;
     while (ip.size() && ip.front().complete) {
       if (ip.front().on_complete) {
-       ip.front().on_complete->complete(0);
+       ip.front().on_complete->complete(res.r);
        ip.front().on_complete = NULL;
       }
       ip.pop_front();
index 16e24c44642a275bbb5f437f2f1424ee18b1f598..4c51f246dea063f167104e80cb96ff3e303cc3b8 100644 (file)
@@ -3650,12 +3650,35 @@ static int check_offset_and_length(uint64_t offset, uint64_t length, uint64_t ma
   return 0;
 }
 
-struct FillInExtent : public Context {
+struct FillInVerifyExtent : public Context {
   ceph_le64 *r;
-  FillInExtent(ceph_le64 *r) : r(r) {}
-  void finish(int _r) {
-    if (_r >= 0) {
-      *r = _r;
+  int32_t *rval;
+  bufferlist *outdatap;
+  boost::optional<uint32_t> maybe_crc;
+  uint64_t size;
+  OSDService *osd;
+  hobject_t soid;
+  __le32 flags;
+  FillInVerifyExtent(ceph_le64 *r, int32_t *rv, bufferlist *blp,
+                    boost::optional<uint32_t> mc, uint64_t size,
+                    OSDService *osd, hobject_t soid, __le32 flags) :
+    r(r), rval(rv), outdatap(blp), maybe_crc(mc),
+    size(size), osd(osd), soid(soid), flags(flags) {}
+  void finish(int len) {
+    *rval = len;
+    *r = len;
+    // whole object?  can we verify the checksum?
+    if (maybe_crc && *r == size) {
+      uint32_t crc = outdatap->crc32c(-1);
+      if (maybe_crc != crc) {
+        osd->clog->error() << std::hex << " full-object read crc 0x" << crc
+                          << " != expected 0x" << *maybe_crc
+                          << std::dec << " on " << soid << "\n";
+        if (!(flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+         *rval = -EIO;
+         *r = 0;
+       }
+      }
     }
   }
 };
@@ -3811,15 +3834,27 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
        // read into a buffer
        bufferlist bl;
+       bool async = false;
        if (trimmed_read && op.extent.length == 0) {
          // read size was trimmed to zero and it is expected to do nothing
          // a read operation of 0 bytes does *not* do nothing, this is why
          // the trimmed_read boolean is needed
        } else if (pool.info.require_rollback()) {
+         async = true;
+         boost::optional<uint32_t> maybe_crc;
+         // If there is a data digest and it is possible we are reading
+         // entire object, pass the digest.  FillInVerifyExtent will
+         // will check the oi.size again.
+         if (oi.is_data_digest() && op.extent.offset == 0 &&
+             op.extent.length >= oi.size)
+           maybe_crc = oi.data_digest;
          ctx->pending_async_reads.push_back(
            make_pair(
              boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
-             make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
+             make_pair(&osd_op.outdata,
+                       new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
+                               &osd_op.outdata, maybe_crc, oi.size, osd,
+                               soid, op.flags))));
          dout(10) << " async_read noted for " << soid << dendl;
        } else {
          int r = pgbackend->objects_read_sync(
@@ -3852,9 +3887,15 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          first_read = false;
          ctx->data_off = op.extent.offset;
        }
+       // XXX the op.extent.length is the requested length for async read
+       // On error this length is changed to 0 after the error comes back.
        ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
        ctx->delta_stats.num_rd++;
 
+       // Skip checking the result and just proceed to the next operation
+       if (async)
+         continue;
+
       }
       break;
 
@@ -6272,6 +6313,10 @@ void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
   assert(ctx->async_reads_complete());
 
   for (vector<OSDOp>::iterator p = ctx->ops.begin(); p != ctx->ops.end(); ++p) {
+    if (p->rval < 0 && !(p->op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+      result = p->rval;
+      break;
+    }
     ctx->bytes_read += p->outdata.length();
   }
   ctx->reply->claim_op_out_data(ctx->ops);