]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: start moving activate to PeeringState
authorSamuel Just <sjust@redhat.com>
Fri, 12 Apr 2019 22:34:25 +0000 (15:34 -0700)
committersjust@redhat.com <sjust@redhat.com>
Wed, 1 May 2019 18:22:21 +0000 (11:22 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h

index 489669666ef8c97c9a49e066cab98d55689f41ec..7bfd4bb27f92ac0647e0a8c95fc49e167cda41eb 100644 (file)
@@ -581,343 +581,6 @@ void PG::build_might_have_unfound()
   dout(15) << __func__ << ": built " << might_have_unfound << dendl;
 }
 
-void PG::activate(ObjectStore::Transaction& t,
-                 epoch_t activation_epoch,
-                 map<int, map<spg_t,pg_query_t> >& query_map,
-                 map<int,
-                     vector<
-                       pair<pg_notify_t,
-                            PastIntervals> > > *activator_map,
-                  PeeringCtx *ctx)
-{
-  ceph_assert(!is_peered());
-  ceph_assert(scrubber.callbacks.empty());
-  ceph_assert(callbacks_for_degraded_object.empty());
-
-  // twiddle pg state
-  state_clear(PG_STATE_DOWN);
-
-  send_notify = false;
-
-  if (is_primary()) {
-    // only update primary last_epoch_started if we will go active
-    if (acting.size() >= pool.info.min_size) {
-      ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
-            info.last_epoch_started <= activation_epoch);
-      info.last_epoch_started = activation_epoch;
-      info.last_interval_started = info.history.same_interval_since;
-    }
-  } else if (is_acting(pg_whoami)) {
-    /* update last_epoch_started on acting replica to whatever the primary sent
-     * unless it's smaller (could happen if we are going peered rather than
-     * active, see doc/dev/osd_internals/last_epoch_started.rst) */
-    if (info.last_epoch_started < activation_epoch) {
-      info.last_epoch_started = activation_epoch;
-      info.last_interval_started = info.history.same_interval_since;
-    }
-  }
-
-  auto &missing = pg_log.get_missing();
-
-  if (is_primary()) {
-    last_update_ondisk = info.last_update;
-    min_last_complete_ondisk = eversion_t(0,0);  // we don't know (yet)!
-  }
-  last_update_applied = info.last_update;
-  last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to();
-
-  need_up_thru = false;
-
-  // write pg info, log
-  dirty_info = true;
-  dirty_big_info = true; // maybe
-
-  // find out when we commit
-  t.register_on_complete(
-    new C_PG_ActivateCommitted(
-      this,
-      get_osdmap_epoch(),
-      activation_epoch));
-  
-  if (is_primary()) {
-    // initialize snap_trimq
-    if (get_osdmap()->require_osd_release < CEPH_RELEASE_MIMIC) {
-      dout(20) << "activate - purged_snaps " << info.purged_snaps
-              << " cached_removed_snaps " << pool.cached_removed_snaps
-              << dendl;
-      snap_trimq = pool.cached_removed_snaps;
-    } else {
-      auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue();
-      auto p = removed_snaps_queue.find(info.pgid.pgid.pool());
-      snap_trimq.clear();
-      if (p != removed_snaps_queue.end()) {
-       dout(20) << "activate - purged_snaps " << info.purged_snaps
-                << " removed_snaps " << p->second
-                << dendl;
-       for (auto q : p->second) {
-         snap_trimq.insert(q.first, q.second);
-       }
-      }
-    }
-    interval_set<snapid_t> purged;
-    purged.intersection_of(snap_trimq, info.purged_snaps);
-    snap_trimq.subtract(purged);
-
-    if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) {
-      // adjust purged_snaps: PG may have been inactive while snaps were pruned
-      // from the removed_snaps_queue in the osdmap.  update local purged_snaps
-      // reflect only those snaps that we thought were pruned and were still in
-      // the queue.
-      info.purged_snaps.swap(purged);
-    }
-  }
-
-  // init complete pointer
-  if (missing.num_missing() == 0) {
-    dout(10) << "activate - no missing, moving last_complete " << info.last_complete 
-            << " -> " << info.last_update << dendl;
-    info.last_complete = info.last_update;
-    info.stats.stats.sum.num_objects_missing = 0;
-    pg_log.reset_recovery_pointers();
-  } else {
-    dout(10) << "activate - not complete, " << missing << dendl;
-    info.stats.stats.sum.num_objects_missing = missing.num_missing();
-    pg_log.activate_not_complete(info);
-  }
-    
-  log_weirdness();
-
-  // if primary..
-  if (is_primary()) {
-    ceph_assert(ctx);
-    // start up replicas
-
-    ceph_assert(!acting_recovery_backfill.empty());
-    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-        i != acting_recovery_backfill.end();
-        ++i) {
-      if (*i == pg_whoami) continue;
-      pg_shard_t peer = *i;
-      ceph_assert(peer_info.count(peer));
-      pg_info_t& pi = peer_info[peer];
-
-      dout(10) << "activate peer osd." << peer << " " << pi << dendl;
-
-      MOSDPGLog *m = 0;
-      ceph_assert(peer_missing.count(peer));
-      pg_missing_t& pm = peer_missing[peer];
-
-      bool needs_past_intervals = pi.dne();
-
-      /*
-       * cover case where peer sort order was different and
-       * last_backfill cannot be interpreted
-       */
-      bool force_restart_backfill =
-       !pi.last_backfill.is_max() &&
-       !pi.last_backfill_bitwise;
-
-      if (pi.last_update == info.last_update && !force_restart_backfill) {
-        // empty log
-       if (!pi.last_backfill.is_max())
-         osd->clog->info() << info.pgid << " continuing backfill to osd."
-                           << peer
-                           << " from (" << pi.log_tail << "," << pi.last_update
-                           << "] " << pi.last_backfill
-                           << " to " << info.last_update;
-       if (!pi.is_empty() && activator_map) {
-         dout(10) << "activate peer osd." << peer << " is up to date, queueing in pending_activators" << dendl;
-         (*activator_map)[peer.osd].emplace_back(
-             pg_notify_t(
-               peer.shard, pg_whoami.shard,
-               get_osdmap_epoch(),
-               get_osdmap_epoch(),
-               info),
-             past_intervals);
-       } else {
-         dout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl;
-         m = new MOSDPGLog(
-           i->shard, pg_whoami.shard,
-           get_osdmap_epoch(), info,
-           last_peering_reset);
-       }
-      } else if (
-       pg_log.get_tail() > pi.last_update ||
-       pi.last_backfill == hobject_t() ||
-       force_restart_backfill ||
-       (backfill_targets.count(*i) && pi.last_backfill.is_max())) {
-       /* ^ This last case covers a situation where a replica is not contiguous
-        * with the auth_log, but is contiguous with this replica.  Reshuffling
-        * the active set to handle this would be tricky, so instead we just go
-        * ahead and backfill it anyway.  This is probably preferrable in any
-        * case since the replica in question would have to be significantly
-        * behind.
-        */
-       // backfill
-       osd->clog->debug() << info.pgid << " starting backfill to osd." << peer
-                        << " from (" << pi.log_tail << "," << pi.last_update
-                         << "] " << pi.last_backfill
-                        << " to " << info.last_update;
-
-       pi.last_update = info.last_update;
-       pi.last_complete = info.last_update;
-       pi.set_last_backfill(hobject_t());
-       pi.last_epoch_started = info.last_epoch_started;
-       pi.last_interval_started = info.last_interval_started;
-       pi.history = info.history;
-       pi.hit_set = info.hit_set;
-        // Save num_bytes for reservation request, can't be negative
-        peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
-        pi.stats.stats.clear();
-
-       // initialize peer with our purged_snaps.
-       pi.purged_snaps = info.purged_snaps;
-
-       m = new MOSDPGLog(
-         i->shard, pg_whoami.shard,
-         get_osdmap_epoch(), pi,
-         last_peering_reset /* epoch to create pg at */);
-
-       // send some recent log, so that op dup detection works well.
-       m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries);
-       m->info.log_tail = m->log.tail;
-       pi.log_tail = m->log.tail;  // sigh...
-
-       pm.clear();
-      } else {
-       // catch up
-       ceph_assert(pg_log.get_tail() <= pi.last_update);
-       m = new MOSDPGLog(
-         i->shard, pg_whoami.shard,
-         get_osdmap_epoch(), info,
-         last_peering_reset /* epoch to create pg at */);
-       // send new stuff to append to replicas log
-       m->log.copy_after(pg_log.get_log(), pi.last_update);
-      }
-
-      // share past_intervals if we are creating the pg on the replica
-      // based on whether our info for that peer was dne() *before*
-      // updating pi.history in the backfill block above.
-      if (m && needs_past_intervals)
-       m->past_intervals = past_intervals;
-
-      // update local version of peer's missing list!
-      if (m && pi.last_backfill != hobject_t()) {
-        for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
-             p != m->log.log.end();
-             ++p) {
-         if (p->soid <= pi.last_backfill &&
-             !p->is_error()) {
-           if (perform_deletes_during_peering() && p->is_delete()) {
-             pm.rm(p->soid, p->version);
-           } else {
-             pm.add_next_event(*p);
-           }
-         }
-       }
-      }
-
-      if (m) {
-       dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
-       //m->log.print(cout);
-       osd->send_message_osd_cluster(peer.osd, m, get_osdmap_epoch());
-      }
-
-      // peer now has 
-      pi.last_update = info.last_update;
-
-      // update our missing
-      if (pm.num_missing() == 0) {
-       pi.last_complete = pi.last_update;
-        dout(10) << "activate peer osd." << peer << " " << pi << " uptodate" << dendl;
-      } else {
-        dout(10) << "activate peer osd." << peer << " " << pi << " missing " << pm << dendl;
-      }
-    }
-
-    // Set up missing_loc
-    set<pg_shard_t> complete_shards;
-    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-        i != acting_recovery_backfill.end();
-        ++i) {
-      dout(20) << __func__ << " setting up missing_loc from shard " << *i << " " << dendl;
-      if (*i == get_primary()) {
-       missing_loc.add_active_missing(missing);
-        if (!missing.have_missing())
-          complete_shards.insert(*i);
-      } else {
-       auto peer_missing_entry = peer_missing.find(*i);
-       ceph_assert(peer_missing_entry != peer_missing.end());
-       missing_loc.add_active_missing(peer_missing_entry->second);
-        if (!peer_missing_entry->second.have_missing() &&
-           peer_info[*i].last_backfill.is_max())
-         complete_shards.insert(*i);
-      }
-    }
-
-    // If necessary, create might_have_unfound to help us find our unfound objects.
-    // NOTE: It's important that we build might_have_unfound before trimming the
-    // past intervals.
-    might_have_unfound.clear();
-    if (needs_recovery()) {
-      // If only one shard has missing, we do a trick to add all others as recovery
-      // source, this is considered safe since the PGLogs have been merged locally,
-      // and covers vast majority of the use cases, like one OSD/host is down for
-      // a while for hardware repairing
-      if (complete_shards.size() + 1 == acting_recovery_backfill.size()) {
-        missing_loc.add_batch_sources_info(complete_shards, ctx->handle);
-      } else {
-        missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
-                                   ctx->handle);
-        for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
-            i != acting_recovery_backfill.end();
-            ++i) {
-         if (*i == pg_whoami) continue;
-         dout(10) << __func__ << ": adding " << *i << " as a source" << dendl;
-         ceph_assert(peer_missing.count(*i));
-         ceph_assert(peer_info.count(*i));
-         missing_loc.add_source_info(
-           *i,
-           peer_info[*i],
-           peer_missing[*i],
-            ctx->handle);
-        }
-      }
-      for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
-          i != peer_missing.end();
-          ++i) {
-       if (is_acting_recovery_backfill(i->first))
-         continue;
-       ceph_assert(peer_info.count(i->first));
-       search_for_missing(
-         peer_info[i->first],
-         i->second,
-         i->first,
-         ctx);
-      }
-
-      build_might_have_unfound();
-
-      // Always call now so _update_calc_stats() will be accurate
-      discover_all_missing(query_map);
-    }
-
-    // num_objects_degraded if calculated should reflect this too, unless no
-    // missing and we are about to go clean.
-    if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) {
-      state_set(PG_STATE_UNDERSIZED);
-    }
-
-    state_set(PG_STATE_ACTIVATING);
-    release_pg_backoffs();
-    projected_last_update = info.last_update;
-  }
-  if (acting.size() >= pool.info.min_size) {
-    PGLogEntryHandler handler{this, &t};
-    pg_log.roll_forward(&handler);
-  }
-}
-
 bool PG::op_has_sufficient_caps(OpRequestRef& op)
 {
   // only check MOSDOp
@@ -961,89 +624,6 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
   return cap;
 }
 
