rm->ctx = 0;
rm->ackerosd = ackerosd;
rm->last_complete = info.last_complete;
+ rm->epoch_started = get_osdmap()->get_epoch();
if (!m->noop) {
if (m->logbl.length()) {
{
lock();
rm->op->mark_event("sub_op_applied");
- dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
- MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
- assert(m->get_header().type == MSG_OSD_SUBOP);
-
- if (!rm->committed) {
- // send ack to acker only if we haven't sent a commit already
- MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
- }
- rm->applied = true;
- bool done = rm->applied && rm->committed;
-
- assert(info.last_update >= m->version);
- assert(last_update_applied < m->version);
- last_update_applied = m->version;
- if (finalizing_scrub) {
- assert(active_rep_scrub);
- if (last_update_applied == active_rep_scrub->scrub_to) {
- osd->rep_scrub_wq.queue(active_rep_scrub);
- active_rep_scrub = 0;
+ if (rm->epoch_started >= last_peering_reset) {
+ dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
+ MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+
+ if (!rm->committed) {
+ // send ack to acker only if we haven't sent a commit already
+ MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+ osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
}
+
+ rm->applied = true;
+
+ assert(info.last_update >= m->version);
+ assert(last_update_applied < m->version);
+ last_update_applied = m->version;
+ if (finalizing_scrub) {
+ assert(active_rep_scrub);
+ if (last_update_applied == active_rep_scrub->scrub_to) {
+ osd->rep_scrub_wq.queue(active_rep_scrub);
+ active_rep_scrub = 0;
+ }
+ }
+ } else {
+ dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request
+ << " from epoch " << rm->epoch_started << " < last_peering_reset "
+ << last_peering_reset << dendl;
}
+ bool done = rm->applied && rm->committed;
unlock();
if (done) {
delete rm->ctx;
lock();
rm->op->mark_event("sub_op_commit");
- // send commit.
- dout(10) << "sub_op_modify_commit on op " << *rm->op->request
- << ", sending commit to osd." << rm->ackerosd
- << dendl;
-
- log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
- if (get_osdmap()->is_up(rm->ackerosd)) {
- last_complete_ondisk = rm->last_complete;
- MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
- commit->set_last_complete_ondisk(rm->last_complete);
- commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ if (rm->epoch_started >= last_peering_reset) {
+ // send commit.
+ dout(10) << "sub_op_modify_commit on op " << *rm->op->request
+ << ", sending commit to osd." << rm->ackerosd
+ << dendl;
+
+ if (get_osdmap()->is_up(rm->ackerosd)) {
+ last_complete_ondisk = rm->last_complete;
+ MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ commit->set_last_complete_ondisk(rm->last_complete);
+ commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+ osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ }
+
+ rm->committed = true;
+ } else {
+ dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request
+ << " from epoch " << rm->epoch_started << " < last_peering_reset "
+ << last_peering_reset << dendl;
}
- rm->committed = true;
+ log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
bool done = rm->applied && rm->committed;
-
unlock();
if (done) {
delete rm->ctx;