]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: RepModify track epoch_started and bail on interval change
authorSamuel Just <sam.just@inktank.com>
Fri, 22 Jun 2012 17:12:26 +0000 (10:12 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:15:02 +0000 (10:15 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 996d4a553390a0989e337fde2f4fd613c941af39..174eb567922479c179277e0d7d17448418fff98d 100644 (file)
@@ -4212,6 +4212,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
   rm->ctx = 0;
   rm->ackerosd = ackerosd;
   rm->last_complete = info.last_complete;
+  rm->epoch_started = get_osdmap()->get_epoch();
 
   if (!m->noop) {
     if (m->logbl.length()) {
@@ -4294,31 +4295,38 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
 {
   lock();
   rm->op->mark_event("sub_op_applied");
-  dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
-  MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
-  assert(m->get_header().type == MSG_OSD_SUBOP);
-
-  if (!rm->committed) {
-    // send ack to acker only if we haven't sent a commit already
-    MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-    ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
-    osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
-  }
 
-  rm->applied = true;
-  bool done = rm->applied && rm->committed;
-
-  assert(info.last_update >= m->version);
-  assert(last_update_applied < m->version);
-  last_update_applied = m->version;
-  if (finalizing_scrub) {
-    assert(active_rep_scrub);
-    if (last_update_applied == active_rep_scrub->scrub_to) {
-      osd->rep_scrub_wq.queue(active_rep_scrub);
-      active_rep_scrub = 0;
+  if (rm->epoch_started >= last_peering_reset) {
+    dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
+    MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
+    assert(m->get_header().type == MSG_OSD_SUBOP);
+    
+    if (!rm->committed) {
+      // send ack to acker only if we haven't sent a commit already
+      MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+      ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+      osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
     }
+    
+    rm->applied = true;
+    
+    assert(info.last_update >= m->version);
+    assert(last_update_applied < m->version);
+    last_update_applied = m->version;
+    if (finalizing_scrub) {
+      assert(active_rep_scrub);
+      if (last_update_applied == active_rep_scrub->scrub_to) {
+       osd->rep_scrub_wq.queue(active_rep_scrub);
+       active_rep_scrub = 0;
+      }
+    }
+  } else {
+    dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request
+            << " from epoch " << rm->epoch_started << " < last_peering_reset "
+            << last_peering_reset << dendl;
   }
 
+  bool done = rm->applied && rm->committed;
   unlock();
   if (done) {
     delete rm->ctx;
@@ -4332,24 +4340,30 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
   lock();
   rm->op->mark_event("sub_op_commit");
 
-  // send commit.
-  dout(10) << "sub_op_modify_commit on op " << *rm->op->request
-           << ", sending commit to osd." << rm->ackerosd
-           << dendl;
-
-  log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
 
-  if (get_osdmap()->is_up(rm->ackerosd)) {
-    last_complete_ondisk = rm->last_complete;
-    MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
-    commit->set_last_complete_ondisk(rm->last_complete);
-    commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
-    osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+  if (rm->epoch_started >= last_peering_reset) {
+    // send commit.
+    dout(10) << "sub_op_modify_commit on op " << *rm->op->request
+            << ", sending commit to osd." << rm->ackerosd
+            << dendl;
+    
+    if (get_osdmap()->is_up(rm->ackerosd)) {
+      last_complete_ondisk = rm->last_complete;
+      MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+      commit->set_last_complete_ondisk(rm->last_complete);
+      commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+      osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+    }
+    
+    rm->committed = true;
+  } else {
+    dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request
+            << " from epoch " << rm->epoch_started << " < last_peering_reset "
+            << last_peering_reset << dendl;
   }
   
-  rm->committed = true;
+  log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
   bool done = rm->applied && rm->committed;
-
   unlock();
   if (done) {
     delete rm->ctx;
index 5bf85c0de7251d745f3c85f0133ee115f0fabfc1..e3dfe8df8d944ac1cecde61d466fc0e54c734ec9 100644 (file)
@@ -693,6 +693,7 @@ protected:
     bool applied, committed;
     int ackerosd;
     eversion_t last_complete;
+    epoch_t epoch_started;
 
     uint64_t bytes_written;