-void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
-{
-  lock();
-  if (pg_has_reset_since(epoch)) {
-    dout(10) << "_activate_committed " << epoch
-            << ", that was an old interval" << dendl;
-  } else if (is_primary()) {
-    ceph_assert(!peer_activated.count(pg_whoami));
-    peer_activated.insert(pg_whoami);
-    dout(10) << "_activate_committed " << epoch
-            << " peer_activated now " << peer_activated
-            << " last_interval_started " << info.history.last_interval_started
-            << " last_epoch_started " << info.history.last_epoch_started
-            << " same_interval_since " << info.history.same_interval_since << dendl;
-    ceph_assert(!acting_recovery_backfill.empty());
-    if (peer_activated.size() == acting_recovery_backfill.size())
-      all_activated_and_committed();
-  } else {
-    dout(10) << "_activate_committed " << epoch << " telling primary" << dendl;
-    MOSDPGInfo *m = new MOSDPGInfo(epoch);
-    pg_notify_t i = pg_notify_t(
-      get_primary().shard, pg_whoami.shard,
-      get_osdmap_epoch(),
-      get_osdmap_epoch(),
-      info);
-
-    i.info.history.last_epoch_started = activation_epoch;
-    i.info.history.last_interval_started = i.info.history.same_interval_since;
-    if (acting.size() >= pool.info.min_size) {
-      state_set(PG_STATE_ACTIVE);
-    } else {
-      state_set(PG_STATE_PEERED);
-    }
-
-    m->pg_list.emplace_back(i, PastIntervals());
-    osd->send_message_osd_cluster(get_primary().osd, m, get_osdmap_epoch());
-
-    // waiters
-    if (recovery_state.needs_flush() == 0) {
-      requeue_ops(waiting_for_peered);
-    } else if (!waiting_for_peered.empty()) {
-      dout(10) << __func__ << " flushes in progress, moving "
-              << waiting_for_peered.size() << " items to waiting_for_flush"
-              << dendl;
-      ceph_assert(waiting_for_flush.empty());
-      waiting_for_flush.swap(waiting_for_peered);
-    }
-  }
-
-  ceph_assert(!dirty_info);
-
-  unlock();
-}
-
-/*
- * update info.history.last_epoch_started ONLY after we and all
- * replicas have activated AND committed the activate transaction
- * (i.e. the peering results are stable on disk).
- */
-void PG::all_activated_and_committed()
-{
-  dout(10) << "all_activated_and_committed" << dendl;
-  ceph_assert(is_primary());
-  ceph_assert(peer_activated.size() == acting_recovery_backfill.size());
-  ceph_assert(!acting_recovery_backfill.empty());
-  ceph_assert(blocked_by.empty());
-
-  // Degraded?
-  _update_calc_stats();
-  if (info.stats.stats.sum.num_objects_degraded) {
-    state_set(PG_STATE_DEGRADED);
-  } else {
-    state_clear(PG_STATE_DEGRADED);
-  }
-
-  queue_peering_event(
-    PGPeeringEventRef(
-      std::make_shared<PGPeeringEvent>(
-        get_osdmap_epoch(),
-        get_osdmap_epoch(),
-        PeeringState::AllReplicasActivated())));
-}
-
 bool PG::requeue_scrub(bool high_priority)
 {
   ceph_assert(is_locked());
@@ -3130,6 +2710,13 @@ void PG::cancel_remote_recovery_reservation() {
     pg_id);
 }
 
