]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
PG,ReplicatedPG: Generalize missing_loc for ECBackend
authorSamuel Just <sam.just@inktank.com>
Wed, 29 Jan 2014 21:38:04 +0000 (13:38 -0800)
committerSamuel Just <sam.just@inktank.com>
Tue, 18 Feb 2014 04:12:14 +0000 (20:12 -0800)
Prior to EC pools, unfound => missing.  Now, unfound (unreadable,
really) is dependent on the PGBackend requirements for reconstituting
an object.  This also means recovering an object missing on a replica
but not the primary requires tracking the missing_loc set.

Thus, rather than maintaining missing_loc only for objects missing
on the primary, the MissingLoc structure will track all missing
objects actingbackfill-wide until each object is recovered.

For simplicity, since we don't really know what objects need recovery
until activation (and since we can't do anything with that information
prior to activation anyway), we defer populating the missing_loc
information until activation.

We need peers to rollback divergent log entries before we attempt to
read the relevant objects.  The simplest way to accomplish this seems to
be the simply choose to always activate peers if search_for_missing
turns up missing objects.

Due to EC pools, missing is necessary, but not sufficient for readability.
Thus, we instead check is_unreadable for cases where we need to read the object
and reserve is_missing for cases where we need the object context.

wait_for_missing_object becomes waiting_for_unreadable_object in order to avoid
having another layer of waiting_for_* maps.  These ops may be requeued
either when the primary is recovered or when the object is no longer degraded,
depending on when the object becomes readable.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 7cd40ac775f6ab9103f205fdefa1f3993e7bf8f6..9bd7206ed6d71fdc1c86ab078753c31defb7f10e 100644 (file)
@@ -4306,11 +4306,11 @@ void OSD::do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist
        pg->pg_log.get_missing().missing.begin();
       for (; mi != mend; ++mi) {
        fout << mi->first << " -> " << mi->second << std::endl;
-       map<hobject_t, set<pg_shard_t> >::const_iterator mli =
-         pg->missing_loc.find(mi->first);
-       if (mli == pg->missing_loc.end())
+       if (!pg->missing_loc.needs_recovery(mi->first))
          continue;
-       const set<pg_shard_t> &mls(mli->second);
+       if (pg->missing_loc.is_unfound(mi->first))
+         fout << " unfound ";
+       const set<pg_shard_t> &mls(pg->missing_loc.get_locations(mi->first));
        if (mls.empty())
          continue;
        fout << "missing_loc: " << mls << std::endl;
index 398c1f5574d1684e2b18bc2cac165fb475b40100..ded126a31a1084ea5cf7a13b1670fef686be5e7f 100644 (file)
 #define dout_subsys ceph_subsys_osd
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
-static ostream& _prefix(std::ostream *_dout, const PG *pg) 
+template <class T>
+static ostream& _prefix(std::ostream *_dout, T *t)
 {
-  return *_dout << pg->gen_prefix();
+  return *_dout << t->gen_prefix();
 }
 
 void PG::get(const string &tag) 
@@ -161,6 +162,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   info(p),
   info_struct_v(0),
   coll(p), pg_log(cct), log_oid(loid), biginfo_oid(ioid),
+  missing_loc(this),
   recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
   recovery_ops_active(0),
   role(0),
@@ -245,7 +247,6 @@ void PG::proc_master_log(
   dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl;
   might_have_unfound.insert(from);
 
-  search_for_missing(oinfo, &omissing, from);
   peer_missing[from].swap(omissing);
 }
     
@@ -263,7 +264,6 @@ void PG::proc_replica_log(
   dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl;
   might_have_unfound.insert(from);
 
-  search_for_missing(oinfo, &omissing, from);
   for (map<hobject_t, pg_missing_t::item>::iterator i = omissing.missing.begin();
        i != omissing.missing.end();
        ++i) {
@@ -292,7 +292,8 @@ bool PG::proc_replica_info(pg_shard_t from, const pg_info_t &oinfo)
   reg_next_scrub();
   
   // stray?
-  if (!is_acting(from)) {
+  if ((!is_active() && !is_acting(from)) ||
+      (is_active() && !is_actingbackfill(from))) {
     dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
     stray_set.insert(from);
     if (is_clean()) {
@@ -374,19 +375,57 @@ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead)
  * Instead, we probably want to just iterate over our unfound set.
  */
 bool PG::search_for_missing(
-  const pg_info_t &oinfo, const pg_missing_t *omissing,
-  pg_shard_t fromosd)
-{
-  bool stats_updated = false;
-  bool found_missing = false;
+  const pg_info_t &oinfo, const pg_missing_t &omissing,
+  pg_shard_t from,
+  RecoveryCtx *ctx)
+{
+  unsigned num_unfound_before = missing_loc.num_unfound();
+  bool found_missing = missing_loc.add_source_info(
+    from, oinfo, omissing);
+  if (found_missing && num_unfound_before != missing_loc.num_unfound())
+    publish_stats_to_osd();
+  if (found_missing &&
+    (get_osdmap()->get_features(NULL) & CEPH_FEATURE_OSD_ERASURE_CODES)) {
+    pg_info_t tinfo(oinfo);
+    tinfo.pgid.shard = pg_whoami.shard;
+    (*(ctx->info_map))[from.osd].push_back(
+      make_pair(
+       pg_notify_t(
+         from.shard, pg_whoami.shard,
+         get_osdmap()->get_epoch(),
+         get_osdmap()->get_epoch(),
+         tinfo),
+       past_intervals));
+  }
+  return found_missing;
+}
 
-  // take note that we've probed this peer, for
-  // all_unfound_are_queried_or_lost()'s benefit.
-  peer_missing[fromosd];
+bool PG::MissingLoc::readable_with_acting(
+  const hobject_t &hoid,
+  const set<pg_shard_t> &acting) const {
+  if (!needs_recovery(hoid)) return true;
+  if (!missing_loc.count(hoid)) return false;
+  const set<pg_shard_t> &locs = missing_loc.find(hoid)->second;
+  dout(10) << __func__ << ": locs:" << locs << dendl;
+  set<pg_shard_t> have_acting;
+  for (set<pg_shard_t>::const_iterator i = locs.begin();
+       i != locs.end();
+       ++i) {
+    if (acting.count(*i))
+      have_acting.insert(*i);
+  }
+  return (*is_readable)(have_acting);
+}
 
+bool PG::MissingLoc::add_source_info(
+  pg_shard_t fromosd,
+  const pg_info_t &oinfo,
+  const pg_missing_t &omissing)
+{
+  bool found_missing = false;;
   // found items?
-  for (map<hobject_t,pg_missing_t::item>::const_iterator p = pg_log.get_missing().missing.begin();
-       p != pg_log.get_missing().missing.end();
+  for (map<hobject_t,pg_missing_t::item>::const_iterator p = needs_recovery_map.begin();
+       p != needs_recovery_map.end();
        ++p) {
     const hobject_t &soid(p->first);
     eversion_t need = p->second.need;
@@ -407,53 +446,29 @@ bool PG::search_for_missing(
       continue;
     }
     if (oinfo.last_complete < need) {
-      if (!omissing) {
-       // We know that the peer lacks some objects at the revision we need.
-       // Without the peer's missing set, we don't know whether it has this
-       // particular object or not.
-       dout(10) << __func__ << " " << soid << " " << need
-                << " might also be missing on osd." << fromosd << dendl;
-       continue;
-      }
-
-      if (omissing->is_missing(soid)) {
+      if (omissing.is_missing(soid)) {
        dout(10) << "search_for_missing " << soid << " " << need
                 << " also missing on osd." << fromosd << dendl;
        continue;
       }
     }
+
     dout(10) << "search_for_missing " << soid << " " << need
             << " is on osd." << fromosd << dendl;
 
-    map<hobject_t, set<pg_shard_t> >::iterator ml = missing_loc.find(soid);
-    if (ml == missing_loc.end()) {
-      map<hobject_t, list<OpRequestRef> >::iterator wmo =
-       waiting_for_missing_object.find(soid);
-      if (wmo != waiting_for_missing_object.end()) {
-       requeue_ops(wmo->second);
-      }
-      stats_updated = true;
-      missing_loc[soid].insert(fromosd);
-      missing_loc_sources.insert(fromosd);
-    }
-    else {
-      ml->second.insert(fromosd);
-      missing_loc_sources.insert(fromosd);
-    }
+    missing_loc[soid].insert(fromosd);
+    missing_loc_sources.insert(fromosd);
     found_missing = true;
   }
-  if (stats_updated) {
-    publish_stats_to_osd();
-  }
 
-  dout(20) << "search_for_missing missing " << pg_log.get_missing().missing << dendl;
+  dout(20) << "needs_recovery_map missing " << needs_recovery_map << dendl;
   return found_missing;
 }
 
 void PG::discover_all_missing(map<int, map<spg_t,pg_query_t> > &query_map)
 {
   const pg_missing_t &missing = pg_log.get_missing();
-  assert(missing.have_missing());
+  assert(have_unfound());
 
   dout(10) << __func__ << " "
           << missing.num_missing() << " missing, "
@@ -801,7 +816,6 @@ void PG::clear_primary_state()
   finish_sync_event = 0;  // so that _finish_recvoery doesn't go off in another thread
 
   missing_loc.clear();
-  missing_loc_sources.clear();
 
   pg_log.reset_recovery_pointers();
 
@@ -1372,7 +1386,8 @@ void PG::activate(ObjectStore::Transaction& t,
                  map<int,
                      vector<
                        pair<pg_notify_t,
-                            pg_interval_map_t> > > *activator_map)
+                            pg_interval_map_t> > > *activator_map,
+                  RecoveryCtx *ctx)
 {
   assert(!is_active());
   assert(scrubber.callbacks.empty());
@@ -1401,20 +1416,11 @@ void PG::activate(ObjectStore::Transaction& t,
 
   send_notify = false;
 
-  info.last_epoch_started = query_epoch;
+  if (is_acting(pg_whoami))
+    info.last_epoch_started = query_epoch;
 
   const pg_missing_t &missing = pg_log.get_missing();
 
-  if (is_primary()) {
-    // If necessary, create might_have_unfound to help us find our unfound objects.
-    // NOTE: It's important that we build might_have_unfound before trimming the
-    // past intervals.
-    might_have_unfound.clear();
-    if (missing.have_missing()) {
-      build_might_have_unfound();
-    }
-  }
   if (is_primary()) {
     last_update_ondisk = info.last_update;
     min_last_complete_ondisk = eversion_t(0,0);  // we don't know (yet)!
@@ -1465,6 +1471,7 @@ void PG::activate(ObjectStore::Transaction& t,
 
   // if primary..
   if (is_primary()) {
+    assert(ctx);
     // start up replicas
 
     assert(actingbackfill.size() > 0);
@@ -1566,6 +1573,51 @@ void PG::activate(ObjectStore::Transaction& t,
       }
     }
 
+    // Set up missing_loc
+    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
+        i != actingbackfill.end();
+        ++i) {
+      if (*i == get_primary()) {
+       missing_loc.add_active_missing(pg_log.get_missing());
+      } else {
+       assert(peer_missing.count(*i));
+       missing_loc.add_active_missing(peer_missing[*i]);
+      }
+    }
+    // If necessary, create might_have_unfound to help us find our unfound objects.
+    // NOTE: It's important that we build might_have_unfound before trimming the
+    // past intervals.
+    might_have_unfound.clear();
+    if (needs_recovery()) {
+      missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing());
+      for (set<pg_shard_t>::iterator i = actingbackfill.begin();
+          i != actingbackfill.end();
+          ++i) {
+       if (*i == pg_whoami) continue;
+       dout(10) << __func__ << ": adding " << *i << " as a source" << dendl;
+       assert(peer_missing.count(*i));
+       assert(peer_info.count(*i));
+       missing_loc.add_source_info(
+         *i,
+         peer_info[*i],
+         peer_missing[*i]);
+      }
+      for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
+          i != peer_missing.end();
+          ++i) {
+       if (is_actingbackfill(i->first))
+         continue;
+       assert(peer_info.count(i->first));
+       search_for_missing(
+         peer_info[i->first],
+         i->second,
+         i->first,
+         ctx);
+      }
+
+      build_might_have_unfound();
+    }
+
     // degraded?
     if (get_osdmap()->get_pg_size(info.pgid.pgid) > acting.size())
       state_set(PG_STATE_DEGRADED);
@@ -1938,7 +1990,7 @@ void PG::split_ops(PG *child, unsigned split_bits) {
   unsigned match = child->info.pgid.ps();
   assert(waiting_for_all_missing.empty());
   assert(waiting_for_cache_not_full.empty());
-  assert(waiting_for_missing_object.empty());
+  assert(waiting_for_unreadable_object.empty());
   assert(waiting_for_degraded_object.empty());
   assert(waiting_for_ack.empty());
   assert(waiting_for_ondisk.empty());
@@ -3392,11 +3444,10 @@ void PG::repair_object(
     peer_missing[bad_peer].add(soid, oi.version, eversion_t());
   } else {
     // We should only be scrubbing if the PG is clean.
-    assert(waiting_for_missing_object.empty());
+    assert(waiting_for_unreadable_object.empty());
 
     pg_log.missing_add(soid, oi.version, eversion_t());
-    missing_loc[soid].insert(ok_peer);
-    missing_loc_sources.insert(ok_peer);
+    missing_loc.add_location(soid, ok_peer);
 
     pg_log.set_last_requested(0);
   }
@@ -6037,7 +6088,8 @@ PG::RecoveryState::Active::Active(my_context ctx)
               pg->get_osdmap()->get_epoch(),
               *context< RecoveryMachine >().get_on_safe_context_list(),
               *context< RecoveryMachine >().get_query_map(),
-              context< RecoveryMachine >().get_info_map());
+              context< RecoveryMachine >().get_info_map(),
+              context< RecoveryMachine >().get_recovery_ctx());
   assert(pg->is_active());
   dout(10) << "Activate Finished" << dendl;
 }
@@ -6099,7 +6151,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
   if (pg->cct->_conf->osd_check_for_log_corruption)
     pg->check_log_for_corruption(pg->osd->store);
 
-  int unfound = pg->pg_log.get_missing().num_missing() - pg->missing_loc.size();
+  int unfound = pg->missing_loc.num_unfound();
   if (unfound > 0 &&
       pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) {
     if (pg->cct->_conf->osd_auto_mark_unfound_lost) {
@@ -6179,8 +6231,14 @@ boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt
   dout(10) << "searching osd." << logevt.from
            << " log for unfound items" << dendl;
   PG *pg = context< RecoveryMachine >().pg;
-  bool got_missing = pg->search_for_missing(logevt.msg->info,
-                                            &logevt.msg->missing, logevt.from);
+  pg->proc_replica_log(
+    *context<RecoveryMachine>().get_cur_transaction(),
+    logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
+  bool got_missing = pg->search_for_missing(
+    pg->peer_info[logevt.from],
+    pg->peer_missing[logevt.from],
+    logevt.from,
+    context< RecoveryMachine >().get_recovery_ctx());
   if (got_missing)
     pg->osd->queue_for_recovery(pg);
   return discard_event();
@@ -6292,7 +6350,7 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(
   pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
               actevt.query_epoch,
               *context< RecoveryMachine >().get_on_safe_context_list(),
-              query_map, NULL);
+              query_map, NULL, NULL);
   dout(10) << "Activate Finished" << dendl;
   return discard_event();
 }
@@ -6927,7 +6985,6 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
       //        can infer the rest!
       dout(10) << " osd." << *i << " has no missing, identical log" << dendl;
       pg->peer_missing[*i];
-      pg->search_for_missing(pi, &pg->peer_missing[*i], *i);
       continue;
     }
 
@@ -7072,17 +7129,10 @@ boost::statechart::result PG::RecoveryState::WaitUpThru::react(const ActMap& am)
 
 boost::statechart::result PG::RecoveryState::WaitUpThru::react(const MLogRec& logevt)
 {
-  dout(10) << "searching osd." << logevt.from
-           << " log for unfound items" << dendl;
+  dout(10) << "Noting missing from osd." << logevt.from << dendl;
   PG *pg = context< RecoveryMachine >().pg;
-  bool got_missing = pg->search_for_missing(logevt.msg->info,
-                                            &logevt.msg->missing, logevt.from);
-
-  // hmm.. should we?
-  (void)got_missing;
-  //if (got_missing)
-  //pg->osd->queue_for_recovery(pg);
-
+  pg->peer_missing[logevt.from].swap(logevt.msg->missing);
+  pg->peer_info[logevt.from] = logevt.msg->info;
   return discard_event();
 }
 
index 769f55f33794b6c2963058f735056fa79d34ae94..db19107c04bc3d05eed3453fb97eea5081a79eca 100644 (file)
@@ -300,8 +300,114 @@ public:
   }
   hobject_t    log_oid;
   hobject_t    biginfo_oid;
-  map<hobject_t, set<pg_shard_t> > missing_loc;
-  set<pg_shard_t> missing_loc_sources;           // superset of missing_loc locations
+
+  class MissingLoc {
+    map<hobject_t, pg_missing_t::item> needs_recovery_map;
+    map<hobject_t, set<pg_shard_t> > missing_loc;
+    set<pg_shard_t> missing_loc_sources;
+    PG *pg;
+    boost::scoped_ptr<PGBackend::IsReadablePredicate> is_readable;
+    boost::scoped_ptr<PGBackend::IsRecoverablePredicate> is_recoverable;
+    set<pg_shard_t> empty_set;
+  public:
+    MissingLoc(PG *pg)
+      : pg(pg) {}
+    void set_backend_predicates(
+      PGBackend::IsReadablePredicate *_is_readable,
+      PGBackend::IsRecoverablePredicate *_is_recoverable) {
+      is_readable.reset(_is_readable);
+      is_recoverable.reset(_is_recoverable);
+    }
+    string gen_prefix() const { return pg->gen_prefix(); }
+    bool needs_recovery(
+      const hobject_t &hoid,
+      eversion_t *v = 0) const {
+      map<hobject_t, pg_missing_t::item>::const_iterator i =
+       needs_recovery_map.find(hoid);
+      if (i == needs_recovery_map.end())
+       return false;
+      if (v)
+       *v = i->second.need;
+      return true;
+    }
+    bool is_unfound(const hobject_t &hoid) const {
+      return needs_recovery(hoid) && (
+       !missing_loc.count(hoid) ||
+       !(*is_recoverable)(missing_loc.find(hoid)->second));
+    }
+    bool readable_with_acting(
+      const hobject_t &hoid,
+      const set<pg_shard_t> &acting) const;
+    uint64_t num_unfound() const {
+      uint64_t ret = 0;
+      for (map<hobject_t, pg_missing_t::item>::const_iterator i =
+            needs_recovery_map.begin();
+          i != needs_recovery_map.end();
+          ++i) {
+       if (is_unfound(i->first))
+         ++ret;
+      }
+      return ret;
+    }
+
+    void clear() {
+      needs_recovery_map.clear();
+      missing_loc.clear();
+      missing_loc_sources.clear();
+    }
+
+    void add_location(const hobject_t &hoid, pg_shard_t location) {
+      missing_loc[hoid].insert(location);
+    }
+    void remove_location(const hobject_t &hoid, pg_shard_t location) {
+      missing_loc[hoid].erase(location);
+    }
+    void add_active_missing(const pg_missing_t &missing) {
+      for (map<hobject_t, pg_missing_t::item>::const_iterator i =
+            missing.missing.begin();
+          i != missing.missing.end();
+          ++i) {
+       map<hobject_t, pg_missing_t::item>::const_iterator j =
+         needs_recovery_map.find(i->first);
+       if (j == needs_recovery_map.end()) {
+         needs_recovery_map.insert(*i);
+       } else {
+         assert(i->second.need == j->second.need);
+       }
+      }
+    }
+    void revise_need(const hobject_t &hoid, eversion_t need) {
+      assert(needs_recovery(hoid));
+      needs_recovery_map[hoid].need = need;
+    }
+
+    /// Adds info about a possible recovery source
+    bool add_source_info(
+      pg_shard_t source,           ///< [in] source
+      const pg_info_t &oinfo,      ///< [in] info
+      const pg_missing_t &omissing ///< [in] (optional) missing
+      ); ///< @return whether a new object location was discovered
+
+    /// Uses osdmap to update structures for now down sources
+    void check_recovery_sources(const OSDMapRef osdmap);
+
+    /// Call when hoid is no longer missing in acting set
+    void recovered(const hobject_t &hoid) {
+      needs_recovery_map.erase(hoid);
+      missing_loc.erase(hoid);
+    }
+
+    const set<pg_shard_t> &get_locations(const hobject_t &hoid) const {
+      return missing_loc.count(hoid) ?
+       missing_loc.find(hoid)->second : empty_set;
+    }
+    const map<hobject_t, set<pg_shard_t> > &get_missing_locs() const {
+      return missing_loc;
+    }
+    const map<hobject_t, pg_missing_t::item> &get_needs_recovery() const {
+      return needs_recovery_map;
+    }
+  } missing_loc;
   
   interval_set<snapid_t> snap_collections; // obsolete
   map<epoch_t,pg_interval_t> past_intervals;
@@ -540,7 +646,7 @@ protected:
   list<OpRequestRef>            waiting_for_active;
   list<OpRequestRef>            waiting_for_cache_not_full;
   list<OpRequestRef>            waiting_for_all_missing;
-  map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
+  map<hobject_t, list<OpRequestRef> > waiting_for_unreadable_object,
                             waiting_for_degraded_object,
                             waiting_for_blocked_object;
   // Callbacks should assume pg (and nothing else) is locked
@@ -730,8 +836,9 @@ public:
     pg_log_t &olog, pg_shard_t from);
   void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);
   bool search_for_missing(
-    const pg_info_t &oinfo, const pg_missing_t *omissing,
-    pg_shard_t fromosd);
+    const pg_info_t &oinfo, const pg_missing_t &omissing,
+    pg_shard_t fromosd,
+    RecoveryCtx*);
 
   void check_for_lost_objects();
   void forget_lost_objects();
@@ -779,17 +886,18 @@ public:
     list<Context*>& tfin,
     map<int, map<spg_t,pg_query_t> >& query_map,
     map<int,
-        vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map=0);
+      vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map,
+    RecoveryCtx *ctx);
   void _activate_committed(epoch_t e);
   void all_activated_and_committed();
 
   void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info);
 
   bool have_unfound() const { 
-    return pg_log.get_missing().num_missing() > missing_loc.size();
+    return missing_loc.num_unfound();
   }
   int get_num_unfound() const {
-    return pg_log.get_missing().num_missing() - missing_loc.size();
+    return missing_loc.num_unfound();
   }
 
   virtual void check_local() = 0;
