From cf04043662ff6d95b65c6665b8b302072212284a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 20 Mar 2008 11:29:30 -0700 Subject: [PATCH] objecter: on do ack or safe messages if they are requested --- src/mds/MDCache.cc | 11 ++- src/messages/MOSDOp.h | 6 +- src/osd/ReplicatedPG.cc | 36 ++++----- src/osd/ReplicatedPG.h | 6 +- src/osdc/Journaler.cc | 2 +- src/osdc/Objecter.cc | 158 +++++++++++++++++++++++++--------------- src/osdc/Objecter.h | 1 + 7 files changed, 134 insertions(+), 86 deletions(-) diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 8cc707d0ba02e..71ae54cbf51d3 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -3523,8 +3523,10 @@ void MDCache::shutdown_check() dout(0) << "log len " << mds->mdlog->get_num_events() << dendl; - if (mds->filer->is_active()) - dout(0) << "filer still active" << dendl; + if (mds->objecter->is_active()) { + dout(0) << "objecter still active" << dendl; + mds->objecter->dump_active(); + } } void MDCache::shutdown_start() @@ -3640,8 +3642,9 @@ bool MDCache::shutdown_pass() } // filer active? - if (mds->filer->is_active()) { - dout(7) << "filer still active" << dendl; + if (mds->objecter->is_active()) { + dout(7) << "objecter still active" << dendl; + mds->objecter->dump_active(); return false; } diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index b8f45bb702d40..57828abdd6095 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -100,7 +100,8 @@ public: MOSDOp(entity_inst_t asker, int inc, long tid, - object_t oid, ceph_object_layout ol, epoch_t mapepoch, int op) : + object_t oid, ceph_object_layout ol, epoch_t mapepoch, int op, + int flags) : Message(CEPH_MSG_OSD_OP) { memset(&head, 0, sizeof(head)); head.client_inst.name = asker.name; @@ -111,8 +112,7 @@ public: head.layout = ol; head.osdmap_epoch = cpu_to_le32(mapepoch); head.op = op; - - head.flags = CEPH_OSD_OP_ACK | CEPH_OSD_OP_SAFE; + head.flags = flags; } MOSDOp() {} diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 867ce6c1002a6..81abe4db0d800 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -164,7 +164,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now) oid, layout, osd->osdmap->get_epoch(), - CEPH_OSD_OP_BALANCEREADS); + CEPH_OSD_OP_BALANCEREADS, 0); do_op(pop); } if (is_balanced && !should_balance && @@ -178,7 +178,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now) oid, layout, osd->osdmap->get_epoch(), - CEPH_OSD_OP_UNBALANCEREADS); + CEPH_OSD_OP_UNBALANCEREADS, 0); do_op(pop); } } @@ -842,27 +842,29 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) dout(10) << "put_repop " << *repop << dendl; // commit? - if (repop->can_send_commit() && - repop->op->wants_commit()) { - // send commit. - MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true); - dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl; - osd->messenger->send_message(reply, repop->op->get_client_inst()); - repop->sent_commit = true; + if (repop->can_send_commit()) { + if (repop->op->wants_commit()) { + // send commit. + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true); + dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl; + osd->messenger->send_message(reply, repop->op->get_client_inst()); + repop->sent_commit = true; + } } // ack? - else if (repop->can_send_ack() && - repop->op->wants_ack()) { + else if (repop->can_send_ack()) { // apply if (!repop->applied) apply_repop(repop); - // send ack - MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false); - dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl; - osd->messenger->send_message(reply, repop->op->get_client_inst()); - repop->sent_ack = true; + if (repop->op->wants_ack()) { + // send ack + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false); + dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl; + osd->messenger->send_message(reply, repop->op->get_client_inst()); + repop->sent_ack = true; + } utime_t now = g_clock.now(); now -= repop->start; @@ -1157,7 +1159,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) poid.oid, layout, osd->osdmap->get_epoch(), - CEPH_OSD_OP_UNBALANCEREADS); + CEPH_OSD_OP_UNBALANCEREADS, 0); do_op(pop); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 45d5da7f3b6be..796f4e518c431 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -55,12 +55,10 @@ public: pg_local_last_complete(lc) { } bool can_send_ack() { - return !sent_ack && !sent_commit && - waitfor_ack.empty(); + return !sent_ack && !sent_commit && waitfor_ack.empty(); } bool can_send_commit() { - return !sent_commit && - waitfor_ack.empty() && waitfor_commit.empty(); + return !sent_commit && waitfor_ack.empty() && waitfor_commit.empty(); } bool can_delete() { return waitfor_ack.empty() && waitfor_commit.empty(); diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 71514dd38808a..db0267b9c8ee0 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -169,7 +169,7 @@ void Journaler::write_head(Context *oncommit) bufferlist bl; bl.append((char*)&last_written, sizeof(last_written)); filer.write(inode, 0, bl.length(), bl, 0, - 0, + NULL, new C_WriteHead(this, last_written, oncommit)); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index eb4153ddaec64..12abe419b6419 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -208,6 +208,7 @@ void Objecter::kick_requests(set& changed_pgs) tids.swap( pg.active_tids ); close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing + dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl; for (set::iterator p = tids.begin(); p != tids.end(); p++) { @@ -216,22 +217,33 @@ void Objecter::kick_requests(set& changed_pgs) if (op_modify.count(tid)) { OSDModify *wr = op_modify[tid]; op_modify.erase(tid); - + + if (wr->onack) + num_unacked--; + if (wr->oncommit) + num_uncommitted--; + // WRITE - if (wr->tid_version.count(tid)) { - if (wr->op == CEPH_OSD_OP_WRITE && - !g_conf.objecter_buffer_uncommitted) { - dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl; - } else { - dout(3) << "kick_requests missing commit, replay write " << tid - << " v " << wr->tid_version[tid] << dendl; - modifyx_submit(wr, wr->waitfor_commit[tid], tid); - } - } - else if (wr->waitfor_ack.count(tid)) { + if (wr->waitfor_ack.count(tid)) { dout(3) << "kick_requests missing ack, resub write " << tid << dendl; modifyx_submit(wr, wr->waitfor_ack[tid], tid); - } + } else { + assert(wr->waitfor_commit.count(tid)); + + if (wr->tid_version.count(tid)) { + if (wr->op == CEPH_OSD_OP_WRITE && + !g_conf.objecter_buffer_uncommitted) { + dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl; + assert(0); // crap. fixme. + } else { + dout(3) << "kick_requests missing commit, replay write " << tid + << " v " << wr->tid_version[tid] << dendl; + } + } else { + dout(3) << "kick_requests missing commit, resub write " << tid << dendl; + } + modifyx_submit(wr, wr->waitfor_commit[tid], tid); + } } else if (op_read.count(tid)) { @@ -355,10 +367,13 @@ tid_t Objecter::stat_submit(OSDStat *st) << dendl; if (pg.acker() >= 0) { - MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, - ex.oid, ex.layout, osdmap->get_epoch(), - CEPH_OSD_OP_STAT); + int flags = st->flags; + if (st->onfinish) flags |= CEPH_OSD_OP_ACK; + MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, + ex.oid, ex.layout, osdmap->get_epoch(), + CEPH_OSD_OP_STAT, flags); + messenger->send_message(m, osdmap->get_inst(pg.acker())); } @@ -471,9 +486,11 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry) << dendl; if (pg.acker() >= 0) { + int flags = rd->flags; + if (rd->onfinish) flags |= CEPH_OSD_OP_ACK; MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, ex.oid, ex.layout, osdmap->get_epoch(), - CEPH_OSD_OP_READ); + CEPH_OSD_OP_READ, flags); m->set_length(ex.length); m->set_offset(ex.start); m->set_retry_attempt(retry); @@ -729,16 +746,26 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) tid = ++last_tid; assert(client_inc >= 0); - // add to gather set - wr->waitfor_ack[tid] = ex; - wr->waitfor_commit[tid] = ex; + // add to gather set(s) + int flags = wr->flags; + if (wr->onack) { + flags |= CEPH_OSD_OP_ACK; + wr->waitfor_ack[tid] = ex; + ++num_unacked; + } else { + dout(20) << " note: not requesting ack" << dendl; + } + if (wr->oncommit) { + flags |= CEPH_OSD_OP_SAFE; + wr->waitfor_commit[tid] = ex; + ++num_uncommitted; + } else { + dout(20) << " note: not requesting commit" << dendl; + } op_modify[tid] = wr; pg.active_tids.insert(tid); pg.last = g_clock.now(); - ++num_unacked; - ++num_uncommitted; - // send? dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid << " oid " << ex.oid @@ -749,7 +776,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) if (pg.primary() >= 0) { MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid, ex.oid, ex.layout, osdmap->get_epoch(), - wr->op); + wr->op, flags); m->set_length(ex.length); m->set_offset(ex.start); if (usetid > 0) @@ -819,40 +846,16 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) assert(m->get_result() >= 0); - // ack or safe? - if (m->is_safe()) { - assert(wr->tid_version.count(tid) == 0 || - m->get_version() == wr->tid_version[tid]); - - // remove from tid/osd maps - assert(pg.active_tids.count(tid)); - pg.active_tids.erase(tid); - dout(15) << "handle_osd_modify_reply pg " << m->get_pg() << " still has " << pg.active_tids << dendl; - if (pg.active_tids.empty()) close_pg( m->get_pg() ); - - // commit. - op_modify.erase( tid ); - wr->waitfor_ack.erase(tid); - wr->waitfor_commit.erase(tid); - - num_uncommitted--; - - if (wr->waitfor_commit.empty()) { - onack = wr->onack; - oncommit = wr->oncommit; - delete wr; - } - } else { - // ack. - assert(wr->waitfor_ack.count(tid)); + // ack|commit -> ack + if (wr->waitfor_ack.count(tid)) { wr->waitfor_ack.erase(tid); - num_unacked--; - + dout(15) << "handle_osd_modify_reply ack" << dendl; + if (wr->tid_version.count(tid) && - wr->tid_version[tid].version != m->get_version().version) { + wr->tid_version[tid].version != m->get_version().version) { dout(-10) << "handle_osd_modify_reply WARNING: replay of tid " << tid - << " did not achieve previous ordering" << dendl; + << " did not achieve previous ordering" << dendl; } wr->tid_version[tid] = m->get_version(); @@ -862,12 +865,39 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) // buffer uncommitted? if (!g_conf.objecter_buffer_uncommitted && - wr->op == CEPH_OSD_OP_WRITE) { - // discard buffer! - ((OSDWrite*)wr)->bl.clear(); + wr->op == CEPH_OSD_OP_WRITE) { + // discard buffer! + ((OSDWrite*)wr)->bl.clear(); } } } + if (m->is_safe()) { + // safe + assert(wr->tid_version.count(tid) == 0 || + m->get_version() == wr->tid_version[tid]); + + wr->waitfor_commit.erase(tid); + num_uncommitted--; + dout(15) << "handle_osd_modify_reply safe" << dendl; + + if (wr->waitfor_commit.empty()) { + oncommit = wr->oncommit; + wr->oncommit = 0; + } + } + + // done? + if (wr->onack == 0 && wr->oncommit == 0) { + // remove from tid/osd maps + assert(pg.active_tids.count(tid)); + pg.active_tids.erase(tid); + dout(15) << "handle_osd_modify_reply completed. pg " << m->get_pg() + << " still has " << pg.active_tids << dendl; + if (pg.active_tids.empty()) + close_pg( m->get_pg() ); + op_modify.erase( tid ); + delete wr; + } dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; @@ -916,3 +946,17 @@ void Objecter::ms_handle_failure(Message *m, entity_name_t dest, const entity_in delete m; } } + + +void Objecter::dump_active() +{ + dout(10) << "dump_active" << dendl; + + for (hash_map::iterator p = op_stat.begin(); p != op_stat.end(); p++) + dout(10) << " stat " << p->first << dendl; + for (hash_map::iterator p = op_read.begin(); p != op_read.end(); p++) + dout(10) << " read " << p->first << dendl; + for (hash_map::iterator p = op_modify.begin(); p != op_modify.end(); p++) + dout(10) << " modify " << p->first << dendl; + +} diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 93c1a2398c16e..8aefd0559be05 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -202,6 +202,7 @@ class Objecter { bool is_active() { return !(op_read.empty() && op_modify.empty()); } + void dump_active(); int get_client_incarnation() { return client_inc; } void set_client_incarnation(int inc) { -- 2.39.5