+void PG::schedule_event_on_commit(
+  ObjectStore::Transaction &t,
+  PGPeeringEventRef on_commit)
+{
+  t.register_on_commit(new QueuePeeringEvt(this, on_commit));
+}
+
 void PG::on_active_exit()
 {
   backfill_reserving = false;
@@ -3258,6 +2845,21 @@ void PG::on_recovery_reserved()
   queue_recovery();
 }
 
+void PG::on_activate_committed()
+{
+  if (!is_primary()) {
+    // waiters
+    if (recovery_state.needs_flush() == 0) {
+      requeue_ops(waiting_for_peered);
+    } else if (!waiting_for_peered.empty()) {
+      dout(10) << __func__ << " flushes in progress, moving "
+              << waiting_for_peered.size() << " items to waiting_for_flush"
+              << dendl;
+      ceph_assert(waiting_for_flush.empty());
+      waiting_for_flush.swap(waiting_for_peered);
+    }
+  }
+}
 
 void PG::do_replica_scrub_map(OpRequestRef op)
 {
index 1bab7af1b878ca3410be74099e80e7a472105bef..e4bdd94e903bed1d10e3585a9435aa10d604ab13 100644 (file)
@@ -448,12 +448,17 @@ public:
     PGPeeringEventRef on_preempt) override;
   void cancel_remote_recovery_reservation() override;
 
