From: Sage Weil Date: Tue, 1 Jul 2008 14:25:06 +0000 (-0700) Subject: mon: use forward_message, orig_source_inst throughout X-Git-Tag: v0.3~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3dd86f80730ab54838a354b30c04c952beea44ea;p=ceph.git mon: use forward_message, orig_source_inst throughout --- diff --git a/src/TODO b/src/TODO index 1e29b26354d1..e675eca1de22 100644 --- a/src/TODO +++ b/src/TODO @@ -1,4 +1,5 @@ v0.3 +- fix mon osdmap send_full/latest ... never use get_source_inst()! v0.4 - fix msgr protocol wrt resets, and varying connection close policies diff --git a/src/cmonctl.cc b/src/cmonctl.cc index 32663f6d581b..44a8dfe31aff 100644 --- a/src/cmonctl.cc +++ b/src/cmonctl.cc @@ -144,7 +144,7 @@ int main(int argc, const char **argv, const char *envp[]) { messenger->set_dispatcher(&dispatcher); // build command - MMonCommand *m = new MMonCommand(monmap.fsid, messenger->get_myinst()); + MMonCommand *m = new MMonCommand(monmap.fsid); m->set_data(indata); m->cmd.swap(vcmd); int mon = monmap.pick_mon(); diff --git a/src/fakefuse.cc b/src/fakefuse.cc index f799b1ff1c8a..9dce514d2857 100644 --- a/src/fakefuse.cc +++ b/src/fakefuse.cc @@ -127,7 +127,7 @@ int main(int argc, const char **argv) { bufferlist bl; map.encode(bl); Messenger *messenger = new FakeMessenger(entity_name_t::ADMIN(-1)); - MMonCommand *m = new MMonCommand(monmap->fsid, messenger->get_myinst()); + MMonCommand *m = new MMonCommand(monmap->fsid); m->set_data(bl); m->cmd.push_back("osd"); m->cmd.push_back("setmap"); diff --git a/src/fakesyn.cc b/src/fakesyn.cc index 482823868938..213d4285cd3f 100644 --- a/src/fakesyn.cc +++ b/src/fakesyn.cc @@ -126,7 +126,7 @@ int main(int argc, const char **argv) bufferlist bl; map.encode(bl); Messenger *messenger = new FakeMessenger(entity_name_t::ADMIN(-1)); - MMonCommand *m = new MMonCommand(monmap->fsid, messenger->get_myinst()); + MMonCommand *m = new MMonCommand(monmap->fsid); m->set_data(bl); m->cmd.push_back("osd"); m->cmd.push_back("setmap"); diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index d1149aff1dc2..0652a80074be 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -374,7 +374,7 @@ struct ceph_msg_header { __le32 front_len; __le32 data_off; /* sender: include full offset; receiver: mask against ~PAGE_MASK */ __le32 data_len; /* bytes of data payload */ - struct ceph_entity_inst src, dst; + struct ceph_entity_inst src, orig_src, dst; } __attribute__ ((packed)); struct ceph_msg_footer { diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 1b3c82f73d81..6ae7e70389e7 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -386,7 +386,7 @@ void MDS::beacon_send() beacon_seq_stamp[beacon_last_seq] = g_clock.now(); int mon = monmap->pick_mon(); - messenger->send_message(new MMDSBeacon(monmap->fsid, messenger->get_myinst(), mdsmap->get_epoch(), + messenger->send_message(new MMDSBeacon(monmap->fsid, mdsmap->get_epoch(), want_state, beacon_last_seq, want_rank), monmap->get_inst(mon)); diff --git a/src/messages/MMDSBeacon.h b/src/messages/MMDSBeacon.h index 133bc396dea0..b64f8ca0823d 100644 --- a/src/messages/MMDSBeacon.h +++ b/src/messages/MMDSBeacon.h @@ -23,7 +23,6 @@ class MMDSBeacon : public Message { ceph_fsid fsid; - entity_inst_t inst; epoch_t last_epoch_seen; // include last mdsmap epoch mds has seen to avoid race with monitor decree __u32 state; version_t seq; @@ -31,12 +30,11 @@ class MMDSBeacon : public Message { public: MMDSBeacon() : Message(MSG_MDS_BEACON) {} - MMDSBeacon(ceph_fsid &f, entity_inst_t i, epoch_t les, int st, version_t se, int wr) : + MMDSBeacon(ceph_fsid &f, epoch_t les, int st, version_t se, int wr) : Message(MSG_MDS_BEACON), - fsid(f), inst(i), last_epoch_seen(les), state(st), seq(se), want_rank(wr) { } + fsid(f), last_epoch_seen(les), state(st), seq(se), want_rank(wr) { } ceph_fsid& get_fsid() { return fsid; } - entity_inst_t& get_mds_inst() { return inst; } epoch_t get_last_epoch_seen() { return last_epoch_seen; } int get_state() { return state; } version_t get_seq() { return seq; } @@ -44,14 +42,12 @@ class MMDSBeacon : public Message { int get_want_rank() { return want_rank; } void print(ostream& out) { - out << "mdsbeacon(" << inst - << " " << MDSMap::get_state_name(state) + out << "mdsbeacon(" << MDSMap::get_state_name(state) << " seq " << seq << ")"; } void encode_payload() { ::encode(fsid, payload); - ::encode(inst, payload); ::encode(last_epoch_seen, payload); ::encode(state, payload); ::encode(seq, payload); @@ -60,7 +56,6 @@ class MMDSBeacon : public Message { void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(fsid, p); - ::decode(inst, p); ::decode(last_epoch_seen, p); ::decode(state, p); ::decode(seq, p); diff --git a/src/messages/MMonCommand.h b/src/messages/MMonCommand.h index 0c92b8ef2b33..d5fdc78ad152 100644 --- a/src/messages/MMonCommand.h +++ b/src/messages/MMonCommand.h @@ -23,13 +23,12 @@ using std::vector; class MMonCommand : public Message { public: ceph_fsid fsid; - entity_inst_t inst; vector cmd; MMonCommand() : Message(MSG_MON_COMMAND) {} - MMonCommand(ceph_fsid &f, entity_inst_t i) : + MMonCommand(ceph_fsid &f) : Message(MSG_MON_COMMAND), - fsid(f), inst(i) { } + fsid(f) { } const char *get_type_name() { return "mon_command"; } void print(ostream& o) { @@ -43,13 +42,11 @@ class MMonCommand : public Message { void encode_payload() { ::encode(fsid, payload); - ::encode(inst, payload); ::encode(cmd, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(fsid, p); - ::decode(inst, p); ::decode(cmd, p); } }; diff --git a/src/messages/MOSDBoot.h b/src/messages/MOSDBoot.h index 8e22755dfc4a..90d30bfe6e73 100644 --- a/src/messages/MOSDBoot.h +++ b/src/messages/MOSDBoot.h @@ -22,28 +22,24 @@ class MOSDBoot : public Message { public: - entity_inst_t inst; OSDSuperblock sb; MOSDBoot() {} - MOSDBoot(entity_inst_t i, OSDSuperblock& s) : + MOSDBoot(OSDSuperblock& s) : Message(MSG_OSD_BOOT), - inst(i), sb(s) { } const char *get_type_name() { return "osd_boot"; } void print(ostream& out) { - out << "osd_boot(" << inst << ")"; + out << "osd_boot(osd" << sb.whoami << ")"; } void encode_payload() { - ::encode(inst, payload); ::encode(sb, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); - ::decode(inst, p); ::decode(sb, p); } }; diff --git a/src/messages/MOSDFailure.h b/src/messages/MOSDFailure.h index 264abbf957f2..6a11a9fdd21f 100644 --- a/src/messages/MOSDFailure.h +++ b/src/messages/MOSDFailure.h @@ -22,29 +22,25 @@ class MOSDFailure : public Message { public: ceph_fsid fsid; - entity_inst_t from; entity_inst_t failed; epoch_t epoch; MOSDFailure() : Message(MSG_OSD_FAILURE) {} - MOSDFailure(ceph_fsid &fs, entity_inst_t fr, entity_inst_t f, epoch_t e) : + MOSDFailure(ceph_fsid &fs, entity_inst_t f, epoch_t e) : Message(MSG_OSD_FAILURE), - fsid(fs), from(fr), failed(f), epoch(e) {} + fsid(fs), failed(f), epoch(e) {} - entity_inst_t get_from() { return from; } entity_inst_t get_failed() { return failed; } epoch_t get_epoch() { return epoch; } void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(fsid, p); - ::decode(from, p); ::decode(failed, p); ::decode(epoch, p); } void encode_payload() { ::encode(fsid, payload); - ::encode(from, payload); ::encode(failed, payload); ::encode(epoch, payload); } diff --git a/src/messages/MPGStats.h b/src/messages/MPGStats.h index 95f67b74902e..31d721f31b08 100644 --- a/src/messages/MPGStats.h +++ b/src/messages/MPGStats.h @@ -21,7 +21,6 @@ class MPGStats : public Message { public: map pg_stat; osd_stat_t osd_stat; - entity_inst_t orig_src; MPGStats() : Message(MSG_PGSTATS) {} @@ -33,13 +32,11 @@ public: void encode_payload() { ::encode(osd_stat, payload); ::encode(pg_stat, payload); - ::encode(orig_src, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(osd_stat, p); ::decode(pg_stat, p); - ::decode(orig_src, p); } }; diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 7d6a7493669e..37683ef19ed6 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -176,7 +176,7 @@ void MDSMonitor::encode_pending(bufferlist &bl) bool MDSMonitor::preprocess_query(Message *m) { - dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl; + dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { @@ -200,16 +200,16 @@ bool MDSMonitor::preprocess_query(Message *m) void MDSMonitor::handle_mds_getmap(MMDSGetMap *m) { if (m->want <= mdsmap.get_epoch()) - send_full(m->get_source_inst()); + send_full(m->get_orig_source_inst()); else - waiting_for_map.push_back(m->get_source_inst()); + waiting_for_map.push_back(m->get_orig_source_inst()); } bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) { - int from = m->get_mds_inst().name.num(); - entity_addr_t addr = m->get_mds_inst().addr; + int from = m->get_orig_source_inst().name.num(); + entity_addr_t addr = m->get_orig_source_inst().addr; int state = m->get_state(); version_t seq = m->get_seq(); @@ -218,22 +218,13 @@ bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) goto out; } - // first time we've seen it? - if (m->get_mds_inst().addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { - m->get_mds_inst() = m->get_source_inst(); - m->clear_payload(); - } - dout(12) << "preprocess_beacon " << *m - << " from " << m->get_mds_inst() + << " from " << m->get_orig_source_inst() << dendl; // fw to leader? - if (!mon->is_leader()) { - dout(10) << "fw to leader" << dendl; - mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); - return true; - } + if (!mon->is_leader()) + return false; // can i handle this query without a map update? @@ -278,9 +269,9 @@ bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) // note time and reply dout(15) << "mds_beacon " << *m << " noting time and replying" << dendl; last_beacon[addr] = g_clock.now(); - mon->messenger->send_message(new MMDSBeacon(mon->monmap->fsid, m->get_mds_inst(), + mon->messenger->send_message(new MMDSBeacon(mon->monmap->fsid, mdsmap.get_epoch(), state, seq, 0), - m->get_mds_inst()); + m->get_orig_source_inst()); // done out: @@ -315,10 +306,10 @@ bool MDSMonitor::handle_beacon(MMDSBeacon *m) { // -- this is an update -- dout(12) << "handle_beacon " << *m - << " from " << m->get_mds_inst() + << " from " << m->get_orig_source_inst() << dendl; - int from = m->get_mds_inst().name.num(); - entity_addr_t addr = m->get_mds_inst().addr; + int from = m->get_orig_source_inst().name.num(); + entity_addr_t addr = m->get_orig_source_inst().addr; int state = m->get_state(); version_t seq = m->get_seq(); @@ -445,13 +436,13 @@ void MDSMonitor::_updated(int from, MMDSBeacon *m) { if (from < 0) { dout(10) << "_updated (booted) mds" << from << " " << *m << dendl; - mon->osdmon->send_latest(m->get_source_inst()); + mon->osdmon->send_latest(m->get_orig_source_inst()); } else { dout(10) << "_updated mds" << from << " " << *m << dendl; } if (m->get_state() == MDSMap::STATE_STOPPED) { // send the map manually (they're out of the map, so they won't get it automatic) - send_latest(m->get_mds_inst()); + send_latest(m->get_orig_source_inst()); } delete m; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 350fc5f0461b..9b7e464ffcc9 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -203,12 +203,6 @@ void Monitor::handle_command(MMonCommand *m) return; } - // first time we've seen it? - if (m->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { - m->inst = m->get_source_inst(); - m->clear_payload(); - } - dout(0) << "handle_command " << *m << dendl; string rs; if (!m->cmd.empty()) { @@ -251,7 +245,7 @@ void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist { MMonCommandAck *reply = new MMonCommandAck(rc, rs); reply->set_data(rdata); - messenger->send_message(reply, m->inst); + messenger->send_message(reply, m->get_orig_source_inst()); delete m; } diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 855c78750b96..ffefc3ace196 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -256,7 +256,7 @@ void OSDMonitor::committed() bool OSDMonitor::preprocess_query(Message *m) { - dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl; + dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { // READs @@ -288,7 +288,7 @@ bool OSDMonitor::preprocess_query(Message *m) bool OSDMonitor::prepare_update(Message *m) { - dout(7) << "prepare_update " << *m << " from " << m->get_source_inst() << dendl; + dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { // damp updates @@ -344,7 +344,7 @@ bool OSDMonitor::should_propose(double& delay) void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) { - dout(7) << "handle_osd_getmap from " << m->get_source() + dout(7) << "handle_osd_getmap from " << m->get_orig_source() << " start " << m->get_start_epoch() << dendl; @@ -355,11 +355,11 @@ void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) if (m->get_start_epoch()) { if (m->get_start_epoch() <= osdmap.get_epoch()) - send_incremental(m->get_source_inst(), m->get_start_epoch()); + send_incremental(m->get_orig_source_inst(), m->get_start_epoch()); else - waiting_for_map[m->get_source_inst()] = m->get_start_epoch(); + waiting_for_map[m->get_orig_source_inst()] = m->get_start_epoch(); } else - send_full(m->get_source_inst()); + send_full(m->get_orig_source_inst()); out: delete m; @@ -391,13 +391,13 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m) */ // first, verify the reporting host is valid - if (m->get_source().is_osd()) { - int from = m->get_source().num(); + if (m->get_orig_source().is_osd()) { + int from = m->get_orig_source().num(); if (!osdmap.exists(from) || - osdmap.get_addr(from) != m->get_source_inst().addr || + osdmap.get_addr(from) != m->get_orig_source_inst().addr || osdmap.is_down(from)) { dout(5) << "preprocess_failure from dead osd" << from << ", ignoring" << dendl; - send_incremental(m->get_from(), m->get_epoch()+1); + send_incremental(m->get_orig_source_inst(), m->get_epoch()+1); goto didit; } } @@ -405,27 +405,27 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m) // weird? if (!osdmap.have_inst(badboy)) { - dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_from() << dendl; + dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl; if (m->get_epoch() < osdmap.get_epoch()) - send_incremental(m->get_from(), m->get_epoch()+1); + send_incremental(m->get_orig_source_inst(), m->get_epoch()+1); goto didit; } if (osdmap.get_inst(badboy) != m->get_failed()) { dout(5) << "preprocess_failure wrong osd: report " << m->get_failed() << " != map's " << osdmap.get_inst(badboy) - << ", from " << m->get_from() << dendl; + << ", from " << m->get_orig_source_inst() << dendl; if (m->get_epoch() < osdmap.get_epoch()) - send_incremental(m->get_from(), m->get_epoch()+1); + send_incremental(m->get_orig_source_inst(), m->get_epoch()+1); goto didit; } // already reported? if (osdmap.is_down(badboy)) { - dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_from() << dendl; + dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl; if (m->get_epoch() < osdmap.get_epoch()) - send_incremental(m->get_from(), m->get_epoch()+1); + send_incremental(m->get_orig_source_inst(), m->get_epoch()+1); goto didit; } - dout(10) << "preprocess_failure new: " << m->get_failed() << ", from " << m->get_from() << dendl; + dout(10) << "preprocess_failure new: " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl; return false; didit: @@ -435,7 +435,7 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m) bool OSDMonitor::prepare_failure(MOSDFailure *m) { - dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_from() << dendl; + dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_orig_source_inst() << dendl; // FIXME // take their word for it @@ -455,8 +455,8 @@ bool OSDMonitor::prepare_failure(MOSDFailure *m) void OSDMonitor::_reported_failure(MOSDFailure *m) { - dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_from() << dendl; - send_latest(m->get_from(), m->get_epoch()); + dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_orig_source_inst() << dendl; + send_latest(m->get_orig_source_inst(), m->get_epoch()); delete m; } @@ -471,33 +471,27 @@ bool OSDMonitor::preprocess_boot(MOSDBoot *m) return true; } - // first time we've seen it? - if (m->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { - m->inst = m->get_source_inst(); - m->clear_payload(); - } - - assert(m->inst.name.is_osd()); - int from = m->inst.name.num(); + assert(m->get_orig_source_inst().name.is_osd()); + int from = m->get_orig_source_inst().name.num(); // already booted? if (osdmap.is_up(from) && - osdmap.get_inst(from) == m->inst) { + osdmap.get_inst(from) == m->get_orig_source_inst()) { // yup. - dout(7) << "preprocess_boot dup from " << m->inst << dendl; + dout(7) << "preprocess_boot dup from " << m->get_orig_source_inst() << dendl; _booted(m); return true; } - dout(10) << "preprocess_boot from " << m->inst << dendl; + dout(10) << "preprocess_boot from " << m->get_orig_source_inst() << dendl; return false; } bool OSDMonitor::prepare_boot(MOSDBoot *m) { - dout(7) << "prepare_boot from " << m->inst << dendl; - assert(m->inst.name.is_osd()); - int from = m->inst.name.num(); + dout(7) << "prepare_boot from " << m->get_orig_source_inst() << dendl; + assert(m->get_orig_source().is_osd()); + int from = m->get_orig_source().num(); // does this osd exist? if (!osdmap.exists(from)) { @@ -509,7 +503,7 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) // already up? mark down first? if (osdmap.is_up(from)) { dout(7) << "prepare_boot was up, first marking down " << osdmap.get_inst(from) << dendl; - assert(osdmap.get_inst(from) != m->inst); // preproces should have caught it + assert(osdmap.get_inst(from) != m->get_orig_source_inst()); // preproces should have caught it // mark previous guy down pending_inc.new_down[from] = false; @@ -518,7 +512,7 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) } else { // mark new guy up. down_pending_out.erase(from); // if any - pending_inc.new_up[from] = m->inst.addr; + pending_inc.new_up[from] = m->get_orig_source_addr(); // mark in? pending_inc.new_offload[from] = CEPH_OSD_IN; @@ -534,8 +528,9 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) void OSDMonitor::_booted(MOSDBoot *m) { - dout(7) << "_booted " << m->inst << " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl; - send_latest(m->inst, m->sb.current_epoch+1); + dout(7) << "_booted " << m->get_orig_source_inst() + << " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl; + send_latest(m->get_orig_source_inst(), m->sb.current_epoch+1); delete m; } @@ -545,25 +540,26 @@ void OSDMonitor::_booted(MOSDBoot *m) bool OSDMonitor::preprocess_alive(MOSDAlive *m) { - int from = m->get_source().num(); + int from = m->get_orig_source().num(); if (osdmap.is_up(from) && - osdmap.get_inst(from) == m->get_source_inst() && + osdmap.get_inst(from) == m->get_orig_source_inst() && osdmap.get_up_thru(from) >= m->map_epoch) { // yup. - dout(7) << "preprocess_alive e" << m->map_epoch << " dup from " << m->get_source_inst() << dendl; + dout(7) << "preprocess_alive e" << m->map_epoch << " dup from " << m->get_orig_source_inst() << dendl; _alive(m); return true; } - dout(10) << "preprocess_alive e" << m->map_epoch << " from " << m->get_source_inst() << dendl; + dout(10) << "preprocess_alive e" << m->map_epoch + << " from " << m->get_orig_source_inst() << dendl; return false; } bool OSDMonitor::prepare_alive(MOSDAlive *m) { - int from = m->get_source().num(); + int from = m->get_orig_source().num(); - dout(7) << "prepare_alive e" << m->map_epoch << " from " << m->get_source_inst() << dendl; + dout(7) << "prepare_alive e" << m->map_epoch << " from " << m->get_orig_source_inst() << dendl; pending_inc.new_up_thru[from] = m->map_epoch; paxos->wait_for_commit(new C_Alive(this,m )); return true; @@ -572,9 +568,9 @@ bool OSDMonitor::prepare_alive(MOSDAlive *m) void OSDMonitor::_alive(MOSDAlive *m) { dout(7) << "_alive e" << m->map_epoch - << " from " << m->get_source_inst() + << " from " << m->get_orig_source_inst() << dendl; - send_latest(m->get_source_inst(), m->map_epoch); + send_latest(m->get_orig_source_inst(), m->map_epoch); delete m; } diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index e0f30fbe58f0..656bbf30a02d 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -170,7 +170,7 @@ void PGMonitor::encode_pending(bufferlist &bl) bool PGMonitor::preprocess_query(Message *m) { - dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl; + dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case CEPH_MSG_STATFS: handle_statfs((MStatfs*)m); @@ -180,10 +180,7 @@ bool PGMonitor::preprocess_query(Message *m) { MPGStats *stats = (MPGStats*)m; - if (!stats->get_source().is_mon()) - stats->orig_src = stats->get_source_inst(); - - int from = m->get_source().num(); + int from = m->get_orig_source().num(); if (pg_map.osd_stat.count(from) || memcmp(&pg_map.osd_stat[from], &stats->osd_stat, sizeof(stats->osd_stat)) != 0) return false; // new osd stat @@ -201,7 +198,7 @@ bool PGMonitor::preprocess_query(Message *m) p != stats->pg_stat.end(); p++) ack->pg_stat[p->first] = p->second.reported; - mon->messenger->send_message(ack, stats->orig_src); + mon->messenger->send_message(ack, stats->get_orig_source_inst()); return true; } @@ -217,7 +214,7 @@ bool PGMonitor::preprocess_query(Message *m) bool PGMonitor::prepare_update(Message *m) { - dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << dendl; + dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; switch (m->get_type()) { case MSG_PGSTATS: return prepare_pg_stats((MPGStats*)m); @@ -240,7 +237,7 @@ void PGMonitor::committed() void PGMonitor::handle_statfs(MStatfs *statfs) { - dout(10) << "handle_statfs " << *statfs << " from " << statfs->get_source() << dendl; + dout(10) << "handle_statfs " << *statfs << " from " << statfs->get_orig_source() << dendl; // fill out stfs MStatfsReply *reply = new MStatfsReply(statfs->tid); @@ -253,17 +250,17 @@ void PGMonitor::handle_statfs(MStatfs *statfs) reply->stfs.f_objects = pg_map.total_osd_num_objects; // reply - mon->messenger->send_message(reply, statfs->get_source_inst()); + mon->messenger->send_message(reply, statfs->get_orig_source_inst()); delete statfs; } bool PGMonitor::prepare_pg_stats(MPGStats *stats) { - dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_source() << dendl; - int from = stats->get_source().num(); - if (!stats->get_source().is_osd() || + dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl; + int from = stats->get_orig_source().num(); + if (!stats->get_orig_source().is_osd() || !mon->osdmon->osdmap.is_up(from) || - stats->get_source_inst() != mon->osdmon->osdmap.get_inst(from)) { + stats->get_orig_source_inst() != mon->osdmon->osdmap.get_inst(from)) { dout(1) << " ignoring stats from non-active osd" << dendl; } @@ -314,7 +311,7 @@ bool PGMonitor::prepare_pg_stats(MPGStats *stats) pg_map.stat_pg_add(pgid, pg_map.pg_stat[pgid]); } - paxos->wait_for_commit(new C_Stats(this, ack, stats->orig_src)); + paxos->wait_for_commit(new C_Stats(this, ack, stats->get_orig_source_inst())); delete stats; return true; } diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index 186b58d4f094..1a07a05d8790 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -51,7 +51,7 @@ void PaxosService::dispatch(Message *m) if (!mon->is_leader()) { // fw to leader dout(10) << " fw to leader mon" << mon->get_leader() << dendl; - mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + mon->messenger->forward_message(m, mon->monmap->get_inst(mon->get_leader())); return; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 01ed7442eb5e..5c753c195719 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1020,7 +1020,7 @@ void OSD::send_boot() { int mon = monmap->pick_mon(); dout(10) << "send_boot to mon" << mon << dendl; - messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), + messenger->send_message(new MOSDBoot(superblock), monmap->get_inst(mon)); } @@ -1063,8 +1063,7 @@ void OSD::send_failures() int mon = monmap->pick_mon(); while (!failure_queue.empty()) { int osd = *failure_queue.begin(); - messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), - osdmap->get_inst(osd), osdmap->get_epoch()), + messenger->send_message(new MOSDFailure(monmap->fsid, osdmap->get_inst(osd), osdmap->get_epoch()), monmap->get_inst(mon)); failure_queue.erase(osd); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 2a94f9a03e06..be8ddc3ec0ff 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1022,8 +1022,7 @@ void Objecter::ms_handle_failure(Message *m, entity_name_t dest, const entity_in dout(0) << "ms_handle_failure " << dest << " inst " << inst << ", dropping, reporting to mon" << mon << dendl; - messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), inst, - osdmap->get_epoch()), + messenger->send_message(new MOSDFailure(monmap->fsid, inst, osdmap->get_epoch()), monmap->get_inst(mon)); } delete m; diff --git a/src/vstartnew.sh b/src/vstartnew.sh index 0a981c95027e..9378b688895e 100755 --- a/src/vstartnew.sh +++ b/src/vstartnew.sh @@ -34,7 +34,7 @@ done $CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap # --pgbits 2 $CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap -for osd in 0 #1 2 3 #4 5 6 7 8 9 10 11 12 13 14 15 +for osd in 0 1 2 3 #4 5 6 7 8 9 10 11 12 13 14 15 do $CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd # initialize empty object store #valgrind --leak-check=full --show-reachable=yes $CEPH_BIN/cosd dev/osd$osd --debug_ms 1 --debug_osd 20 --debug_filestore 10 1>out/o$osd & #--debug_osd 40