]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: move client reply handling out of eval_repop
authorSamuel Just <sjust@redhat.com>
Fri, 12 Feb 2016 17:32:45 +0000 (09:32 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 25 Feb 2016 18:56:40 +0000 (10:56 -0800)
execute_ctx is the only path which actually has a client op,
let's clarify eval_repop by moving that logic inline there.
This also eliminates most of the repop->ctx users from
eval_repop.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index d383058997fae8c82129722e159e8040ca8e8543..3452cf9cb8d35cb86814a4fbe6325bebae2c4e9c 100644 (file)
@@ -2961,6 +2961,59 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
     }
   }
 
+  // no need to capture PG ref, repop cancel will handle that
+  // Can capture the ctx by pointer, it's owned by the repop
+  ctx->register_on_applied(
+    [m, ctx, this](){
+      if (m && m->wants_ack() && !ctx->sent_ack && !ctx->sent_disk) {
+       // send ack
+       MOSDOpReply *reply = ctx->reply;
+       if (reply)
+         ctx->reply = NULL;
+       else {
+         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+         reply->set_reply_versions(ctx->at_version,
+                                   ctx->user_at_version);
+       }
+       reply->add_flags(CEPH_OSD_FLAG_ACK);
+       dout(10) << " sending ack: " << *m << " " << reply << dendl;
+       osd->send_message_osd_client(reply, m->get_connection());
+       ctx->sent_ack = true;
+      }
+
+      // note the write is now readable (for rlatency calc).  note
+      // that this will only be defined if the write is readable
+      // _prior_ to being committed; it will not get set with
+      // writeahead journaling, for instance.
+      if (ctx->readable_stamp == utime_t())
+       ctx->readable_stamp = ceph_clock_now(cct);
+    });
+  ctx->register_on_commit(
+    [m, ctx, this](){
+      if (ctx->op)
+       log_op_stats(
+         ctx);
+
+      publish_stats_to_osd();
+
+      if (m && m->wants_ondisk() && !ctx->sent_disk) {
+       // send commit.
+       MOSDOpReply *reply = ctx->reply;
+       if (reply)
+         ctx->reply = NULL;
+       else {
+         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+         reply->set_reply_versions(ctx->at_version,
+                                   ctx->user_at_version);
+       }
+       reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+       dout(10) << " sending commit on " << *m << " " << reply << dendl;
+       osd->send_message_osd_client(reply, m->get_connection());
+       ctx->sent_disk = true;
+       ctx->op->mark_commit_sent();
+      }
+    });
+
   // issue replica writes
   ceph_tid_t rep_tid = osd->get_tid();
   RepGather *repop = new_repop(ctx, obc, rep_tid);  // new repop claims our obc, src_obc refs
@@ -8202,18 +8255,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
   // ondisk?
   if (repop->all_committed) {
+    dout(10) << " commit: " << *repop << dendl;
     for (auto p = repop->on_committed.begin();
         p != repop->on_committed.end();
         repop->on_committed.erase(p++)) {
       (*p)();
     }
-
-    if (repop->ctx->op && !repop->log_op_stat) {
-      log_op_stats(repop->ctx);
-      repop->log_op_stat = true;
-    }
-    publish_stats_to_osd();
-
     // send dup commits, in order
     if (waiting_for_ondisk.count(repop->v)) {
       assert(waiting_for_ondisk.begin()->first == repop->v);
@@ -8233,22 +8280,6 @@ void ReplicatedPG::eval_repop(RepGather *repop)
       waiting_for_ack.erase(repop->v);
     }
 
-    if (m && m->wants_ondisk() && !repop->sent_disk) {
-      // send commit.
-      MOSDOpReply *reply = repop->ctx->reply;
-      if (reply)
-       repop->ctx->reply = NULL;
-      else {
-       reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
-       reply->set_reply_versions(repop->ctx->at_version,
-                                 repop->ctx->user_at_version);
-      }
-      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-      dout(10) << " sending commit on " << *repop << " " << reply << dendl;
-      osd->send_message_osd_client(reply, m->get_connection());
-      repop->sent_disk = true;
-      repop->ctx->op->mark_commit_sent();
-    }
   }
 
   // applied?
@@ -8276,29 +8307,6 @@ void ReplicatedPG::eval_repop(RepGather *repop)
       }
       waiting_for_ack.erase(repop->v);
     }
-
-    if (m && m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
-      // send ack
-      MOSDOpReply *reply = repop->ctx->reply;
-      if (reply)
-       repop->ctx->reply = NULL;
-      else {
-       reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
-       reply->set_reply_versions(repop->ctx->at_version,
-                                 repop->ctx->user_at_version);
-      }
-      reply->add_flags(CEPH_OSD_FLAG_ACK);
-      dout(10) << " sending ack on " << *repop << " " << reply << dendl;
-      osd->send_message_osd_client(reply, m->get_connection());
-      repop->sent_ack = true;
-    }
-
-    // note the write is now readable (for rlatency calc).  note
-    // that this will only be defined if the write is readable
-    // _prior_ to being committed; it will not get set with
-    // writeahead journaling, for instance.
-    if (repop->ctx->readable_stamp == utime_t())
-      repop->ctx->readable_stamp = ceph_clock_now(cct);
   }
 
   // done.
index 0f0537493496161cb8b1c88909513746d152f8f2..d458a5495da6a7b5d55eecc03169eb399631ef4d 100644 (file)
@@ -579,6 +579,8 @@ public:
       on_committed.emplace_back(std::move(f));
     }
 
+    bool sent_ack;
+    bool sent_disk;
 
     void apply_pending_attrs() {
       for (map<ObjectContextRef,
@@ -643,6 +645,7 @@ public:
       num_read(0),
       num_write(0),
       copy_cb(NULL),
+      sent_ack(false), sent_disk(false),
       async_read_result(0),
       inflightreads(0),
       lock_to_release(NONE),
@@ -721,9 +724,6 @@ public:
 
     bool all_applied;
     bool all_committed;
-    bool sent_ack;
-    //bool sent_nvram;
-    bool sent_disk;
     
     utime_t   start;
     
@@ -733,7 +733,6 @@ public:
     list<std::function<void()>> on_committed;
     list<std::function<void()>> on_success;
     list<std::function<void()>> on_finish;
-    bool log_op_stat;
     
     RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt,
              eversion_t lc) :
@@ -742,15 +741,12 @@ public:
       ctx(c), obc(pi),
       rep_tid(rt), 
       rep_aborted(false), rep_done(false),
-      all_applied(false), all_committed(false), sent_ack(false),
-      //sent_nvram(false),
-      sent_disk(false),
+      all_applied(false), all_committed(false),
       pg_local_last_complete(lc),
       on_applied(std::move(c->on_applied)),
       on_committed(std::move(c->on_committed)),
       on_success(std::move(c->on_success)),
-      on_finish(std::move(c->on_finish)),
-      log_op_stat(false) { }
+      on_finish(std::move(c->on_finish)) {}
 
     RepGather *get() {
       nref++;