+  void schedule_event_on_commit(
+    ObjectStore::Transaction &t,
+    PGPeeringEventRef on_commit) override;
+
   void on_active_exit() override;
 
   Context *on_clean() override {
     try_mark_clean();
     return finish_recovery();
   }
+  void on_activate_committed() override;
 
   void on_active_actmap() override;
   void on_active_advmap(const OSDMapRef &osdmap) override;
@@ -1051,26 +1056,6 @@ protected:
   void discover_all_missing(std::map<int, map<spg_t,pg_query_t> > &query_map);
   
   void build_might_have_unfound();
-  void activate(
-    ObjectStore::Transaction& t,
-    epoch_t activation_epoch,
-    map<int, map<spg_t,pg_query_t> >& query_map,
-    map<int,
-      vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
-    PeeringCtx *ctx);
-
-  struct C_PG_ActivateCommitted : public Context {
-    PGRef pg;
-    epoch_t epoch;
-    epoch_t activation_epoch;
-    C_PG_ActivateCommitted(PG *p, epoch_t e, epoch_t ae)
-      : pg(p), epoch(e), activation_epoch(ae) {}
-    void finish(int r) override {
-      pg->_activate_committed(epoch, activation_epoch);
-    }
-  };
-  void _activate_committed(epoch_t epoch, epoch_t activation_epoch);
-  void all_activated_and_committed();
 
   void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info);
 
index 2640dc8ecac012daf9541e90e256d93b400457d8..5486008c774e3c16ff06367f9876b7524940b3c8 100644 (file)
@@ -11,6 +11,7 @@
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
 #include "messages/MOSDScrubReserve.h"
