delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
- if (oldv <= last_update_ondisk) {
+ if (already_complete(oldv)) {
osd->reply_op_error(op, 0, oldv);
} else {
dout(10) << " waiting for " << oldv << " to commit" << dendl;
repop->waitfor_ack.erase(whoami);
last_update_ondisk = repop->v;
- if (waiting_for_ondisk.count(repop->v)) {
- osd->requeue_ops(this, waiting_for_ondisk[repop->v]);
- waiting_for_ondisk.erase(repop->v);
- }
last_complete_ondisk = repop->pg_local_last_complete;
eval_repop(repop);
log_op_stats(repop->ctx);
update_stats();
+ if (waiting_for_ondisk.count(repop->v)) {
+ assert(waiting_for_ondisk.begin()->first == repop->v);
+ for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
+ i != waiting_for_ondisk[repop->v].end();
+ ++i) {
+ osd->reply_op_error(*i, 0, repop->v);
+ }
+ waiting_for_ondisk.erase(repop->v);
+ }
+
if (m->wants_ondisk() && !repop->sent_disk) {
// send commit.
MOSDOpReply *reply = repop->ctx->reply;
// replica ops
// [primary|tail]
xlist<RepGather*> repop_queue;
+ bool already_complete(eversion_t v) {
+ for (xlist<RepGather*>::iterator i = repop_queue.begin();
+ !i.end();
+ ++i) {
+ if ((*i)->v > v)
+ break;
+ if (!(*i)->waitfor_disk.empty())
+ return false;
+ }
+ return true;
+ }
map<tid_t, RepGather*> repop_map;
void apply_repop(RepGather *repop);