From cc1b2c6f342b17d6e304560c23f4ce310d6690d9 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 12 Feb 2016 09:32:45 -0800 Subject: [PATCH] ReplicatedPG: move client reply handling out of eval_repop execute_ctx is the only path which actually has a client op, let's clarify eval_repop by moving that logic inline there. This also eliminates most of the repop->ctx users from eval_repop. Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 100 ++++++++++++++++++++++------------------ src/osd/ReplicatedPG.h | 14 ++---- 2 files changed, 59 insertions(+), 55 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d383058997fae..3452cf9cb8d35 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2961,6 +2961,59 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) } } + // no need to capture PG ref, repop cancel will handle that + // Can capture the ctx by pointer, it's owned by the repop + ctx->register_on_applied( + [m, ctx, this](){ + if (m && m->wants_ack() && !ctx->sent_ack && !ctx->sent_disk) { + // send ack + MOSDOpReply *reply = ctx->reply; + if (reply) + ctx->reply = NULL; + else { + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); + reply->set_reply_versions(ctx->at_version, + ctx->user_at_version); + } + reply->add_flags(CEPH_OSD_FLAG_ACK); + dout(10) << " sending ack: " << *m << " " << reply << dendl; + osd->send_message_osd_client(reply, m->get_connection()); + ctx->sent_ack = true; + } + + // note the write is now readable (for rlatency calc). note + // that this will only be defined if the write is readable + // _prior_ to being committed; it will not get set with + // writeahead journaling, for instance. + if (ctx->readable_stamp == utime_t()) + ctx->readable_stamp = ceph_clock_now(cct); + }); + ctx->register_on_commit( + [m, ctx, this](){ + if (ctx->op) + log_op_stats( + ctx); + + publish_stats_to_osd(); + + if (m && m->wants_ondisk() && !ctx->sent_disk) { + // send commit. + MOSDOpReply *reply = ctx->reply; + if (reply) + ctx->reply = NULL; + else { + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); + reply->set_reply_versions(ctx->at_version, + ctx->user_at_version); + } + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + dout(10) << " sending commit on " << *m << " " << reply << dendl; + osd->send_message_osd_client(reply, m->get_connection()); + ctx->sent_disk = true; + ctx->op->mark_commit_sent(); + } + }); + // issue replica writes ceph_tid_t rep_tid = osd->get_tid(); RepGather *repop = new_repop(ctx, obc, rep_tid); // new repop claims our obc, src_obc refs @@ -8202,18 +8255,12 @@ void ReplicatedPG::eval_repop(RepGather *repop) // ondisk? if (repop->all_committed) { + dout(10) << " commit: " << *repop << dendl; for (auto p = repop->on_committed.begin(); p != repop->on_committed.end(); repop->on_committed.erase(p++)) { (*p)(); } - - if (repop->ctx->op && !repop->log_op_stat) { - log_op_stats(repop->ctx); - repop->log_op_stat = true; - } - publish_stats_to_osd(); - // send dup commits, in order if (waiting_for_ondisk.count(repop->v)) { assert(waiting_for_ondisk.begin()->first == repop->v); @@ -8233,22 +8280,6 @@ void ReplicatedPG::eval_repop(RepGather *repop) waiting_for_ack.erase(repop->v); } - if (m && m->wants_ondisk() && !repop->sent_disk) { - // send commit. - MOSDOpReply *reply = repop->ctx->reply; - if (reply) - repop->ctx->reply = NULL; - else { - reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(repop->ctx->at_version, - repop->ctx->user_at_version); - } - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - dout(10) << " sending commit on " << *repop << " " << reply << dendl; - osd->send_message_osd_client(reply, m->get_connection()); - repop->sent_disk = true; - repop->ctx->op->mark_commit_sent(); - } } // applied? @@ -8276,29 +8307,6 @@ void ReplicatedPG::eval_repop(RepGather *repop) } waiting_for_ack.erase(repop->v); } - - if (m && m->wants_ack() && !repop->sent_ack && !repop->sent_disk) { - // send ack - MOSDOpReply *reply = repop->ctx->reply; - if (reply) - repop->ctx->reply = NULL; - else { - reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(repop->ctx->at_version, - repop->ctx->user_at_version); - } - reply->add_flags(CEPH_OSD_FLAG_ACK); - dout(10) << " sending ack on " << *repop << " " << reply << dendl; - osd->send_message_osd_client(reply, m->get_connection()); - repop->sent_ack = true; - } - - // note the write is now readable (for rlatency calc). note - // that this will only be defined if the write is readable - // _prior_ to being committed; it will not get set with - // writeahead journaling, for instance. - if (repop->ctx->readable_stamp == utime_t()) - repop->ctx->readable_stamp = ceph_clock_now(cct); } // done. diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 0f05374934961..d458a5495da6a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -579,6 +579,8 @@ public: on_committed.emplace_back(std::move(f)); } + bool sent_ack; + bool sent_disk; void apply_pending_attrs() { for (map> on_committed; list> on_success; list> on_finish; - bool log_op_stat; RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt, eversion_t lc) : @@ -742,15 +741,12 @@ public: ctx(c), obc(pi), rep_tid(rt), rep_aborted(false), rep_done(false), - all_applied(false), all_committed(false), sent_ack(false), - //sent_nvram(false), - sent_disk(false), + all_applied(false), all_committed(false), pg_local_last_complete(lc), on_applied(std::move(c->on_applied)), on_committed(std::move(c->on_committed)), on_success(std::move(c->on_success)), - on_finish(std::move(c->on_finish)), - log_op_stat(false) { } + on_finish(std::move(c->on_finish)) {} RepGather *get() { nref++; -- 2.39.5