+#include "messages/MOSDPGInfo.h"
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
@@ -147,7 +148,7 @@ void PeeringState::check_recovery_sources(const OSDMapRef& osdmap)
        i != peer_log_requested.end();
        ) {
     if (!osdmap->is_up(i->osd)) {
-      dout(10) << "peer_log_requested removing " << *i << dendl;
+      psdout(10) << "peer_log_requested removing " << *i << dendl;
       peer_log_requested.erase(i++);
     } else {
       ++i;
@@ -158,7 +159,7 @@ void PeeringState::check_recovery_sources(const OSDMapRef& osdmap)
        i != peer_missing_requested.end();
        ) {
     if (!osdmap->is_up(i->osd)) {
-      dout(10) << "peer_missing_requested removing " << *i << dendl;
+      psdout(10) << "peer_missing_requested removing " << *i << dendl;
       peer_missing_requested.erase(i++);
     } else {
       ++i;
@@ -198,7 +199,7 @@ void PeeringState::purge_strays()
        ++p) {
     ceph_assert(!is_acting_recovery_backfill(*p));
     if (get_osdmap()->is_up(p->osd)) {
-      dout(10) << "sending PGRemove to osd." << *p << dendl;
+      psdout(10) << "sending PGRemove to osd." << *p << dendl;
       vector<spg_t> to_remove;
       to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
       MOSDPGRemove *m = new MOSDPGRemove(
@@ -206,7 +207,7 @@ void PeeringState::purge_strays()
        to_remove);
       pl->send_cluster_message(p->osd, m, get_osdmap_epoch());
     } else {
-      dout(10) << "not sending PGRemove to down osd." << *p << dendl;
+      psdout(10) << "not sending PGRemove to down osd." << *p << dendl;
     }
     peer_missing.erase(*p);
     peer_info.erase(*p);
@@ -232,17 +233,18 @@ bool PeeringState::proc_replica_info(
 {
   map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
   if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
-    dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
+    psdout(10) << " got dup osd." << from << " info "
+              << oinfo << ", identical to ours" << dendl;
     return false;
   }
 
   if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) {
-    dout(10) << " got info " << oinfo << " from down osd." << from
+    psdout(10) << " got info " << oinfo << " from down osd." << from
             << " discarding" << dendl;
     return false;
   }
 
-  dout(10) << " got osd." << from << " " << oinfo << dendl;
+  psdout(10) << " got osd." << from << " " << oinfo << dendl;
   ceph_assert(is_primary());
   peer_info[from] = oinfo;
   might_have_unfound.insert(from);
@@ -251,7 +253,7 @@ bool PeeringState::proc_replica_info(
 
   // stray?
   if (!is_up(from) && !is_acting(from)) {
-    dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
+    psdout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
     stray_set.insert(from);
     if (is_clean()) {
       purge_strays();
@@ -378,7 +380,8 @@ void PeeringState::set_last_peering_reset()
 {
   psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
   if (last_peering_reset != get_osdmap_epoch()) {
-    dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
+    last_peering_reset = get_osdmap_epoch();
+    psdout(10) << "Clearing blocked outgoing recovery messages" << dendl;
     clear_blocked_outgoing();
     if (!pl->try_flush_or_schedule_async()) {
       psdout(10) << "Beginning to block outgoing recovery messages" << dendl;
@@ -443,8 +446,8 @@ bool PeeringState::should_restart_peering(
        osdmap.get(),
        lastmap.get(),
        info.pgid.pgid)) {
-    dout(20) << "new interval newup " << newup
-            << " newacting " << newacting << dendl;
+    psdout(20) << "new interval newup " << newup
+              << " newacting " << newacting << dendl;
     return true;
   }
   if (!lastmap->is_up(pg_whoami.osd) && osdmap->is_up(pg_whoami.osd)) {
@@ -1032,8 +1035,8 @@ bool PeeringState::needs_recovery() const
   auto &missing = pg_log.get_missing();
 
   if (missing.num_missing()) {
-    dout(10) << __func__ << " primary has " << missing.num_missing()
-      << " missing" << dendl;
+    psdout(10) << __func__ << " primary has " << missing.num_missing()
+              << " missing" << dendl;
     return true;
   }
 
@@ -1078,7 +1081,7 @@ bool PeeringState::needs_backfill() const
     }
   }
 
-  dout(10) << __func__ << " does not need backfill" << dendl;
+  psdout(10) << __func__ << " does not need backfill" << dendl;
   return false;
 }
 
@@ -1836,7 +1839,349 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id,
   return true;
 }
 
+void PeeringState::activate(
+  ObjectStore::Transaction& t,
+  epoch_t activation_epoch,
+  map<int, map<spg_t,pg_query_t> >& query_map,
+  map<int,
+  vector<
+  pair<pg_notify_t, PastIntervals> > > *activator_map,
+  PeeringCtx *ctx)
+{
+  ceph_assert(!is_peered());
+  // ceph_assert(scrubber.callbacks.empty()); TODOSAM
+  // ceph_assert(callbacks_for_degraded_object.empty()); TODOSAM
+
+  // twiddle pg state
+  state_clear(PG_STATE_DOWN);
+
+  send_notify = false;
+
+  if (is_primary()) {
+    // only update primary last_epoch_started if we will go active
+    if (acting.size() >= pool.info.min_size) {
+      ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
+            info.last_epoch_started <= activation_epoch);
+      info.last_epoch_started = activation_epoch;
+      info.last_interval_started = info.history.same_interval_since;
+    }
+  } else if (is_acting(pg_whoami)) {
+    /* update last_epoch_started on acting replica to whatever the primary sent
+     * unless it's smaller (could happen if we are going peered rather than
+     * active, see doc/dev/osd_internals/last_epoch_started.rst) */
+    if (info.last_epoch_started < activation_epoch) {
+      info.last_epoch_started = activation_epoch;
+      info.last_interval_started = info.history.same_interval_since;
+    }
+  }
+
+  auto &missing = pg_log.get_missing();
+
+  if (is_primary()) {
+    last_update_ondisk = info.last_update;
+    min_last_complete_ondisk = eversion_t(0,0);  // we don't know (yet)!
+  }
+  last_update_applied = info.last_update;
+  last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to();
+
+  need_up_thru = false;
+
+  // write pg info, log
+  dirty_info = true;
+  dirty_big_info = true; // maybe
+
+  pl->schedule_event_on_commit(
+    t,
+    std::make_shared<PGPeeringEvent>(
+      get_osdmap_epoch(),
+      get_osdmap_epoch(),
+      ActivateCommitted(
+       get_osdmap_epoch(),
+       activation_epoch)));
+
+  if (is_primary()) {
+    // initialize snap_trimq
+    if (get_osdmap()->require_osd_release < CEPH_RELEASE_MIMIC) {
+      psdout(20) << "activate - purged_snaps " << info.purged_snaps
+                << " cached_removed_snaps " << pool.cached_removed_snaps
+                << dendl;
+      pg->snap_trimq = pool.cached_removed_snaps;
+    } else {
+      auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue();
+      auto p = removed_snaps_queue.find(info.pgid.pgid.pool());
+      pg->snap_trimq.clear();
+      if (p != removed_snaps_queue.end()) {
+       dout(20) << "activate - purged_snaps " << info.purged_snaps
+                << " removed_snaps " << p->second
+                << dendl;
+       for (auto q : p->second) {
+         pg->snap_trimq.insert(q.first, q.second);
+       }
+      }
+    }
+    interval_set<snapid_t> purged;
+    purged.intersection_of(pg->snap_trimq, info.purged_snaps);
+    pg->snap_trimq.subtract(purged);
+
+    if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) {
+      // adjust purged_snaps: PG may have been inactive while snaps were pruned
+      // from the removed_snaps_queue in the osdmap.  update local purged_snaps
+      // reflect only those snaps that we thought were pruned and were still in
+      // the queue.
+      info.purged_snaps.swap(purged);
+    }
+  }
+
+  // init complete pointer
+  if (missing.num_missing() == 0) {
+    psdout(10) << "activate - no missing, moving last_complete " << info.last_complete
+            << " -> " << info.last_update << dendl;
+    info.last_complete = info.last_update;
+    info.stats.stats.sum.num_objects_missing = 0;
+    pg_log.reset_recovery_pointers();
+  } else {
+    psdout(10) << "activate - not complete, " << missing << dendl;
+    info.stats.stats.sum.num_objects_missing = missing.num_missing();
+    pg_log.activate_not_complete(info);
+  }
+
+  pg->log_weirdness();
+
+  // if primary..
+  if (is_primary()) {
+    ceph_assert(ctx);
+    // start up replicas
+
+    ceph_assert(!acting_recovery_backfill.empty());
+    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+        i != acting_recovery_backfill.end();
+        ++i) {
+      if (*i == pg_whoami) continue;
+      pg_shard_t peer = *i;
+      ceph_assert(peer_info.count(peer));
+      pg_info_t& pi = peer_info[peer];
+
+      psdout(10) << "activate peer osd." << peer << " " << pi << dendl;
+
+      MOSDPGLog *m = 0;
+      ceph_assert(peer_missing.count(peer));
+      pg_missing_t& pm = peer_missing[peer];
+
+      bool needs_past_intervals = pi.dne();
+
+      /*
+       * cover case where peer sort order was different and
+       * last_backfill cannot be interpreted
+       */
+      bool force_restart_backfill =
+       !pi.last_backfill.is_max() &&
+       !pi.last_backfill_bitwise;
+
+      if (pi.last_update == info.last_update && !force_restart_backfill) {
+        // empty log
+       if (!pi.last_backfill.is_max())
+         pl->get_clog().info() << info.pgid << " continuing backfill to osd."
+                               << peer
+                               << " from (" << pi.log_tail << "," << pi.last_update
+                               << "] " << pi.last_backfill
+                               << " to " << info.last_update;
+       if (!pi.is_empty() && activator_map) {
+         psdout(10) << "activate peer osd." << peer
+                    << " is up to date, queueing in pending_activators" << dendl;
+         (*activator_map)[peer.osd].emplace_back(
+           pg_notify_t(
+             peer.shard, pg_whoami.shard,
+             get_osdmap_epoch(),
+             get_osdmap_epoch(),
+             info),
+           past_intervals);
+       } else {
+         psdout(10) << "activate peer osd." << peer
+                    << " is up to date, but sending pg_log anyway" << dendl;
+         m = new MOSDPGLog(
+           i->shard, pg_whoami.shard,
+           get_osdmap_epoch(), info,
+           last_peering_reset);
+       }
+      } else if (
+       pg_log.get_tail() > pi.last_update ||
+       pi.last_backfill == hobject_t() ||
+       force_restart_backfill ||
+       (backfill_targets.count(*i) && pi.last_backfill.is_max())) {
+       /* ^ This last case covers a situation where a replica is not contiguous
+        * with the auth_log, but is contiguous with this replica.  Reshuffling
+        * the active set to handle this would be tricky, so instead we just go
+        * ahead and backfill it anyway.  This is probably preferrable in any
+        * case since the replica in question would have to be significantly
+        * behind.
+        */
+       // backfill
+       pl->get_clog().debug() << info.pgid << " starting backfill to osd." << peer
+                              << " from (" << pi.log_tail << "," << pi.last_update
+                              << "] " << pi.last_backfill
+                              << " to " << info.last_update;
+
+       pi.last_update = info.last_update;
+       pi.last_complete = info.last_update;
+       pi.set_last_backfill(hobject_t());
+       pi.last_epoch_started = info.last_epoch_started;
+       pi.last_interval_started = info.last_interval_started;
+       pi.history = info.history;
+       pi.hit_set = info.hit_set;
+        // Save num_bytes for reservation request, can't be negative
+        peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
+        pi.stats.stats.clear();
+
+       // initialize peer with our purged_snaps.
+       pi.purged_snaps = info.purged_snaps;
+
+       m = new MOSDPGLog(
+         i->shard, pg_whoami.shard,
+         get_osdmap_epoch(), pi,
+         last_peering_reset /* epoch to create pg at */);
+
+       // send some recent log, so that op dup detection works well.
+       m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries);
+       m->info.log_tail = m->log.tail;
+       pi.log_tail = m->log.tail;  // sigh...
+
+       pm.clear();
+      } else {
+       // catch up
+       ceph_assert(pg_log.get_tail() <= pi.last_update);
+       m = new MOSDPGLog(
+         i->shard, pg_whoami.shard,
+         get_osdmap_epoch(), info,
+         last_peering_reset /* epoch to create pg at */);
+       // send new stuff to append to replicas log
+       m->log.copy_after(pg_log.get_log(), pi.last_update);
+      }
+
+      // share past_intervals if we are creating the pg on the replica
+      // based on whether our info for that peer was dne() *before*
+      // updating pi.history in the backfill block above.
+      if (m && needs_past_intervals)
+       m->past_intervals = past_intervals;
+
+      // update local version of peer's missing list!
+      if (m && pi.last_backfill != hobject_t()) {
+        for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
+             p != m->log.log.end();
+             ++p) {
+         if (p->soid <= pi.last_backfill &&
+             !p->is_error()) {
+           if (pg->perform_deletes_during_peering() && p->is_delete()) {
+             pm.rm(p->soid, p->version);
+           } else {
+             pm.add_next_event(*p);
+           }
+         }
+       }
+      }
+
+      if (m) {
+       dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
+       //m->log.print(cout);
+       pl->send_cluster_message(peer.osd, m, get_osdmap_epoch());
+      }
+
+      // peer now has
+      pi.last_update = info.last_update;
+
+      // update our missing
+      if (pm.num_missing() == 0) {
+       pi.last_complete = pi.last_update;
+        psdout(10) << "activate peer osd." << peer << " " << pi
+                  << " uptodate" << dendl;
+      } else {
+        psdout(10) << "activate peer osd." << peer << " " << pi
+                  << " missing " << pm << dendl;
+      }
+    }
+
+    // Set up missing_loc
+    set<pg_shard_t> complete_shards;
+    for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+        i != acting_recovery_backfill.end();
+        ++i) {
+      psdout(20) << __func__ << " setting up missing_loc from shard " << *i
+                << " " << dendl;
+      if (*i == get_primary()) {
+       missing_loc.add_active_missing(missing);
+        if (!missing.have_missing())
+          complete_shards.insert(*i);
+      } else {
+       auto peer_missing_entry = peer_missing.find(*i);
+       ceph_assert(peer_missing_entry != peer_missing.end());
+       missing_loc.add_active_missing(peer_missing_entry->second);
+        if (!peer_missing_entry->second.have_missing() &&
+           peer_info[*i].last_backfill.is_max())
+         complete_shards.insert(*i);
+      }
+    }
+
+    // If necessary, create might_have_unfound to help us find our unfound objects.
+    // NOTE: It's important that we build might_have_unfound before trimming the
+    // past intervals.
+    might_have_unfound.clear();
+    if (needs_recovery()) {
+      // If only one shard has missing, we do a trick to add all others as recovery
+      // source, this is considered safe since the PGLogs have been merged locally,
+      // and covers vast majority of the use cases, like one OSD/host is down for
+      // a while for hardware repairing
+      if (complete_shards.size() + 1 == acting_recovery_backfill.size()) {
+        missing_loc.add_batch_sources_info(complete_shards, ctx->handle);
+      } else {
+        missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
+                                   ctx->handle);
+        for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+            i != acting_recovery_backfill.end();
+            ++i) {
+         if (*i == pg_whoami) continue;
+         psdout(10) << __func__ << ": adding " << *i << " as a source" << dendl;
+         ceph_assert(peer_missing.count(*i));
+         ceph_assert(peer_info.count(*i));
+         missing_loc.add_source_info(
+           *i,
+           peer_info[*i],
+           peer_missing[*i],
+            ctx->handle);
+        }
+      }
+      for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
+          i != peer_missing.end();
+          ++i) {
+       if (is_acting_recovery_backfill(i->first))
+         continue;
+       ceph_assert(peer_info.count(i->first));
+       pg->search_for_missing(
+         peer_info[i->first],
+         i->second,
+         i->first,
+         ctx);
+      }
+
+      pg->build_might_have_unfound();
 
