From: Samuel Just Date: Fri, 22 Jun 2012 17:12:26 +0000 (-0700) Subject: ReplicatedPG: RepModify track epoch_started and bail on interval change X-Git-Tag: v0.50~109^2~2^2~16 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f0b2310f84b134cc565edcb9265d90b68a670cf2;p=ceph.git ReplicatedPG: RepModify track epoch_started and bail on interval change Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 996d4a553390..174eb5679224 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -4212,6 +4212,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) rm->ctx = 0; rm->ackerosd = ackerosd; rm->last_complete = info.last_complete; + rm->epoch_started = get_osdmap()->get_epoch(); if (!m->noop) { if (m->logbl.length()) { @@ -4294,31 +4295,38 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) { lock(); rm->op->mark_event("sub_op_applied"); - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl; - MOSDSubOp *m = (MOSDSubOp*)rm->op->request; - assert(m->get_header().type == MSG_OSD_SUBOP); - - if (!rm->committed) { - // send ack to acker only if we haven't sent a commit already - MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! - osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd)); - } - rm->applied = true; - bool done = rm->applied && rm->committed; - - assert(info.last_update >= m->version); - assert(last_update_applied < m->version); - last_update_applied = m->version; - if (finalizing_scrub) { - assert(active_rep_scrub); - if (last_update_applied == active_rep_scrub->scrub_to) { - osd->rep_scrub_wq.queue(active_rep_scrub); - active_rep_scrub = 0; + if (rm->epoch_started >= last_peering_reset) { + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl; + MOSDSubOp *m = (MOSDSubOp*)rm->op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + + if (!rm->committed) { + // send ack to acker only if we haven't sent a commit already + MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! + osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd)); } + + rm->applied = true; + + assert(info.last_update >= m->version); + assert(last_update_applied < m->version); + last_update_applied = m->version; + if (finalizing_scrub) { + assert(active_rep_scrub); + if (last_update_applied == active_rep_scrub->scrub_to) { + osd->rep_scrub_wq.queue(active_rep_scrub); + active_rep_scrub = 0; + } + } + } else { + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request + << " from epoch " << rm->epoch_started << " < last_peering_reset " + << last_peering_reset << dendl; } + bool done = rm->applied && rm->committed; unlock(); if (done) { delete rm->ctx; @@ -4332,24 +4340,30 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) lock(); rm->op->mark_event("sub_op_commit"); - // send commit. - dout(10) << "sub_op_modify_commit on op " << *rm->op->request - << ", sending commit to osd." << rm->ackerosd - << dendl; - - log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat); - if (get_osdmap()->is_up(rm->ackerosd)) { - last_complete_ondisk = rm->last_complete; - MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); - commit->set_last_complete_ondisk(rm->last_complete); - commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! - osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd)); + if (rm->epoch_started >= last_peering_reset) { + // send commit. + dout(10) << "sub_op_modify_commit on op " << *rm->op->request + << ", sending commit to osd." << rm->ackerosd + << dendl; + + if (get_osdmap()->is_up(rm->ackerosd)) { + last_complete_ondisk = rm->last_complete; + MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + commit->set_last_complete_ondisk(rm->last_complete); + commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! + osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd)); + } + + rm->committed = true; + } else { + dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request + << " from epoch " << rm->epoch_started << " < last_peering_reset " + << last_peering_reset << dendl; } - rm->committed = true; + log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat); bool done = rm->applied && rm->committed; - unlock(); if (done) { delete rm->ctx; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5bf85c0de725..e3dfe8df8d94 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -693,6 +693,7 @@ protected: bool applied, committed; int ackerosd; eversion_t last_complete; + epoch_t epoch_started; uint64_t bytes_written;