From: Samuel Just Date: Fri, 12 Apr 2019 22:34:25 +0000 (-0700) Subject: osd/: start moving activate to PeeringState X-Git-Tag: v15.1.0~2774^2~50 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=749a13d86482391fdb93c428879ad396eb61cf46;p=ceph.git osd/: start moving activate to PeeringState Signed-off-by: Samuel Just --- diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 489669666ef..7bfd4bb27f9 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -581,343 +581,6 @@ void PG::build_might_have_unfound() dout(15) << __func__ << ": built " << might_have_unfound << dendl; } -void PG::activate(ObjectStore::Transaction& t, - epoch_t activation_epoch, - map >& query_map, - map > > *activator_map, - PeeringCtx *ctx) -{ - ceph_assert(!is_peered()); - ceph_assert(scrubber.callbacks.empty()); - ceph_assert(callbacks_for_degraded_object.empty()); - - // twiddle pg state - state_clear(PG_STATE_DOWN); - - send_notify = false; - - if (is_primary()) { - // only update primary last_epoch_started if we will go active - if (acting.size() >= pool.info.min_size) { - ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les || - info.last_epoch_started <= activation_epoch); - info.last_epoch_started = activation_epoch; - info.last_interval_started = info.history.same_interval_since; - } - } else if (is_acting(pg_whoami)) { - /* update last_epoch_started on acting replica to whatever the primary sent - * unless it's smaller (could happen if we are going peered rather than - * active, see doc/dev/osd_internals/last_epoch_started.rst) */ - if (info.last_epoch_started < activation_epoch) { - info.last_epoch_started = activation_epoch; - info.last_interval_started = info.history.same_interval_since; - } - } - - auto &missing = pg_log.get_missing(); - - if (is_primary()) { - last_update_ondisk = info.last_update; - min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)! - } - last_update_applied = info.last_update; - last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to(); - - need_up_thru = false; - - // write pg info, log - dirty_info = true; - dirty_big_info = true; // maybe - - // find out when we commit - t.register_on_complete( - new C_PG_ActivateCommitted( - this, - get_osdmap_epoch(), - activation_epoch)); - - if (is_primary()) { - // initialize snap_trimq - if (get_osdmap()->require_osd_release < CEPH_RELEASE_MIMIC) { - dout(20) << "activate - purged_snaps " << info.purged_snaps - << " cached_removed_snaps " << pool.cached_removed_snaps - << dendl; - snap_trimq = pool.cached_removed_snaps; - } else { - auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue(); - auto p = removed_snaps_queue.find(info.pgid.pgid.pool()); - snap_trimq.clear(); - if (p != removed_snaps_queue.end()) { - dout(20) << "activate - purged_snaps " << info.purged_snaps - << " removed_snaps " << p->second - << dendl; - for (auto q : p->second) { - snap_trimq.insert(q.first, q.second); - } - } - } - interval_set purged; - purged.intersection_of(snap_trimq, info.purged_snaps); - snap_trimq.subtract(purged); - - if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) { - // adjust purged_snaps: PG may have been inactive while snaps were pruned - // from the removed_snaps_queue in the osdmap. update local purged_snaps - // reflect only those snaps that we thought were pruned and were still in - // the queue. - info.purged_snaps.swap(purged); - } - } - - // init complete pointer - if (missing.num_missing() == 0) { - dout(10) << "activate - no missing, moving last_complete " << info.last_complete - << " -> " << info.last_update << dendl; - info.last_complete = info.last_update; - info.stats.stats.sum.num_objects_missing = 0; - pg_log.reset_recovery_pointers(); - } else { - dout(10) << "activate - not complete, " << missing << dendl; - info.stats.stats.sum.num_objects_missing = missing.num_missing(); - pg_log.activate_not_complete(info); - } - - log_weirdness(); - - // if primary.. - if (is_primary()) { - ceph_assert(ctx); - // start up replicas - - ceph_assert(!acting_recovery_backfill.empty()); - for (set::iterator i = acting_recovery_backfill.begin(); - i != acting_recovery_backfill.end(); - ++i) { - if (*i == pg_whoami) continue; - pg_shard_t peer = *i; - ceph_assert(peer_info.count(peer)); - pg_info_t& pi = peer_info[peer]; - - dout(10) << "activate peer osd." << peer << " " << pi << dendl; - - MOSDPGLog *m = 0; - ceph_assert(peer_missing.count(peer)); - pg_missing_t& pm = peer_missing[peer]; - - bool needs_past_intervals = pi.dne(); - - /* - * cover case where peer sort order was different and - * last_backfill cannot be interpreted - */ - bool force_restart_backfill = - !pi.last_backfill.is_max() && - !pi.last_backfill_bitwise; - - if (pi.last_update == info.last_update && !force_restart_backfill) { - // empty log - if (!pi.last_backfill.is_max()) - osd->clog->info() << info.pgid << " continuing backfill to osd." - << peer - << " from (" << pi.log_tail << "," << pi.last_update - << "] " << pi.last_backfill - << " to " << info.last_update; - if (!pi.is_empty() && activator_map) { - dout(10) << "activate peer osd." << peer << " is up to date, queueing in pending_activators" << dendl; - (*activator_map)[peer.osd].emplace_back( - pg_notify_t( - peer.shard, pg_whoami.shard, - get_osdmap_epoch(), - get_osdmap_epoch(), - info), - past_intervals); - } else { - dout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl; - m = new MOSDPGLog( - i->shard, pg_whoami.shard, - get_osdmap_epoch(), info, - last_peering_reset); - } - } else if ( - pg_log.get_tail() > pi.last_update || - pi.last_backfill == hobject_t() || - force_restart_backfill || - (backfill_targets.count(*i) && pi.last_backfill.is_max())) { - /* ^ This last case covers a situation where a replica is not contiguous - * with the auth_log, but is contiguous with this replica. Reshuffling - * the active set to handle this would be tricky, so instead we just go - * ahead and backfill it anyway. This is probably preferrable in any - * case since the replica in question would have to be significantly - * behind. - */ - // backfill - osd->clog->debug() << info.pgid << " starting backfill to osd." << peer - << " from (" << pi.log_tail << "," << pi.last_update - << "] " << pi.last_backfill - << " to " << info.last_update; - - pi.last_update = info.last_update; - pi.last_complete = info.last_update; - pi.set_last_backfill(hobject_t()); - pi.last_epoch_started = info.last_epoch_started; - pi.last_interval_started = info.last_interval_started; - pi.history = info.history; - pi.hit_set = info.hit_set; - // Save num_bytes for reservation request, can't be negative - peer_bytes[peer] = std::max(0, pi.stats.stats.sum.num_bytes); - pi.stats.stats.clear(); - - // initialize peer with our purged_snaps. - pi.purged_snaps = info.purged_snaps; - - m = new MOSDPGLog( - i->shard, pg_whoami.shard, - get_osdmap_epoch(), pi, - last_peering_reset /* epoch to create pg at */); - - // send some recent log, so that op dup detection works well. - m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries); - m->info.log_tail = m->log.tail; - pi.log_tail = m->log.tail; // sigh... - - pm.clear(); - } else { - // catch up - ceph_assert(pg_log.get_tail() <= pi.last_update); - m = new MOSDPGLog( - i->shard, pg_whoami.shard, - get_osdmap_epoch(), info, - last_peering_reset /* epoch to create pg at */); - // send new stuff to append to replicas log - m->log.copy_after(pg_log.get_log(), pi.last_update); - } - - // share past_intervals if we are creating the pg on the replica - // based on whether our info for that peer was dne() *before* - // updating pi.history in the backfill block above. - if (m && needs_past_intervals) - m->past_intervals = past_intervals; - - // update local version of peer's missing list! - if (m && pi.last_backfill != hobject_t()) { - for (list::iterator p = m->log.log.begin(); - p != m->log.log.end(); - ++p) { - if (p->soid <= pi.last_backfill && - !p->is_error()) { - if (perform_deletes_during_peering() && p->is_delete()) { - pm.rm(p->soid, p->version); - } else { - pm.add_next_event(*p); - } - } - } - } - - if (m) { - dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl; - //m->log.print(cout); - osd->send_message_osd_cluster(peer.osd, m, get_osdmap_epoch()); - } - - // peer now has - pi.last_update = info.last_update; - - // update our missing - if (pm.num_missing() == 0) { - pi.last_complete = pi.last_update; - dout(10) << "activate peer osd." << peer << " " << pi << " uptodate" << dendl; - } else { - dout(10) << "activate peer osd." << peer << " " << pi << " missing " << pm << dendl; - } - } - - // Set up missing_loc - set complete_shards; - for (set::iterator i = acting_recovery_backfill.begin(); - i != acting_recovery_backfill.end(); - ++i) { - dout(20) << __func__ << " setting up missing_loc from shard " << *i << " " << dendl; - if (*i == get_primary()) { - missing_loc.add_active_missing(missing); - if (!missing.have_missing()) - complete_shards.insert(*i); - } else { - auto peer_missing_entry = peer_missing.find(*i); - ceph_assert(peer_missing_entry != peer_missing.end()); - missing_loc.add_active_missing(peer_missing_entry->second); - if (!peer_missing_entry->second.have_missing() && - peer_info[*i].last_backfill.is_max()) - complete_shards.insert(*i); - } - } - - // If necessary, create might_have_unfound to help us find our unfound objects. - // NOTE: It's important that we build might_have_unfound before trimming the - // past intervals. - might_have_unfound.clear(); - if (needs_recovery()) { - // If only one shard has missing, we do a trick to add all others as recovery - // source, this is considered safe since the PGLogs have been merged locally, - // and covers vast majority of the use cases, like one OSD/host is down for - // a while for hardware repairing - if (complete_shards.size() + 1 == acting_recovery_backfill.size()) { - missing_loc.add_batch_sources_info(complete_shards, ctx->handle); - } else { - missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(), - ctx->handle); - for (set::iterator i = acting_recovery_backfill.begin(); - i != acting_recovery_backfill.end(); - ++i) { - if (*i == pg_whoami) continue; - dout(10) << __func__ << ": adding " << *i << " as a source" << dendl; - ceph_assert(peer_missing.count(*i)); - ceph_assert(peer_info.count(*i)); - missing_loc.add_source_info( - *i, - peer_info[*i], - peer_missing[*i], - ctx->handle); - } - } - for (map::iterator i = peer_missing.begin(); - i != peer_missing.end(); - ++i) { - if (is_acting_recovery_backfill(i->first)) - continue; - ceph_assert(peer_info.count(i->first)); - search_for_missing( - peer_info[i->first], - i->second, - i->first, - ctx); - } - - build_might_have_unfound(); - - // Always call now so _update_calc_stats() will be accurate - discover_all_missing(query_map); - } - - // num_objects_degraded if calculated should reflect this too, unless no - // missing and we are about to go clean. - if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) { - state_set(PG_STATE_UNDERSIZED); - } - - state_set(PG_STATE_ACTIVATING); - release_pg_backoffs(); - projected_last_update = info.last_update; - } - if (acting.size() >= pool.info.min_size) { - PGLogEntryHandler handler{this, &t}; - pg_log.roll_forward(&handler); - } -} - bool PG::op_has_sufficient_caps(OpRequestRef& op) { // only check MOSDOp @@ -961,89 +624,6 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op) return cap; } -void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch) -{ - lock(); - if (pg_has_reset_since(epoch)) { - dout(10) << "_activate_committed " << epoch - << ", that was an old interval" << dendl; - } else if (is_primary()) { - ceph_assert(!peer_activated.count(pg_whoami)); - peer_activated.insert(pg_whoami); - dout(10) << "_activate_committed " << epoch - << " peer_activated now " << peer_activated - << " last_interval_started " << info.history.last_interval_started - << " last_epoch_started " << info.history.last_epoch_started - << " same_interval_since " << info.history.same_interval_since << dendl; - ceph_assert(!acting_recovery_backfill.empty()); - if (peer_activated.size() == acting_recovery_backfill.size()) - all_activated_and_committed(); - } else { - dout(10) << "_activate_committed " << epoch << " telling primary" << dendl; - MOSDPGInfo *m = new MOSDPGInfo(epoch); - pg_notify_t i = pg_notify_t( - get_primary().shard, pg_whoami.shard, - get_osdmap_epoch(), - get_osdmap_epoch(), - info); - - i.info.history.last_epoch_started = activation_epoch; - i.info.history.last_interval_started = i.info.history.same_interval_since; - if (acting.size() >= pool.info.min_size) { - state_set(PG_STATE_ACTIVE); - } else { - state_set(PG_STATE_PEERED); - } - - m->pg_list.emplace_back(i, PastIntervals()); - osd->send_message_osd_cluster(get_primary().osd, m, get_osdmap_epoch()); - - // waiters - if (recovery_state.needs_flush() == 0) { - requeue_ops(waiting_for_peered); - } else if (!waiting_for_peered.empty()) { - dout(10) << __func__ << " flushes in progress, moving " - << waiting_for_peered.size() << " items to waiting_for_flush" - << dendl; - ceph_assert(waiting_for_flush.empty()); - waiting_for_flush.swap(waiting_for_peered); - } - } - - ceph_assert(!dirty_info); - - unlock(); -} - -/* - * update info.history.last_epoch_started ONLY after we and all - * replicas have activated AND committed the activate transaction - * (i.e. the peering results are stable on disk). - */ -void PG::all_activated_and_committed() -{ - dout(10) << "all_activated_and_committed" << dendl; - ceph_assert(is_primary()); - ceph_assert(peer_activated.size() == acting_recovery_backfill.size()); - ceph_assert(!acting_recovery_backfill.empty()); - ceph_assert(blocked_by.empty()); - - // Degraded? - _update_calc_stats(); - if (info.stats.stats.sum.num_objects_degraded) { - state_set(PG_STATE_DEGRADED); - } else { - state_clear(PG_STATE_DEGRADED); - } - - queue_peering_event( - PGPeeringEventRef( - std::make_shared( - get_osdmap_epoch(), - get_osdmap_epoch(), - PeeringState::AllReplicasActivated()))); -} - bool PG::requeue_scrub(bool high_priority) { ceph_assert(is_locked()); @@ -3130,6 +2710,13 @@ void PG::cancel_remote_recovery_reservation() { pg_id); } +void PG::schedule_event_on_commit( + ObjectStore::Transaction &t, + PGPeeringEventRef on_commit) +{ + t.register_on_commit(new QueuePeeringEvt(this, on_commit)); +} + void PG::on_active_exit() { backfill_reserving = false; @@ -3258,6 +2845,21 @@ void PG::on_recovery_reserved() queue_recovery(); } +void PG::on_activate_committed() +{ + if (!is_primary()) { + // waiters + if (recovery_state.needs_flush() == 0) { + requeue_ops(waiting_for_peered); + } else if (!waiting_for_peered.empty()) { + dout(10) << __func__ << " flushes in progress, moving " + << waiting_for_peered.size() << " items to waiting_for_flush" + << dendl; + ceph_assert(waiting_for_flush.empty()); + waiting_for_flush.swap(waiting_for_peered); + } + } +} void PG::do_replica_scrub_map(OpRequestRef op) { diff --git a/src/osd/PG.h b/src/osd/PG.h index 1bab7af1b87..e4bdd94e903 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -448,12 +448,17 @@ public: PGPeeringEventRef on_preempt) override; void cancel_remote_recovery_reservation() override; + void schedule_event_on_commit( + ObjectStore::Transaction &t, + PGPeeringEventRef on_commit) override; + void on_active_exit() override; Context *on_clean() override { try_mark_clean(); return finish_recovery(); } + void on_activate_committed() override; void on_active_actmap() override; void on_active_advmap(const OSDMapRef &osdmap) override; @@ -1051,26 +1056,6 @@ protected: void discover_all_missing(std::map > &query_map); void build_might_have_unfound(); - void activate( - ObjectStore::Transaction& t, - epoch_t activation_epoch, - map >& query_map, - map > > *activator_map, - PeeringCtx *ctx); - - struct C_PG_ActivateCommitted : public Context { - PGRef pg; - epoch_t epoch; - epoch_t activation_epoch; - C_PG_ActivateCommitted(PG *p, epoch_t e, epoch_t ae) - : pg(p), epoch(e), activation_epoch(ae) {} - void finish(int r) override { - pg->_activate_committed(epoch, activation_epoch); - } - }; - void _activate_committed(epoch_t epoch, epoch_t activation_epoch); - void all_activated_and_committed(); void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info); diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 2640dc8ecac..5486008c774 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -11,6 +11,7 @@ #include "messages/MBackfillReserve.h" #include "messages/MRecoveryReserve.h" #include "messages/MOSDScrubReserve.h" +#include "messages/MOSDPGInfo.h" #define dout_context cct #define dout_subsys ceph_subsys_osd @@ -147,7 +148,7 @@ void PeeringState::check_recovery_sources(const OSDMapRef& osdmap) i != peer_log_requested.end(); ) { if (!osdmap->is_up(i->osd)) { - dout(10) << "peer_log_requested removing " << *i << dendl; + psdout(10) << "peer_log_requested removing " << *i << dendl; peer_log_requested.erase(i++); } else { ++i; @@ -158,7 +159,7 @@ void PeeringState::check_recovery_sources(const OSDMapRef& osdmap) i != peer_missing_requested.end(); ) { if (!osdmap->is_up(i->osd)) { - dout(10) << "peer_missing_requested removing " << *i << dendl; + psdout(10) << "peer_missing_requested removing " << *i << dendl; peer_missing_requested.erase(i++); } else { ++i; @@ -198,7 +199,7 @@ void PeeringState::purge_strays() ++p) { ceph_assert(!is_acting_recovery_backfill(*p)); if (get_osdmap()->is_up(p->osd)) { - dout(10) << "sending PGRemove to osd." << *p << dendl; + psdout(10) << "sending PGRemove to osd." << *p << dendl; vector to_remove; to_remove.push_back(spg_t(info.pgid.pgid, p->shard)); MOSDPGRemove *m = new MOSDPGRemove( @@ -206,7 +207,7 @@ void PeeringState::purge_strays() to_remove); pl->send_cluster_message(p->osd, m, get_osdmap_epoch()); } else { - dout(10) << "not sending PGRemove to down osd." << *p << dendl; + psdout(10) << "not sending PGRemove to down osd." << *p << dendl; } peer_missing.erase(*p); peer_info.erase(*p); @@ -232,17 +233,18 @@ bool PeeringState::proc_replica_info( { map::iterator p = peer_info.find(from); if (p != peer_info.end() && p->second.last_update == oinfo.last_update) { - dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl; + psdout(10) << " got dup osd." << from << " info " + << oinfo << ", identical to ours" << dendl; return false; } if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) { - dout(10) << " got info " << oinfo << " from down osd." << from + psdout(10) << " got info " << oinfo << " from down osd." << from << " discarding" << dendl; return false; } - dout(10) << " got osd." << from << " " << oinfo << dendl; + psdout(10) << " got osd." << from << " " << oinfo << dendl; ceph_assert(is_primary()); peer_info[from] = oinfo; might_have_unfound.insert(from); @@ -251,7 +253,7 @@ bool PeeringState::proc_replica_info( // stray? if (!is_up(from) && !is_acting(from)) { - dout(10) << " osd." << from << " has stray content: " << oinfo << dendl; + psdout(10) << " osd." << from << " has stray content: " << oinfo << dendl; stray_set.insert(from); if (is_clean()) { purge_strays(); @@ -378,7 +380,8 @@ void PeeringState::set_last_peering_reset() { psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl; if (last_peering_reset != get_osdmap_epoch()) { - dout(10) << "Clearing blocked outgoing recovery messages" << dendl; + last_peering_reset = get_osdmap_epoch(); + psdout(10) << "Clearing blocked outgoing recovery messages" << dendl; clear_blocked_outgoing(); if (!pl->try_flush_or_schedule_async()) { psdout(10) << "Beginning to block outgoing recovery messages" << dendl; @@ -443,8 +446,8 @@ bool PeeringState::should_restart_peering( osdmap.get(), lastmap.get(), info.pgid.pgid)) { - dout(20) << "new interval newup " << newup - << " newacting " << newacting << dendl; + psdout(20) << "new interval newup " << newup + << " newacting " << newacting << dendl; return true; } if (!lastmap->is_up(pg_whoami.osd) && osdmap->is_up(pg_whoami.osd)) { @@ -1032,8 +1035,8 @@ bool PeeringState::needs_recovery() const auto &missing = pg_log.get_missing(); if (missing.num_missing()) { - dout(10) << __func__ << " primary has " << missing.num_missing() - << " missing" << dendl; + psdout(10) << __func__ << " primary has " << missing.num_missing() + << " missing" << dendl; return true; } @@ -1078,7 +1081,7 @@ bool PeeringState::needs_backfill() const } } - dout(10) << __func__ << " does not need backfill" << dendl; + psdout(10) << __func__ << " does not need backfill" << dendl; return false; } @@ -1836,7 +1839,349 @@ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id, return true; } +void PeeringState::activate( + ObjectStore::Transaction& t, + epoch_t activation_epoch, + map >& query_map, + map > > *activator_map, + PeeringCtx *ctx) +{ + ceph_assert(!is_peered()); + // ceph_assert(scrubber.callbacks.empty()); TODOSAM + // ceph_assert(callbacks_for_degraded_object.empty()); TODOSAM + + // twiddle pg state + state_clear(PG_STATE_DOWN); + + send_notify = false; + + if (is_primary()) { + // only update primary last_epoch_started if we will go active + if (acting.size() >= pool.info.min_size) { + ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les || + info.last_epoch_started <= activation_epoch); + info.last_epoch_started = activation_epoch; + info.last_interval_started = info.history.same_interval_since; + } + } else if (is_acting(pg_whoami)) { + /* update last_epoch_started on acting replica to whatever the primary sent + * unless it's smaller (could happen if we are going peered rather than + * active, see doc/dev/osd_internals/last_epoch_started.rst) */ + if (info.last_epoch_started < activation_epoch) { + info.last_epoch_started = activation_epoch; + info.last_interval_started = info.history.same_interval_since; + } + } + + auto &missing = pg_log.get_missing(); + + if (is_primary()) { + last_update_ondisk = info.last_update; + min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)! + } + last_update_applied = info.last_update; + last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to(); + + need_up_thru = false; + + // write pg info, log + dirty_info = true; + dirty_big_info = true; // maybe + + pl->schedule_event_on_commit( + t, + std::make_shared( + get_osdmap_epoch(), + get_osdmap_epoch(), + ActivateCommitted( + get_osdmap_epoch(), + activation_epoch))); + + if (is_primary()) { + // initialize snap_trimq + if (get_osdmap()->require_osd_release < CEPH_RELEASE_MIMIC) { + psdout(20) << "activate - purged_snaps " << info.purged_snaps + << " cached_removed_snaps " << pool.cached_removed_snaps + << dendl; + pg->snap_trimq = pool.cached_removed_snaps; + } else { + auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue(); + auto p = removed_snaps_queue.find(info.pgid.pgid.pool()); + pg->snap_trimq.clear(); + if (p != removed_snaps_queue.end()) { + dout(20) << "activate - purged_snaps " << info.purged_snaps + << " removed_snaps " << p->second + << dendl; + for (auto q : p->second) { + pg->snap_trimq.insert(q.first, q.second); + } + } + } + interval_set purged; + purged.intersection_of(pg->snap_trimq, info.purged_snaps); + pg->snap_trimq.subtract(purged); + + if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) { + // adjust purged_snaps: PG may have been inactive while snaps were pruned + // from the removed_snaps_queue in the osdmap. update local purged_snaps + // reflect only those snaps that we thought were pruned and were still in + // the queue. + info.purged_snaps.swap(purged); + } + } + + // init complete pointer + if (missing.num_missing() == 0) { + psdout(10) << "activate - no missing, moving last_complete " << info.last_complete + << " -> " << info.last_update << dendl; + info.last_complete = info.last_update; + info.stats.stats.sum.num_objects_missing = 0; + pg_log.reset_recovery_pointers(); + } else { + psdout(10) << "activate - not complete, " << missing << dendl; + info.stats.stats.sum.num_objects_missing = missing.num_missing(); + pg_log.activate_not_complete(info); + } + + pg->log_weirdness(); + + // if primary.. + if (is_primary()) { + ceph_assert(ctx); + // start up replicas + + ceph_assert(!acting_recovery_backfill.empty()); + for (set::iterator i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + if (*i == pg_whoami) continue; + pg_shard_t peer = *i; + ceph_assert(peer_info.count(peer)); + pg_info_t& pi = peer_info[peer]; + + psdout(10) << "activate peer osd." << peer << " " << pi << dendl; + + MOSDPGLog *m = 0; + ceph_assert(peer_missing.count(peer)); + pg_missing_t& pm = peer_missing[peer]; + + bool needs_past_intervals = pi.dne(); + + /* + * cover case where peer sort order was different and + * last_backfill cannot be interpreted + */ + bool force_restart_backfill = + !pi.last_backfill.is_max() && + !pi.last_backfill_bitwise; + + if (pi.last_update == info.last_update && !force_restart_backfill) { + // empty log + if (!pi.last_backfill.is_max()) + pl->get_clog().info() << info.pgid << " continuing backfill to osd." + << peer + << " from (" << pi.log_tail << "," << pi.last_update + << "] " << pi.last_backfill + << " to " << info.last_update; + if (!pi.is_empty() && activator_map) { + psdout(10) << "activate peer osd." << peer + << " is up to date, queueing in pending_activators" << dendl; + (*activator_map)[peer.osd].emplace_back( + pg_notify_t( + peer.shard, pg_whoami.shard, + get_osdmap_epoch(), + get_osdmap_epoch(), + info), + past_intervals); + } else { + psdout(10) << "activate peer osd." << peer + << " is up to date, but sending pg_log anyway" << dendl; + m = new MOSDPGLog( + i->shard, pg_whoami.shard, + get_osdmap_epoch(), info, + last_peering_reset); + } + } else if ( + pg_log.get_tail() > pi.last_update || + pi.last_backfill == hobject_t() || + force_restart_backfill || + (backfill_targets.count(*i) && pi.last_backfill.is_max())) { + /* ^ This last case covers a situation where a replica is not contiguous + * with the auth_log, but is contiguous with this replica. Reshuffling + * the active set to handle this would be tricky, so instead we just go + * ahead and backfill it anyway. This is probably preferrable in any + * case since the replica in question would have to be significantly + * behind. + */ + // backfill + pl->get_clog().debug() << info.pgid << " starting backfill to osd." << peer + << " from (" << pi.log_tail << "," << pi.last_update + << "] " << pi.last_backfill + << " to " << info.last_update; + + pi.last_update = info.last_update; + pi.last_complete = info.last_update; + pi.set_last_backfill(hobject_t()); + pi.last_epoch_started = info.last_epoch_started; + pi.last_interval_started = info.last_interval_started; + pi.history = info.history; + pi.hit_set = info.hit_set; + // Save num_bytes for reservation request, can't be negative + peer_bytes[peer] = std::max(0, pi.stats.stats.sum.num_bytes); + pi.stats.stats.clear(); + + // initialize peer with our purged_snaps. + pi.purged_snaps = info.purged_snaps; + + m = new MOSDPGLog( + i->shard, pg_whoami.shard, + get_osdmap_epoch(), pi, + last_peering_reset /* epoch to create pg at */); + + // send some recent log, so that op dup detection works well. + m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries); + m->info.log_tail = m->log.tail; + pi.log_tail = m->log.tail; // sigh... + + pm.clear(); + } else { + // catch up + ceph_assert(pg_log.get_tail() <= pi.last_update); + m = new MOSDPGLog( + i->shard, pg_whoami.shard, + get_osdmap_epoch(), info, + last_peering_reset /* epoch to create pg at */); + // send new stuff to append to replicas log + m->log.copy_after(pg_log.get_log(), pi.last_update); + } + + // share past_intervals if we are creating the pg on the replica + // based on whether our info for that peer was dne() *before* + // updating pi.history in the backfill block above. + if (m && needs_past_intervals) + m->past_intervals = past_intervals; + + // update local version of peer's missing list! + if (m && pi.last_backfill != hobject_t()) { + for (list::iterator p = m->log.log.begin(); + p != m->log.log.end(); + ++p) { + if (p->soid <= pi.last_backfill && + !p->is_error()) { + if (pg->perform_deletes_during_peering() && p->is_delete()) { + pm.rm(p->soid, p->version); + } else { + pm.add_next_event(*p); + } + } + } + } + + if (m) { + dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl; + //m->log.print(cout); + pl->send_cluster_message(peer.osd, m, get_osdmap_epoch()); + } + + // peer now has + pi.last_update = info.last_update; + + // update our missing + if (pm.num_missing() == 0) { + pi.last_complete = pi.last_update; + psdout(10) << "activate peer osd." << peer << " " << pi + << " uptodate" << dendl; + } else { + psdout(10) << "activate peer osd." << peer << " " << pi + << " missing " << pm << dendl; + } + } + + // Set up missing_loc + set complete_shards; + for (set::iterator i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + psdout(20) << __func__ << " setting up missing_loc from shard " << *i + << " " << dendl; + if (*i == get_primary()) { + missing_loc.add_active_missing(missing); + if (!missing.have_missing()) + complete_shards.insert(*i); + } else { + auto peer_missing_entry = peer_missing.find(*i); + ceph_assert(peer_missing_entry != peer_missing.end()); + missing_loc.add_active_missing(peer_missing_entry->second); + if (!peer_missing_entry->second.have_missing() && + peer_info[*i].last_backfill.is_max()) + complete_shards.insert(*i); + } + } + + // If necessary, create might_have_unfound to help us find our unfound objects. + // NOTE: It's important that we build might_have_unfound before trimming the + // past intervals. + might_have_unfound.clear(); + if (needs_recovery()) { + // If only one shard has missing, we do a trick to add all others as recovery + // source, this is considered safe since the PGLogs have been merged locally, + // and covers vast majority of the use cases, like one OSD/host is down for + // a while for hardware repairing + if (complete_shards.size() + 1 == acting_recovery_backfill.size()) { + missing_loc.add_batch_sources_info(complete_shards, ctx->handle); + } else { + missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(), + ctx->handle); + for (set::iterator i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + if (*i == pg_whoami) continue; + psdout(10) << __func__ << ": adding " << *i << " as a source" << dendl; + ceph_assert(peer_missing.count(*i)); + ceph_assert(peer_info.count(*i)); + missing_loc.add_source_info( + *i, + peer_info[*i], + peer_missing[*i], + ctx->handle); + } + } + for (map::iterator i = peer_missing.begin(); + i != peer_missing.end(); + ++i) { + if (is_acting_recovery_backfill(i->first)) + continue; + ceph_assert(peer_info.count(i->first)); + pg->search_for_missing( + peer_info[i->first], + i->second, + i->first, + ctx); + } + + pg->build_might_have_unfound(); + // Always call now so _update_calc_stats() will be accurate + pg->discover_all_missing(query_map); + } + + // num_objects_degraded if calculated should reflect this too, unless no + // missing and we are about to go clean. + if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) { + state_set(PG_STATE_UNDERSIZED); + } + + state_set(PG_STATE_ACTIVATING); + pg->release_pg_backoffs(); + pg->projected_last_update = info.last_update; + } + if (acting.size() >= pool.info.min_size) { + PG::PGLogEntryHandler handler{pg, &t}; + pg_log.roll_forward(&handler); + } +} /*------------ Peering State Machine----------------*/ #undef dout_prefix @@ -3151,7 +3496,7 @@ PeeringState::Active::Active(my_context ctx) ceph_assert(ps->is_primary()); psdout(10) << "In Active, about to call activate" << dendl; ps->start_flush(context< PeeringMachine >().get_cur_transaction()); - pg->activate(*context< PeeringMachine >().get_cur_transaction(), + ps->activate(*context< PeeringMachine >().get_cur_transaction(), ps->get_osdmap_epoch(), *context< PeeringMachine >().get_query_map(), context< PeeringMachine >().get_info_map(), @@ -3317,7 +3662,7 @@ boost::statechart::result PeeringState::Active::react(const MInfoRec& infoevt) ps->blocked_by.erase(infoevt.from.shard); pl->publish_stats_to_osd(); if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) { - pg->all_activated_and_committed(); + all_activated_and_committed(); } } return discard_event(); @@ -3380,6 +3725,27 @@ boost::statechart::result PeeringState::Active::react(const QueryState& q) return forward_event(); } +boost::statechart::result PeeringState::Active::react( + const ActivateCommitted &evt) +{ + DECLARE_LOCALS + ceph_assert(!ps->peer_activated.count(ps->pg_whoami)); + ps->peer_activated.insert(ps->pg_whoami); + psdout(10) << "_activate_committed " << evt.epoch + << " peer_activated now " << ps->peer_activated + << " last_interval_started " + << ps->info.history.last_interval_started + << " last_epoch_started " + << ps->info.history.last_epoch_started + << " same_interval_since " + << ps->info.history.same_interval_since + << dendl; + ceph_assert(!ps->acting_recovery_backfill.empty()); + if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) + all_activated_and_committed(); + return discard_event(); +} + boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt) { @@ -3443,6 +3809,32 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated return discard_event(); } +/* + * update info.history.last_epoch_started ONLY after we and all + * replicas have activated AND committed the activate transaction + * (i.e. the peering results are stable on disk). + */ +void PeeringState::Active::all_activated_and_committed() +{ + DECLARE_LOCALS + psdout(10) << "all_activated_and_committed" << dendl; + ceph_assert(ps->is_primary()); + ceph_assert(ps->peer_activated.size() == ps->acting_recovery_backfill.size()); + ceph_assert(!ps->acting_recovery_backfill.empty()); + ceph_assert(ps->blocked_by.empty()); + + // Degraded? + // _update_calc_stats(); TODOSAM + if (ps->info.stats.stats.sum.num_objects_degraded) { + ps->state_set(PG_STATE_DEGRADED); + } else { + ps->state_clear(PG_STATE_DEGRADED); + } + + post_event(PeeringState::AllReplicasActivated()); +} + + void PeeringState::Active::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); @@ -3482,13 +3874,44 @@ boost::statechart::result PeeringState::ReplicaActive::react( DECLARE_LOCALS psdout(10) << "In ReplicaActive, about to call activate" << dendl; map > query_map; - pg->activate(*context< PeeringMachine >().get_cur_transaction(), + ps->activate(*context< PeeringMachine >().get_cur_transaction(), actevt.activation_epoch, query_map, NULL, NULL); psdout(10) << "Activate Finished" << dendl; return discard_event(); } +boost::statechart::result PeeringState::ReplicaActive::react( + const ActivateCommitted &evt) +{ + DECLARE_LOCALS + psdout(10) << "_activate_committed " << evt.epoch + << " telling primary" << dendl; + MOSDPGInfo *m = new MOSDPGInfo(evt.epoch); + pg_notify_t i = pg_notify_t( + ps->get_primary().shard, ps->pg_whoami.shard, + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + ps->info); + + i.info.history.last_epoch_started = evt.activation_epoch; + i.info.history.last_interval_started = i.info.history.same_interval_since; + if (ps->acting.size() >= ps->pool.info.min_size) { + ps->state_set(PG_STATE_ACTIVE); + } else { + ps->state_set(PG_STATE_PEERED); + } + + m->pg_list.emplace_back(i, PastIntervals()); + pl->send_cluster_message( + ps->get_primary().osd, + m, + ps->get_osdmap_epoch()); + + pl->on_activate_committed(); + return discard_event(); +} + boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt) { DECLARE_LOCALS diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 6f89570f96c..ce53bdd49cd 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -95,6 +95,10 @@ public: PGPeeringEventRef on_preempt) = 0; virtual void cancel_remote_recovery_reservation() = 0; + virtual void schedule_event_on_commit( + ObjectStore::Transaction &t, + PGPeeringEventRef on_commit) = 0; + // HB virtual void set_probe_targets(const set &probe_set) = 0; virtual void clear_probe_targets() = 0; @@ -120,6 +124,7 @@ public: virtual void on_activate() = 0; virtual void on_new_interval() = 0; virtual Context *on_clean() = 0; + virtual void on_activate_committed() = 0; virtual void on_active_exit() = 0; @@ -257,6 +262,17 @@ public: *out << "Activate from " << activation_epoch; } }; + struct ActivateCommitted : boost::statechart::event< ActivateCommitted > { + epoch_t epoch; + epoch_t activation_epoch; + explicit ActivateCommitted(epoch_t e, epoch_t ae) + : boost::statechart::event< ActivateCommitted >(), + activation_epoch(ae) {} + void print(std::ostream *out) const { + *out << "ActivateCommitted from " << activation_epoch + << " processed at " << epoch; + } + }; public: struct UnfoundBackfill : boost::statechart::event { explicit UnfoundBackfill() {} @@ -601,6 +617,7 @@ public: boost::statechart::custom_reaction< MLogRec >, boost::statechart::custom_reaction< MTrim >, boost::statechart::custom_reaction< Backfilled >, + boost::statechart::custom_reaction< ActivateCommitted >, boost::statechart::custom_reaction< AllReplicasActivated >, boost::statechart::custom_reaction< DeferRecovery >, boost::statechart::custom_reaction< DeferBackfill >, @@ -620,6 +637,7 @@ public: boost::statechart::result react(const Backfilled&) { return discard_event(); } + boost::statechart::result react(const ActivateCommitted&); boost::statechart::result react(const AllReplicasActivated&); boost::statechart::result react(const DeferRecovery& evt) { return discard_event(); @@ -642,6 +660,7 @@ public: boost::statechart::result react(const DoRecovery&) { return discard_event(); } + void all_activated_and_committed(); }; struct Clean : boost::statechart::state< Clean, Active >, NamedState { @@ -764,6 +783,7 @@ public: boost::statechart::custom_reaction< MLogRec >, boost::statechart::custom_reaction< MTrim >, boost::statechart::custom_reaction< Activate >, + boost::statechart::custom_reaction< ActivateCommitted >, boost::statechart::custom_reaction< DeferRecovery >, boost::statechart::custom_reaction< DeferBackfill >, boost::statechart::custom_reaction< UnfoundRecovery >, @@ -780,6 +800,7 @@ public: boost::statechart::result react(const ActMap&); boost::statechart::result react(const MQuery&); boost::statechart::result react(const Activate&); + boost::statechart::result react(const ActivateCommitted&); boost::statechart::result react(const RecoveryDone&) { return discard_event(); } @@ -953,6 +974,7 @@ public: unsigned priority = 0; typedef boost::mpl::list < boost::statechart::custom_reaction< ActMap >, + boost::statechart::custom_reaction< ActivateCommitted >, boost::statechart::custom_reaction< DeleteSome > > reactions; explicit ToDelete(my_context ctx); @@ -961,6 +983,11 @@ public: // happens if we drop out of Deleting due to reprioritization etc. return discard_event(); } + boost::statechart::result react(const ActivateCommitted&) { + // Can happens if we were activated as a stray but not actually pulled + // from prior to the pg going clean and sending a delete. + return discard_event(); + } void exit(); }; @@ -1305,6 +1332,13 @@ public: bool restrict_to_up_acting, bool *history_les_bound); + void activate( + ObjectStore::Transaction& t, + epoch_t activation_epoch, + map >& query_map, + map > > *activator_map, + PeeringCtx *ctx); + public: PeeringState( CephContext *cct,