+      // Always call now so _update_calc_stats() will be accurate
+      pg->discover_all_missing(query_map);
+    }
+
+    // num_objects_degraded if calculated should reflect this too, unless no
+    // missing and we are about to go clean.
+    if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) {
+      state_set(PG_STATE_UNDERSIZED);
+    }
+
+    state_set(PG_STATE_ACTIVATING);
+    pg->release_pg_backoffs();
+    pg->projected_last_update = info.last_update;
+  }
+  if (acting.size() >= pool.info.min_size) {
+    PG::PGLogEntryHandler handler{pg, &t};
+    pg_log.roll_forward(&handler);
+  }
+}
 
 /*------------ Peering State Machine----------------*/
 #undef dout_prefix
@@ -3151,7 +3496,7 @@ PeeringState::Active::Active(my_context ctx)
   ceph_assert(ps->is_primary());
   psdout(10) << "In Active, about to call activate" << dendl;
   ps->start_flush(context< PeeringMachine >().get_cur_transaction());
-  pg->activate(*context< PeeringMachine >().get_cur_transaction(),
+  ps->activate(*context< PeeringMachine >().get_cur_transaction(),
               ps->get_osdmap_epoch(),
               *context< PeeringMachine >().get_query_map(),
               context< PeeringMachine >().get_info_map(),
@@ -3317,7 +3662,7 @@ boost::statechart::result PeeringState::Active::react(const MInfoRec& infoevt)
     ps->blocked_by.erase(infoevt.from.shard);
     pl->publish_stats_to_osd();
     if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) {
-      pg->all_activated_and_committed();
+      all_activated_and_committed();
     }
   }
   return discard_event();
