list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
waiting_for_degraded_object;
- map<eversion_t,list<OpRequestRef> > waiting_for_ondisk;
+ map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
if (already_complete(oldv)) {
osd->reply_op_error(op, 0, oldv);
} else {
+ if (m->wants_ack()) {
+ if (already_ack(oldv)) {
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->add_flags(CEPH_OSD_FLAG_ACK);
+ osd->client_messenger->send_message(reply, m->get_connection());
+ } else {
+ dout(10) << " waiting for " << oldv << " to ack" << dendl;
+ waiting_for_ack[oldv].push_back(op);
+ }
+ }
dout(10) << " waiting for " << oldv << " to commit" << dendl;
- waiting_for_ondisk[oldv].push_back(op);
+ waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed
op->mark_delayed();
}
return;
log_op_stats(repop->ctx);
update_stats();
+ // send dup commits, in order
if (waiting_for_ondisk.count(repop->v)) {
assert(waiting_for_ondisk.begin()->first == repop->v);
for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
// applied?
if (repop->waitfor_ack.empty()) {
+
+ // send dup acks, in order
+ if (waiting_for_ack.count(repop->v)) {
+ assert(waiting_for_ack.begin()->first == repop->v);
+ for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
+ i != waiting_for_ack[repop->v].end();
+ ++i) {
+ MOSDOp *m = (MOSDOp*)(*i)->request;
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->add_flags(CEPH_OSD_FLAG_ACK);
+ osd->client_messenger->send_message(reply, m->get_connection());
+ }
+ waiting_for_ack.erase(repop->v);
+ }
+
if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
// send ack
MOSDOpReply *reply = repop->ctx->reply;
p++)
requeue_ops(p->second);
waiting_for_ondisk.clear();
+ waiting_for_ack.clear();
}
// replica ops
// [primary|tail]
xlist<RepGather*> repop_queue;
- bool already_complete(eversion_t v) {
- for (xlist<RepGather*>::iterator i = repop_queue.begin();
- !i.end();
- ++i) {
- if ((*i)->v > v)
- break;
- if (!(*i)->waitfor_disk.empty())
- return false;
- }
- return true;
- }
map<tid_t, RepGather*> repop_map;
void apply_repop(RepGather *repop);
int result, int ack_type,
int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
+ /// true if we can send an ondisk/commit for v
+ bool already_complete(eversion_t v) {
+ for (xlist<RepGather*>::iterator i = repop_queue.begin();
+ !i.end();
+ ++i) {
+ if ((*i)->v > v)
+ break;
+ if (!(*i)->waitfor_disk.empty())
+ return false;
+ }
+ return true;
+ }
+ /// true if we can send an ack for v
+ bool already_ack(eversion_t v) {
+ for (xlist<RepGather*>::iterator i = repop_queue.begin();
+ !i.end();
+ ++i) {
+ if ((*i)->v > v)
+ break;
+ if (!(*i)->waitfor_ack.empty())
+ return false;
+ }
+ return true;
+ }
+
friend class C_OSD_OpCommit;
friend class C_OSD_OpApplied;