@@ -1273,6 +1381,8 @@ public:
        return &(state->rctx->on_applied->contexts);
       }
 
+      RecoveryCtx *get_recovery_ctx() { return state->rctx; }
+
       void send_notify(pg_shard_t to,
                       const pg_notify_t &info, const pg_interval_map_t &pi) {
        assert(state->rctx->notify_list);
index eb515217ddf292e18d978575a9d69db02cbc1036..689e47b9d0e011c27a5bd5a42118698a1397f1ca 100644 (file)
    };
    virtual IsRecoverablePredicate *get_is_recoverable_predicate() = 0;
 
+   class IsReadablePredicate {
+   public:
+     /**
+      * have encodes the shards available
+      */
+     virtual bool operator()(const set<pg_shard_t> &have) const = 0;
+     virtual ~IsReadablePredicate() {}
+   };
+   virtual IsReadablePredicate *get_is_readable_predicate() = 0;
+
    void temp_colls(list<coll_t> *out) {
      if (temp_created)
        out->push_back(temp_coll);
index ef2153e292d192c418a1910d2a56fe3af09a0fab..f910b2660e415e039c1742c85f5d8b2a0c4f1ba0 100644 (file)
@@ -80,6 +80,18 @@ public:
     return new RPCRecPred;
   }
 