@@ -3380,6 +3725,27 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q)
   return forward_event();
 }
 
+boost::statechart::result PeeringState::Active::react(
+  const ActivateCommitted &evt)
+{
+  DECLARE_LOCALS
+  ceph_assert(!ps->peer_activated.count(ps->pg_whoami));
+  ps->peer_activated.insert(ps->pg_whoami);
+  psdout(10) << "_activate_committed " << evt.epoch
+            << " peer_activated now " << ps->peer_activated
+            << " last_interval_started "
+            << ps->info.history.last_interval_started
+            << " last_epoch_started "
+            << ps->info.history.last_epoch_started
+            << " same_interval_since "
+            << ps->info.history.same_interval_since
+            << dendl;
+  ceph_assert(!ps->acting_recovery_backfill.empty());
+  if (ps->peer_activated.size() == ps->acting_recovery_backfill.size())
+    all_activated_and_committed();
+  return discard_event();
+}
+
 boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt)
 {
 
@@ -3443,6 +3809,32 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated
   return discard_event();
 }
 
+/*
+ * update info.history.last_epoch_started ONLY after we and all
+ * replicas have activated AND committed the activate transaction
+ * (i.e. the peering results are stable on disk).
+ */
+void PeeringState::Active::all_activated_and_committed()
+{
+  DECLARE_LOCALS
+  psdout(10) << "all_activated_and_committed" << dendl;
+  ceph_assert(ps->is_primary());
+  ceph_assert(ps->peer_activated.size() == ps->acting_recovery_backfill.size());
+  ceph_assert(!ps->acting_recovery_backfill.empty());
+  ceph_assert(ps->blocked_by.empty());
+
+  // Degraded?
+  // _update_calc_stats(); TODOSAM
+  if (ps->info.stats.stats.sum.num_objects_degraded) {
+    ps->state_set(PG_STATE_DEGRADED);
+  } else {
+    ps->state_clear(PG_STATE_DEGRADED);
+  }
+
+  post_event(PeeringState::AllReplicasActivated());
+}
+
+
 void PeeringState::Active::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
