list<class Message*> waiting_for_active;
hash_map<sobject_t,
list<class Message*> > waiting_for_missing_object, waiting_for_degraded_object;
+ map<eversion_t,list<Message*> > waiting_for_ondisk;
map<eversion_t,class MOSDOp*> replay_queue;
void take_object_waiters(hash_map<sobject_t, list<Message*> >& m);
void queue_snap_trim();
- bool is_dup(osd_reqid_t rid) {
- return log.logged_req(rid);
- }
-
// abstract bits
return;
}
- if (is_dup(ctx->reqid)) {
- dout(3) << "do_op dup " << ctx->reqid << ", doing WRNOOP" << dendl;
- noop = true;
+ eversion_t oldv = log.get_request_version(ctx->reqid);
+ if (oldv != eversion_t()) {
+ dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
+ delete ctx;
+ put_object_context(obc);
+ if (oldv >= last_update_ondisk) {
+ osd->reply_op_error(op, 0);
+ } else {
+ dout(10) << " waiting for " << oldv << " to commit" << dendl;
+ waiting_for_ondisk[oldv].push_back(op);
+ }
+ return;
}
// version
dout(10) << "op_commit " << *repop << dendl;
repop->waitfor_disk.erase(osd->get_nodeid());
//repop->waitfor_nvram.erase(osd->get_nodeid());
+
last_update_ondisk = repop->v;
+ if (waiting_for_ondisk.count(repop->v)) {
+ osd->take_waiters(waiting_for_ondisk[repop->v]);
+ waiting_for_ondisk.erase(repop->v);
+ }
+
last_complete_ondisk = repop->pg_local_last_complete;
eval_repop(repop);
}
{
dout(10) << "on_role_change" << dendl;
+ // take commit waiters
+ for (map<eversion_t, list<Message*> >::iterator p = waiting_for_ondisk.begin();
+ p != waiting_for_ondisk.end();
+ p++)
+ osd->take_waiters(p->second);
+ waiting_for_ondisk.clear();
+
// take object waiters
take_object_waiters(waiting_for_missing_object);
take_object_waiters(waiting_for_degraded_object);