{
dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all applied "
<< dendl;
+ assert(!repop->applies_with_commit);
repop->all_applied = true;
if (!repop->rep_aborted) {
eval_repop(repop);
dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all committed "
<< dendl;
repop->all_committed = true;
+ if (repop->applies_with_commit) {
+ assert(!repop->all_applied);
+ repop->all_applied = true;
+ }
if (!repop->rep_aborted) {
if (repop->v != eversion_t()) {
assert(waiting_for_ack.begin()->first == repop->v);
waiting_for_ack.erase(repop->v);
}
-
}
// applied?
if (repop->all_applied) {
+ if (repop->applies_with_commit) {
+ assert(repop->on_applied.empty());
+ }
dout(10) << " applied: " << *repop << " " << dendl;
for (auto p = repop->on_applied.begin();
p != repop->on_applied.end();
publish_stats_to_osd();
calc_min_last_complete_ondisk();
- for (auto p = repop->on_success.begin();
- p != repop->on_success.end();
- repop->on_success.erase(p++)) {
- (*p)();
- }
-
dout(10) << " removing " << *repop << dendl;
assert(!repop_queue.empty());
dout(20) << " q front is " << *repop_queue.front() << dendl;
if (repop_queue.front() != repop) {
- dout(0) << " removing " << *repop << dendl;
- dout(0) << " q front is " << *repop_queue.front() << dendl;
- assert(repop_queue.front() == repop);
+ if (!repop->applies_with_commit) {
+ dout(0) << " removing " << *repop << dendl;
+ dout(0) << " q front is " << *repop_queue.front() << dendl;
+ assert(repop_queue.front() == repop);
+ }
+ } else {
+ RepGather *to_remove = nullptr;
+ while (!repop_queue.empty() &&
+ (to_remove = repop_queue.front())->rep_done) {
+ repop_queue.pop_front();
+ for (auto p = to_remove->on_success.begin();
+ p != to_remove->on_success.end();
+ to_remove->on_success.erase(p++)) {
+ (*p)();
+ }
+ remove_repop(to_remove);
+ }
}
- repop_queue.pop_front();
- remove_repop(repop);
}
}
else
dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
- RepGather *repop = new RepGather(ctx, rep_tid, info.last_complete);
+ RepGather *repop = new RepGather(
+ ctx, rep_tid, info.last_complete, false);
repop->start = ceph_clock_now(cct);
std::move(op),
std::move(on_complete),
osd->get_tid(),
- info.last_complete);
+ info.last_complete,
+ true);
repop->v = version;
repop->start = ceph_clock_now(cct);
assert(it2 != it->second.waiting_on.end());
it->second.waiting_on.erase(it2);
if (it->second.waiting_on.empty()) {
- pg->repop_all_applied(it->second.repop.get());
pg->repop_all_committed(it->second.repop.get());
pg->log_entry_update_waiting_on.erase(it);
}
pg->unlock();
}
};
- t.register_on_complete(
+ t.register_on_commit(
new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
} else {
if (on_complete) {
unlock();
});
- /* Hack to work around the fact that ReplicatedBackend sends
- * ack+commit if commit happens first */
- if (pool.info.ec_pool()) {
- t.register_on_complete(complete);
- } else {
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
t.register_on_commit(complete);
+ } else {
+ /* Hack to work around the fact that ReplicatedBackend sends
+ * ack+commit if commit happens first
+ *
+ * This behavior is no longer necessary, but we preserve it so old
+ * primaries can keep their repops in order */
+ if (pool.info.ec_pool()) {
+ t.register_on_complete(complete);
+ } else {
+ t.register_on_commit(complete);
+ }
}
t.register_on_applied(
new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update});
}
if (it->second.waiting_on.empty()) {
- repop_all_applied(it->second.repop.get());
repop_all_committed(it->second.repop.get());
log_entry_update_waiting_on.erase(it);
}
bool all_applied;
bool all_committed;
+ const bool applies_with_commit;
utime_t start;
list<std::function<void()>> on_success;
list<std::function<void()>> on_finish;
- RepGather(OpContext *c, ceph_tid_t rt,
- eversion_t lc) :
+ RepGather(
+ OpContext *c, ceph_tid_t rt,
+ eversion_t lc,
+ bool applies_with_commit) :
hoid(c->obc->obs.oi.soid),
op(c->op),
queue_item(this),
rep_tid(rt),
rep_aborted(false), rep_done(false),
all_applied(false), all_committed(false),
+ applies_with_commit(applies_with_commit),
pg_local_last_complete(lc),
lock_manager(std::move(c->lock_manager)),
on_applied(std::move(c->on_applied)),
OpRequestRef &&o,
boost::optional<std::function<void(void)> > &&on_complete,
ceph_tid_t rt,
- eversion_t lc) :
+ eversion_t lc,
+ bool applies_with_commit) :
op(o),
queue_item(this),
nref(1),
rep_tid(rt),
rep_aborted(false), rep_done(false),
all_applied(false), all_committed(false),
+ applies_with_commit(applies_with_commit),
pg_local_last_complete(lc),
lock_manager(std::move(manager)) {
if (on_complete) {