@@ -3482,13 +3874,44 @@ boost::statechart::result PeeringState::ReplicaActive::react(
   DECLARE_LOCALS
   psdout(10) << "In ReplicaActive, about to call activate" << dendl;
   map<int, map<spg_t, pg_query_t> > query_map;
-  pg->activate(*context< PeeringMachine >().get_cur_transaction(),
+  ps->activate(*context< PeeringMachine >().get_cur_transaction(),
               actevt.activation_epoch,
               query_map, NULL, NULL);
   psdout(10) << "Activate Finished" << dendl;
   return discard_event();
 }
 
+boost::statechart::result PeeringState::ReplicaActive::react(
+  const ActivateCommitted &evt)
+{
+  DECLARE_LOCALS
+  psdout(10) << "_activate_committed " << evt.epoch
+          << " telling primary" << dendl;
+  MOSDPGInfo *m = new MOSDPGInfo(evt.epoch);
+  pg_notify_t i = pg_notify_t(
+    ps->get_primary().shard, ps->pg_whoami.shard,
+    ps->get_osdmap_epoch(),
+    ps->get_osdmap_epoch(),
+    ps->info);
+
+  i.info.history.last_epoch_started = evt.activation_epoch;
+  i.info.history.last_interval_started = i.info.history.same_interval_since;
+  if (ps->acting.size() >= ps->pool.info.min_size) {
+    ps->state_set(PG_STATE_ACTIVE);
+  } else {
+    ps->state_set(PG_STATE_PEERED);
+  }
+
+  m->pg_list.emplace_back(i, PastIntervals());
+  pl->send_cluster_message(
+    ps->get_primary().osd,
+    m,
+    ps->get_osdmap_epoch());
+
+  pl->on_activate_committed();
+  return discard_event();
+}
+
 boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt)
 {
   DECLARE_LOCALS
index 6f89570f96c64352290c13117a1d532ffaa19fdb..ce53bdd49cdd81e44dcbf02c69242a74a561627c 100644 (file)
@@ -95,6 +95,10 @@ public:
       PGPeeringEventRef on_preempt) = 0;
     virtual void cancel_remote_recovery_reservation() = 0;
 
+    virtual void schedule_event_on_commit(
+      ObjectStore::Transaction &t,
+      PGPeeringEventRef on_commit) = 0;
+
     // HB
     virtual void set_probe_targets(const set<pg_shard_t> &probe_set) = 0;
     virtual void clear_probe_targets() = 0;
@@ -120,6 +124,7 @@ public:
     virtual void on_activate() = 0;
     virtual void on_new_interval() = 0;
     virtual Context *on_clean() = 0;
+    virtual void on_activate_committed() = 0;
 
     virtual void on_active_exit() = 0;
 
@@ -257,6 +262,17 @@ public:
       *out << "Activate from " << activation_epoch;
     }
   };
+  struct ActivateCommitted : boost::statechart::event< ActivateCommitted > {
+    epoch_t epoch;
+    epoch_t activation_epoch;
+    explicit ActivateCommitted(epoch_t e, epoch_t ae)
+      : boost::statechart::event< ActivateCommitted >(),
+       activation_epoch(ae) {}
+    void print(std::ostream *out) const {
+      *out << "ActivateCommitted from " << activation_epoch
+          << " processed at " << epoch;
+    }
+  };
 public:
   struct UnfoundBackfill : boost::statechart::event<UnfoundBackfill> {
     explicit UnfoundBackfill() {}
@@ -601,6 +617,7 @@ public:
       boost::statechart::custom_reaction< MLogRec >,
       boost::statechart::custom_reaction< MTrim >,
       boost::statechart::custom_reaction< Backfilled >,
+      boost::statechart::custom_reaction< ActivateCommitted >,
       boost::statechart::custom_reaction< AllReplicasActivated >,
       boost::statechart::custom_reaction< DeferRecovery >,
       boost::statechart::custom_reaction< DeferBackfill >,
@@ -620,6 +637,7 @@ public:
     boost::statechart::result react(const Backfilled&) {
       return discard_event();
     }
+    boost::statechart::result react(const ActivateCommitted&);
     boost::statechart::result react(const AllReplicasActivated&);
     boost::statechart::result react(const DeferRecovery& evt) {
       return discard_event();
@@ -642,6 +660,7 @@ public:
     boost::statechart::result react(const DoRecovery&) {
       return discard_event();
     }
+    void all_activated_and_committed();
   };
 
   struct Clean : boost::statechart::state< Clean, Active >, NamedState {
@@ -764,6 +783,7 @@ public:
       boost::statechart::custom_reaction< MLogRec >,
       boost::statechart::custom_reaction< MTrim >,
       boost::statechart::custom_reaction< Activate >,
+      boost::statechart::custom_reaction< ActivateCommitted >,
       boost::statechart::custom_reaction< DeferRecovery >,
       boost::statechart::custom_reaction< DeferBackfill >,
       boost::statechart::custom_reaction< UnfoundRecovery >,
@@ -780,6 +800,7 @@ public:
     boost::statechart::result react(const ActMap&);
     boost::statechart::result react(const MQuery&);
     boost::statechart::result react(const Activate&);
+    boost::statechart::result react(const ActivateCommitted&);
     boost::statechart::result react(const RecoveryDone&) {
       return discard_event();
     }
@@ -953,6 +974,7 @@ public:
     unsigned priority = 0;
     typedef boost::mpl::list <
       boost::statechart::custom_reaction< ActMap >,
+      boost::statechart::custom_reaction< ActivateCommitted >,
       boost::statechart::custom_reaction< DeleteSome >
       > reactions;
     explicit ToDelete(my_context ctx);
@@ -961,6 +983,11 @@ public:
       // happens if we drop out of Deleting due to reprioritization etc.
       return discard_event();
     }
+    boost::statechart::result react(const ActivateCommitted&) {
+      // Can happens if we were activated as a stray but not actually pulled
+      // from prior to the pg going clean and sending a delete.
+      return discard_event();
+    }
     void exit();
   };
 
@@ -1305,6 +1332,13 @@ public:
                     bool restrict_to_up_acting,
                     bool *history_les_bound);
 
+  void activate(
+    ObjectStore::Transaction& t,
+    epoch_t activation_epoch,
+    map<int, map<spg_t,pg_query_t> >& query_map,
+    map<int, vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
+    PeeringCtx *ctx);
+
 public:
   PeeringState(
     CephContext *cct,