}
+
+
+
+
+
// ========================================================================
// rep op gather
class C_OSD_ModifyCommit : public Context {
public:
ReplicatedPG *pg;
- tid_t rep_tid;
- eversion_t pg_last_complete;
- C_OSD_ModifyCommit(ReplicatedPG *p, tid_t rt, eversion_t lc) : pg(p), rep_tid(rt), pg_last_complete(lc) {
- pg->get(); // we're copying the pointer
+ ReplicatedPG::RepGather *repop;
+
+ C_OSD_ModifyCommit(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
+ pg(p), repop(rg) {
+ repop->get();
+ pg->get(); // we're copying the pointer
}
void finish(int r) {
pg->lock();
if (!pg->is_deleted())
- pg->op_modify_ondisk(rep_tid, pg_last_complete);
+ pg->op_modify_ondisk(repop);
+ repop->put();
pg->put_unlock();
}
};
-
-void ReplicatedPG::get_rep_gather(RepGather *repop)
+/** op_modify_commit
+ * transaction commit on the acker.
+ */
+void ReplicatedPG::op_modify_ondisk(RepGather *repop)
{
- //repop->lock.Lock();
- dout(10) << "get_repop " << *repop << dendl;
+ if (repop->aborted) {
+ dout(10) << "op_modify_ondisk " << *repop->op << " -- aborted" << dendl;
+ } else {
+ dout(10) << "op_modify_ondisk " << *repop->op << dendl;
+ assert(repop->waitfor_disk.count(osd->get_nodeid()));
+ repop->waitfor_nvram.erase(osd->get_nodeid());
+ repop->waitfor_disk.erase(osd->get_nodeid());
+ repop->pg_complete_thru[osd->get_nodeid()] = repop->pg_local_last_complete;
+ eval_repop(repop);
+ }
}
+
void ReplicatedPG::apply_repop(RepGather *repop)
{
dout(10) << "apply_repop applying update on " << *repop << dendl;
assert(!repop->applied);
- Context *oncommit = new C_OSD_ModifyCommit(this, repop->rep_tid, repop->pg_local_last_complete);
+ Context *oncommit = new C_OSD_ModifyCommit(this, repop);
unsigned r = osd->store->apply_transaction(repop->t, oncommit);
if (r)
dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << dendl;
}
update_stats();
-
}
-void ReplicatedPG::put_rep_gather(RepGather *repop)
+void ReplicatedPG::eval_repop(RepGather *repop)
{
- dout(10) << "put_repop " << *repop << dendl;
+ dout(10) << "eval_repop " << *repop << dendl;
// disk?
if (repop->can_send_disk()) {
if (repop->op->wants_ondisk()) {
// send commit.
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONDISK);
- dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl;
+ dout(10) << " sending commit on " << *repop << " " << reply << dendl;
osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
repop->sent_disk = true;
}
if (repop->op->wants_onnvram()) {
// send commit.
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONNVRAM);
- dout(10) << "put_repop sending onnvram on " << *repop << " " << reply << dendl;
+ dout(10) << " sending onnvram on " << *repop << " " << reply << dendl;
osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
repop->sent_nvram = true;
}
if (repop->op->wants_ack()) {
// send ack
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK);
- dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl;
+ dout(10) << " sending ack on " << *repop << " " << reply << dendl;
osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
repop->sent_ack = true;
}
}
if (min > peers_complete_thru) {
- dout(10) << "put_repop peers_complete_thru "
- << peers_complete_thru << " -> " << min
- << dendl;
+ dout(10) << " peers_complete_thru "
+ << peers_complete_thru << " -> " << min
+ << dendl;
peers_complete_thru = min;
}
}
- dout(10) << "put_repop deleting " << *repop << dendl;
-
- assert(rep_gather.count(repop->rep_tid));
- rep_gather.erase(repop->rep_tid);
-
- delete repop->op;
- delete repop;
+ dout(10) << " removing " << *repop << dendl;
+ assert(!repop_queue.empty());
+ assert(repop_queue.front() == repop);
+ repop_queue.pop_front();
+ repop_map.erase(repop->rep_tid);
+ repop->put();
}
}
osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
}
-ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv,
- SnapSet& snapset, SnapContext& snapc)
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, tid_t rep_tid, eversion_t nv,
+ SnapSet& snapset, SnapContext& snapc)
{
- dout(10) << "new_rep_gather rep_tid " << rep_tid << " on " << *op << dendl;
+ dout(10) << "new_repop rep_tid " << rep_tid << " on " << *op << dendl;
+
RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete,
snapset, snapc);
- // osds. commits all come to me.
- for (unsigned i=0; i<acting.size(); i++) {
- int osd = acting[i];
- repop->osds.insert(osd);
- repop->waitfor_disk.insert(osd);
- }
-
- // primary. all osds ack to me.
+ // initialize gather sets
for (unsigned i=0; i<acting.size(); i++) {
int osd = acting[i];
repop->waitfor_ack.insert(osd);
+ repop->waitfor_disk.insert(osd);
}
repop->start = g_clock.now();
- rep_gather[repop->rep_tid] = repop;
-
- // anyone waiting? (acks that got here before the op did)
- if (waiting_for_repop.count(repop->rep_tid)) {
- osd->take_waiters(waiting_for_repop[repop->rep_tid]);
- waiting_for_repop.erase(repop->rep_tid);
- }
+ repop_queue.push_back(repop);
+ repop_map[repop->rep_tid] = repop;
+ repop->get();
return repop;
}
-void ReplicatedPG::repop_ack(RepGather *repop,
- int result, int ack_type,
+void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
int fromosd, eversion_t pg_complete_thru)
{
MOSDOp *op = repop->op;
<< " ack_type " << ack_type
<< " from osd" << fromosd
<< dendl;
-
- get_rep_gather(repop);
- {
- if (ack_type & CEPH_OSD_OP_ONDISK) {
- // disk
- assert(repop->waitfor_disk.count(fromosd));
- repop->waitfor_disk.erase(fromosd);
- repop->waitfor_nvram.erase(fromosd);
- repop->waitfor_ack.erase(fromosd);
- repop->pg_complete_thru[fromosd] = pg_complete_thru;
- } else if (ack_type & CEPH_OSD_OP_ONNVRAM) {
- // nvram
- repop->waitfor_nvram.erase(fromosd);
- repop->waitfor_ack.erase(fromosd);
- } else {
- // ack
- repop->waitfor_ack.erase(fromosd);
- }
+
+ if (ack_type & CEPH_OSD_OP_ONDISK) {
+ // disk
+ assert(repop->waitfor_disk.count(fromosd));
+ repop->waitfor_disk.erase(fromosd);
+ repop->waitfor_nvram.erase(fromosd);
+ repop->waitfor_ack.erase(fromosd);
+ repop->pg_complete_thru[fromosd] = pg_complete_thru;
+ } else if (ack_type & CEPH_OSD_OP_ONNVRAM) {
+ // nvram
+ repop->waitfor_nvram.erase(fromosd);
+ repop->waitfor_ack.erase(fromosd);
+ } else {
+ // ack
+ repop->waitfor_ack.erase(fromosd);
}
- put_rep_gather(repop);
-}
-
-
-
-
-
-
+ eval_repop(repop);
+}
-/** op_modify_commit
- * transaction commit on the acker.
- */
-void ReplicatedPG::op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru)
-{
- if (rep_gather.count(rep_tid)) {
- RepGather *repop = rep_gather[rep_tid];
-
- dout(10) << "op_modify_ondisk " << *repop->op << dendl;
- get_rep_gather(repop);
- {
- assert(repop->waitfor_disk.count(osd->get_nodeid()));
- repop->waitfor_nvram.erase(osd->get_nodeid());
- repop->waitfor_disk.erase(osd->get_nodeid());
- repop->pg_complete_thru[osd->get_nodeid()] = pg_complete_thru;
- }
- put_rep_gather(repop);
- dout(10) << "op_modify_ondisk done on " << repop << dendl;
- } else {
- dout(10) << "op_modify_ondisk rep_tid " << rep_tid << " dne" << dendl;
- }
-}
-// commit (to disk) callback
-class C_OSD_RepModifyCommit : public Context {
-public:
- ReplicatedPG *pg;
- MOSDSubOp *op;
- int destosd;
-
- eversion_t pg_last_complete;
-
- Mutex lock;
- Cond cond;
- bool acked;
- bool waiting;
-
- C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) :
- pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
- lock("C_OSD_RepModifyCommit::lock"),
- acked(false), waiting(false) {
- pg->get(); // we're copying the pointer.
- }
- void finish(int r) {
- lock.Lock();
- assert(!waiting);
- while (!acked) {
- waiting = true;
- cond.Wait(lock);
- }
- assert(acked);
- lock.Unlock();
-
- pg->lock();
- pg->sub_op_modify_ondisk(op, destosd, pg_last_complete);
- pg->put_unlock();
- }
- void ack() {
- lock.Lock();
- assert(!acked);
- acked = true;
- if (waiting) cond.Signal();
-
- // discard my reference to buffer
- op->get_data().clear();
-
- lock.Unlock();
- }
-};
void ReplicatedPG::op_modify(MOSDOp *op)
{
// issue replica writes
tid_t rep_tid = osd->get_tid();
- RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
+ RepGather *repop = new_repop(op, rep_tid, av, snapset, snapc);
+
for (unsigned i=1; i<acting.size(); i++)
issue_repop(repop, acting[i], now);
// (logical) local ack.
// (if alone, this will apply the update.)
- get_rep_gather(repop);
- {
- assert(repop->waitfor_ack.count(whoami));
- repop->waitfor_ack.erase(whoami);
- }
- put_rep_gather(repop);
+ assert(repop->waitfor_ack.count(whoami));
+ repop->waitfor_ack.erase(whoami);
+ eval_repop(repop);
+ repop->put();
}
+
+
// sub op modify
+
+// commit (to disk) callback
+class C_OSD_RepModifyCommit : public Context {
+public:
+ ReplicatedPG *pg;
+
+ MOSDSubOp *op;
+ int destosd;
+
+ eversion_t pg_last_complete;
+
+ Mutex lock;
+ Cond cond;
+ bool acked;
+ bool waiting;
+
+ C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) :
+ pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
+ lock("C_OSD_RepModifyCommit::lock"),
+ acked(false), waiting(false) {
+ pg->get(); // we're copying the pointer.
+ }
+ void finish(int r) {
+ lock.Lock();
+ assert(!waiting);
+ while (!acked) {
+ waiting = true;
+ cond.Wait(lock);
+ }
+ assert(acked);
+ lock.Unlock();
+
+ pg->lock();
+ pg->sub_op_modify_ondisk(op, destosd, pg_last_complete);
+ pg->put_unlock();
+ }
+ void ack() {
+ lock.Lock();
+ assert(!acked);
+ acked = true;
+ if (waiting) cond.Signal();
+
+ // discard my reference to buffer
+ op->get_data().clear();
+
+ lock.Unlock();
+ }
+};
+
void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
{
pobject_t poid = op->poid;
osd->take_peer_stat(fromosd, r->get_peer_stat());
- if (rep_gather.count(rep_tid)) {
+ if (repop_map.count(rep_tid)) {
// oh, good.
- repop_ack(rep_gather[rep_tid],
+ repop_ack(repop_map[rep_tid],
r->get_result(), r->ack_type,
fromosd,
r->get_pg_complete_thru());
- delete r;
- } else {
- // early ack.
- waiting_for_repop[rep_tid].push_back(r);
}
+
+ delete r;
}
{
dout(10) << "on_osd_failure " << o << dendl;
- hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
- while (p != rep_gather.end()) {
- RepGather *repop = p->second;
- p++;
- dout(-1) << "checking repop tid " << repop->rep_tid << dendl;
+ // artificially ack failed osds
+ deque<RepGather*>::iterator p = repop_queue.begin();
+ while (p != repop_queue.end()) {
+ RepGather *repop = *p++;
+ dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl;
if (repop->waitfor_ack.count(o) ||
+ repop->waitfor_nvram.count(o) ||
repop->waitfor_disk.count(o))
- repop_ack(repop, -1, true, o);
+ repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, o);
}
// remove from pushing map
// apply all local repops
// (pg is inactive; we will repeer)
- for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
- p != rep_gather.end();
+ for (deque<RepGather*>::iterator p = repop_queue.begin();
+ p != repop_queue.end();
p++)
- if (!p->second->applied)
- apply_repop(p->second);
+ if (!(*p)->applied)
+ apply_repop(*p);
- hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
- while (p != rep_gather.end()) {
- RepGather *repop = p->second;
+ deque<RepGather*>::iterator p = repop_queue.begin();
+ while (p != repop_queue.end()) {
+ RepGather *repop = *p;
- if (acting.empty() || acting[0] != osd->get_nodeid()) {
+ if (!is_primary()) {
// no longer primary. hose repops.
- dout(-1) << "no longer primary, aborting repop tid " << repop->rep_tid << dendl;
- rep_gather.erase(p++);
- delete repop->op;
- delete repop;
+ dout(-1) << " aborting repop tid " << repop->rep_tid << dendl;
+ repop->aborted = true;
+ repop_queue.erase(p++);
+ repop_map.erase(repop->rep_tid);
+ repop->put();
} else {
// still primary. artificially ack+commit any replicas who dropped out of the pg
p++;
- dout(-1) << "checking repop tid " << repop->rep_tid << dendl;
+ dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl;
set<int> all;
set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(),
repop->waitfor_ack.begin(), repop->waitfor_ack.end(),
*/
class RepGather {
public:
+ int nref;
+
class MOSDOp *op;
tid_t rep_tid;
ObjectStore::Transaction t;
- bool applied;
+ bool applied, aborted;
pg_stat_t stats;
set<int> waitfor_ack;
set<int> waitfor_nvram;
set<int> waitfor_disk;
+ bool sent_ack, sent_nvram, sent_disk;
utime_t start;
-
- bool sent_ack, sent_nvram, sent_disk;
- set<int> osds;
eversion_t old_version, at_version;
SnapSet snapset;
SnapContext snapc;
- eversion_t pg_local_last_complete;
+ eversion_t pg_local_last_complete;
map<int,eversion_t> pg_complete_thru;
RepGather(MOSDOp *o, tid_t rt, eversion_t av, eversion_t lc,
SnapSet& ss, SnapContext& sc) :
- op(o), rep_tid(rt),
- applied(false),
+ nref(1), op(o), rep_tid(rt),
+ applied(false), aborted(false),
sent_ack(false), sent_nvram(false), sent_disk(false),
at_version(av),
snapset(ss), snapc(sc),
bool can_delete() {
return waitfor_ack.empty() && waitfor_nvram.empty() && waitfor_disk.empty();
}
+
+ void get() {
+ nref++;
+ }
+ void put() {
+ if (--nref == 0) {
+ delete op;
+ delete this;
+ }
+ }
};
protected:
// replica ops
// [primary|tail]
- hash_map<tid_t, RepGather*> rep_gather;
- hash_map<tid_t, list<class Message*> > waiting_for_repop;
-
- // load balancing
- set<object_t> balancing_reads;
- set<object_t> unbalancing_reads;
- hash_map<object_t, list<Message*> > waiting_for_unbalanced_reads; // i.e. primary-lock
+ deque<RepGather*> repop_queue;
+ map<tid_t, RepGather*> repop_map;
- void get_rep_gather(RepGather*);
void apply_repop(RepGather *repop);
- void put_rep_gather(RepGather*);
+ void eval_repop(RepGather*);
void issue_repop(RepGather *repop, int dest, utime_t now);
- RepGather *new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv,
- SnapSet& snapset, SnapContext& snapc);
+ RepGather *new_repop(MOSDOp *op, tid_t rep_tid, eversion_t nv,
+ SnapSet& snapset, SnapContext& snapc);
void repop_ack(RepGather *repop,
int result, int ack_type,
int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
+
+ // load balancing
+ set<object_t> balancing_reads;
+ set<object_t> unbalancing_reads;
+ hash_map<object_t, list<Message*> > waiting_for_unbalanced_reads; // i.e. primary-lock
+
// push/pull
map<object_t, pair<eversion_t, int> > pulling; // which objects are currently being pulled, and from where
// modify
- void op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru);
+ void op_modify_ondisk(RepGather *repop);
void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete);
void _make_clone(ObjectStore::Transaction& t,