}
}
+ // no need to capture PG ref, repop cancel will handle that
+ // Can capture the ctx by pointer, it's owned by the repop
+ ctx->register_on_applied(
+ [m, ctx, this](){
+ if (m && m->wants_ack() && !ctx->sent_ack && !ctx->sent_disk) {
+ // send ack
+ MOSDOpReply *reply = ctx->reply;
+ if (reply)
+ ctx->reply = NULL;
+ else {
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+ reply->set_reply_versions(ctx->at_version,
+ ctx->user_at_version);
+ }
+ reply->add_flags(CEPH_OSD_FLAG_ACK);
+ dout(10) << " sending ack: " << *m << " " << reply << dendl;
+ osd->send_message_osd_client(reply, m->get_connection());
+ ctx->sent_ack = true;
+ }
+
+ // note the write is now readable (for rlatency calc). note
+ // that this will only be defined if the write is readable
+ // _prior_ to being committed; it will not get set with
+ // writeahead journaling, for instance.
+ if (ctx->readable_stamp == utime_t())
+ ctx->readable_stamp = ceph_clock_now(cct);
+ });
+ ctx->register_on_commit(
+ [m, ctx, this](){
+ if (ctx->op)
+ log_op_stats(
+ ctx);
+
+ publish_stats_to_osd();
+
+ if (m && m->wants_ondisk() && !ctx->sent_disk) {
+ // send commit.
+ MOSDOpReply *reply = ctx->reply;
+ if (reply)
+ ctx->reply = NULL;
+ else {
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+ reply->set_reply_versions(ctx->at_version,
+ ctx->user_at_version);
+ }
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ dout(10) << " sending commit on " << *m << " " << reply << dendl;
+ osd->send_message_osd_client(reply, m->get_connection());
+ ctx->sent_disk = true;
+ ctx->op->mark_commit_sent();
+ }
+ });
+
// issue replica writes
ceph_tid_t rep_tid = osd->get_tid();
RepGather *repop = new_repop(ctx, obc, rep_tid); // new repop claims our obc, src_obc refs
// ondisk?
if (repop->all_committed) {
+ dout(10) << " commit: " << *repop << dendl;
for (auto p = repop->on_committed.begin();
p != repop->on_committed.end();
repop->on_committed.erase(p++)) {
(*p)();
}
-
- if (repop->ctx->op && !repop->log_op_stat) {
- log_op_stats(repop->ctx);
- repop->log_op_stat = true;
- }
- publish_stats_to_osd();
-
// send dup commits, in order
if (waiting_for_ondisk.count(repop->v)) {
assert(waiting_for_ondisk.begin()->first == repop->v);
waiting_for_ack.erase(repop->v);
}
- if (m && m->wants_ondisk() && !repop->sent_disk) {
- // send commit.
- MOSDOpReply *reply = repop->ctx->reply;
- if (reply)
- repop->ctx->reply = NULL;
- else {
- reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
- reply->set_reply_versions(repop->ctx->at_version,
- repop->ctx->user_at_version);
- }
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- dout(10) << " sending commit on " << *repop << " " << reply << dendl;
- osd->send_message_osd_client(reply, m->get_connection());
- repop->sent_disk = true;
- repop->ctx->op->mark_commit_sent();
- }
}
// applied?
}
waiting_for_ack.erase(repop->v);
}
-
- if (m && m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
- // send ack
- MOSDOpReply *reply = repop->ctx->reply;
- if (reply)
- repop->ctx->reply = NULL;
- else {
- reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
- reply->set_reply_versions(repop->ctx->at_version,
- repop->ctx->user_at_version);
- }
- reply->add_flags(CEPH_OSD_FLAG_ACK);
- dout(10) << " sending ack on " << *repop << " " << reply << dendl;
- osd->send_message_osd_client(reply, m->get_connection());
- repop->sent_ack = true;
- }
-
- // note the write is now readable (for rlatency calc). note
- // that this will only be defined if the write is readable
- // _prior_ to being committed; it will not get set with
- // writeahead journaling, for instance.
- if (repop->ctx->readable_stamp == utime_t())
- repop->ctx->readable_stamp = ceph_clock_now(cct);
}
// done.
on_committed.emplace_back(std::move(f));
}
+ bool sent_ack;
+ bool sent_disk;
void apply_pending_attrs() {
for (map<ObjectContextRef,
num_read(0),
num_write(0),
copy_cb(NULL),
+ sent_ack(false), sent_disk(false),
async_read_result(0),
inflightreads(0),
lock_to_release(NONE),
bool all_applied;
bool all_committed;
- bool sent_ack;
- //bool sent_nvram;
- bool sent_disk;
utime_t start;
list<std::function<void()>> on_committed;
list<std::function<void()>> on_success;
list<std::function<void()>> on_finish;
- bool log_op_stat;
RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt,
eversion_t lc) :
ctx(c), obc(pi),
rep_tid(rt),
rep_aborted(false), rep_done(false),
- all_applied(false), all_committed(false), sent_ack(false),
- //sent_nvram(false),
- sent_disk(false),
+ all_applied(false), all_committed(false),
pg_local_last_complete(lc),
on_applied(std::move(c->on_applied)),
on_committed(std::move(c->on_committed)),
on_success(std::move(c->on_success)),
- on_finish(std::move(c->on_finish)),
- log_op_stat(false) { }
+ on_finish(std::move(c->on_finish)) {}
RepGather *get() {
nref++;