*_dout << "missing: " << *missing;
*_dout << dendl;
- unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
- pg->info.history.merge(info.history);
- reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+ // don't update history (yet) if we are active and primary; the replica
+ // may be telling us they have activated (and committed) but we can't
+ // share that until _everyone_ does the same.
+ if (pg->is_active() && pg->is_primary() && pg->is_acting(from) &&
+ pg->info.history.last_epoch_started < pg->info.history.same_acting_since &&
+ info.history.last_epoch_started >= pg->info.history.same_acting_since) {
+ dout(10) << " peer osd" << from << " activated and committed" << dendl;
+ pg->peer_activated.insert(from);
+ if (pg->peer_activated.size() == pg->acting.size())
+ pg->all_activated_and_committed();
+ } else {
+ unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+ pg->info.history.merge(info.history);
+ reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+ }
// dump log
dout(15) << *pg << " my log = ";
peer_missing.clear();
need_up_thru = false;
peer_last_complete_ondisk.clear();
+ peer_activated.clear();
min_last_complete_ondisk = eversion_t();
stray_purged.clear();
might_have_unfound.clear();
dout(15) << __func__ << ": built " << might_have_unfound << dendl;
}
+struct C_PG_ActivateCommitted : public Context {
+ PG *pg;
+ epoch_t epoch;
+ C_PG_ActivateCommitted(PG *p, epoch_t e) : pg(p), epoch(e) {}
+ void finish(int r) {
+ pg->_activate_committed(epoch);
+ }
+};
+
void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
map< int, map<pg_t,Query> >& query_map,
map<int, MOSDPGInfo*> *activator_map)
build_might_have_unfound();
}
}
-
- info.history.last_epoch_started = osd->osdmap->get_epoch();
- trim_past_intervals();
-
+
if (role == 0) { // primary state
last_update_ondisk = info.last_update;
min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
// clean up stray objects
clean_up_local(t);
+ // find out when we commit
+ tfin.push_back(new C_PG_ActivateCommitted(this, info.history.same_acting_since));
+
// initialize snap_trimq
if (is_primary()) {
snap_trimq = pool->cached_removed_snaps;
osd->take_waiters(waiting_for_active);
}
+void PG::_activate_committed(epoch_t e)
+{
+ if (e < info.history.same_acting_since) {
+ dout(10) << "_activate_committed " << e << ", that was an old interval" << dendl;
+ return;
+ }
+
+ if (is_primary()) {
+ peer_activated.insert(osd->whoami);
+ dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated << dendl;
+ if (peer_activated.size() == acting.size())
+ all_activated_and_committed();
+ } else {
+ dout(10) << "_activate_committed " << e << " telling primary" << dendl;
+ MOSDPGInfo *m = new MOSDPGInfo(osd->osdmap->get_epoch());
+ PG::Info i = info;
+ i.history.last_epoch_started = e;
+ m->pg_info.push_back(i);
+ osd->cluster_messenger->send_message(m, osd->osdmap->get_cluster_inst(acting[0]));
+ }
+}
+
+/*
+ * update info.history.last_epoch_started ONLY after we and all
+ * replicas have activated AND committed the activate transaction
+ * (i.e. the peering results are stable on disk).
+ */
+void PG::all_activated_and_committed()
+{
+ dout(10) << "all_activated_and_committed" << dendl;
+ assert(is_primary());
+ assert(peer_activated.size() == acting.size());
+
+ info.history.last_epoch_started = osd->osdmap->get_epoch();
+ share_pg_info();
+
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ write_info(*t);
+ int tr = osd->store->queue_transaction(&osr, t);
+ assert(tr == 0);
+}
+
void PG::queue_snap_trim()
{
if (osd->snap_trim_wq.queue(this))
set<int> peer_backlog_requested;
set<int> peer_missing_requested;
set<int> stray_purged; // i deleted these strays; ignore racing PGInfo from them
+ set<int> peer_activated;
// primary-only, recovery-only state
set<int> might_have_unfound; // These osds might have objects on them
void activate(ObjectStore::Transaction& t, list<Context*>& tfin,
map< int, map<pg_t,Query> >& query_map,
map<int, MOSDPGInfo*> *activator_map=0);
+ void _activate_committed(epoch_t e);
+ void all_activated_and_committed();
bool have_unfound() const {
return missing.num_missing() > missing_loc.size();