From: Sage Weil Date: Mon, 23 Jul 2012 23:51:03 +0000 (-0700) Subject: osd: fix ACK ordering on resent ops X-Git-Tag: v0.50~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=67832c34a228c5cff1941eab3225969b5e66e44b;p=ceph.git osd: fix ACK ordering on resent ops The wait_for_ondisk handling fixed COMMIT ordering, but the ACKs need to go back in the same order too. For example: - op A is queued - client disconnects, both ACK and COMMIT replies are lost - client reconnects - op A and B are sent - op A is queued - op B is applied, ACK is sent - op A and B COMMITs are sent -> client's ack callbacks will see B and then A. Fix this by creating a waiting_for_ack queue as well, and sending ACK responses as needed. Also handle the case where the ACK should be sent immediately when the retry event is received. Fixes: #2823 Signed-off-by: Sage Weil Reviewed-by: Mike Ryan --- diff --git a/src/osd/PG.h b/src/osd/PG.h index 1f7bb6285c7..53358215de7 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -627,7 +627,7 @@ protected: list waiting_for_all_missing; map > waiting_for_missing_object, waiting_for_degraded_object; - map > waiting_for_ondisk; + map > waiting_for_ack, waiting_for_ondisk; map replay_queue; void requeue_object_waiters(map >& m); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 16af52ea022..79efb7cff81 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -849,8 +849,18 @@ void ReplicatedPG::do_op(OpRequestRef op) if (already_complete(oldv)) { osd->reply_op_error(op, 0, oldv); } else { + if (m->wants_ack()) { + if (already_ack(oldv)) { + MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); + reply->add_flags(CEPH_OSD_FLAG_ACK); + osd->client_messenger->send_message(reply, m->get_connection()); + } else { + dout(10) << " waiting for " << oldv << " to ack" << dendl; + waiting_for_ack[oldv].push_back(op); + } + } dout(10) << " waiting for " << oldv << " to commit" << dendl; - waiting_for_ondisk[oldv].push_back(op); + waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed op->mark_delayed(); } return; @@ -3533,6 +3543,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) log_op_stats(repop->ctx); update_stats(); + // send dup commits, in order if (waiting_for_ondisk.count(repop->v)) { assert(waiting_for_ondisk.begin()->first == repop->v); for (list::iterator i = waiting_for_ondisk[repop->v].begin(); @@ -3560,6 +3571,21 @@ void ReplicatedPG::eval_repop(RepGather *repop) // applied? if (repop->waitfor_ack.empty()) { + + // send dup acks, in order + if (waiting_for_ack.count(repop->v)) { + assert(waiting_for_ack.begin()->first == repop->v); + for (list::iterator i = waiting_for_ack[repop->v].begin(); + i != waiting_for_ack[repop->v].end(); + ++i) { + MOSDOp *m = (MOSDOp*)(*i)->request; + MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); + reply->add_flags(CEPH_OSD_FLAG_ACK); + osd->client_messenger->send_message(reply, m->get_connection()); + } + waiting_for_ack.erase(repop->v); + } + if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) { // send ack MOSDOpReply *reply = repop->ctx->reply; @@ -5785,6 +5811,7 @@ void ReplicatedPG::on_role_change() p++) requeue_ops(p->second); waiting_for_ondisk.clear(); + waiting_for_ack.clear(); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index fb9861b0405..a96f62df01e 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -470,17 +470,6 @@ protected: // replica ops // [primary|tail] xlist repop_queue; - bool already_complete(eversion_t v) { - for (xlist::iterator i = repop_queue.begin(); - !i.end(); - ++i) { - if ((*i)->v > v) - break; - if (!(*i)->waitfor_disk.empty()) - return false; - } - return true; - } map repop_map; void apply_repop(RepGather *repop); @@ -495,6 +484,31 @@ protected: int result, int ack_type, int fromosd, eversion_t pg_complete_thru=eversion_t(0,0)); + /// true if we can send an ondisk/commit for v + bool already_complete(eversion_t v) { + for (xlist::iterator i = repop_queue.begin(); + !i.end(); + ++i) { + if ((*i)->v > v) + break; + if (!(*i)->waitfor_disk.empty()) + return false; + } + return true; + } + /// true if we can send an ack for v + bool already_ack(eversion_t v) { + for (xlist::iterator i = repop_queue.begin(); + !i.end(); + ++i) { + if ((*i)->v > v) + break; + if (!(*i)->waitfor_ack.empty()) + return false; + } + return true; + } + friend class C_OSD_OpCommit; friend class C_OSD_OpApplied;