]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG::eval_repop: check waiting_for_* even if !m 3689/head
authorSamuel Just <sjust@redhat.com>
Thu, 5 Feb 2015 17:51:08 +0000 (09:51 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 9 Feb 2015 17:17:44 +0000 (09:17 -0800)
There can be client ops waiting on a promote now.

Fixes: 10771
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedPG.cc

index b615e146b0f687f22e692d919a9221201837c5d4..eb3d83c3ab6e2359d9acb960806ad9aa1b886d8e 100644 (file)
@@ -7409,99 +7409,92 @@ void ReplicatedPG::eval_repop(RepGather *repop)
   if (repop->rep_done)
     return;
 
-  if (m) {
-
-    // an 'ondisk' reply implies 'ack'. so, prefer to send just one
-    // ondisk instead of ack followed by ondisk.
-
-    // ondisk?
-    if (repop->all_committed) {
-
-      if (!repop->log_op_stat) {
-        log_op_stats(repop->ctx);
-        repop->log_op_stat = true;
-      }
-      publish_stats_to_osd();
+  // ondisk?
+  if (repop->all_committed) {
+    if (repop->ctx->op && !repop->log_op_stat) {
+      log_op_stats(repop->ctx);
+      repop->log_op_stat = true;
+    }
+    publish_stats_to_osd();
 
-      // send dup commits, in order
-      if (waiting_for_ondisk.count(repop->v)) {
-       assert(waiting_for_ondisk.begin()->first == repop->v);
-       for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
-            i != waiting_for_ondisk[repop->v].end();
-            ++i) {
-         osd->reply_op_error(*i, 0, repop->ctx->at_version,
-                             repop->ctx->user_at_version);
-       }
-       waiting_for_ondisk.erase(repop->v);
+    // send dup commits, in order
+    if (waiting_for_ondisk.count(repop->v)) {
+      assert(waiting_for_ondisk.begin()->first == repop->v);
+      for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
+          i != waiting_for_ondisk[repop->v].end();
+          ++i) {
+       osd->reply_op_error(*i, 0, repop->ctx->at_version,
+                           repop->ctx->user_at_version);
       }
+      waiting_for_ondisk.erase(repop->v);
+    }
 
-      // clear out acks, we sent the commits above
-      if (waiting_for_ack.count(repop->v)) {
-       assert(waiting_for_ack.begin()->first == repop->v);
-       waiting_for_ack.erase(repop->v);
-      }
+    // clear out acks, we sent the commits above
+    if (waiting_for_ack.count(repop->v)) {
+      assert(waiting_for_ack.begin()->first == repop->v);
+      waiting_for_ack.erase(repop->v);
+    }
 
-      if (m->wants_ondisk() && !repop->sent_disk) {
-       // send commit.
-       MOSDOpReply *reply = repop->ctx->reply;
-       if (reply)
-         repop->ctx->reply = NULL;
-       else {
-         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
-         reply->set_reply_versions(repop->ctx->at_version,
-                                   repop->ctx->user_at_version);
-       }
-       reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-       dout(10) << " sending commit on " << *repop << " " << reply << dendl;
-       osd->send_message_osd_client(reply, m->get_connection());
-       repop->sent_disk = true;
-       repop->ctx->op->mark_commit_sent();
+    if (m && m->wants_ondisk() && !repop->sent_disk) {
+      // send commit.
+      MOSDOpReply *reply = repop->ctx->reply;
+      if (reply)
+       repop->ctx->reply = NULL;
+      else {
+       reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+       reply->set_reply_versions(repop->ctx->at_version,
+                                 repop->ctx->user_at_version);
       }
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      dout(10) << " sending commit on " << *repop << " " << reply << dendl;
+      osd->send_message_osd_client(reply, m->get_connection());
+      repop->sent_disk = true;
+      repop->ctx->op->mark_commit_sent();
     }
+  }
 
-    // applied?
-    if (repop->all_applied) {
+  // applied?
+  if (repop->all_applied) {
 
-      // send dup acks, in order
-      if (waiting_for_ack.count(repop->v)) {
-       assert(waiting_for_ack.begin()->first == repop->v);
-       for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
-            i != waiting_for_ack[repop->v].end();
-            ++i) {
-         MOSDOp *m = (MOSDOp*)(*i)->get_req();
-         MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
-         reply->set_reply_versions(repop->ctx->at_version,
-                                   repop->ctx->user_at_version);
-         reply->add_flags(CEPH_OSD_FLAG_ACK);
-         osd->send_message_osd_client(reply, m->get_connection());
-       }
-       waiting_for_ack.erase(repop->v);
-      }
-
-      if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
-       // send ack
-       MOSDOpReply *reply = repop->ctx->reply;
-       if (reply)
-         repop->ctx->reply = NULL;
-       else {
-         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
-         reply->set_reply_versions(repop->ctx->at_version,
-                                   repop->ctx->user_at_version);
-       }
+    // send dup acks, in order
+    if (waiting_for_ack.count(repop->v)) {
+      assert(waiting_for_ack.begin()->first == repop->v);
+      for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
+          i != waiting_for_ack[repop->v].end();
+          ++i) {
+       MOSDOp *m = (MOSDOp*)(*i)->get_req();
+       MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+       reply->set_reply_versions(repop->ctx->at_version,
+                                 repop->ctx->user_at_version);
        reply->add_flags(CEPH_OSD_FLAG_ACK);
-       dout(10) << " sending ack on " << *repop << " " << reply << dendl;
-        assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
        osd->send_message_osd_client(reply, m->get_connection());
-       repop->sent_ack = true;
       }
-
-      // note the write is now readable (for rlatency calc).  note
-      // that this will only be defined if the write is readable
-      // _prior_ to being committed; it will not get set with
-      // writeahead journaling, for instance.
-      if (repop->ctx->readable_stamp == utime_t())
-       repop->ctx->readable_stamp = ceph_clock_now(cct);
+      waiting_for_ack.erase(repop->v);
     }
+
+    if (m && m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
+      // send ack
+      MOSDOpReply *reply = repop->ctx->reply;
+      if (reply)
+       repop->ctx->reply = NULL;
+      else {
+       reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+       reply->set_reply_versions(repop->ctx->at_version,
+                                 repop->ctx->user_at_version);
+      }
+      reply->add_flags(CEPH_OSD_FLAG_ACK);
+      dout(10) << " sending ack on " << *repop << " " << reply << dendl;
+      assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+      osd->send_message_osd_client(reply, m->get_connection());
+      repop->sent_ack = true;
+    }
+
+    // note the write is now readable (for rlatency calc).  note
+    // that this will only be defined if the write is readable
+    // _prior_ to being committed; it will not get set with
+    // writeahead journaling, for instance.
+    if (repop->ctx->readable_stamp == utime_t())
+      repop->ctx->readable_stamp = ceph_clock_now(cct);
   }
 
   // done.