#include "config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
char *osd_base_path = "./osddata";
char *ebofs_base_path = "./ebofsdev";
if (pg->is_crashed() &&
pg->is_replay() &&
pg->get_role() == 0 &&
- pg->info.same_primary_since <= epoch) {
+ pg->info.history.same_primary_since <= epoch) {
ObjectStore::Transaction t;
pg->activate(t);
store->apply_transaction(t);
pg->acting.swap(acting);
pg->last_epoch_started_any =
pg->info.last_epoch_started =
- pg->info.same_primary_since =
- pg->info.same_acker_since =
- pg->info.same_role_since = osdmap->get_epoch();
+ pg->info.history.same_since =
+ pg->info.history.same_primary_since =
+ pg->info.history.same_acker_since = osdmap->get_epoch();
pg->activate(t);
dout(7) << "created " << *pg << endl;
pg->set_role(role);
pg->last_epoch_started_any =
pg->info.last_epoch_started =
- pg->info.same_primary_since =
- pg->info.same_acker_since =
- pg->info.same_role_since = osdmap->get_epoch();
+ pg->info.history.same_primary_since =
+ pg->info.history.same_acker_since =
+ pg->info.history.same_since = osdmap->get_epoch();
pg->activate(t);
dout(7) << "created " << *pg << endl;
}
// get new acting set
- vector<int> acting;
- int nrep = osdmap->pg_to_acting_osds(pgid, acting);
- int role = osdmap->calc_pg_role(whoami, acting, nrep);
+ vector<int> tacting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, tacting);
+ int role = osdmap->calc_pg_role(whoami, tacting, nrep);
// no change?
- if (acting == pg->acting)
+ if (tacting == pg->acting)
continue;
- int primary = -1;
- if (nrep > 0) primary = acting[0];
+ // -- there was a change! --
+ _lock_pg(pgid);
int oldrole = pg->get_role();
int oldprimary = pg->get_primary();
vector<int> oldacting = pg->acting;
// update PG
- pg->acting.swap(acting);
+ pg->acting.swap(tacting);
pg->set_role(role);
// did primary|acker change?
- if (oldprimary != primary) {
- pg->info.same_primary_since = osdmap->get_epoch();
+ pg->info.history.same_since = osdmap->get_epoch();
+ if (oldprimary != pg->get_primary()) {
+ pg->info.history.same_primary_since = osdmap->get_epoch();
pg->cancel_recovery();
}
if (oldacker != pg->get_acker()) {
- pg->info.same_acker_since = osdmap->get_epoch();
+ pg->info.history.same_acker_since = osdmap->get_epoch();
}
- if (oldprimary != primary || oldacker != pg->get_acker()) {
+
+ // deactivate.
+ pg->state_clear(PG::STATE_ACTIVE);
+
+ // discard any repops in progress.
+ if (oldacker == whoami) {
// drop our write-ahead log. (we'll only have on if we were just the acker)
pg->trim_write_ahead();
+
+ // drop repops
+ for (map<tid_t,PG::RepOpGather*>::iterator p = pg->repop_gather.begin();
+ p != pg->repop_gather.end();
+ p++) {
+ dout(-1) << *pg << " discarding repop " << p->second << endl;
+ delete p->second->op;
+ delete p->second;
+ }
+ pg->repop_gather.clear();
+
+ // and repop waiters
+ for (map<tid_t, list<Message*> >::iterator p = pg->waiting_for_repop.begin();
+ p != pg->waiting_for_repop.end();
+ p++)
+ for (list<Message*>::iterator pm = p->second.begin();
+ pm != p->second.end();
+ pm++)
+ delete *pm;
+ pg->waiting_for_repop.clear();
}
if (role != oldrole) {
- pg->info.same_role_since = osdmap->get_epoch();
-
// old primary?
if (oldrole == 0) {
pg->state_clear(PG::STATE_CLEAN);
// new primary?
if (role == 0) {
// i am new primary
- pg->state_clear(PG::STATE_ACTIVE);
pg->state_clear(PG::STATE_STRAY);
pg->last_epoch_started_any = pg->info.last_epoch_started;
} else {
// i am now replica|stray. we need to send a notify.
- pg->state_clear(PG::STATE_ACTIVE);
pg->state_set(PG::STATE_STRAY);
if (nrep == 0) {
}
// my role changed.
- dout(10) << *pg << " " << oldacting << " -> " << acting
+ dout(10) << *pg << " " << oldacting << " -> " << pg->acting
<< ", role " << oldrole << " -> " << role << endl;
} else {
// no role change.
// did primary change?
- if (primary != oldprimary) {
+ if (pg->get_primary() != oldprimary) {
// we need to announce
pg->state_set(PG::STATE_STRAY);
- pg->state_clear(PG::STATE_ACTIVE);
- dout(10) << *pg << " " << oldacting << " -> " << acting
+ dout(10) << *pg << " " << oldacting << " -> " << pg->acting
<< ", acting primary "
- << oldprimary << " -> " << primary
+ << oldprimary << " -> " << pg->get_primary()
<< endl;
} else {
// primary is the same.
if (role == 0) {
// i am (still) primary. but my replica set changed.
- pg->state_clear(PG::STATE_ACTIVE);
pg->state_clear(PG::STATE_CLEAN);
pg->state_clear(PG::STATE_REPLAY);
- dout(10) << *pg << " " << oldacting << " -> " << acting
+ dout(10) << *pg << " " << oldacting << " -> " << pg->acting
<< ", replicas changed" << endl;
// completely restart peering process.
}
}
+
+ _unlock_pg(pgid);
}
}
}
pg->build_prior();
pg->peer(t, query_map);
}
- else if (pg->is_stray()) {
+ else if (pg->is_stray() &&
+ pg->get_primary() >= 0) {
// i am residual|replica
notify_list[pg->get_primary()].push_back(pg->info);
}
}
/**
- * check epochs starting from start to verify the primary hasn't changed
+ * check epochs starting from start to verify the pg acting set hasn't changed
* up until now
*/
-epoch_t OSD::calc_pg_primary_since(int primary, pg_t pgid, epoch_t start)
+void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
{
+ dout(-15) << "project_pg_history " << hex << pgid << dec
+ << " from " << from << " to " << osdmap->get_epoch()
+ << ", start " << h
+ << endl;
+
+ vector<int> last;
+ osdmap->pg_to_acting_osds(pgid, last);
+
for (epoch_t e = osdmap->get_epoch()-1;
- e >= start;
+ e >= from;
e--) {
// verify during intermediate epoch
- vector<int> acting;
-
OSDMap oldmap;
get_map(e, oldmap);
+
+ vector<int> acting;
oldmap.pg_to_acting_osds(pgid, acting);
- if (acting[0] != primary)
- return e+1; // nope, primary only goes back through e!
- }
+ // acting set change?
+ if (acting != last &&
+ e <= h.same_since) {
+ dout(-15) << "project_pg_history " << hex << pgid << dec << " changed in " << e+1
+ << " from " << acting << " -> " << last << endl;
+ h.same_since = e+1;
+ }
- return start; // same all the way back thru start!
-}
+ // primary change?
+ if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
+ e <= h.same_primary_since) {
+ dout(-15) << "project_pg_history " << hex << pgid << dec << " primary changed in " << e+1 << endl;
+ h.same_primary_since = e+1;
+
+ if (g_conf.osd_rep == OSD_REP_PRIMARY)
+ h.same_acker_since = h.same_primary_since;
+ }
+ // acker change?
+ if (g_conf.osd_rep != OSD_REP_PRIMARY) {
+ if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
+ e <= h.same_acker_since) {
+ dout(-15) << "project_pg_history " << hex << pgid << dec << " acker changed in " << e+1 << endl;
+ h.same_acker_since = e+1;
+ }
+ }
+ if (h.same_since > e &&
+ h.same_primary_since > e &&
+ h.same_acker_since > e) break;
+ }
+ dout(-15) << "project_pg_history end " << h << endl;
+}
/** do_notifies
PG *pg;
if (pg_map.count(pgid) == 0) {
- // check mapping.
- vector<int> acting;
- int nrep = osdmap->pg_to_acting_osds(pgid, acting);
- if (!nrep) {
- dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " has null mapping" << endl;
- continue;
- }
-
- // am i still the primary?
- assert(it->same_primary_since <= osdmap->get_epoch());
- if (acting.empty() || acting[0] != whoami) {
- // not primary now, so who cares!
- dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and i'm not the primary" << endl;
+ // same primary?
+ PG::Info::History history = it->history;
+ project_pg_history(pgid, history, m->get_epoch());
+
+ if (m->get_epoch() < history.same_primary_since) {
+ dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and primary changed in "
+ << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << endl;
continue;
- } else {
- // ok, well, i'm primary now... was it continuous since caller's epoch?
- epoch_t since = calc_pg_primary_since(whoami, pgid, m->get_epoch());
- if (since > m->get_epoch()) {
- dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and i wasn't primary during intermediate epoch " << since
- << " (caller " << m->get_epoch() << " < " << since << " < now " << osdmap->get_epoch() << ")" << endl;
- continue;
- }
}
// ok, create PG!
pg = create_pg(pgid, t);
- pg->acting.swap( acting );
+ osdmap->pg_to_acting_osds(pgid, pg->acting);
pg->set_role(0);
- pg->info.same_primary_since = it->same_primary_since;
- pg->info.same_acker_since = it->same_acker_since;
- pg->info.same_role_since = osdmap->get_epoch();
+ pg->info.history = history;
pg->last_epoch_started_any = it->last_epoch_started;
pg->build_prior();
waiting_for_pg.erase(pgid);
}
- pg = _lock_pg(pgid);
+ _lock_pg(pgid);
} else {
// already had it. am i (still) the primary?
pg = _lock_pg(pgid);
- if (pg->is_primary()) {
- if (pg->info.same_primary_since > m->get_epoch()) {
- dout(10) << *pg << " requestor epoch " << m->get_epoch()
- << " < my primary start epoch " << pg->info.same_primary_since
- << endl;
- _unlock_pg(pgid);
- continue;
- }
- } else {
- dout(10) << *pg << " not primary" << endl;
- assert(m->get_epoch() < osdmap->get_epoch());
+ if (m->get_epoch() < pg->info.history.same_primary_since) {
+ dout(10) << *pg << " handle_pg_notify primary changed in "
+ << pg->info.history.same_primary_since
+ << " (msg from " << m->get_epoch() << ")" << endl;
_unlock_pg(pgid);
continue;
}
assert(pg);
dout(7) << "handle_pg_log " << *pg
- << " got " << m->log
+ << " got " << m->log << " " << m->missing
<< " from " << m->get_source() << endl;
ObjectStore::Transaction t;
- if (pg->acting[0] == whoami) {
+ if (pg->is_primary()) {
// i am PRIMARY
assert(pg->peer_log_requested.count(from) ||
pg->peer_summary_requested.count(from));
pg->merge_log(m->log, m->missing, from);
assert(pg->missing.num_lost() == 0);
- // ok active!
- pg->info.same_primary_since = m->info.same_primary_since;
- pg->info.same_acker_since = m->info.same_acker_since;
+ // ok activate!
pg->activate(t);
}
it != m->pg_list.end();
it++) {
pg_t pgid = it->first;
- PG *pg;
+ PG *pg = 0;
if (pg_map.count(pgid) == 0) {
+ // same primary?
+ PG::Info::History history = it->second.history;
+ project_pg_history(pgid, history, m->get_epoch());
+
+ if (m->get_epoch() < history.same_primary_since) {
+ dout(10) << " pg " << hex << pgid << dec << " dne, and primary has changed in "
+ << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << endl;
+ continue;
+ }
+
// get active rush mapping
vector<int> acting;
int nrep = osdmap->pg_to_acting_osds(pgid, acting);
int role = osdmap->calc_pg_role(whoami, acting, nrep);
- if (role == 0) {
- dout(10) << " pg " << hex << pgid << dec << " dne, and i am primary. just waiting for notify." << endl;
- continue;
- }
if (role < 0) {
dout(10) << " pg " << hex << pgid << dec << " dne, and i am not an active replica" << endl;
PG::Info empty(pgid);
notify_list[from].push_back(empty);
continue;
}
-
+ assert(role > 0);
+
ObjectStore::Transaction t;
- PG *pg = create_pg(pgid, t);
+ pg = create_pg(pgid, t);
pg->acting.swap( acting );
pg->set_role(role);
-
- pg->info.same_primary_since = it->second.same_primary_since;
- pg->info.same_acker_since = it->second.same_acker_since;
- pg->info.same_role_since = osdmap->get_epoch();
+ pg->info.history = history;
t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
store->apply_transaction(t);
dout(10) << *pg << " dne (before), but i am role " << role << endl;
- }
- pg = _lock_pg(pgid);
-
- // verify this is from same primary
- if (pg->is_primary()) {
- dout(10) << *pg << " i am primary, skipping" << endl;
- _unlock_pg(pgid);
- continue;
- } else {
- if (from == pg->acting[0]) {
- if (m->get_epoch() < pg->info.same_primary_since) {
- dout(10) << *pg << " not same primary since " << m->get_epoch() << ", skipping" << endl;
- _unlock_pg(pgid);
- continue;
- }
- } else {
- dout(10) << *pg << " query not from primary, skipping" << endl;
- assert(m->get_epoch() < osdmap->get_epoch());
+ _lock_pg(pgid);
+ } else {
+ pg = _lock_pg(pgid);
+
+ // same primary?
+ if (m->get_epoch() < pg->info.history.same_primary_since) {
+ dout(10) << *pg << " handle_pg_query primary changed in "
+ << pg->info.history.same_primary_since
+ << " (msg from " << m->get_epoch() << ")" << endl;
_unlock_pg(pgid);
continue;
}
}
+ // ok, process query!
+ assert(!pg->acting.empty());
+ assert(from == pg->acting[0]);
+
if (it->second.type == PG::Query::INFO) {
// info
dout(10) << *pg << " sending info" << endl;
/** pull - request object from a peer
*/
-void OSD::pull(PG *pg, object_t oid, eversion_t v)
+void OSD::pull(PG *pg, object_t oid)
{
assert(pg->missing.loc.count(oid));
+ eversion_t v = pg->missing.missing[oid];
int osd = pg->missing.loc[oid];
dout(7) << *pg << " pull " << hex << oid << dec
const eversion_t v = op->get_version();
int from = op->get_source().num();
- dout(7) << "op_pull " << hex << oid << dec << " v " << op->get_version()
+ dout(7) << *pg << " op_pull " << hex << oid << dec << " v " << op->get_version()
<< " from " << op->get_source()
<< endl;
// is a replica asking? are they missing it?
- if (pg->is_primary() &&
- (pg->peer_missing.count(from) == 0 ||
- !pg->peer_missing[from].is_missing(oid))) {
- dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << endl;
- delete op;
- return;
+ if (pg->is_primary()) {
+ // primary
+ assert(pg->peer_missing.count(from)); // we had better know this, from the peering process.
+
+ if (!pg->peer_missing[from].is_missing(oid)) {
+ dout(7) << *pg << " op_pull replica isn't actually missing it, we must have already pushed to them" << endl;
+ delete op;
+ return;
+ }
+
+ // do we have it yet?
+ if (waitfor_missing_object(op, pg))
+ return;
+ } else {
+ // non-primary
+ if (pg->missing.is_missing(oid)) {
+ dout(7) << *pg << " op_pull not primary, and missing " << hex << oid << dec << ", ignoring" << endl;
+ delete op;
+ return;
+ }
}
-
+
// push it back!
push(pg, oid, op->get_source().num());
}
eversion_t v = op->get_version();
if (!pg->missing.is_missing(oid)) {
- dout(7) << "op_push not missing object " << hex << oid << dec << endl;
+ dout(7) << *pg << " op_push not missing " << hex << oid << dec << endl;
return;
}
- dout(7) << "op_push "
+ dout(7) << *pg << " op_push "
<< hex << oid << dec
<< " v " << v
<< " size " << op->get_length() << " " << op->get_data().length()
- << " in " << *pg
<< endl;
assert(op->get_data().length() == op->get_length());
t.setattrs(oid, op->get_attrset());
t.collection_add(pg->info.pgid, oid);
+ // close out pull op?
+ num_pulling--;
+ if (pg->objects_pulling.count(oid))
+ pg->objects_pulling.erase(oid);
+ pg->missing.got(oid, v);
+
+
// raise last_complete?
assert(pg->log.complete_to != pg->log.log.end());
while (pg->log.complete_to != pg->log.log.end()) {
}
dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl;
+
// apply to disk!
t.collection_setattr(pg->info.pgid, "info", &pg->info, sizeof(pg->info));
unsigned r = store->apply_transaction(t);
assert(r == 0);
- // close out pull op?
- num_pulling--;
- if (pg->objects_pulling.count(oid))
- pg->objects_pulling.erase(oid);
- pg->missing.got(oid, v);
// am i primary? are others missing this too?
if (pg->is_primary()) {
for (unsigned i=1; i<pg->acting.size(); i++) {
int peer = pg->acting[i];
- if (pg->peer_missing.count(peer) &&
- pg->peer_missing[peer].is_missing(oid)) {
+ assert(pg->peer_missing.count(peer));
+ if (pg->peer_missing[peer].is_missing(oid)) {
// ok, push it, and they (will) have it now.
pg->peer_missing[peer].got(oid, v);
push(pg, oid, peer);
}
}
-
- // continue recovery
- pg->do_recovery();
}
+ // continue recovery
+ pg->do_recovery();
+
// kick waiters
if (pg->waiting_for_missing_object.count(oid))
take_waiters(pg->waiting_for_missing_object[oid]);
if (read) {
// read. am i the (same) acker?
if (pg->get_acker() != whoami ||
- op->get_map_epoch() < pg->info.same_acker_since) {
+ op->get_map_epoch() < pg->info.history.same_acker_since) {
dout(7) << "acting acker is osd" << pg->get_acker()
- << " since " << pg->info.same_acker_since
+ << " since " << pg->info.history.same_acker_since
<< ", dropping" << endl;
assert(op->get_map_epoch() < osdmap->get_epoch());
+ delete op;
return;
}
} else {
// write. am i the (same) primary?
if (pg->get_primary() != whoami ||
- op->get_map_epoch() < pg->info.same_primary_since) {
+ op->get_map_epoch() < pg->info.history.same_primary_since) {
dout(7) << "acting primary is osd" << pg->get_primary()
- << " since " << pg->info.same_primary_since
+ << " since " << pg->info.history.same_primary_since
<< ", dropping" << endl;
assert(op->get_map_epoch() < osdmap->get_epoch());
+ delete op;
return;
}
}
// have pg?
if (!pg) {
- dout(7) << "handle_rep_op " << op
- << " pgid " << hex << pgid << dec << " dne" << endl;
+ derr(-7) << "handle_rep_op " << op
+ << " pgid " << hex << pgid << dec << " dne" << endl;
delete op;
+ //assert(0); // wtf, shouldn't happen.
return;
}
- // check osd map: same primary+acker?
- if (op->get_map_epoch() != osdmap->get_epoch()) {
- if (op->get_map_epoch() < pg->info.same_primary_since ||
- op->get_map_epoch() < pg->info.same_acker_since) {
- // drop message.
- delete op;
- return;
- }
- assert(pg->get_role() >= 0);
-
- dout(5) << "handle_rep_op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
- << ", but primary+acker same in " << *pg
- << endl;
+ // check osd map: same set, or primary+acker?
+ if (g_conf.osd_rep == OSD_REP_CHAIN &&
+ op->get_map_epoch() < pg->info.history.same_since) {
+ dout(10) << "handle_rep_op pg changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << endl;
+ delete op;
+ return;
}
-
+ if (g_conf.osd_rep != OSD_REP_CHAIN &&
+ (op->get_map_epoch() < pg->info.history.same_primary_since ||
+ op->get_map_epoch() < pg->info.history.same_acker_since)) {
+ dout(10) << "handle_rep_op pg primary|acker changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << endl;
+ delete op;
+ return;
+ }
+
+ assert(pg->get_role() >= 0);
dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
}
ls.pop_front();
if (pgid) {
- dout(10) << "dequeue_op write pg " << hex << pgid << dec << " op " << op << ", "
+ dout(10) << "dequeue_op " << op << " write pg " << hex << pgid << dec
<< ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
} else {
- dout(10) << "dequeue_op read op " << op << ", "
+ dout(10) << "dequeue_op " << op << " read "
<< ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
}
_unlock_pg(pgid);
}
- dout(10) << "dequeue_op finish op " << op << endl;
+ dout(10) << "dequeue_op " << op << " finish" << endl;
assert(pending_ops > 0);
if (pending_ops > g_conf.osd_max_opq)
*/
void OSD::do_op(Message *m, PG *pg)
{
- //dout(15) << "do_op " << *op << " in " << *pg << endl;
+ //dout(15) << "do_op " << *m << endl;
if (m->get_type() == MSG_OSD_OP) {
MOSDOp *op = (MOSDOp*)m;
<< " in " << *pg
<< ", pulling"
<< endl;
- pull(pg, oid, v);
+ pull(pg, oid);
}
pg->waiting_for_missing_object[oid].push_back(op);
return true;
// adjust peers_complete_thru
if (!repop->pg_complete_thru.empty()) {
eversion_t min = pg->info.last_complete; // hrm....
- for (unsigned i=1; i<pg->acting.size(); i++) {
- if (repop->pg_complete_thru[i] < min) // note: if we haven't heard, it'll be zero, which is what we want.
- min = repop->pg_complete_thru[i];
+ for (unsigned i=0; i<pg->acting.size(); i++) {
+ if (repop->pg_complete_thru[pg->acting[i]] < min) // note: if we haven't heard, it'll be zero, which is what we want.
+ min = repop->pg_complete_thru[pg->acting[i]];
}
if (min > pg->peers_complete_thru) {
- dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+ dout(10) << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << " in " << *pg << endl;
pg->peers_complete_thru = min;
}
}
// are any peers missing this?
for (unsigned i=1; i<pg->acting.size(); i++) {
int peer = pg->acting[i];
- if (pg->peer_missing.count(i) &&
+ if (pg->peer_missing.count(peer) &&
pg->peer_missing[peer].is_missing(oid)) {
// push it before this update. FIXME, this is probably extra much work (eg if we're about to overwrite)
pg->peer_missing[peer].got(oid);
dout(10) << "merge_log missing " << hex << p->first << dec << " " << p->second
<< " also LOST on source, osd" << fromosd << endl;
}
- } else {
+ }
+ else if (p->second <= olog.top) {
dout(10) << "merge_log missing " << hex << p->first << dec << " " << p->second
<< " on source, osd" << fromosd << endl;
missing.loc[p->first] = fromosd;
- }
+ }
}
dout(10) << "merge_log missing " << hex << missing.missing << dec << endl;
}
dout(10) << " querying info from osd" << *it << endl;
- query_map[*it][info.pgid] = Query(Query::INFO, info.same_primary_since, info.same_acker_since);
+ query_map[*it][info.pgid] = Query(Query::INFO, info.history);
peer_info_requested.insert(*it);
}
if (missing_info) return;
}
}
- // gather log?
+ // gather log+missing?
+ // ...from all active
+ for (unsigned i=1; i<acting.size(); i++) {
+ int peer = acting[i];
+ if (peer_log_requested.count(peer)) continue;
+
+ dout(10) << " pulling log from osd" << peer
+ << " from v " << oldest_update_needed
+ << endl;
+ query_map[peer][info.pgid] = Query(Query::LOG, oldest_update_needed, info.history);
+ peer_log_requested[peer] = oldest_update_needed;
+ }
+
+ // ...and the newest too
if (newest_update_osd != osd->whoami) {
if (peer_log_requested.count(newest_update_osd) ||
peer_summary_requested.count(newest_update_osd)) {
<< " v " << newest_update
<< ", querying since " << oldest_update_needed
<< endl;
- query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed,
- info.same_primary_since, info.same_acker_since);
+ query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, info.history);
peer_log_requested[newest_update_osd] = oldest_update_needed;
} else {
dout(10) << " newest update on osd" << newest_update_osd
assert((peer_info[newest_update_osd].last_complete >=
peer_info[newest_update_osd].log_bottom) ||
peer_info[newest_update_osd].log_backlog); // or else we're in trouble.
- query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG,
- info.same_primary_since, info.same_acker_since);
+ query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, info.history);
peer_summary_requested.insert(newest_update_osd);
}
}
} else {
dout(10) << " i have the most up-to-date pg v " << info.last_update << endl;
}
+
+ // did we get them all?
+ bool have_missing = true;
+ for (unsigned i=1; i<acting.size(); i++) {
+ int peer = acting[i];
+ if (peer_missing.count(peer)) continue;
+
+ dout(10) << " waiting for log+missing from osd" << peer << endl;
+ have_missing = false;
+ }
+ if (!have_missing) return;
+
dout(10) << " peers_complete_thru " << peers_complete_thru << endl;
dout(10) << " oldest_update_needed " << oldest_update_needed << endl;
<< ". fetching summary/backlog from osd" << who
<< endl;
assert(who != osd->whoami); // can't be me, or we're in trouble.
- query_map[who][info.pgid] = Query(Query::BACKLOG,
- info.same_primary_since, info.same_acker_since);
+ query_map[who][info.pgid] = Query(Query::BACKLOG, info.history);
peer_summary_requested.insert(who);
}
return;
}
dout(10) << " requesting summary/backlog from osd" << peer << endl;
- query_map[peer][info.pgid] = Query(Query::INFO,
- info.same_primary_since,
- info.same_acker_since);
+ query_map[peer][info.pgid] = Query(Query::INFO, info.history);
peer_summary_requested.insert(peer);
waiting = true;
}
state_set(STATE_REPLAY);
g_timer.add_event_after(g_conf.osd_replay_window,
new OSD::C_Activate(osd, info.pgid, osd->osdmap->get_epoch()));
- } else {
+ }
+ else if (!is_active()) {
// -- ok, activate!
activate(t);
}
void PG::activate(ObjectStore::Transaction& t)
{
+ assert(!is_active());
+
// twiddle pg state
state_set(STATE_ACTIVE);
state_clear(STATE_STRAY);
log.complete_to == log.log.end();
log.requested_to = log.log.end();
}
- else if (is_primary()) {
+ //else if (is_primary()) {
+ else if (true) {
dout(10) << "activate - not complete, " << missing << ", starting recovery" << endl;
// init complete_to
log.complete_to++;
assert(log.complete_to != log.log.end());
}
-
+
// start recovery
log.requested_to = log.complete_to;
do_recovery();
int peer = acting[i];
assert(peer_info.count(peer));
- if (peer_info[peer].is_clean())
- clean_set.insert(peer);
-
-
MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(),
info.pgid);
m->info = info;
assert(peer_info[peer].last_update < info.last_update);
m->log.copy_after(log, peer_info[peer].last_update);
}
+
+ // update local version of peer's missing list!
+ {
+ eversion_t plu = peer_info[peer].last_update;
+ Missing& pm = peer_missing[peer];
+ for (list<Log::Entry>::iterator p = m->log.log.begin();
+ p != m->log.log.end();
+ p++)
+ if (p->version > plu)
+ pm.add(p->oid, p->version);
+ }
- dout(10) << "sending " << m->log << " " << m->missing
+ dout(10) << "activate sending " << m->log << " " << m->missing
<< " to osd" << peer << endl;
-
//m->log.print(cout);
-
osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+
+ // update our missing
+ if (peer_missing[peer].num_missing() == 0) {
+ dout(10) << "activate peer osd" << peer << " already clean, " << peer_info[peer] << endl;
+ assert(peer_info[peer].last_complete == info.last_update);
+ clean_set.insert(peer);
+ } else {
+ dout(10) << "activate peer osd" << peer << " " << peer_info[peer]
+ << " missing " << peer_missing[peer] << endl;
+ }
+
}
+
+ // discard unneeded peering state
+ //peer_log.clear(); // actually, do this carefully, in case peer() is called again.
// all clean?
if (is_all_clean()) {
state_set(STATE_CLEAN);
- dout(10) << "all replicas clean" << endl;
+ dout(10) << "activate all replicas clean" << endl;
clean_replicas();
}
}
if (latest->is_update() &&
!objects_pulling.count(latest->oid) &&
missing.is_missing(latest->oid)) {
- osd->pull(this, latest->oid, latest->version);
+ osd->pull(this, latest->oid);
return true;
}