oid,
layout,
osd->osdmap->get_epoch(),
- CEPH_OSD_OP_BALANCEREADS);
+ CEPH_OSD_OP_BALANCEREADS, 0);
do_op(pop);
}
if (is_balanced && !should_balance &&
oid,
layout,
osd->osdmap->get_epoch(),
- CEPH_OSD_OP_UNBALANCEREADS);
+ CEPH_OSD_OP_UNBALANCEREADS, 0);
do_op(pop);
}
}
dout(10) << "put_repop " << *repop << dendl;
// commit?
- if (repop->can_send_commit() &&
- repop->op->wants_commit()) {
- // send commit.
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
- dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_client_inst());
- repop->sent_commit = true;
+ if (repop->can_send_commit()) {
+ if (repop->op->wants_commit()) {
+ // send commit.
+ MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
+ dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl;
+ osd->messenger->send_message(reply, repop->op->get_client_inst());
+ repop->sent_commit = true;
+ }
}
// ack?
- else if (repop->can_send_ack() &&
- repop->op->wants_ack()) {
+ else if (repop->can_send_ack()) {
// apply
if (!repop->applied)
apply_repop(repop);
- // send ack
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
- dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_client_inst());
- repop->sent_ack = true;
+ if (repop->op->wants_ack()) {
+ // send ack
+ MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
+ dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl;
+ osd->messenger->send_message(reply, repop->op->get_client_inst());
+ repop->sent_ack = true;
+ }
utime_t now = g_clock.now();
now -= repop->start;
poid.oid,
layout,
osd->osdmap->get_epoch(),
- CEPH_OSD_OP_UNBALANCEREADS);
+ CEPH_OSD_OP_UNBALANCEREADS, 0);
do_op(pop);
}
tids.swap( pg.active_tids );
close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing
+ dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl;
for (set<tid_t>::iterator p = tids.begin();
p != tids.end();
p++) {
if (op_modify.count(tid)) {
OSDModify *wr = op_modify[tid];
op_modify.erase(tid);
-
+
+ if (wr->onack)
+ num_unacked--;
+ if (wr->oncommit)
+ num_uncommitted--;
+
// WRITE
- if (wr->tid_version.count(tid)) {
- if (wr->op == CEPH_OSD_OP_WRITE &&
- !g_conf.objecter_buffer_uncommitted) {
- dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl;
- } else {
- dout(3) << "kick_requests missing commit, replay write " << tid
- << " v " << wr->tid_version[tid] << dendl;
- modifyx_submit(wr, wr->waitfor_commit[tid], tid);
- }
- }
- else if (wr->waitfor_ack.count(tid)) {
+ if (wr->waitfor_ack.count(tid)) {
dout(3) << "kick_requests missing ack, resub write " << tid << dendl;
modifyx_submit(wr, wr->waitfor_ack[tid], tid);
- }
+ } else {
+ assert(wr->waitfor_commit.count(tid));
+
+ if (wr->tid_version.count(tid)) {
+ if (wr->op == CEPH_OSD_OP_WRITE &&
+ !g_conf.objecter_buffer_uncommitted) {
+ dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl;
+ assert(0); // crap. fixme.
+ } else {
+ dout(3) << "kick_requests missing commit, replay write " << tid
+ << " v " << wr->tid_version[tid] << dendl;
+ }
+ } else {
+ dout(3) << "kick_requests missing commit, resub write " << tid << dendl;
+ }
+ modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+ }
}
else if (op_read.count(tid)) {
<< dendl;
if (pg.acker() >= 0) {
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.layout, osdmap->get_epoch(),
- CEPH_OSD_OP_STAT);
+ int flags = st->flags;
+ if (st->onfinish) flags |= CEPH_OSD_OP_ACK;
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+ ex.oid, ex.layout, osdmap->get_epoch(),
+ CEPH_OSD_OP_STAT, flags);
+
messenger->send_message(m, osdmap->get_inst(pg.acker()));
}
<< dendl;
if (pg.acker() >= 0) {
+ int flags = rd->flags;
+ if (rd->onfinish) flags |= CEPH_OSD_OP_ACK;
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
ex.oid, ex.layout, osdmap->get_epoch(),
- CEPH_OSD_OP_READ);
+ CEPH_OSD_OP_READ, flags);
m->set_length(ex.length);
m->set_offset(ex.start);
m->set_retry_attempt(retry);
tid = ++last_tid;
assert(client_inc >= 0);
- // add to gather set
- wr->waitfor_ack[tid] = ex;
- wr->waitfor_commit[tid] = ex;
+ // add to gather set(s)
+ int flags = wr->flags;
+ if (wr->onack) {
+ flags |= CEPH_OSD_OP_ACK;
+ wr->waitfor_ack[tid] = ex;
+ ++num_unacked;
+ } else {
+ dout(20) << " note: not requesting ack" << dendl;
+ }
+ if (wr->oncommit) {
+ flags |= CEPH_OSD_OP_SAFE;
+ wr->waitfor_commit[tid] = ex;
+ ++num_uncommitted;
+ } else {
+ dout(20) << " note: not requesting commit" << dendl;
+ }
op_modify[tid] = wr;
pg.active_tids.insert(tid);
pg.last = g_clock.now();
- ++num_unacked;
- ++num_uncommitted;
-
// send?
dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
<< " oid " << ex.oid
if (pg.primary() >= 0) {
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
ex.oid, ex.layout, osdmap->get_epoch(),
- wr->op);
+ wr->op, flags);
m->set_length(ex.length);
m->set_offset(ex.start);
if (usetid > 0)
assert(m->get_result() >= 0);
- // ack or safe?
- if (m->is_safe()) {
- assert(wr->tid_version.count(tid) == 0 ||
- m->get_version() == wr->tid_version[tid]);
-
- // remove from tid/osd maps
- assert(pg.active_tids.count(tid));
- pg.active_tids.erase(tid);
- dout(15) << "handle_osd_modify_reply pg " << m->get_pg() << " still has " << pg.active_tids << dendl;
- if (pg.active_tids.empty()) close_pg( m->get_pg() );
-
- // commit.
- op_modify.erase( tid );
- wr->waitfor_ack.erase(tid);
- wr->waitfor_commit.erase(tid);
-
- num_uncommitted--;
-
- if (wr->waitfor_commit.empty()) {
- onack = wr->onack;
- oncommit = wr->oncommit;
- delete wr;
- }
- } else {
- // ack.
- assert(wr->waitfor_ack.count(tid));
+ // ack|commit -> ack
+ if (wr->waitfor_ack.count(tid)) {
wr->waitfor_ack.erase(tid);
-
num_unacked--;
-
+ dout(15) << "handle_osd_modify_reply ack" << dendl;
+
if (wr->tid_version.count(tid) &&
- wr->tid_version[tid].version != m->get_version().version) {
+ wr->tid_version[tid].version != m->get_version().version) {
dout(-10) << "handle_osd_modify_reply WARNING: replay of tid " << tid
- << " did not achieve previous ordering" << dendl;
+ << " did not achieve previous ordering" << dendl;
}
wr->tid_version[tid] = m->get_version();
// buffer uncommitted?
if (!g_conf.objecter_buffer_uncommitted &&
- wr->op == CEPH_OSD_OP_WRITE) {
- // discard buffer!
- ((OSDWrite*)wr)->bl.clear();
+ wr->op == CEPH_OSD_OP_WRITE) {
+ // discard buffer!
+ ((OSDWrite*)wr)->bl.clear();
}
}
}
+ if (m->is_safe()) {
+ // safe
+ assert(wr->tid_version.count(tid) == 0 ||
+ m->get_version() == wr->tid_version[tid]);
+
+ wr->waitfor_commit.erase(tid);
+ num_uncommitted--;
+ dout(15) << "handle_osd_modify_reply safe" << dendl;
+
+ if (wr->waitfor_commit.empty()) {
+ oncommit = wr->oncommit;
+ wr->oncommit = 0;
+ }
+ }
+
+ // done?
+ if (wr->onack == 0 && wr->oncommit == 0) {
+ // remove from tid/osd maps
+ assert(pg.active_tids.count(tid));
+ pg.active_tids.erase(tid);
+ dout(15) << "handle_osd_modify_reply completed. pg " << m->get_pg()
+ << " still has " << pg.active_tids << dendl;
+ if (pg.active_tids.empty())
+ close_pg( m->get_pg() );
+ op_modify.erase( tid );
+ delete wr;
+ }
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
delete m;
}
}
+
+
+void Objecter::dump_active()
+{
+ dout(10) << "dump_active" << dendl;
+
+ for (hash_map<tid_t,OSDStat*>::iterator p = op_stat.begin(); p != op_stat.end(); p++)
+ dout(10) << " stat " << p->first << dendl;
+ for (hash_map<tid_t,OSDRead*>::iterator p = op_read.begin(); p != op_read.end(); p++)
+ dout(10) << " read " << p->first << dendl;
+ for (hash_map<tid_t,OSDModify*>::iterator p = op_modify.begin(); p != op_modify.end(); p++)
+ dout(10) << " modify " << p->first << dendl;
+
+}