if (repop->rep_done)
return;
- if (m) {
-
- // an 'ondisk' reply implies 'ack'. so, prefer to send just one
- // ondisk instead of ack followed by ondisk.
-
- // ondisk?
- if (repop->all_committed) {
-
- if (!repop->log_op_stat) {
- log_op_stats(repop->ctx);
- repop->log_op_stat = true;
- }
- publish_stats_to_osd();
+ // ondisk?
+ if (repop->all_committed) {
+ if (repop->ctx->op && !repop->log_op_stat) {
+ log_op_stats(repop->ctx);
+ repop->log_op_stat = true;
+ }
+ publish_stats_to_osd();
- // send dup commits, in order
- if (waiting_for_ondisk.count(repop->v)) {
- assert(waiting_for_ondisk.begin()->first == repop->v);
- for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
- i != waiting_for_ondisk[repop->v].end();
- ++i) {
- osd->reply_op_error(*i, 0, repop->ctx->at_version,
- repop->ctx->user_at_version);
- }
- waiting_for_ondisk.erase(repop->v);
+ // send dup commits, in order
+ if (waiting_for_ondisk.count(repop->v)) {
+ assert(waiting_for_ondisk.begin()->first == repop->v);
+ for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
+ i != waiting_for_ondisk[repop->v].end();
+ ++i) {
+ osd->reply_op_error(*i, 0, repop->ctx->at_version,
+ repop->ctx->user_at_version);
}
+ waiting_for_ondisk.erase(repop->v);
+ }
- // clear out acks, we sent the commits above
- if (waiting_for_ack.count(repop->v)) {
- assert(waiting_for_ack.begin()->first == repop->v);
- waiting_for_ack.erase(repop->v);
- }
+ // clear out acks, we sent the commits above
+ if (waiting_for_ack.count(repop->v)) {
+ assert(waiting_for_ack.begin()->first == repop->v);
+ waiting_for_ack.erase(repop->v);
+ }
- if (m->wants_ondisk() && !repop->sent_disk) {
- // send commit.
- MOSDOpReply *reply = repop->ctx->reply;
- if (reply)
- repop->ctx->reply = NULL;
- else {
- reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
- reply->set_reply_versions(repop->ctx->at_version,
- repop->ctx->user_at_version);
- }
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- dout(10) << " sending commit on " << *repop << " " << reply << dendl;
- osd->send_message_osd_client(reply, m->get_connection());
- repop->sent_disk = true;
- repop->ctx->op->mark_commit_sent();
+ if (m && m->wants_ondisk() && !repop->sent_disk) {
+ // send commit.
+ MOSDOpReply *reply = repop->ctx->reply;
+ if (reply)
+ repop->ctx->reply = NULL;
+ else {
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
}
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ dout(10) << " sending commit on " << *repop << " " << reply << dendl;
+ osd->send_message_osd_client(reply, m->get_connection());
+ repop->sent_disk = true;
+ repop->ctx->op->mark_commit_sent();
}
+ }
- // applied?
- if (repop->all_applied) {
+ // applied?
+ if (repop->all_applied) {
- // send dup acks, in order
- if (waiting_for_ack.count(repop->v)) {
- assert(waiting_for_ack.begin()->first == repop->v);
- for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
- i != waiting_for_ack[repop->v].end();
- ++i) {
- MOSDOp *m = (MOSDOp*)(*i)->get_req();
- MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
- reply->set_reply_versions(repop->ctx->at_version,
- repop->ctx->user_at_version);
- reply->add_flags(CEPH_OSD_FLAG_ACK);
- osd->send_message_osd_client(reply, m->get_connection());
- }
- waiting_for_ack.erase(repop->v);
- }
-
- if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
- // send ack
- MOSDOpReply *reply = repop->ctx->reply;
- if (reply)
- repop->ctx->reply = NULL;
- else {
- reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
- reply->set_reply_versions(repop->ctx->at_version,
- repop->ctx->user_at_version);
- }
+ // send dup acks, in order
+ if (waiting_for_ack.count(repop->v)) {
+ assert(waiting_for_ack.begin()->first == repop->v);
+ for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
+ i != waiting_for_ack[repop->v].end();
+ ++i) {
+ MOSDOp *m = (MOSDOp*)(*i)->get_req();
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
reply->add_flags(CEPH_OSD_FLAG_ACK);
- dout(10) << " sending ack on " << *repop << " " << reply << dendl;
- assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
osd->send_message_osd_client(reply, m->get_connection());
- repop->sent_ack = true;
}
-
- // note the write is now readable (for rlatency calc). note
- // that this will only be defined if the write is readable
- // _prior_ to being committed; it will not get set with
- // writeahead journaling, for instance.
- if (repop->ctx->readable_stamp == utime_t())
- repop->ctx->readable_stamp = ceph_clock_now(cct);
+ waiting_for_ack.erase(repop->v);
}
+
+ if (m && m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
+ // send ack
+ MOSDOpReply *reply = repop->ctx->reply;
+ if (reply)
+ repop->ctx->reply = NULL;
+ else {
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
+ }
+ reply->add_flags(CEPH_OSD_FLAG_ACK);
+ dout(10) << " sending ack on " << *repop << " " << reply << dendl;
+ assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+ osd->send_message_osd_client(reply, m->get_connection());
+ repop->sent_ack = true;
+ }
+
+ // note the write is now readable (for rlatency calc). note
+ // that this will only be defined if the write is readable
+ // _prior_ to being committed; it will not get set with
+ // writeahead journaling, for instance.
+ if (repop->ctx->readable_stamp == utime_t())
+ repop->ctx->readable_stamp = ceph_clock_now(cct);
}
// done.