]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Send reads to other shards if erasure coded chunk reads fail
authorDavid Zafman <dzafman@redhat.com>
Tue, 11 Aug 2015 19:51:39 +0000 (12:51 -0700)
committerDavid Zafman <dzafman@redhat.com>
Thu, 27 Aug 2015 21:03:23 +0000 (14:03 -0700)
Handle errors in a common way whether redundant reads or not

Signed-off-by: David Zafman <dzafman@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h

index 15c7666d974281d1ddcdb560e8892c89489f6cc0..47594cb9328855aef96c7c439deeadffc5e62b80 100644 (file)
@@ -472,7 +472,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
     priority,
     m.reads,
     OpRequestRef(),
-    false);
+    false, true);
 }
 
 void ECBackend::continue_recovery_op(
@@ -1044,8 +1044,6 @@ dout(0) << __func__ << " to_read skipping" << dendl;
        from,
        i->second));
 dout(0) << __func__ << " shard=" << from << " error=" << i->second << dendl;
-    if (!rop.do_redundant_reads && rop.complete[i->first].r == 0)
-      rop.complete[i->first].r = i->second;
   }
 
   map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
@@ -1056,8 +1054,10 @@ dout(0) << __func__ << " shard=" << from << " error=" << i->second << dendl;
 
   assert(rop.in_progress.count(from));
   rop.in_progress.erase(from);
-  bool is_complete = rop.in_progress.empty();
-  if (rop.do_redundant_reads) {
+  unsigned is_complete = 0;
+  // For redundant reads check for completion as each shard comes in,
+  // or in a non-recovery read check for completion once all the shards read.
+  if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
     for (map<hobject_t, read_result_t>::const_iterator iter =
         rop.complete.begin();
       iter != rop.complete.end();
@@ -1073,9 +1073,20 @@ dout(0) << __func__ << " have shard=" << j->first.shard << dendl;
       set<int> want_to_read, dummy_minimum;
       get_want_to_read_shards(&want_to_read);
       int err;
+      // XXX: Could just do if (have.size < ec_impl->get_data_chunk_count())
       if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
 dout(0) << __func__ << " minimum_to_decode failed" << dendl;
-        if (is_complete) {
+        if (rop.in_progress.empty()) {
+         // If we don't have enough copies and we haven't sent reads for all shards
+         // we can send the rest of the reads, if any.
+         if (!rop.do_redundant_reads) {
+           int r = objects_remaining_read_async(iter->first, rop);
+           if (r == 0) {
+             // We added to in_progress and not incrementing is_complete
+             continue;
+           }
+           // Couldn't read any additional shards so handle as completed with errors
+         }
          if (rop.complete[iter->first].errors.empty()) {
 dout(0) << __func__ << " simply not enough copies err=" << err << dendl;
          } else {
@@ -1084,17 +1095,17 @@ dout(0) << __func__ << " simply not enough copies err=" << err << dendl;
 dout(0) << __func__ << ": Use one of the shard errors err=" << err << dendl;
          }
          rop.complete[iter->first].r = err;
+         ++is_complete;
        }
-        break;
       } else {
-dout(0) << __func__ << " Enough copies have come in ignore errors" << dendl;
-       is_complete = true;
+dout(0) << __func__ << " Enough copies for " << iter->first << " (ignore errors)" << dendl;
+       ++is_complete;
        rop.complete[iter->first].errors.clear();
         assert(rop.complete[iter->first].r == 0);
       }
     }
   }