+  class RPCReadPred : public IsReadablePredicate {
+    pg_shard_t whoami;
+  public:
+    RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
+    bool operator()(const set<pg_shard_t> &have) const {
+      return have.count(whoami);
+    }
+  };
+  IsReadablePredicate *get_is_readable_predicate() {
+    return new RPCReadPred(get_parent()->whoami_shard());
+  }
+
   virtual void dump_recovery_info(Formatter *f) const {
     {
       f->open_array_section("pull_from_peer");
index a6e958d6452be31bd8e88f1ecc9b8f5fe71e62dd..ac3abb1efcb03568af506ef81d09930428b3653a 100644 (file)
@@ -255,14 +255,17 @@ void ReplicatedPG::on_local_recover(
     t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
 
     publish_stats_to_osd();
-    if (waiting_for_missing_object.count(hoid)) {
-      dout(20) << " kicking waiters on " << hoid << dendl;
-      requeue_ops(waiting_for_missing_object[hoid]);
-      waiting_for_missing_object.erase(hoid);
-      if (pg_log.get_missing().missing.size() == 0) {
-       requeue_ops(waiting_for_all_missing);
-       waiting_for_all_missing.clear();
-      }
+    assert(missing_loc.needs_recovery(hoid));
+    missing_loc.add_location(hoid, pg_whoami);
+    if (!is_unreadable_object(hoid) &&
+        waiting_for_unreadable_object.count(hoid)) {
+      dout(20) << " kicking unreadable waiters on " << hoid << dendl;
+      requeue_ops(waiting_for_unreadable_object[hoid]);
+      waiting_for_unreadable_object.erase(hoid);
+    }
+    if (pg_log.get_missing().missing.size() == 0) {
+      requeue_ops(waiting_for_all_missing);
+      waiting_for_all_missing.clear();
     }
   } else {
     t->register_on_applied(
@@ -285,6 +288,7 @@ void ReplicatedPG::on_local_recover(
 void ReplicatedPG::on_global_recover(
   const hobject_t &soid)
 {
+  missing_loc.recovered(soid);
   publish_stats_to_osd();
   dout(10) << "pushed " << soid << " to all replicas" << dendl;
   map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
@@ -297,7 +301,13 @@ void ReplicatedPG::on_global_recover(
   }
   recovering.erase(i);
   finish_recovery_op(soid);
+  if (waiting_for_unreadable_object.count(soid)) {
+    dout(20) << " kicking unreadable waiters on " << soid << dendl;
+    requeue_ops(waiting_for_unreadable_object[soid]);
+    waiting_for_unreadable_object.erase(soid);
+  }
   if (waiting_for_degraded_object.count(soid)) {
+    dout(20) << " kicking degraded waiters on " << soid << dendl;
     requeue_ops(waiting_for_degraded_object[soid]);
     waiting_for_degraded_object.erase(soid);
   }
@@ -380,36 +390,32 @@ bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
 // ====================
 // missing objects
 
-bool ReplicatedPG::is_missing_object(const hobject_t& soid)
+bool ReplicatedPG::is_missing_object(const hobject_t& soid) const
 {
   return pg_log.get_missing().missing.count(soid);
 }
 
-void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op)
+void ReplicatedPG::wait_for_unreadable_object(
+  const hobject_t& soid, OpRequestRef op)
 {
-  assert(is_missing_object(soid));
-
-  const pg_missing_t &missing = pg_log.get_missing();
+  assert(is_unreadable_object(soid));
 
-  // we don't have it (yet).
-  map<hobject_t, pg_missing_t::item>::const_iterator g = missing.missing.find(soid);
-  assert(g != missing.missing.end());
-  const eversion_t &v(g->second.need);
+  eversion_t v;
+  bool needs_recovery = missing_loc.needs_recovery(soid, &v);
+  assert(needs_recovery);
 
   map<hobject_t, ObjectContextRef>::const_iterator p = recovering.find(soid);
   if (p != recovering.end()) {
     dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl;
-  }
-  else if (missing_loc.find(soid) == missing_loc.end()) {
+  } else if (missing_loc.is_unfound(soid)) {
     dout(7) << "missing " << soid << " v " << v << ", is unfound." << dendl;
-  }
-  else {
+  } else {
     dout(7) << "missing " << soid << " v " << v << ", recovering." << dendl;
     PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
     recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
     pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
   }
-  waiting_for_missing_object[soid].push_back(op);
+  waiting_for_unreadable_object[soid].push_back(op);
   op->mark_delayed("waiting for missing object");
 }
 
@@ -641,7 +647,7 @@ int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
       return -EROFS;
     }
 
-    int unfound = missing.num_missing() - missing_loc.size();
+    int unfound = missing_loc.num_unfound();
     if (!unfound) {
       ss << "pg has no unfound objects";
       return 0;  // make command idempotent
@@ -694,13 +700,13 @@ int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
        p->second.dump(f.get());  // have, need keys
        {
          f->open_array_section("locations");
-         map<hobject_t,set<pg_shard_t> >::iterator q =
-           missing_loc.find(p->first);
-         if (q != missing_loc.end())
-           for (set<pg_shard_t>::iterator r = q->second.begin();
-                r != q->second.end();
+         if (missing_loc.needs_recovery(p->first)) {
+           for (set<pg_shard_t>::iterator r =
+                  missing_loc.get_locations(p->first).begin();
+                r != missing_loc.get_locations(p->first).end();
                 ++r)
              f->dump_stream("shard") << *r;
+         }
          f->close_section();
        }
        f->close_section();
@@ -1019,6 +1025,9 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
   temp_seq(0),
   snap_trimmer_machine(this)
 { 
+  missing_loc.set_backend_predicates(
+    pgbackend->get_is_readable_predicate(),
+    pgbackend->get_is_recoverable_predicate());
   snap_trimmer_machine.initiate();
 }
 
@@ -1179,8 +1188,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
   }
 
   // missing object?
-  if (is_missing_object(head)) {
-    wait_for_missing_object(head, op);
+  if (is_unreadable_object(head)) {
+    wait_for_unreadable_object(head, op);
     return;
   }
 
@@ -1195,7 +1204,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
                    CEPH_SNAPDIR, m->get_pg().ps(), info.pgid.pool(),
                    m->get_object_locator().nspace);
   if (is_missing_object(snapdir)) {
-    wait_for_missing_object(snapdir, op);
+    wait_for_unreadable_object(snapdir, op);
     return;
   }
 
@@ -1231,7 +1240,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
         !(m->get_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
       // missing the specific snap we need; requeue and wait.
       assert(!can_create); // only happens on a read
-      wait_for_missing_object(missing_oid, op);
+      wait_for_unreadable_object(missing_oid, op);
       return;
     }
   }
@@ -1309,11 +1318,11 @@ void ReplicatedPG::do_op(OpRequestRef op)
        int r;
 
        if (src_oid.is_head() && is_missing_object(src_oid)) {
-         wait_for_missing_object(src_oid, op);
+         wait_for_unreadable_object(src_oid, op);
        } else if ((r = find_object_context(
                      src_oid, &sobc, false, &wait_oid)) == -EAGAIN) {
          // missing the specific snap we need; requeue and wait.
-         wait_for_missing_object(wait_oid, op);
+         wait_for_unreadable_object(wait_oid, op);
        } else if (r) {
          if (!maybe_handle_cache(op, write_ordered, sobc, r, wait_oid, true))
            osd->reply_op_error(op, r);
@@ -1373,7 +1382,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
        int r = find_object_context(clone_oid, &sobc, false, &wait_oid);
        if (r == -EAGAIN) {
          // missing the specific snap we need; requeue and wait.
-         wait_for_missing_object(wait_oid, op);
+         wait_for_unreadable_object(wait_oid, op);
        } else if (r) {
          if (!maybe_handle_cache(op, write_ordered, sobc, r, wait_oid, true))
            osd->reply_op_error(op, r);
@@ -4512,7 +4521,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
     assert(is_missing_object(missing_oid));
     dout(20) << "_rollback_to attempted to roll back to a missing object "
             << missing_oid << " (requested snapid: ) " << snapid << dendl;
-    wait_for_missing_object(missing_oid, ctx->op);
+    wait_for_unreadable_object(missing_oid, ctx->op);
     return ret;
   }
   if (maybe_handle_cache(ctx->op, true, rollback_to, ret, missing_oid, true)) {
@@ -7456,8 +7465,7 @@ int ReplicatedPG::recover_missing(
   int priority,
   PGBackend::RecoveryHandle *h)
 {
-  map<hobject_t, set<pg_shard_t> >::iterator q = missing_loc.find(soid);
-  if (q == missing_loc.end()) {
+  if (missing_loc.is_unfound(soid)) {
     dout(7) << "pull " << soid
            << " v " << v 
            << " but it is unfound" << dendl;
@@ -8368,10 +8376,6 @@ void ReplicatedPG::_applied_recovered_object_replica()
 void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
 {
   dout(10) << "got missing " << oid << " v " << v << dendl;
-  if (pg_log.get_missing().is_missing(oid, v)) {
-      if (is_primary())
-       missing_loc.erase(oid);
-  }
   pg_log.recover_got(oid, v, info);
   if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) {
     dout(10) << "last_complete now " << info.last_complete
@@ -8498,18 +8502,10 @@ void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid)
 {
   assert(recovering.count(soid));
   recovering.erase(soid);
-  map<hobject_t,set<pg_shard_t> >::iterator p = missing_loc.find(soid);
-  if (p != missing_loc.end()) {
-    dout(0) << "_failed_push " << soid << " from shard " << from
-           << ", reps on " << p->second << dendl;
-
-    p->second.erase(from);          // forget about this (bad) peer replica
-    if (p->second.empty())
-      missing_loc.erase(p);
-  } else {
-    dout(0) << "_failed_push " << soid << " from shard " << from
-           << " but not in missing_loc ???" << dendl;
-  }
+  missing_loc.remove_location(soid, from);
+  dout(0) << "_failed_push " << soid << " from shard " << from
+         << ", reps on " << missing_loc.get_locations(soid)
+         << " unfound? " << missing_loc.is_unfound(soid) << dendl;
   finish_recovery_op(soid);  // close out this attempt,
 }
 
@@ -8575,8 +8571,8 @@ ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
   // Wake anyone waiting for this object. Now that it's been marked as lost,
   // we will just return an error code.
   map<hobject_t, list<OpRequestRef> >::iterator wmo =
-    waiting_for_missing_object.find(oid);
-  if (wmo != waiting_for_missing_object.end()) {
+    waiting_for_unreadable_object.find(oid);
+  if (wmo != waiting_for_unreadable_object.end()) {
     requeue_ops(wmo->second);
   }
 
@@ -8629,7 +8625,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
   map<hobject_t, pg_missing_t::item>::const_iterator mend = missing.missing.end();
   while (m != mend) {
     const hobject_t &oid(m->first);
-    if (missing_loc.find(oid) != missing_loc.end()) {
+    if (!missing_loc.is_unfound(oid)) {
       // We only care about unfound objects
       ++m;
       continue;
@@ -8661,6 +8657,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
        // we are now missing the new version; recovery code will sort it out.
        ++m;
        pg_log.revise_need(oid, info.last_update);
+       missing_loc.revise_need(oid, info.last_update);
        break;
       }
       /** fall-thru **/
@@ -8679,6 +8676,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
          assert(0 == "not implemented.. tho i'm not sure how useful it really would be.");
        }
        pg_log.missing_rm(m++);
+       missing_loc.recovered(oid);
       }
       break;
 
@@ -8885,9 +8883,9 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
 
   // requeue object waiters
   if (is_primary()) {
-    requeue_object_waiters(waiting_for_missing_object);
+    requeue_object_waiters(waiting_for_unreadable_object);
   } else {
-    waiting_for_missing_object.clear();
+    waiting_for_unreadable_object.clear();
   }
   for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
        p != waiting_for_degraded_object.end();
@@ -8948,7 +8946,6 @@ void ReplicatedPG::on_pool_change()
 void ReplicatedPG::_clear_recovery_state()
 {
   missing_loc.clear();
-  missing_loc_sources.clear();
 #ifdef DEBUG_RECOVERY_OIDS
   recovering_oids.clear();
 #endif
@@ -8981,6 +8978,34 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
    * check that any peers we are planning to (or currently) pulling
    * objects from are dealt with.
    */
+  missing_loc.check_recovery_sources(osdmap);
+  pgbackend->check_recovery_sources(osdmap);
+
+  for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
+       i != peer_log_requested.end();
+       ) {
+    if (!osdmap->is_up(i->osd)) {
+      dout(10) << "peer_log_requested removing " << *i << dendl;
+      peer_log_requested.erase(i++);
+    } else {
+      ++i;
+    }
+  }
+
+  for (set<pg_shard_t>::iterator i = peer_missing_requested.begin();
+       i != peer_missing_requested.end();
+       ) {
+    if (!osdmap->is_up(i->osd)) {
+      dout(10) << "peer_missing_requested removing " << *i << dendl;
+      peer_missing_requested.erase(i++);
+    } else {
+      ++i;
+    }
+  }
+}
+
+void PG::MissingLoc::check_recovery_sources(const OSDMapRef osdmap)
+{
   set<pg_shard_t> now_down;
   for (set<pg_shard_t>::iterator p = missing_loc_sources.begin();
        p != missing_loc_sources.end();
@@ -8993,7 +9018,6 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
     now_down.insert(*p);
     missing_loc_sources.erase(p++);
   }
-  pgbackend->check_recovery_sources(osdmap);
 
   if (now_down.empty()) {
     dout(10) << "check_recovery_sources no source osds (" << missing_loc_sources << ") went down" << dendl;
@@ -9009,7 +9033,6 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
        if (now_down.count(*q)) {
          p->second.erase(q++);
        } else {
-         assert(missing_loc_sources.count(*q));
          ++q;
        }
       if (p->second.empty())
@@ -9018,28 +9041,6 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
        ++p;
     }
   }
-
-  for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
-       i != peer_log_requested.end();
-       ) {
-    if (!osdmap->is_up(i->osd)) {
-      dout(10) << "peer_log_requested removing " << *i << dendl;
-      peer_log_requested.erase(i++);
-    } else {
-      ++i;
-    }
-  }
-
-  for (set<pg_shard_t>::iterator i = peer_missing_requested.begin();
-       i != peer_missing_requested.end();
-       ) {
-    if (!osdmap->is_up(i->osd)) {
-      dout(10) << "peer_missing_requested removing " << *i << dendl;
-      peer_missing_requested.erase(i++);
-    } else {
-      ++i;
-    }
-  }
 }
   
 
@@ -9124,6 +9125,12 @@ bool ReplicatedPG::start_recovery_ops(
   assert(recovering.empty());
   assert(recovery_ops_active == 0);
 
+  dout(10) << __func__ << " needs_recovery: "
+          << missing_loc.get_needs_recovery()
+          << dendl;
+  dout(10) << __func__ << " missing_loc: "
+          << missing_loc.get_missing_locs()
+          << dendl;
   int unfound = get_num_unfound();
   if (unfound) {
     dout(10) << " still have " << unfound << " unfound" << dendl;
@@ -9221,7 +9228,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
 
     eversion_t need = item.need;
 
-    bool unfound = (missing_loc.find(soid) == missing_loc.end());
+    bool unfound = missing_loc.is_unfound(soid);
 
     dout(10) << "recover_primary "
              << soid << " " << item.need
@@ -9262,6 +9269,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
              t->setattr(coll, soid, OI_ATTR, b2);
 
              recover_got(soid, latest->version);
+             missing_loc.add_location(soid, pg_whoami);
 
              ++active_pushes;
 
@@ -9289,16 +9297,16 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
            eversion_t alternate_need = latest->reverting_to;
            dout(10) << " need to pull prior_version " << alternate_need << " for revert " << item << dendl;
 
-           set<pg_shard_t>& loc = missing_loc[soid];
            for (map<pg_shard_t, pg_missing_t>::iterator p = peer_missing.begin();
                 p != peer_missing.end();
                 ++p)
              if (p->second.is_missing(soid, need) &&
                  p->second.missing[soid].have == alternate_need) {
-               missing_loc_sources.insert(p->first);
-               loc.insert(p->first);
+               missing_loc.add_location(soid, p->first);
              }
-           dout(10) << " will pull " << alternate_need << " or " << need << " from one of " << loc << dendl;
+           dout(10) << " will pull " << alternate_need << " or " << need
+                    << " from one of " << missing_loc.get_locations(soid)
+                    << dendl;
            unfound = false;
          }
        }
@@ -9351,6 +9359,7 @@ int ReplicatedPG::prep_object_replica_pushes(
   ObjectContextRef obc = get_object_context(soid, false);
   if (!obc) {
     pg_log.missing_add(soid, v, eversion_t());
+    missing_loc.remove_location(soid, pg_whoami);
     bool uhoh = true;
     assert(actingbackfill.size() > 0);
     for (set<pg_shard_t>::iterator i = actingbackfill.begin();
@@ -9359,8 +9368,7 @@ int ReplicatedPG::prep_object_replica_pushes(
       if (*i == get_primary()) continue;
       pg_shard_t peer = *i;
       if (!peer_missing[peer].is_missing(soid, v)) {
-       missing_loc[soid].insert(peer);
-       missing_loc_sources.insert(peer);
+       missing_loc.add_location(soid, peer);
        dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
                 << ", there should be a copy on shard " << peer << dendl;
        uhoh = false;
@@ -9370,7 +9378,8 @@ int ReplicatedPG::prep_object_replica_pushes(
       osd->clog.error() << info.pgid << " missing primary copy of " << soid << ", unfound\n";
     else
       osd->clog.error() << info.pgid << " missing primary copy of " << soid
-                       << ", will try copies on " << missing_loc[soid] << "\n";
+                       << ", will try copies on " << missing_loc.get_locations(soid)
+                       << "\n";
     return 0;
   }
 
@@ -9468,7 +9477,7 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
       }
 
       if (pg_log.get_missing().is_missing(soid)) {
-       if (missing_loc.find(soid) == missing_loc.end())
+       if (missing_loc.is_unfound(soid))
          dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
        else
          dout(10) << __func__ << ": " << soid << " still missing on primary" << dendl;
index f331ca97fccaace465b5155243c753282643d163..10298efbe74dec441b000da5b444ccd7b5b08367 100644 (file)
@@ -304,7 +304,7 @@ public:
   std::string gen_dbg_prefix() const { return gen_prefix(); }
   
   const map<hobject_t, set<pg_shard_t> > &get_missing_loc_shards() const {
-    return missing_loc;
+    return missing_loc.get_missing_locs();
   }
   const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
     return peer_missing;
@@ -1272,8 +1272,12 @@ public:
   bool same_for_modify_since(epoch_t e);
   bool same_for_rep_modify_since(epoch_t e);
 
-  bool is_missing_object(const hobject_t& oid);
-  void wait_for_missing_object(const hobject_t& oid, OpRequestRef op);
+  bool is_missing_object(const hobject_t& oid) const;
+  bool is_unreadable_object(const hobject_t &oid) const {
+    return is_missing_object(oid) ||
+      !missing_loc.readable_with_acting(oid, actingset);
+  }
+  void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
   void wait_for_all_missing(OpRequestRef op);
 
   bool is_degraded_object(const hobject_t& oid);