dout(15) << __func__ << ": built " << might_have_unfound << dendl;
}
-void PG::activate(ObjectStore::Transaction& t,
- epoch_t activation_epoch,
- map<int, map<spg_t,pg_query_t> >& query_map,
- map<int,
- vector<
- pair<pg_notify_t,
- PastIntervals> > > *activator_map,
- PeeringCtx *ctx)
-{
- ceph_assert(!is_peered());
- ceph_assert(scrubber.callbacks.empty());
- ceph_assert(callbacks_for_degraded_object.empty());
-
- // twiddle pg state
- state_clear(PG_STATE_DOWN);
-
- send_notify = false;
-
- if (is_primary()) {
- // only update primary last_epoch_started if we will go active
- if (acting.size() >= pool.info.min_size) {
- ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
- info.last_epoch_started <= activation_epoch);
- info.last_epoch_started = activation_epoch;
- info.last_interval_started = info.history.same_interval_since;
- }
- } else if (is_acting(pg_whoami)) {
- /* update last_epoch_started on acting replica to whatever the primary sent
- * unless it's smaller (could happen if we are going peered rather than
- * active, see doc/dev/osd_internals/last_epoch_started.rst) */
- if (info.last_epoch_started < activation_epoch) {
- info.last_epoch_started = activation_epoch;
- info.last_interval_started = info.history.same_interval_since;
- }
- }
-
- auto &missing = pg_log.get_missing();
-
- if (is_primary()) {
- last_update_ondisk = info.last_update;
- min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
- }
- last_update_applied = info.last_update;
- last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to();
-
- need_up_thru = false;
-
- // write pg info, log
- dirty_info = true;
- dirty_big_info = true; // maybe
-
- // find out when we commit
- t.register_on_complete(
- new C_PG_ActivateCommitted(
- this,
- get_osdmap_epoch(),
- activation_epoch));
-
- if (is_primary()) {
- // initialize snap_trimq
- if (get_osdmap()->require_osd_release < CEPH_RELEASE_MIMIC) {
- dout(20) << "activate - purged_snaps " << info.purged_snaps
- << " cached_removed_snaps " << pool.cached_removed_snaps
- << dendl;
- snap_trimq = pool.cached_removed_snaps;
- } else {
- auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue();
- auto p = removed_snaps_queue.find(info.pgid.pgid.pool());
- snap_trimq.clear();
- if (p != removed_snaps_queue.end()) {
- dout(20) << "activate - purged_snaps " << info.purged_snaps
- << " removed_snaps " << p->second
- << dendl;
- for (auto q : p->second) {
- snap_trimq.insert(q.first, q.second);
- }
- }
- }
- interval_set<snapid_t> purged;
- purged.intersection_of(snap_trimq, info.purged_snaps);
- snap_trimq.subtract(purged);
-
- if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) {
- // adjust purged_snaps: PG may have been inactive while snaps were pruned
- // from the removed_snaps_queue in the osdmap. update local purged_snaps
- // reflect only those snaps that we thought were pruned and were still in
- // the queue.
- info.purged_snaps.swap(purged);
- }
- }
-
- // init complete pointer
- if (missing.num_missing() == 0) {
- dout(10) << "activate - no missing, moving last_complete " << info.last_complete
- << " -> " << info.last_update << dendl;
- info.last_complete = info.last_update;
- info.stats.stats.sum.num_objects_missing = 0;
- pg_log.reset_recovery_pointers();
- } else {
- dout(10) << "activate - not complete, " << missing << dendl;
- info.stats.stats.sum.num_objects_missing = missing.num_missing();
- pg_log.activate_not_complete(info);
- }
-
- log_weirdness();
-
- // if primary..
- if (is_primary()) {
- ceph_assert(ctx);
- // start up replicas
-
- ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- pg_shard_t peer = *i;
- ceph_assert(peer_info.count(peer));
- pg_info_t& pi = peer_info[peer];
-
- dout(10) << "activate peer osd." << peer << " " << pi << dendl;
-
- MOSDPGLog *m = 0;
- ceph_assert(peer_missing.count(peer));
- pg_missing_t& pm = peer_missing[peer];
-
- bool needs_past_intervals = pi.dne();
-
- /*
- * cover case where peer sort order was different and
- * last_backfill cannot be interpreted
- */
- bool force_restart_backfill =
- !pi.last_backfill.is_max() &&
- !pi.last_backfill_bitwise;
-
- if (pi.last_update == info.last_update && !force_restart_backfill) {
- // empty log
- if (!pi.last_backfill.is_max())
- osd->clog->info() << info.pgid << " continuing backfill to osd."
- << peer
- << " from (" << pi.log_tail << "," << pi.last_update
- << "] " << pi.last_backfill
- << " to " << info.last_update;
- if (!pi.is_empty() && activator_map) {
- dout(10) << "activate peer osd." << peer << " is up to date, queueing in pending_activators" << dendl;
- (*activator_map)[peer.osd].emplace_back(
- pg_notify_t(
- peer.shard, pg_whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info),
- past_intervals);
- } else {
- dout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl;
- m = new MOSDPGLog(
- i->shard, pg_whoami.shard,
- get_osdmap_epoch(), info,
- last_peering_reset);
- }
- } else if (
- pg_log.get_tail() > pi.last_update ||
- pi.last_backfill == hobject_t() ||
- force_restart_backfill ||
- (backfill_targets.count(*i) && pi.last_backfill.is_max())) {
- /* ^ This last case covers a situation where a replica is not contiguous
- * with the auth_log, but is contiguous with this replica. Reshuffling
- * the active set to handle this would be tricky, so instead we just go
- * ahead and backfill it anyway. This is probably preferrable in any
- * case since the replica in question would have to be significantly
- * behind.
- */
- // backfill
- osd->clog->debug() << info.pgid << " starting backfill to osd." << peer
- << " from (" << pi.log_tail << "," << pi.last_update
- << "] " << pi.last_backfill
- << " to " << info.last_update;
-
- pi.last_update = info.last_update;
- pi.last_complete = info.last_update;
- pi.set_last_backfill(hobject_t());
- pi.last_epoch_started = info.last_epoch_started;
- pi.last_interval_started = info.last_interval_started;
- pi.history = info.history;
- pi.hit_set = info.hit_set;
- // Save num_bytes for reservation request, can't be negative
- peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
- pi.stats.stats.clear();
-
- // initialize peer with our purged_snaps.
- pi.purged_snaps = info.purged_snaps;
-
- m = new MOSDPGLog(
- i->shard, pg_whoami.shard,
- get_osdmap_epoch(), pi,
- last_peering_reset /* epoch to create pg at */);
-
- // send some recent log, so that op dup detection works well.
- m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries);
- m->info.log_tail = m->log.tail;
- pi.log_tail = m->log.tail; // sigh...
-
- pm.clear();
- } else {
- // catch up
- ceph_assert(pg_log.get_tail() <= pi.last_update);
- m = new MOSDPGLog(
- i->shard, pg_whoami.shard,
- get_osdmap_epoch(), info,
- last_peering_reset /* epoch to create pg at */);
- // send new stuff to append to replicas log
- m->log.copy_after(pg_log.get_log(), pi.last_update);
- }
-
- // share past_intervals if we are creating the pg on the replica
- // based on whether our info for that peer was dne() *before*
- // updating pi.history in the backfill block above.
- if (m && needs_past_intervals)
- m->past_intervals = past_intervals;
-
- // update local version of peer's missing list!
- if (m && pi.last_backfill != hobject_t()) {
- for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
- p != m->log.log.end();
- ++p) {
- if (p->soid <= pi.last_backfill &&
- !p->is_error()) {
- if (perform_deletes_during_peering() && p->is_delete()) {
- pm.rm(p->soid, p->version);
- } else {
- pm.add_next_event(*p);
- }
- }
- }
- }
-
- if (m) {
- dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
- //m->log.print(cout);
- osd->send_message_osd_cluster(peer.osd, m, get_osdmap_epoch());
- }
-
- // peer now has
- pi.last_update = info.last_update;
-
- // update our missing
- if (pm.num_missing() == 0) {
- pi.last_complete = pi.last_update;
- dout(10) << "activate peer osd." << peer << " " << pi << " uptodate" << dendl;
- } else {
- dout(10) << "activate peer osd." << peer << " " << pi << " missing " << pm << dendl;
- }
- }
-
- // Set up missing_loc
- set<pg_shard_t> complete_shards;
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- dout(20) << __func__ << " setting up missing_loc from shard " << *i << " " << dendl;
- if (*i == get_primary()) {
- missing_loc.add_active_missing(missing);
- if (!missing.have_missing())
- complete_shards.insert(*i);
- } else {
- auto peer_missing_entry = peer_missing.find(*i);
- ceph_assert(peer_missing_entry != peer_missing.end());
- missing_loc.add_active_missing(peer_missing_entry->second);
- if (!peer_missing_entry->second.have_missing() &&
- peer_info[*i].last_backfill.is_max())
- complete_shards.insert(*i);
- }
- }
-
- // If necessary, create might_have_unfound to help us find our unfound objects.
- // NOTE: It's important that we build might_have_unfound before trimming the
- // past intervals.
- might_have_unfound.clear();
- if (needs_recovery()) {
- // If only one shard has missing, we do a trick to add all others as recovery
- // source, this is considered safe since the PGLogs have been merged locally,
- // and covers vast majority of the use cases, like one OSD/host is down for
- // a while for hardware repairing
- if (complete_shards.size() + 1 == acting_recovery_backfill.size()) {
- missing_loc.add_batch_sources_info(complete_shards, ctx->handle);
- } else {
- missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
- ctx->handle);
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == pg_whoami) continue;
- dout(10) << __func__ << ": adding " << *i << " as a source" << dendl;
- ceph_assert(peer_missing.count(*i));
- ceph_assert(peer_info.count(*i));
- missing_loc.add_source_info(
- *i,
- peer_info[*i],
- peer_missing[*i],
- ctx->handle);
- }
- }
- for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
- i != peer_missing.end();
- ++i) {
- if (is_acting_recovery_backfill(i->first))
- continue;
- ceph_assert(peer_info.count(i->first));
- search_for_missing(
- peer_info[i->first],
- i->second,
- i->first,
- ctx);
- }
-
- build_might_have_unfound();
-
- // Always call now so _update_calc_stats() will be accurate
- discover_all_missing(query_map);
- }
-
- // num_objects_degraded if calculated should reflect this too, unless no
- // missing and we are about to go clean.
- if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) {
- state_set(PG_STATE_UNDERSIZED);
- }
-
- state_set(PG_STATE_ACTIVATING);
- release_pg_backoffs();
- projected_last_update = info.last_update;
- }
- if (acting.size() >= pool.info.min_size) {
- PGLogEntryHandler handler{this, &t};
- pg_log.roll_forward(&handler);
- }
-}
-
bool PG::op_has_sufficient_caps(OpRequestRef& op)
{
// only check MOSDOp
return cap;
}
-void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
-{
- lock();
- if (pg_has_reset_since(epoch)) {
- dout(10) << "_activate_committed " << epoch
- << ", that was an old interval" << dendl;
- } else if (is_primary()) {
- ceph_assert(!peer_activated.count(pg_whoami));
- peer_activated.insert(pg_whoami);
- dout(10) << "_activate_committed " << epoch
- << " peer_activated now " << peer_activated
- << " last_interval_started " << info.history.last_interval_started
- << " last_epoch_started " << info.history.last_epoch_started
- << " same_interval_since " << info.history.same_interval_since << dendl;
- ceph_assert(!acting_recovery_backfill.empty());
- if (peer_activated.size() == acting_recovery_backfill.size())
- all_activated_and_committed();
- } else {
- dout(10) << "_activate_committed " << epoch << " telling primary" << dendl;
- MOSDPGInfo *m = new MOSDPGInfo(epoch);
- pg_notify_t i = pg_notify_t(
- get_primary().shard, pg_whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info);
-
- i.info.history.last_epoch_started = activation_epoch;
- i.info.history.last_interval_started = i.info.history.same_interval_since;
- if (acting.size() >= pool.info.min_size) {
- state_set(PG_STATE_ACTIVE);
- } else {
- state_set(PG_STATE_PEERED);
- }
-
- m->pg_list.emplace_back(i, PastIntervals());
- osd->send_message_osd_cluster(get_primary().osd, m, get_osdmap_epoch());
-
- // waiters
- if (recovery_state.needs_flush() == 0) {
- requeue_ops(waiting_for_peered);
- } else if (!waiting_for_peered.empty()) {
- dout(10) << __func__ << " flushes in progress, moving "
- << waiting_for_peered.size() << " items to waiting_for_flush"
- << dendl;
- ceph_assert(waiting_for_flush.empty());
- waiting_for_flush.swap(waiting_for_peered);
- }
- }
-
- ceph_assert(!dirty_info);
-
- unlock();
-}
-
-/*
- * update info.history.last_epoch_started ONLY after we and all
- * replicas have activated AND committed the activate transaction
- * (i.e. the peering results are stable on disk).
- */
-void PG::all_activated_and_committed()
-{
- dout(10) << "all_activated_and_committed" << dendl;
- ceph_assert(is_primary());
- ceph_assert(peer_activated.size() == acting_recovery_backfill.size());
- ceph_assert(!acting_recovery_backfill.empty());
- ceph_assert(blocked_by.empty());
-
- // Degraded?
- _update_calc_stats();
- if (info.stats.stats.sum.num_objects_degraded) {
- state_set(PG_STATE_DEGRADED);
- } else {
- state_clear(PG_STATE_DEGRADED);
- }
-
- queue_peering_event(
- PGPeeringEventRef(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- PeeringState::AllReplicasActivated())));
-}
-
bool PG::requeue_scrub(bool high_priority)
{
ceph_assert(is_locked());
pg_id);
}
+void PG::schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit)
+{
+ t.register_on_commit(new QueuePeeringEvt(this, on_commit));
+}
+
void PG::on_active_exit()
{
backfill_reserving = false;
queue_recovery();
}
+void PG::on_activate_committed()
+{
+ if (!is_primary()) {
+ // waiters
+ if (recovery_state.needs_flush() == 0) {
+ requeue_ops(waiting_for_peered);
+ } else if (!waiting_for_peered.empty()) {
+ dout(10) << __func__ << " flushes in progress, moving "
+ << waiting_for_peered.size() << " items to waiting_for_flush"
+ << dendl;
+ ceph_assert(waiting_for_flush.empty());
+ waiting_for_flush.swap(waiting_for_peered);
+ }
+ }
+}
void PG::do_replica_scrub_map(OpRequestRef op)
{
PGPeeringEventRef on_preempt) override;
void cancel_remote_recovery_reservation() override;
+ void schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit) override;
+
void on_active_exit() override;
Context *on_clean() override {
try_mark_clean();
return finish_recovery();
}
+ void on_activate_committed() override;
void on_active_actmap() override;
void on_active_advmap(const OSDMapRef &osdmap) override;
void discover_all_missing(std::map<int, map<spg_t,pg_query_t> > &query_map);
void build_might_have_unfound();
- void activate(
- ObjectStore::Transaction& t,
- epoch_t activation_epoch,
- map<int, map<spg_t,pg_query_t> >& query_map,
- map<int,
- vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
- PeeringCtx *ctx);
-
- struct C_PG_ActivateCommitted : public Context {
- PGRef pg;
- epoch_t epoch;
- epoch_t activation_epoch;
- C_PG_ActivateCommitted(PG *p, epoch_t e, epoch_t ae)
- : pg(p), epoch(e), activation_epoch(ae) {}
- void finish(int r) override {
- pg->_activate_committed(epoch, activation_epoch);
- }
- };
- void _activate_committed(epoch_t epoch, epoch_t activation_epoch);
- void all_activated_and_committed();
void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info);
#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
#include "messages/MOSDScrubReserve.h"
+#include "messages/MOSDPGInfo.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
i != peer_log_requested.end();
) {
if (!osdmap->is_up(i->osd)) {
- dout(10) << "peer_log_requested removing " << *i << dendl;
+ psdout(10) << "peer_log_requested removing " << *i << dendl;
peer_log_requested.erase(i++);
} else {
++i;
i != peer_missing_requested.end();
) {
if (!osdmap->is_up(i->osd)) {
- dout(10) << "peer_missing_requested removing " << *i << dendl;
+ psdout(10) << "peer_missing_requested removing " << *i << dendl;
peer_missing_requested.erase(i++);
} else {
++i;
++p) {
ceph_assert(!is_acting_recovery_backfill(*p));
if (get_osdmap()->is_up(p->osd)) {
- dout(10) << "sending PGRemove to osd." << *p << dendl;
+ psdout(10) << "sending PGRemove to osd." << *p << dendl;
vector<spg_t> to_remove;
to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
MOSDPGRemove *m = new MOSDPGRemove(
to_remove);
pl->send_cluster_message(p->osd, m, get_osdmap_epoch());
} else {
- dout(10) << "not sending PGRemove to down osd." << *p << dendl;
+ psdout(10) << "not sending PGRemove to down osd." << *p << dendl;
}
peer_missing.erase(*p);
peer_info.erase(*p);
{
map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
- dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
+ psdout(10) << " got dup osd." << from << " info "
+ << oinfo << ", identical to ours" << dendl;
return false;
}
if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) {
- dout(10) << " got info " << oinfo << " from down osd." << from
+ psdout(10) << " got info " << oinfo << " from down osd." << from
<< " discarding" << dendl;
return false;
}
- dout(10) << " got osd." << from << " " << oinfo << dendl;
+ psdout(10) << " got osd." << from << " " << oinfo << dendl;
ceph_assert(is_primary());
peer_info[from] = oinfo;
might_have_unfound.insert(from);
// stray?
if (!is_up(from) && !is_acting(from)) {
- dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
+ psdout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
stray_set.insert(from);
if (is_clean()) {
purge_strays();
{
psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
if (last_peering_reset != get_osdmap_epoch()) {
- dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
+ last_peering_reset = get_osdmap_epoch();
+ psdout(10) << "Clearing blocked outgoing recovery messages" << dendl;
clear_blocked_outgoing();
if (!pl->try_flush_or_schedule_async()) {
psdout(10) << "Beginning to block outgoing recovery messages" << dendl;
osdmap.get(),
lastmap.get(),
info.pgid.pgid)) {
- dout(20) << "new interval newup " << newup
- << " newacting " << newacting << dendl;
+ psdout(20) << "new interval newup " << newup
+ << " newacting " << newacting << dendl;
return true;
}
if (!lastmap->is_up(pg_whoami.osd) && osdmap->is_up(pg_whoami.osd)) {
auto &missing = pg_log.get_missing();
if (missing.num_missing()) {
- dout(10) << __func__ << " primary has " << missing.num_missing()
- << " missing" << dendl;
+ psdout(10) << __func__ << " primary has " << missing.num_missing()
+ << " missing" << dendl;
return true;
}
}
}
- dout(10) << __func__ << " does not need backfill" << dendl;
+ psdout(10) << __func__ << " does not need backfill" << dendl;
return false;
}
return true;
}
+void PeeringState::activate(
+ ObjectStore::Transaction& t,
+ epoch_t activation_epoch,
+ map<int, map<spg_t,pg_query_t> >& query_map,
+ map<int,
+ vector<
+ pair<pg_notify_t, PastIntervals> > > *activator_map,
+ PeeringCtx *ctx)
+{
+ ceph_assert(!is_peered());
+ // ceph_assert(scrubber.callbacks.empty()); TODOSAM
+ // ceph_assert(callbacks_for_degraded_object.empty()); TODOSAM
+
+ // twiddle pg state
+ state_clear(PG_STATE_DOWN);
+
+ send_notify = false;
+
+ if (is_primary()) {
+ // only update primary last_epoch_started if we will go active
+ if (acting.size() >= pool.info.min_size) {
+ ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
+ info.last_epoch_started <= activation_epoch);
+ info.last_epoch_started = activation_epoch;
+ info.last_interval_started = info.history.same_interval_since;
+ }
+ } else if (is_acting(pg_whoami)) {
+ /* update last_epoch_started on acting replica to whatever the primary sent
+ * unless it's smaller (could happen if we are going peered rather than
+ * active, see doc/dev/osd_internals/last_epoch_started.rst) */
+ if (info.last_epoch_started < activation_epoch) {
+ info.last_epoch_started = activation_epoch;
+ info.last_interval_started = info.history.same_interval_since;
+ }
+ }
+
+ auto &missing = pg_log.get_missing();
+
+ if (is_primary()) {
+ last_update_ondisk = info.last_update;
+ min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
+ }
+ last_update_applied = info.last_update;
+ last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to();
+
+ need_up_thru = false;
+
+ // write pg info, log
+ dirty_info = true;
+ dirty_big_info = true; // maybe
+
+ pl->schedule_event_on_commit(
+ t,
+ std::make_shared<PGPeeringEvent>(
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ ActivateCommitted(
+ get_osdmap_epoch(),
+ activation_epoch)));
+
+ if (is_primary()) {
+ // initialize snap_trimq
+ if (get_osdmap()->require_osd_release < CEPH_RELEASE_MIMIC) {
+ psdout(20) << "activate - purged_snaps " << info.purged_snaps
+ << " cached_removed_snaps " << pool.cached_removed_snaps
+ << dendl;
+ pg->snap_trimq = pool.cached_removed_snaps;
+ } else {
+ auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue();
+ auto p = removed_snaps_queue.find(info.pgid.pgid.pool());
+ pg->snap_trimq.clear();
+ if (p != removed_snaps_queue.end()) {
+ dout(20) << "activate - purged_snaps " << info.purged_snaps
+ << " removed_snaps " << p->second
+ << dendl;
+ for (auto q : p->second) {
+ pg->snap_trimq.insert(q.first, q.second);
+ }
+ }
+ }
+ interval_set<snapid_t> purged;
+ purged.intersection_of(pg->snap_trimq, info.purged_snaps);
+ pg->snap_trimq.subtract(purged);
+
+ if (get_osdmap()->require_osd_release >= CEPH_RELEASE_MIMIC) {
+ // adjust purged_snaps: PG may have been inactive while snaps were pruned
+ // from the removed_snaps_queue in the osdmap. update local purged_snaps
+ // reflect only those snaps that we thought were pruned and were still in
+ // the queue.
+ info.purged_snaps.swap(purged);
+ }
+ }
+
+ // init complete pointer
+ if (missing.num_missing() == 0) {
+ psdout(10) << "activate - no missing, moving last_complete " << info.last_complete
+ << " -> " << info.last_update << dendl;
+ info.last_complete = info.last_update;
+ info.stats.stats.sum.num_objects_missing = 0;
+ pg_log.reset_recovery_pointers();
+ } else {
+ psdout(10) << "activate - not complete, " << missing << dendl;
+ info.stats.stats.sum.num_objects_missing = missing.num_missing();
+ pg_log.activate_not_complete(info);
+ }
+
+ pg->log_weirdness();
+
+ // if primary..
+ if (is_primary()) {
+ ceph_assert(ctx);
+ // start up replicas
+
+ ceph_assert(!acting_recovery_backfill.empty());
+ for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+ i != acting_recovery_backfill.end();
+ ++i) {
+ if (*i == pg_whoami) continue;
+ pg_shard_t peer = *i;
+ ceph_assert(peer_info.count(peer));
+ pg_info_t& pi = peer_info[peer];
+
+ psdout(10) << "activate peer osd." << peer << " " << pi << dendl;
+
+ MOSDPGLog *m = 0;
+ ceph_assert(peer_missing.count(peer));
+ pg_missing_t& pm = peer_missing[peer];
+
+ bool needs_past_intervals = pi.dne();
+
+ /*
+ * cover case where peer sort order was different and
+ * last_backfill cannot be interpreted
+ */
+ bool force_restart_backfill =
+ !pi.last_backfill.is_max() &&
+ !pi.last_backfill_bitwise;
+
+ if (pi.last_update == info.last_update && !force_restart_backfill) {
+ // empty log
+ if (!pi.last_backfill.is_max())
+ pl->get_clog().info() << info.pgid << " continuing backfill to osd."
+ << peer
+ << " from (" << pi.log_tail << "," << pi.last_update
+ << "] " << pi.last_backfill
+ << " to " << info.last_update;
+ if (!pi.is_empty() && activator_map) {
+ psdout(10) << "activate peer osd." << peer
+ << " is up to date, queueing in pending_activators" << dendl;
+ (*activator_map)[peer.osd].emplace_back(
+ pg_notify_t(
+ peer.shard, pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ info),
+ past_intervals);
+ } else {
+ psdout(10) << "activate peer osd." << peer
+ << " is up to date, but sending pg_log anyway" << dendl;
+ m = new MOSDPGLog(
+ i->shard, pg_whoami.shard,
+ get_osdmap_epoch(), info,
+ last_peering_reset);
+ }
+ } else if (
+ pg_log.get_tail() > pi.last_update ||
+ pi.last_backfill == hobject_t() ||
+ force_restart_backfill ||
+ (backfill_targets.count(*i) && pi.last_backfill.is_max())) {
+ /* ^ This last case covers a situation where a replica is not contiguous
+ * with the auth_log, but is contiguous with this replica. Reshuffling
+ * the active set to handle this would be tricky, so instead we just go
+ * ahead and backfill it anyway. This is probably preferrable in any
+ * case since the replica in question would have to be significantly
+ * behind.
+ */
+ // backfill
+ pl->get_clog().debug() << info.pgid << " starting backfill to osd." << peer
+ << " from (" << pi.log_tail << "," << pi.last_update
+ << "] " << pi.last_backfill
+ << " to " << info.last_update;
+
+ pi.last_update = info.last_update;
+ pi.last_complete = info.last_update;
+ pi.set_last_backfill(hobject_t());
+ pi.last_epoch_started = info.last_epoch_started;
+ pi.last_interval_started = info.last_interval_started;
+ pi.history = info.history;
+ pi.hit_set = info.hit_set;
+ // Save num_bytes for reservation request, can't be negative
+ peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
+ pi.stats.stats.clear();
+
+ // initialize peer with our purged_snaps.
+ pi.purged_snaps = info.purged_snaps;
+
+ m = new MOSDPGLog(
+ i->shard, pg_whoami.shard,
+ get_osdmap_epoch(), pi,
+ last_peering_reset /* epoch to create pg at */);
+
+ // send some recent log, so that op dup detection works well.
+ m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries);
+ m->info.log_tail = m->log.tail;
+ pi.log_tail = m->log.tail; // sigh...
+
+ pm.clear();
+ } else {
+ // catch up
+ ceph_assert(pg_log.get_tail() <= pi.last_update);
+ m = new MOSDPGLog(
+ i->shard, pg_whoami.shard,
+ get_osdmap_epoch(), info,
+ last_peering_reset /* epoch to create pg at */);
+ // send new stuff to append to replicas log
+ m->log.copy_after(pg_log.get_log(), pi.last_update);
+ }
+
+ // share past_intervals if we are creating the pg on the replica
+ // based on whether our info for that peer was dne() *before*
+ // updating pi.history in the backfill block above.
+ if (m && needs_past_intervals)
+ m->past_intervals = past_intervals;
+
+ // update local version of peer's missing list!
+ if (m && pi.last_backfill != hobject_t()) {
+ for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
+ p != m->log.log.end();
+ ++p) {
+ if (p->soid <= pi.last_backfill &&
+ !p->is_error()) {
+ if (pg->perform_deletes_during_peering() && p->is_delete()) {
+ pm.rm(p->soid, p->version);
+ } else {
+ pm.add_next_event(*p);
+ }
+ }
+ }
+ }
+
+ if (m) {
+ dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
+ //m->log.print(cout);
+ pl->send_cluster_message(peer.osd, m, get_osdmap_epoch());
+ }
+
+ // peer now has
+ pi.last_update = info.last_update;
+
+ // update our missing
+ if (pm.num_missing() == 0) {
+ pi.last_complete = pi.last_update;
+ psdout(10) << "activate peer osd." << peer << " " << pi
+ << " uptodate" << dendl;
+ } else {
+ psdout(10) << "activate peer osd." << peer << " " << pi
+ << " missing " << pm << dendl;
+ }
+ }
+
+ // Set up missing_loc
+ set<pg_shard_t> complete_shards;
+ for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+ i != acting_recovery_backfill.end();
+ ++i) {
+ psdout(20) << __func__ << " setting up missing_loc from shard " << *i
+ << " " << dendl;
+ if (*i == get_primary()) {
+ missing_loc.add_active_missing(missing);
+ if (!missing.have_missing())
+ complete_shards.insert(*i);
+ } else {
+ auto peer_missing_entry = peer_missing.find(*i);
+ ceph_assert(peer_missing_entry != peer_missing.end());
+ missing_loc.add_active_missing(peer_missing_entry->second);
+ if (!peer_missing_entry->second.have_missing() &&
+ peer_info[*i].last_backfill.is_max())
+ complete_shards.insert(*i);
+ }
+ }
+
+ // If necessary, create might_have_unfound to help us find our unfound objects.
+ // NOTE: It's important that we build might_have_unfound before trimming the
+ // past intervals.
+ might_have_unfound.clear();
+ if (needs_recovery()) {
+ // If only one shard has missing, we do a trick to add all others as recovery
+ // source, this is considered safe since the PGLogs have been merged locally,
+ // and covers vast majority of the use cases, like one OSD/host is down for
+ // a while for hardware repairing
+ if (complete_shards.size() + 1 == acting_recovery_backfill.size()) {
+ missing_loc.add_batch_sources_info(complete_shards, ctx->handle);
+ } else {
+ missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
+ ctx->handle);
+ for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+ i != acting_recovery_backfill.end();
+ ++i) {
+ if (*i == pg_whoami) continue;
+ psdout(10) << __func__ << ": adding " << *i << " as a source" << dendl;
+ ceph_assert(peer_missing.count(*i));
+ ceph_assert(peer_info.count(*i));
+ missing_loc.add_source_info(
+ *i,
+ peer_info[*i],
+ peer_missing[*i],
+ ctx->handle);
+ }
+ }
+ for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
+ i != peer_missing.end();
+ ++i) {
+ if (is_acting_recovery_backfill(i->first))
+ continue;
+ ceph_assert(peer_info.count(i->first));
+ pg->search_for_missing(
+ peer_info[i->first],
+ i->second,
+ i->first,
+ ctx);
+ }
+
+ pg->build_might_have_unfound();
+ // Always call now so _update_calc_stats() will be accurate
+ pg->discover_all_missing(query_map);
+ }
+
+ // num_objects_degraded if calculated should reflect this too, unless no
+ // missing and we are about to go clean.
+ if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) {
+ state_set(PG_STATE_UNDERSIZED);
+ }
+
+ state_set(PG_STATE_ACTIVATING);
+ pg->release_pg_backoffs();
+ pg->projected_last_update = info.last_update;
+ }
+ if (acting.size() >= pool.info.min_size) {
+ PG::PGLogEntryHandler handler{pg, &t};
+ pg_log.roll_forward(&handler);
+ }
+}
/*------------ Peering State Machine----------------*/
#undef dout_prefix
ceph_assert(ps->is_primary());
psdout(10) << "In Active, about to call activate" << dendl;
ps->start_flush(context< PeeringMachine >().get_cur_transaction());
- pg->activate(*context< PeeringMachine >().get_cur_transaction(),
+ ps->activate(*context< PeeringMachine >().get_cur_transaction(),
ps->get_osdmap_epoch(),
*context< PeeringMachine >().get_query_map(),
context< PeeringMachine >().get_info_map(),
ps->blocked_by.erase(infoevt.from.shard);
pl->publish_stats_to_osd();
if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) {
- pg->all_activated_and_committed();
+ all_activated_and_committed();
}
}
return discard_event();
return forward_event();
}
+boost::statechart::result PeeringState::Active::react(
+ const ActivateCommitted &evt)
+{
+ DECLARE_LOCALS
+ ceph_assert(!ps->peer_activated.count(ps->pg_whoami));
+ ps->peer_activated.insert(ps->pg_whoami);
+ psdout(10) << "_activate_committed " << evt.epoch
+ << " peer_activated now " << ps->peer_activated
+ << " last_interval_started "
+ << ps->info.history.last_interval_started
+ << " last_epoch_started "
+ << ps->info.history.last_epoch_started
+ << " same_interval_since "
+ << ps->info.history.same_interval_since
+ << dendl;
+ ceph_assert(!ps->acting_recovery_backfill.empty());
+ if (ps->peer_activated.size() == ps->acting_recovery_backfill.size())
+ all_activated_and_committed();
+ return discard_event();
+}
+
boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt)
{
return discard_event();
}
+/*
+ * update info.history.last_epoch_started ONLY after we and all
+ * replicas have activated AND committed the activate transaction
+ * (i.e. the peering results are stable on disk).
+ */
+void PeeringState::Active::all_activated_and_committed()
+{
+ DECLARE_LOCALS
+ psdout(10) << "all_activated_and_committed" << dendl;
+ ceph_assert(ps->is_primary());
+ ceph_assert(ps->peer_activated.size() == ps->acting_recovery_backfill.size());
+ ceph_assert(!ps->acting_recovery_backfill.empty());
+ ceph_assert(ps->blocked_by.empty());
+
+ // Degraded?
+ // _update_calc_stats(); TODOSAM
+ if (ps->info.stats.stats.sum.num_objects_degraded) {
+ ps->state_set(PG_STATE_DEGRADED);
+ } else {
+ ps->state_clear(PG_STATE_DEGRADED);
+ }
+
+ post_event(PeeringState::AllReplicasActivated());
+}
+
+
void PeeringState::Active::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
DECLARE_LOCALS
psdout(10) << "In ReplicaActive, about to call activate" << dendl;
map<int, map<spg_t, pg_query_t> > query_map;
- pg->activate(*context< PeeringMachine >().get_cur_transaction(),
+ ps->activate(*context< PeeringMachine >().get_cur_transaction(),
actevt.activation_epoch,
query_map, NULL, NULL);
psdout(10) << "Activate Finished" << dendl;
return discard_event();
}
+boost::statechart::result PeeringState::ReplicaActive::react(
+ const ActivateCommitted &evt)
+{
+ DECLARE_LOCALS
+ psdout(10) << "_activate_committed " << evt.epoch
+ << " telling primary" << dendl;
+ MOSDPGInfo *m = new MOSDPGInfo(evt.epoch);
+ pg_notify_t i = pg_notify_t(
+ ps->get_primary().shard, ps->pg_whoami.shard,
+ ps->get_osdmap_epoch(),
+ ps->get_osdmap_epoch(),
+ ps->info);
+
+ i.info.history.last_epoch_started = evt.activation_epoch;
+ i.info.history.last_interval_started = i.info.history.same_interval_since;
+ if (ps->acting.size() >= ps->pool.info.min_size) {
+ ps->state_set(PG_STATE_ACTIVE);
+ } else {
+ ps->state_set(PG_STATE_PEERED);
+ }
+
+ m->pg_list.emplace_back(i, PastIntervals());
+ pl->send_cluster_message(
+ ps->get_primary().osd,
+ m,
+ ps->get_osdmap_epoch());
+
+ pl->on_activate_committed();
+ return discard_event();
+}
+
boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt)
{
DECLARE_LOCALS
PGPeeringEventRef on_preempt) = 0;
virtual void cancel_remote_recovery_reservation() = 0;
+ virtual void schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit) = 0;
+
// HB
virtual void set_probe_targets(const set<pg_shard_t> &probe_set) = 0;
virtual void clear_probe_targets() = 0;
virtual void on_activate() = 0;
virtual void on_new_interval() = 0;
virtual Context *on_clean() = 0;
+ virtual void on_activate_committed() = 0;
virtual void on_active_exit() = 0;
*out << "Activate from " << activation_epoch;
}
};
+ struct ActivateCommitted : boost::statechart::event< ActivateCommitted > {
+ epoch_t epoch;
+ epoch_t activation_epoch;
+ explicit ActivateCommitted(epoch_t e, epoch_t ae)
+ : boost::statechart::event< ActivateCommitted >(),
+ activation_epoch(ae) {}
+ void print(std::ostream *out) const {
+ *out << "ActivateCommitted from " << activation_epoch
+ << " processed at " << epoch;
+ }
+ };
public:
struct UnfoundBackfill : boost::statechart::event<UnfoundBackfill> {
explicit UnfoundBackfill() {}
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< MTrim >,
boost::statechart::custom_reaction< Backfilled >,
+ boost::statechart::custom_reaction< ActivateCommitted >,
boost::statechart::custom_reaction< AllReplicasActivated >,
boost::statechart::custom_reaction< DeferRecovery >,
boost::statechart::custom_reaction< DeferBackfill >,
boost::statechart::result react(const Backfilled&) {
return discard_event();
}
+ boost::statechart::result react(const ActivateCommitted&);
boost::statechart::result react(const AllReplicasActivated&);
boost::statechart::result react(const DeferRecovery& evt) {
return discard_event();
boost::statechart::result react(const DoRecovery&) {
return discard_event();
}
+ void all_activated_and_committed();
};
struct Clean : boost::statechart::state< Clean, Active >, NamedState {
boost::statechart::custom_reaction< MLogRec >,
boost::statechart::custom_reaction< MTrim >,
boost::statechart::custom_reaction< Activate >,
+ boost::statechart::custom_reaction< ActivateCommitted >,
boost::statechart::custom_reaction< DeferRecovery >,
boost::statechart::custom_reaction< DeferBackfill >,
boost::statechart::custom_reaction< UnfoundRecovery >,
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const MQuery&);
boost::statechart::result react(const Activate&);
+ boost::statechart::result react(const ActivateCommitted&);
boost::statechart::result react(const RecoveryDone&) {
return discard_event();
}
unsigned priority = 0;
typedef boost::mpl::list <
boost::statechart::custom_reaction< ActMap >,
+ boost::statechart::custom_reaction< ActivateCommitted >,
boost::statechart::custom_reaction< DeleteSome >
> reactions;
explicit ToDelete(my_context ctx);
// happens if we drop out of Deleting due to reprioritization etc.
return discard_event();
}
+ boost::statechart::result react(const ActivateCommitted&) {
+ // Can happens if we were activated as a stray but not actually pulled
+ // from prior to the pg going clean and sending a delete.
+ return discard_event();
+ }
void exit();
};
bool restrict_to_up_acting,
bool *history_les_bound);
+ void activate(
+ ObjectStore::Transaction& t,
+ epoch_t activation_epoch,
+ map<int, map<spg_t,pg_query_t> >& query_map,
+ map<int, vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
+ PeeringCtx *ctx);
+
public:
PeeringState(
CephContext *cct,