-  if (is_complete) {
+  if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
 dout(0) << __func__ << " Complete: " << rop << dendl;
     complete_read_op(rop, m);
   } else {
@@ -1470,11 +1481,50 @@ int ECBackend::get_min_avail_to_read_shards(
   return 0;
 }
 
+int ECBackend::get_remaining_shards(
+  const hobject_t &hoid,
+  const set<int> &avail,
+  set<pg_shard_t> *to_read)
+{
+  map<hobject_t, set<pg_shard_t> >::const_iterator miter =
+    get_parent()->get_missing_loc_shards().find(hoid);
+
+  set<int> need;
+  map<shard_id_t, pg_shard_t> shards;
+
+  for (set<pg_shard_t>::const_iterator i =
+        get_parent()->get_acting_shards().begin();
+       i != get_parent()->get_acting_shards().end();
+       ++i) {
+    dout(10) << __func__ << ": checking acting " << *i << dendl;
+    const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+    if (!missing.is_missing(hoid)) {
+      assert(!need.count(i->shard));
+      need.insert(i->shard);
+      assert(!shards.count(i->shard));
+      shards.insert(make_pair(i->shard, *i));
+    }
+  }
+
+  if (!to_read)
+    return 0;
+
+  for (set<int>::iterator i = need.begin();
+       i != need.end();
+       ++i) {
+    assert(shards.count(shard_id_t(*i)));
+    if (avail.find(*i) == avail.end())
+      to_read->insert(shards[shard_id_t(*i)]);
+  }
+  return 0;
+}
+
 void ECBackend::start_read_op(
   int priority,
   map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read,
   OpRequestRef _op,
-  bool do_redundant_reads)
+  bool do_redundant_reads,
+  bool for_recovery)
 {
   ceph_tid_t tid = get_parent()->get_tid();
   assert(!tid_to_read_map.count(tid));
@@ -1484,6 +1534,7 @@ void ECBackend::start_read_op(
   op.to_read.swap(to_read);
   op.op = _op;
   op.do_redundant_reads = do_redundant_reads;
+  op.for_recovery = for_recovery;
   dout(10) << __func__ << ": starting " << op << dendl;
 
   map<pg_shard_t, ECSubRead> messages;
@@ -1549,6 +1600,71 @@ void ECBackend::start_read_op(
   dout(10) << __func__ << ": started " << op << dendl;
 }
 
+void ECBackend::start_remaining_read_op(
+  ReadOp &op,
+  map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read)
+{
+  int priority = op.priority;
+  ceph_tid_t tid = op.tid;
+  op.to_read.swap(to_read);
+
+  dout(10) << __func__ << ": starting additional " << op << dendl;
+
+  map<pg_shard_t, ECSubRead> messages;
+  for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
+       i != op.to_read.end();
+       ++i) {
+    bool need_attrs = i->second.want_attrs;
+    for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
+        j != i->second.need.end();
+        ++j) {
+      if (need_attrs) {
+       messages[*j].attrs_to_read.insert(i->first);
+       need_attrs = false;
+      }
+      op.obj_to_source[i->first].insert(*j);
+      op.source_to_obj[*j].insert(i->first);
+    }
+    for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
+          i->second.to_read.begin();
+        j != i->second.to_read.end();
+        ++j) {
+      pair<uint64_t, uint64_t> chunk_off_len =
+       sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
+      for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
+          k != i->second.need.end();
+          ++k) {
+       messages[*k].to_read[i->first].push_back(boost::make_tuple(chunk_off_len.first,
+                                                                   chunk_off_len.second,
+                                                                   j->get<2>()));
+      }
+      assert(!need_attrs);
+    }
+  }
+
+  for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
+       i != messages.end();
+       ++i) {
+    op.in_progress.insert(i->first);
+    shard_to_read_map[i->first].insert(op.tid);
+    i->second.tid = tid;
+    MOSDECSubOpRead *msg = new MOSDECSubOpRead;
+    msg->set_priority(priority);
+    msg->pgid = spg_t(
+      get_parent()->whoami_spg_t().pgid,
+      i->first.shard);
+    msg->map_epoch = get_parent()->get_epoch();
+    msg->op = i->second;
+    msg->op.from = get_parent()->whoami_shard();
+    msg->op.tid = tid;
+    get_parent()->send_message_osd_cluster(
+      i->first.osd,
+      msg,
+      get_parent()->get_epoch());
+  }
+  dout(10) << __func__ << ": started additional " << op << dendl;
+}
+
 ECUtil::HashInfoRef ECBackend::get_hash_info(
   const hobject_t &hoid, bool checks)
 {
@@ -1820,11 +1936,47 @@ void ECBackend::objects_read_async(
     cct->_conf->osd_client_op_priority,
     for_read_op,
     OpRequestRef(),
-    fast_read);
+    fast_read, false);
   return;
 }
 
 
+int ECBackend::objects_remaining_read_async(
+  const hobject_t &hoid,
+  ReadOp &rop)
+{
+  set<int> already_read;
+  set<pg_shard_t> ots = rop.obj_to_source[hoid];
+  for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
+    already_read.insert(i->shard);
+  dout(10) << __func__ << " have/error shards=" << already_read << dendl;
+  set<pg_shard_t> shards;
+  int r = get_remaining_shards(hoid, already_read, &shards);
+  if (r)
+    return r;
+  if (shards.empty())
+    return -EIO;
+
+  dout(10) << __func__ << " Read remaining shards " << shards << dendl;
+
+  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets = rop.to_read.find(hoid)->second.to_read;
+  GenContext<pair<RecoveryMessages *, read_result_t& > &> *c = rop.to_read.find(hoid)->second.cb;
+
+  map<hobject_t, read_request_t, hobject_t::BitwiseComparator> for_read_op;
+  for_read_op.insert(
+    make_pair(
+      hoid,
+      read_request_t(
+       hoid,
+       offsets,
+       shards,
+       false,
+       c)));
+
+  start_remaining_read_op(rop, for_read_op);
+  return 0;
+}
+
 int ECBackend::objects_get_attrs(
   const hobject_t &hoid,
   map<string, bufferlist> *out)
index f416e30f3f8d2885968d4f24bcdd6316243b0e98..a039b70c8a8dc0953ec98ed2b09fc0a9aae1f226 100644 (file)
@@ -297,6 +297,9 @@ public:
     // this is useful to tradeoff some resources (redundant ops) for
     // low latency read, especially on relatively idle cluster
     bool do_redundant_reads;
+    // True if reading for recovery which could possibly reading only a subset
+    // of the available shards.
+    bool for_recovery;
 
     map<hobject_t, read_request_t, hobject_t::BitwiseComparator> to_read;
     map<hobject_t, read_result_t, hobject_t::BitwiseComparator> complete;
@@ -320,7 +323,13 @@ public:
     int priority,
     map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read,
     OpRequestRef op,
-    bool do_redundant_reads);
+    bool do_redundant_reads, bool for_recovery);
+
+  void start_remaining_read_op(ReadOp &rop,
+    map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read);
+  int objects_remaining_read_async(
+    const hobject_t &hoid,
+    ReadOp &rop);
 
 
   /**
@@ -470,6 +479,11 @@ public:
     set<pg_shard_t> *to_read   ///< [out] shards to read
     ); ///< @return error code, 0 on success
 
+  int get_remaining_shards(
+    const hobject_t &hoid,
+    const set<int> &avail,
+    set<pg_shard_t> *to_read);
+
   int objects_get_attrs(
     const hobject_t &hoid,
     map<string, bufferlist> *out);