From 78b1fb5e9440dcd4ef99301c3ac857385e870cf3 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 5 Feb 2015 09:51:08 -0800 Subject: [PATCH] ReplicatedPG::eval_repop: check waiting_for_* even if !m There can be client ops waiting on a promote now. Fixes: 10771 Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 155 +++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 81 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b615e146b0f6..eb3d83c3ab6e 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -7409,99 +7409,92 @@ void ReplicatedPG::eval_repop(RepGather *repop) if (repop->rep_done) return; - if (m) { - - // an 'ondisk' reply implies 'ack'. so, prefer to send just one - // ondisk instead of ack followed by ondisk. - - // ondisk? - if (repop->all_committed) { - - if (!repop->log_op_stat) { - log_op_stats(repop->ctx); - repop->log_op_stat = true; - } - publish_stats_to_osd(); + // ondisk? + if (repop->all_committed) { + if (repop->ctx->op && !repop->log_op_stat) { + log_op_stats(repop->ctx); + repop->log_op_stat = true; + } + publish_stats_to_osd(); - // send dup commits, in order - if (waiting_for_ondisk.count(repop->v)) { - assert(waiting_for_ondisk.begin()->first == repop->v); - for (list::iterator i = waiting_for_ondisk[repop->v].begin(); - i != waiting_for_ondisk[repop->v].end(); - ++i) { - osd->reply_op_error(*i, 0, repop->ctx->at_version, - repop->ctx->user_at_version); - } - waiting_for_ondisk.erase(repop->v); + // send dup commits, in order + if (waiting_for_ondisk.count(repop->v)) { + assert(waiting_for_ondisk.begin()->first == repop->v); + for (list::iterator i = waiting_for_ondisk[repop->v].begin(); + i != waiting_for_ondisk[repop->v].end(); + ++i) { + osd->reply_op_error(*i, 0, repop->ctx->at_version, + repop->ctx->user_at_version); } + waiting_for_ondisk.erase(repop->v); + } - // clear out acks, we sent the commits above - if (waiting_for_ack.count(repop->v)) { - assert(waiting_for_ack.begin()->first == repop->v); - waiting_for_ack.erase(repop->v); - } + // clear out acks, we sent the commits above + if (waiting_for_ack.count(repop->v)) { + assert(waiting_for_ack.begin()->first == repop->v); + waiting_for_ack.erase(repop->v); + } - if (m->wants_ondisk() && !repop->sent_disk) { - // send commit. - MOSDOpReply *reply = repop->ctx->reply; - if (reply) - repop->ctx->reply = NULL; - else { - reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(repop->ctx->at_version, - repop->ctx->user_at_version); - } - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - dout(10) << " sending commit on " << *repop << " " << reply << dendl; - osd->send_message_osd_client(reply, m->get_connection()); - repop->sent_disk = true; - repop->ctx->op->mark_commit_sent(); + if (m && m->wants_ondisk() && !repop->sent_disk) { + // send commit. + MOSDOpReply *reply = repop->ctx->reply; + if (reply) + repop->ctx->reply = NULL; + else { + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); + reply->set_reply_versions(repop->ctx->at_version, + repop->ctx->user_at_version); } + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + dout(10) << " sending commit on " << *repop << " " << reply << dendl; + osd->send_message_osd_client(reply, m->get_connection()); + repop->sent_disk = true; + repop->ctx->op->mark_commit_sent(); } + } - // applied? - if (repop->all_applied) { + // applied? + if (repop->all_applied) { - // send dup acks, in order - if (waiting_for_ack.count(repop->v)) { - assert(waiting_for_ack.begin()->first == repop->v); - for (list::iterator i = waiting_for_ack[repop->v].begin(); - i != waiting_for_ack[repop->v].end(); - ++i) { - MOSDOp *m = (MOSDOp*)(*i)->get_req(); - MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(repop->ctx->at_version, - repop->ctx->user_at_version); - reply->add_flags(CEPH_OSD_FLAG_ACK); - osd->send_message_osd_client(reply, m->get_connection()); - } - waiting_for_ack.erase(repop->v); - } - - if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) { - // send ack - MOSDOpReply *reply = repop->ctx->reply; - if (reply) - repop->ctx->reply = NULL; - else { - reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(repop->ctx->at_version, - repop->ctx->user_at_version); - } + // send dup acks, in order + if (waiting_for_ack.count(repop->v)) { + assert(waiting_for_ack.begin()->first == repop->v); + for (list::iterator i = waiting_for_ack[repop->v].begin(); + i != waiting_for_ack[repop->v].end(); + ++i) { + MOSDOp *m = (MOSDOp*)(*i)->get_req(); + MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); + reply->set_reply_versions(repop->ctx->at_version, + repop->ctx->user_at_version); reply->add_flags(CEPH_OSD_FLAG_ACK); - dout(10) << " sending ack on " << *repop << " " << reply << dendl; - assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); osd->send_message_osd_client(reply, m->get_connection()); - repop->sent_ack = true; } - - // note the write is now readable (for rlatency calc). note - // that this will only be defined if the write is readable - // _prior_ to being committed; it will not get set with - // writeahead journaling, for instance. - if (repop->ctx->readable_stamp == utime_t()) - repop->ctx->readable_stamp = ceph_clock_now(cct); + waiting_for_ack.erase(repop->v); } + + if (m && m->wants_ack() && !repop->sent_ack && !repop->sent_disk) { + // send ack + MOSDOpReply *reply = repop->ctx->reply; + if (reply) + repop->ctx->reply = NULL; + else { + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); + reply->set_reply_versions(repop->ctx->at_version, + repop->ctx->user_at_version); + } + reply->add_flags(CEPH_OSD_FLAG_ACK); + dout(10) << " sending ack on " << *repop << " " << reply << dendl; + assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); + osd->send_message_osd_client(reply, m->get_connection()); + repop->sent_ack = true; + } + + // note the write is now readable (for rlatency calc). note + // that this will only be defined if the write is readable + // _prior_ to being committed; it will not get set with + // writeahead journaling, for instance. + if (repop->ctx->readable_stamp == utime_t()) + repop->ctx->readable_stamp = ceph_clock_now(cct); } // done. -- 2.47.3