client_trace: 0,
fuse_direct_io: 0,
+ // --- objecter ---
+ objecter_buffer_uncommitted: true,
+
// --- mds ---
mds_cache_size: MDS_CACHE_SIZE,
mds_cache_mid: .7,
else if (strcmp(args[i], "--fakemessenger_serialize") == 0)
g_conf.fakemessenger_serialize = atoi(args[++i]);
+
+ else if (strcmp(args[i], "--objecter_buffer_uncommitted") == 0)
+ g_conf.objecter_buffer_uncommitted = atoi(args[++i]);
+
else if (strcmp(args[i], "--mds_cache_size") == 0)
g_conf.mds_cache_size = atoi(args[++i]);
#include "config.h"
#undef dout
#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter "
+#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << messenger->get_myaddr() << ".objecter "
// messages ------------------------------
// WRITE
if (wr->tid_version.count(tid)) {
- dout(0) << "kick_requests missing commit, replay write " << tid
- << " v " << wr->tid_version[tid] << endl;
- modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+ if (wr->op == OSD_OP_WRITE &&
+ !g_conf.objecter_buffer_uncommitted) {
+ derr(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << endl;
+ } else {
+ dout(0) << "kick_requests missing commit, replay write " << tid
+ << " v " << wr->tid_version[tid] << endl;
+ modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+ }
}
else if (wr->waitfor_ack.count(tid)) {
dout(0) << "kick_requests missing ack, resub write " << tid << endl;
op_modify[tid] = wr;
pg.active_tids.insert(tid);
+ ++num_unacked;
+ ++num_uncommitted;
+
// send
dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
<< " oid " << hex << ex.oid << dec
if (pg.primary() >= 0)
messenger->send_message(m, MSG_ADDR_OSD(pg.primary()), 0);
+ dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl;
+
return tid;
}
wr->waitfor_ack.erase(tid);
wr->waitfor_commit.erase(tid);
+ num_uncommitted--;
+
if (wr->waitfor_commit.empty()) {
onack = wr->onack;
oncommit = wr->oncommit;
//dout(15) << " handle_osd_write_reply ack on " << tid << endl;
assert(wr->waitfor_ack.count(tid));
wr->waitfor_ack.erase(tid);
+
+ num_unacked--;
if (wr->tid_version.count(tid) &&
wr->tid_version[tid].version != m->get_version().version) {
if (wr->waitfor_ack.empty()) {
onack = wr->onack;
wr->onack = 0; // only do callback once
+
+ // buffer uncommitted?
+ if (!g_conf.objecter_buffer_uncommitted &&
+ wr->op == OSD_OP_WRITE) {
+ // discard buffer!
+ ((OSDWrite*)wr)->bl.clear();
+ }
}
}