}
}
- if (!op->get_source().is_osd()) {
- // REGULAR OP (non-replication)
-
- // note original source
- op->clear_payload(); // and hose encoded payload (in case we forward)
-
- // have pg?
- if (!pg) {
- dout(7) << "hit non-existent pg "
- << pgid
- << ", waiting" << dendl;
- waiting_for_pg[pgid].push_back(op);
+ // we don't need encoded payload anymore
+ op->clear_payload();
+
+ // have pg?
+ if (!pg) {
+ dout(7) << "hit non-existent pg "
+ << pgid
+ << ", waiting" << dendl;
+ waiting_for_pg[pgid].push_back(op);
+ return;
+ }
+
+ // pg must be same-ish...
+ if (!op->is_modify()) {
+ // read
+ if (!pg->same_for_read_since(op->get_map_epoch())) {
+ dout(7) << "handle_rep_op pg changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << dendl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ pg->unlock();
+ delete op;
return;
}
-
- // pg must be same-ish...
- if (!op->is_modify()) {
- // read
- if (!pg->same_for_read_since(op->get_map_epoch())) {
- dout(7) << "handle_rep_op pg changed " << pg->info.history
- << " after " << op->get_map_epoch()
- << ", dropping" << dendl;
- assert(op->get_map_epoch() < osdmap->get_epoch());
- pg->unlock();
- delete op;
- return;
- }
-
- if (op->get_oid().snap > 0) {
- // snap read. hrm.
- // are we missing a revision that we might need?
- // let's get them all.
- for (unsigned i=0; i<op->get_snaps().size(); i++) {
- object_t oid = op->get_oid();
- oid.snap = op->get_snaps()[i];
- if (pg->is_missing_object(oid)) {
- dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl;
- pg->wait_for_missing_object(op->get_oid(), op);
- pg->unlock();
- return;
- }
- }
- }
-
- } else {
- // modify
- if ((!pg->is_primary() ||
- !pg->same_for_modify_since(op->get_map_epoch()))) {
- dout(7) << "handle_op pg changed " << pg->info.history
- << " after " << op->get_map_epoch()
- << ", dropping" << dendl;
- assert(op->get_map_epoch() < osdmap->get_epoch());
- pg->unlock();
- delete op;
- return;
- }
-
- // scrubbing?
- if (pg->is_scrubbing()) {
- dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
- pg->waiting_for_active.push_back(op);
- pg->unlock();
- return;
- }
- }
- // pg must be active.
- if (!pg->is_active()) {
- // replay?
- if (op->get_version().version > 0) {
- if (op->get_version() > pg->info.last_update) {
- dout(7) << *pg << " queueing replay at " << op->get_version()
- << " for " << *op << dendl;
- pg->replay_queue[op->get_version()] = op;
+ if (op->get_oid().snap > 0) {
+ // snap read. hrm.
+ // are we missing a revision that we might need?
+ // let's get them all.
+ for (unsigned i=0; i<op->get_snaps().size(); i++) {
+ object_t oid = op->get_oid();
+ oid.snap = op->get_snaps()[i];
+ if (pg->is_missing_object(oid)) {
+ dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl;
+ pg->wait_for_missing_object(op->get_oid(), op);
pg->unlock();
- return;
- } else {
- dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update
- << " for " << *op
- << ", will queue for WRNOOP" << dendl;
- }
+ return;
+ }
}
-
- dout(7) << *pg << " not active (yet)" << dendl;
- pg->waiting_for_active.push_back(op);
- pg->unlock();
- return;
}
- // missing object?
- if (pg->is_missing_object(op->get_oid())) {
- pg->wait_for_missing_object(op->get_oid(), op);
- pg->unlock();
- return;
- }
-
- dout(10) << "handle_op " << *op << " in " << *pg << dendl;
-
} else {
- // REPLICATION OP (it's from another OSD)
-
- // have pg?
- if (!pg) {
- derr(-7) << "handle_rep_op " << *op
- << " pgid " << pgid << " dne" << dendl;
+ // modify
+ if ((!pg->is_primary() ||
+ !pg->same_for_modify_since(op->get_map_epoch()))) {
+ dout(7) << "handle_op pg changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << dendl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ pg->unlock();
delete op;
- //assert(0); // wtf, shouldn't happen.
return;
}
- // check osd map: same set, or primary+acker?
- if (!pg->same_for_rep_modify_since(op->get_map_epoch())) {
- dout(10) << "handle_rep_op pg changed " << pg->info.history
- << " after " << op->get_map_epoch()
- << ", dropping" << dendl;
+ // scrubbing?
+ if (pg->is_scrubbing()) {
+ dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
+ pg->waiting_for_active.push_back(op);
pg->unlock();
- delete op;
return;
}
-
- assert(pg->get_role() >= 0);
- dout(7) << "handle_rep_op " << op << " in " << *pg << dendl;
}
+
+ // pg must be active.
+ if (!pg->is_active()) {
+ // replay?
+ if (op->get_version().version > 0) {
+ if (op->get_version() > pg->info.last_update) {
+ dout(7) << *pg << " queueing replay at " << op->get_version()
+ << " for " << *op << dendl;
+ pg->replay_queue[op->get_version()] = op;
+ pg->unlock();
+ return;
+ } else {
+ dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update
+ << " for " << *op
+ << ", will queue for WRNOOP" << dendl;
+ }
+ }
+
+ dout(7) << *pg << " not active (yet)" << dendl;
+ pg->waiting_for_active.push_back(op);
+ pg->unlock();
+ return;
+ }
+
+ // missing object?
+ if (pg->is_missing_object(op->get_oid())) {
+ pg->wait_for_missing_object(op->get_oid(), op);
+ pg->unlock();
+ return;
+ }
+
+ dout(10) << "handle_op " << *op << " in " << *pg << dendl;
// proprocess op?
if (pg->preprocess_op(op, now)) {
}
PG *pg = _lookup_lock_pg(pgid);
+
+ // same pg?
+ // if pg changes _at all_, we reset and repeer!
+ if (op->map_epoch < pg->info.history.same_since) {
+ dout(10) << "handle_sub_op pg changed " << pg->info.history
+ << " after " << op->map_epoch
+ << ", dropping" << dendl;
+ pg->unlock();
+ delete op;
+ return;
+ }
+
if (g_conf.osd_maxthreads < 1) {
pg->do_sub_op(op); // do it now
} else {
<< dendl;
// sanity checks
- if (op->map_epoch < info.history.same_primary_since) {
- dout(10) << "sub_op_modify discarding old sub_op from "
- << op->map_epoch << " < " << info.history.same_primary_since << dendl;
- delete op;
- return;
- }
- if (!is_active()) {
- dout(10) << "sub_op_modify not active" << dendl;
- delete op;
- return;
- }
+ assert(op->map_epoch >= info.history.same_primary_since);
+ assert(is_active());
assert(is_replica());
// note peer's stat
<< " from " << op->get_source()
<< dendl;
- if (op->map_epoch < info.history.same_primary_since) {
- dout(10) << "sub_op_pull discarding old sub_op from "
- << op->map_epoch << " < " << info.history.same_primary_since << dendl;
- delete op;
- return;
- }
-
assert(!is_primary()); // we should be a replica or stray.
- // push it back!
push(poid, op->get_source().num(), op->data_subset, op->clone_subsets);
delete op;
}
interval_set<__u64> data_subset;
map<pobject_t, interval_set<__u64> > clone_subsets;
- if (!is_primary()) {
- // non-primary should only accept pushes from the current primary.
- if (op->map_epoch < info.history.same_primary_since) {
- dout(10) << "sub_op_push discarding old sub_op from "
- << op->map_epoch << " < " << info.history.same_primary_since << dendl;
- delete op;
- return;
- }
- // FIXME: actually, no, what i really want here is a personal "same_role_since"
- if (!is_active()) {
- dout(10) << "sub_op_push not active" << dendl;
- delete op;
- return;
- }
- } else {
- // primary will accept pushes anytime.
- }
-
// are we missing (this specific version)?
// (if version is wrong, it is either old (we don't want it) or
// newer (peering is buggy))
void ReplicatedPG::on_osd_failure(int o)
{
- dout(10) << "on_osd_failure " << o << dendl;
-
- // artificially ack failed osds
- xlist<RepGather*>::iterator p = repop_queue.begin();
- while (!p.end()) {
- RepGather *repop = *p;
- ++p;
- dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl;
- if (repop->waitfor_ack.count(o) ||
- repop->waitfor_nvram.count(o) ||
- repop->waitfor_disk.count(o))
- repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, o);
- }
-
- // remove from pushing map
- {
- map<object_t, pair<eversion_t,int> >::iterator p = pulling.begin();
- while (p != pulling.end())
- if (p->second.second == o) {
- dout(10) << " forgetting pull of " << p->first << " " << p->second.first
- << " from osd" << o << dendl;
- pulling.erase(p++);
- } else
- p++;
- }
+ //dout(10) << "on_osd_failure " << o << dendl;
}
void ReplicatedPG::on_shutdown()
{
dout(10) << "on_change" << dendl;
- // apply all local repops
- // (pg is inactive; we will repeer)
- for (xlist<RepGather*>::iterator p = repop_queue.begin();
- !p.end(); ++p)
- if (!(*p)->applied)
- apply_repop(*p);
-
- xlist<RepGather*>::iterator p = repop_queue.begin();
- while (!p.end()) {
- RepGather *repop = *p;
- ++p;
-
- if (!is_primary()) {
- // no longer primary. hose repops.
- dout(-1) << " aborting repop tid " << repop->rep_tid << dendl;
- repop->aborted = true;
- repop->queue_item.remove_myself();
- repop_map.erase(repop->rep_tid);
- repop->put();
- } else {
- // still primary. artificially ack+commit any replicas who dropped out of the pg
- dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl;
- set<int> all;
- set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(),
- repop->waitfor_ack.begin(), repop->waitfor_ack.end(),
- inserter(all, all.begin()));
- for (set<int>::iterator q = all.begin(); q != all.end(); q++) {
- if (*q == osd->get_nodeid())
- continue;
- bool have = false;
- for (unsigned i=1; i<acting.size(); i++)
- if (acting[i] == *q)
- have = true;
- if (!have)
- repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, *q);
- }
- }
+ // apply all repops
+ while (!repop_queue.empty()) {
+ RepGather *repop = repop_queue.front();
+ repop_queue.pop_front();
+ dout(10) << " applying repop tid " << repop->rep_tid << dendl;
+ if (!repop->applied)
+ apply_repop(repop);
+ repop->aborted = true;
+ repop->put();
}
- // clear pushing map
+ // clear pushing/pulling maps
pushing.clear();
+ pulling.clear();
}
void ReplicatedPG::on_role_change()