snap_trim_wq.stop();
dout(10) << "snap trim wq stopped" << dendl;
+ // tell pgs we're shutting down
+ for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
+ p != pg_map.end();
+ p++)
+ p->second->on_shutdown();
// zap waiters (bleh, this is messy)
finished_lock.Lock();
for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
p != pg_map.end();
p++) {
- delete p->second;
+ PG *pg = p->second;
+ pg->lock();
+ pg->put_unlock();
}
pg_map.clear();
- // shut everything else down
- //monitor->shutdown();
messenger->shutdown();
return r;
}
if (changed) {
write_info(t);
write_log(t);
+
+ // init complete pointer
+ if (missing.num_missing() == 0 &&
+ info.last_complete != info.last_update) {
+ dout(10) << "merge_log - no missing, moving last_complete " << info.last_complete
+ << " -> " << info.last_update << dendl;
+ info.last_complete = info.last_update;
+ }
+
+ if (info.last_complete == info.last_update) {
+ dout(10) << "merge_log - complete" << dendl;
+ log.complete_to = log.log.end();
+ log.requested_to = log.log.end();
+ } else {
+ dout(10) << "merge_log - not complete, " << missing << dendl;
+
+ log.complete_to = log.log.begin();
+ while (log.complete_to->version < info.last_complete) {
+ log.complete_to++;
+ assert(log.complete_to != log.log.end());
+ }
+ log.requested_to = log.complete_to;
+ }
}
}
if (!info.dead_snaps.empty())
queue_snap_trim();
- // init complete pointer
- if (missing.num_missing() == 0 &&
- info.last_complete != info.last_update) {
- dout(10) << "activate - no missing, moving last_complete " << info.last_complete
- << " -> " << info.last_update << dendl;
- info.last_complete = info.last_update;
- }
-
if (info.last_complete == info.last_update) {
dout(10) << "activate - complete" << dendl;
- log.complete_to == log.log.end();
+ log.complete_to = log.log.end();
log.requested_to = log.log.end();
- }
- else if (true) {
+ } else {
dout(10) << "activate - not complete, " << missing << dendl;
-
- // init complete_to
- log.complete_to = log.log.begin();
- while (log.complete_to->version < info.last_complete) {
- log.complete_to++;
- assert(log.complete_to != log.log.end());
- }
+ assert(log.complete_to->version >= info.last_complete);
if (is_primary()) {
// start recovery
dout(10) << "activate - starting recovery" << dendl;
- log.requested_to = log.complete_to;
+ assert(log.requested_to == log.complete_to);
osd->queue_for_recovery(this);
}
- } else {
- dout(10) << "activate - not complete, " << missing << dendl;
}
// if primary..
virtual void on_acker_change() = 0;
virtual void on_role_change() = 0;
virtual void on_change() = 0;
+ virtual void on_shutdown() = 0;
};
WRITE_CLASS_ENCODER(PG::Info::History)
void on_acker_change();
void on_role_change();
void on_change();
+ void on_shutdown() {}
};
#undef dout_prefix
#define dout_prefix _prefix(this, osd->whoami, osd->osdmap)
static ostream& _prefix(PG *pg, int whoami, OSDMap *osdmap) {
- return *_dout << dbeginl<< pthread_self() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " " << *pg << " ";
+ return *_dout << dbeginl << pthread_self() << " osd" << whoami
+ << " " << (osdmap ? osdmap->get_epoch():0) << " " << *pg << " ";
}
void ReplicatedPG::op_modify_ondisk(RepGather *repop)
{
if (repop->aborted) {
- dout(10) << "op_modify_ondisk " << *repop->op << " -- aborted" << dendl;
+ dout(10) << "op_modify_ondisk " << *repop << " -- aborted" << dendl;
} else {
- dout(10) << "op_modify_ondisk " << *repop->op << dendl;
+ dout(10) << "op_modify_ondisk " << *repop << dendl;
assert(repop->waitfor_disk.count(osd->get_nodeid()));
- repop->waitfor_nvram.erase(osd->get_nodeid());
repop->waitfor_disk.erase(osd->get_nodeid());
+ repop->waitfor_nvram.erase(osd->get_nodeid());
repop->pg_complete_thru[osd->get_nodeid()] = repop->pg_local_last_complete;
eval_repop(repop);
}
repop->start = g_clock.now();
- repop_queue.push_back(repop);
+ repop_queue.push_back(&repop->queue_item);
repop_map[repop->rep_tid] = repop;
repop->get();
missing_loc.erase(poid.oid);
// raise last_complete?
- assert(log.complete_to != log.log.end());
while (log.complete_to != log.log.end()) {
- if (missing.missing.count(log.complete_to->oid)) break;
+ if (missing.missing.count(log.complete_to->oid))
+ break;
if (info.last_complete < log.complete_to->version)
info.last_complete = log.complete_to->version;
log.complete_to++;
dout(10) << "on_osd_failure " << o << dendl;
// artificially ack failed osds
- deque<RepGather*>::iterator p = repop_queue.begin();
- while (p != repop_queue.end()) {
- RepGather *repop = *p++;
+ xlist<RepGather*>::iterator p = repop_queue.begin();
+ while (!p.end()) {
+ RepGather *repop = *p;
+ ++p;
dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl;
if (repop->waitfor_ack.count(o) ||
repop->waitfor_nvram.count(o) ||
dout(10) << "on_acker_change" << dendl;
}
+void ReplicatedPG::on_shutdown()
+{
+ dout(10) << "on_shutdown" << dendl;
+
+ // apply all local repops
+ // (pg is inactive; we will repeer)
+ xlist<RepGather*>::iterator p = repop_queue.begin();
+ while (!p.end()) {
+ RepGather *repop = *p;
+ ++p;
+ if (!repop->applied)
+ apply_repop(repop);
+ repop->queue_item.remove_myself();
+ repop->put();
+ }
+}
+
void ReplicatedPG::on_change()
{
dout(10) << "on_change" << dendl;
// apply all local repops
// (pg is inactive; we will repeer)
- for (deque<RepGather*>::iterator p = repop_queue.begin();
- p != repop_queue.end();
- p++)
+ for (xlist<RepGather*>::iterator p = repop_queue.begin();
+ !p.end(); ++p)
if (!(*p)->applied)
apply_repop(*p);
- deque<RepGather*>::iterator p = repop_queue.begin();
- while (p != repop_queue.end()) {
+ xlist<RepGather*>::iterator p = repop_queue.begin();
+ while (!p.end()) {
RepGather *repop = *p;
+ ++p;
if (!is_primary()) {
// no longer primary. hose repops.
dout(-1) << " aborting repop tid " << repop->rep_tid << dendl;
repop->aborted = true;
- repop_queue.erase(p++);
+ repop->queue_item.remove_myself();
repop_map.erase(repop->rep_tid);
repop->put();
} else {
// still primary. artificially ack+commit any replicas who dropped out of the pg
- p++;
dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl;
set<int> all;
set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(),
repop->waitfor_ack.begin(), repop->waitfor_ack.end(),
inserter(all, all.begin()));
for (set<int>::iterator q = all.begin(); q != all.end(); q++) {
+ if (*q == osd->get_nodeid())
+ continue;
bool have = false;
for (unsigned i=1; i<acting.size(); i++)
if (acting[i] == *q)
have = true;
if (!have)
- repop_ack(repop, -EIO, true, *q);
+ repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, *q);
}
}
}
*/
class RepGather {
public:
+ xlist<RepGather*>::item queue_item;
int nref;
class MOSDOp *op;
RepGather(MOSDOp *o, tid_t rt, eversion_t av, eversion_t lc,
SnapSet& ss, SnapContext& sc) :
+ queue_item(this),
nref(1), op(o), rep_tid(rt),
applied(false), aborted(false),
sent_ack(false), sent_nvram(false), sent_disk(false),
if (--nref == 0) {
delete op;
delete this;
+ generic_dout(0) << "deleting " << this << dendl;
}
}
};
protected:
// replica ops
// [primary|tail]
- deque<RepGather*> repop_queue;
+ xlist<RepGather*> repop_queue;
map<tid_t, RepGather*> repop_map;
void apply_repop(RepGather *repop);
void on_acker_change();
void on_role_change();
void on_change();
+ void on_shutdown();
};
{
out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid
<< " wfack=" << repop.waitfor_ack
- << " wfnvram=" << repop.waitfor_nvram
+ //<< " wfnvram=" << repop.waitfor_nvram
<< " wfdisk=" << repop.waitfor_disk;
out << " pct=" << repop.pg_complete_thru;
out << " op=" << *(repop.op);
- out << " repop=" << &repop;
+ out << " " << &repop;
out << ")";
return out;
}