v0.3
+- fix mon osdmap send_full/latest ... never use get_source_inst()!
v0.4
- fix msgr protocol wrt resets, and varying connection close policies
messenger->set_dispatcher(&dispatcher);
// build command
- MMonCommand *m = new MMonCommand(monmap.fsid, messenger->get_myinst());
+ MMonCommand *m = new MMonCommand(monmap.fsid);
m->set_data(indata);
m->cmd.swap(vcmd);
int mon = monmap.pick_mon();
bufferlist bl;
map.encode(bl);
Messenger *messenger = new FakeMessenger(entity_name_t::ADMIN(-1));
- MMonCommand *m = new MMonCommand(monmap->fsid, messenger->get_myinst());
+ MMonCommand *m = new MMonCommand(monmap->fsid);
m->set_data(bl);
m->cmd.push_back("osd");
m->cmd.push_back("setmap");
bufferlist bl;
map.encode(bl);
Messenger *messenger = new FakeMessenger(entity_name_t::ADMIN(-1));
- MMonCommand *m = new MMonCommand(monmap->fsid, messenger->get_myinst());
+ MMonCommand *m = new MMonCommand(monmap->fsid);
m->set_data(bl);
m->cmd.push_back("osd");
m->cmd.push_back("setmap");
__le32 front_len;
__le32 data_off; /* sender: include full offset; receiver: mask against ~PAGE_MASK */
__le32 data_len; /* bytes of data payload */
- struct ceph_entity_inst src, dst;
+ struct ceph_entity_inst src, orig_src, dst;
} __attribute__ ((packed));
struct ceph_msg_footer {
beacon_seq_stamp[beacon_last_seq] = g_clock.now();
int mon = monmap->pick_mon();
- messenger->send_message(new MMDSBeacon(monmap->fsid, messenger->get_myinst(), mdsmap->get_epoch(),
+ messenger->send_message(new MMDSBeacon(monmap->fsid, mdsmap->get_epoch(),
want_state, beacon_last_seq, want_rank),
monmap->get_inst(mon));
class MMDSBeacon : public Message {
ceph_fsid fsid;
- entity_inst_t inst;
epoch_t last_epoch_seen; // include last mdsmap epoch mds has seen to avoid race with monitor decree
__u32 state;
version_t seq;
public:
MMDSBeacon() : Message(MSG_MDS_BEACON) {}
- MMDSBeacon(ceph_fsid &f, entity_inst_t i, epoch_t les, int st, version_t se, int wr) :
+ MMDSBeacon(ceph_fsid &f, epoch_t les, int st, version_t se, int wr) :
Message(MSG_MDS_BEACON),
- fsid(f), inst(i), last_epoch_seen(les), state(st), seq(se), want_rank(wr) { }
+ fsid(f), last_epoch_seen(les), state(st), seq(se), want_rank(wr) { }
ceph_fsid& get_fsid() { return fsid; }
- entity_inst_t& get_mds_inst() { return inst; }
epoch_t get_last_epoch_seen() { return last_epoch_seen; }
int get_state() { return state; }
version_t get_seq() { return seq; }
int get_want_rank() { return want_rank; }
void print(ostream& out) {
- out << "mdsbeacon(" << inst
- << " " << MDSMap::get_state_name(state)
+ out << "mdsbeacon(" << MDSMap::get_state_name(state)
<< " seq " << seq << ")";
}
void encode_payload() {
::encode(fsid, payload);
- ::encode(inst, payload);
::encode(last_epoch_seen, payload);
::encode(state, payload);
::encode(seq, payload);
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
- ::decode(inst, p);
::decode(last_epoch_seen, p);
::decode(state, p);
::decode(seq, p);
class MMonCommand : public Message {
public:
ceph_fsid fsid;
- entity_inst_t inst;
vector<string> cmd;
MMonCommand() : Message(MSG_MON_COMMAND) {}
- MMonCommand(ceph_fsid &f, entity_inst_t i) :
+ MMonCommand(ceph_fsid &f) :
Message(MSG_MON_COMMAND),
- fsid(f), inst(i) { }
+ fsid(f) { }
const char *get_type_name() { return "mon_command"; }
void print(ostream& o) {
void encode_payload() {
::encode(fsid, payload);
- ::encode(inst, payload);
::encode(cmd, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
- ::decode(inst, p);
::decode(cmd, p);
}
};
class MOSDBoot : public Message {
public:
- entity_inst_t inst;
OSDSuperblock sb;
MOSDBoot() {}
- MOSDBoot(entity_inst_t i, OSDSuperblock& s) :
+ MOSDBoot(OSDSuperblock& s) :
Message(MSG_OSD_BOOT),
- inst(i),
sb(s) {
}
const char *get_type_name() { return "osd_boot"; }
void print(ostream& out) {
- out << "osd_boot(" << inst << ")";
+ out << "osd_boot(osd" << sb.whoami << ")";
}
void encode_payload() {
- ::encode(inst, payload);
::encode(sb, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
- ::decode(inst, p);
::decode(sb, p);
}
};
class MOSDFailure : public Message {
public:
ceph_fsid fsid;
- entity_inst_t from;
entity_inst_t failed;
epoch_t epoch;
MOSDFailure() : Message(MSG_OSD_FAILURE) {}
- MOSDFailure(ceph_fsid &fs, entity_inst_t fr, entity_inst_t f, epoch_t e) :
+ MOSDFailure(ceph_fsid &fs, entity_inst_t f, epoch_t e) :
Message(MSG_OSD_FAILURE),
- fsid(fs), from(fr), failed(f), epoch(e) {}
+ fsid(fs), failed(f), epoch(e) {}
- entity_inst_t get_from() { return from; }
entity_inst_t get_failed() { return failed; }
epoch_t get_epoch() { return epoch; }
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
- ::decode(from, p);
::decode(failed, p);
::decode(epoch, p);
}
void encode_payload() {
::encode(fsid, payload);
- ::encode(from, payload);
::encode(failed, payload);
::encode(epoch, payload);
}
public:
map<pg_t,pg_stat_t> pg_stat;
osd_stat_t osd_stat;
- entity_inst_t orig_src;
MPGStats() : Message(MSG_PGSTATS) {}
void encode_payload() {
::encode(osd_stat, payload);
::encode(pg_stat, payload);
- ::encode(orig_src, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(osd_stat, p);
::decode(pg_stat, p);
- ::decode(orig_src, p);
}
};
bool MDSMonitor::preprocess_query(Message *m)
{
- dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl;
+ dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
void MDSMonitor::handle_mds_getmap(MMDSGetMap *m)
{
if (m->want <= mdsmap.get_epoch())
- send_full(m->get_source_inst());
+ send_full(m->get_orig_source_inst());
else
- waiting_for_map.push_back(m->get_source_inst());
+ waiting_for_map.push_back(m->get_orig_source_inst());
}
bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
{
- int from = m->get_mds_inst().name.num();
- entity_addr_t addr = m->get_mds_inst().addr;
+ int from = m->get_orig_source_inst().name.num();
+ entity_addr_t addr = m->get_orig_source_inst().addr;
int state = m->get_state();
version_t seq = m->get_seq();
goto out;
}
- // first time we've seen it?
- if (m->get_mds_inst().addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
- m->get_mds_inst() = m->get_source_inst();
- m->clear_payload();
- }
-
dout(12) << "preprocess_beacon " << *m
- << " from " << m->get_mds_inst()
+ << " from " << m->get_orig_source_inst()
<< dendl;
// fw to leader?
- if (!mon->is_leader()) {
- dout(10) << "fw to leader" << dendl;
- mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
- return true;
- }
+ if (!mon->is_leader())
+ return false;
// can i handle this query without a map update?
// note time and reply
dout(15) << "mds_beacon " << *m << " noting time and replying" << dendl;
last_beacon[addr] = g_clock.now();
- mon->messenger->send_message(new MMDSBeacon(mon->monmap->fsid, m->get_mds_inst(),
+ mon->messenger->send_message(new MMDSBeacon(mon->monmap->fsid,
mdsmap.get_epoch(), state, seq, 0),
- m->get_mds_inst());
+ m->get_orig_source_inst());
// done
out:
{
// -- this is an update --
dout(12) << "handle_beacon " << *m
- << " from " << m->get_mds_inst()
+ << " from " << m->get_orig_source_inst()
<< dendl;
- int from = m->get_mds_inst().name.num();
- entity_addr_t addr = m->get_mds_inst().addr;
+ int from = m->get_orig_source_inst().name.num();
+ entity_addr_t addr = m->get_orig_source_inst().addr;
int state = m->get_state();
version_t seq = m->get_seq();
{
if (from < 0) {
dout(10) << "_updated (booted) mds" << from << " " << *m << dendl;
- mon->osdmon->send_latest(m->get_source_inst());
+ mon->osdmon->send_latest(m->get_orig_source_inst());
} else {
dout(10) << "_updated mds" << from << " " << *m << dendl;
}
if (m->get_state() == MDSMap::STATE_STOPPED) {
// send the map manually (they're out of the map, so they won't get it automatic)
- send_latest(m->get_mds_inst());
+ send_latest(m->get_orig_source_inst());
}
delete m;
return;
}
- // first time we've seen it?
- if (m->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
- m->inst = m->get_source_inst();
- m->clear_payload();
- }
-
dout(0) << "handle_command " << *m << dendl;
string rs;
if (!m->cmd.empty()) {
{
MMonCommandAck *reply = new MMonCommandAck(rc, rs);
reply->set_data(rdata);
- messenger->send_message(reply, m->inst);
+ messenger->send_message(reply, m->get_orig_source_inst());
delete m;
}
bool OSDMonitor::preprocess_query(Message *m)
{
- dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl;
+ dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
// READs
bool OSDMonitor::prepare_update(Message *m)
{
- dout(7) << "prepare_update " << *m << " from " << m->get_source_inst() << dendl;
+ dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
// damp updates
void OSDMonitor::handle_osd_getmap(MOSDGetMap *m)
{
- dout(7) << "handle_osd_getmap from " << m->get_source()
+ dout(7) << "handle_osd_getmap from " << m->get_orig_source()
<< " start " << m->get_start_epoch()
<< dendl;
if (m->get_start_epoch()) {
if (m->get_start_epoch() <= osdmap.get_epoch())
- send_incremental(m->get_source_inst(), m->get_start_epoch());
+ send_incremental(m->get_orig_source_inst(), m->get_start_epoch());
else
- waiting_for_map[m->get_source_inst()] = m->get_start_epoch();
+ waiting_for_map[m->get_orig_source_inst()] = m->get_start_epoch();
} else
- send_full(m->get_source_inst());
+ send_full(m->get_orig_source_inst());
out:
delete m;
*/
// first, verify the reporting host is valid
- if (m->get_source().is_osd()) {
- int from = m->get_source().num();
+ if (m->get_orig_source().is_osd()) {
+ int from = m->get_orig_source().num();
if (!osdmap.exists(from) ||
- osdmap.get_addr(from) != m->get_source_inst().addr ||
+ osdmap.get_addr(from) != m->get_orig_source_inst().addr ||
osdmap.is_down(from)) {
dout(5) << "preprocess_failure from dead osd" << from << ", ignoring" << dendl;
- send_incremental(m->get_from(), m->get_epoch()+1);
+ send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
goto didit;
}
}
// weird?
if (!osdmap.have_inst(badboy)) {
- dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_from() << dendl;
+ dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
- send_incremental(m->get_from(), m->get_epoch()+1);
+ send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
goto didit;
}
if (osdmap.get_inst(badboy) != m->get_failed()) {
dout(5) << "preprocess_failure wrong osd: report " << m->get_failed() << " != map's " << osdmap.get_inst(badboy)
- << ", from " << m->get_from() << dendl;
+ << ", from " << m->get_orig_source_inst() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
- send_incremental(m->get_from(), m->get_epoch()+1);
+ send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
goto didit;
}
// already reported?
if (osdmap.is_down(badboy)) {
- dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_from() << dendl;
+ dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
- send_incremental(m->get_from(), m->get_epoch()+1);
+ send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
goto didit;
}
- dout(10) << "preprocess_failure new: " << m->get_failed() << ", from " << m->get_from() << dendl;
+ dout(10) << "preprocess_failure new: " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl;
return false;
didit:
bool OSDMonitor::prepare_failure(MOSDFailure *m)
{
- dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_from() << dendl;
+ dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_orig_source_inst() << dendl;
// FIXME
// take their word for it
void OSDMonitor::_reported_failure(MOSDFailure *m)
{
- dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_from() << dendl;
- send_latest(m->get_from(), m->get_epoch());
+ dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_orig_source_inst() << dendl;
+ send_latest(m->get_orig_source_inst(), m->get_epoch());
delete m;
}
return true;
}
- // first time we've seen it?
- if (m->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
- m->inst = m->get_source_inst();
- m->clear_payload();
- }
-
- assert(m->inst.name.is_osd());
- int from = m->inst.name.num();
+ assert(m->get_orig_source_inst().name.is_osd());
+ int from = m->get_orig_source_inst().name.num();
// already booted?
if (osdmap.is_up(from) &&
- osdmap.get_inst(from) == m->inst) {
+ osdmap.get_inst(from) == m->get_orig_source_inst()) {
// yup.
- dout(7) << "preprocess_boot dup from " << m->inst << dendl;
+ dout(7) << "preprocess_boot dup from " << m->get_orig_source_inst() << dendl;
_booted(m);
return true;
}
- dout(10) << "preprocess_boot from " << m->inst << dendl;
+ dout(10) << "preprocess_boot from " << m->get_orig_source_inst() << dendl;
return false;
}
bool OSDMonitor::prepare_boot(MOSDBoot *m)
{
- dout(7) << "prepare_boot from " << m->inst << dendl;
- assert(m->inst.name.is_osd());
- int from = m->inst.name.num();
+ dout(7) << "prepare_boot from " << m->get_orig_source_inst() << dendl;
+ assert(m->get_orig_source().is_osd());
+ int from = m->get_orig_source().num();
// does this osd exist?
if (!osdmap.exists(from)) {
// already up? mark down first?
if (osdmap.is_up(from)) {
dout(7) << "prepare_boot was up, first marking down " << osdmap.get_inst(from) << dendl;
- assert(osdmap.get_inst(from) != m->inst); // preproces should have caught it
+ assert(osdmap.get_inst(from) != m->get_orig_source_inst()); // preproces should have caught it
// mark previous guy down
pending_inc.new_down[from] = false;
} else {
// mark new guy up.
down_pending_out.erase(from); // if any
- pending_inc.new_up[from] = m->inst.addr;
+ pending_inc.new_up[from] = m->get_orig_source_addr();
// mark in?
pending_inc.new_offload[from] = CEPH_OSD_IN;
void OSDMonitor::_booted(MOSDBoot *m)
{
- dout(7) << "_booted " << m->inst << " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl;
- send_latest(m->inst, m->sb.current_epoch+1);
+ dout(7) << "_booted " << m->get_orig_source_inst()
+ << " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl;
+ send_latest(m->get_orig_source_inst(), m->sb.current_epoch+1);
delete m;
}
bool OSDMonitor::preprocess_alive(MOSDAlive *m)
{
- int from = m->get_source().num();
+ int from = m->get_orig_source().num();
if (osdmap.is_up(from) &&
- osdmap.get_inst(from) == m->get_source_inst() &&
+ osdmap.get_inst(from) == m->get_orig_source_inst() &&
osdmap.get_up_thru(from) >= m->map_epoch) {
// yup.
- dout(7) << "preprocess_alive e" << m->map_epoch << " dup from " << m->get_source_inst() << dendl;
+ dout(7) << "preprocess_alive e" << m->map_epoch << " dup from " << m->get_orig_source_inst() << dendl;
_alive(m);
return true;
}
- dout(10) << "preprocess_alive e" << m->map_epoch << " from " << m->get_source_inst() << dendl;
+ dout(10) << "preprocess_alive e" << m->map_epoch
+ << " from " << m->get_orig_source_inst() << dendl;
return false;
}
bool OSDMonitor::prepare_alive(MOSDAlive *m)
{
- int from = m->get_source().num();
+ int from = m->get_orig_source().num();
- dout(7) << "prepare_alive e" << m->map_epoch << " from " << m->get_source_inst() << dendl;
+ dout(7) << "prepare_alive e" << m->map_epoch << " from " << m->get_orig_source_inst() << dendl;
pending_inc.new_up_thru[from] = m->map_epoch;
paxos->wait_for_commit(new C_Alive(this,m ));
return true;
void OSDMonitor::_alive(MOSDAlive *m)
{
dout(7) << "_alive e" << m->map_epoch
- << " from " << m->get_source_inst()
+ << " from " << m->get_orig_source_inst()
<< dendl;
- send_latest(m->get_source_inst(), m->map_epoch);
+ send_latest(m->get_orig_source_inst(), m->map_epoch);
delete m;
}
bool PGMonitor::preprocess_query(Message *m)
{
- dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl;
+ dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case CEPH_MSG_STATFS:
handle_statfs((MStatfs*)m);
{
MPGStats *stats = (MPGStats*)m;
- if (!stats->get_source().is_mon())
- stats->orig_src = stats->get_source_inst();
-
- int from = m->get_source().num();
+ int from = m->get_orig_source().num();
if (pg_map.osd_stat.count(from) ||
memcmp(&pg_map.osd_stat[from], &stats->osd_stat, sizeof(stats->osd_stat)) != 0)
return false; // new osd stat
p != stats->pg_stat.end();
p++)
ack->pg_stat[p->first] = p->second.reported;
- mon->messenger->send_message(ack, stats->orig_src);
+ mon->messenger->send_message(ack, stats->get_orig_source_inst());
return true;
}
bool PGMonitor::prepare_update(Message *m)
{
- dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << dendl;
+ dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_PGSTATS:
return prepare_pg_stats((MPGStats*)m);
void PGMonitor::handle_statfs(MStatfs *statfs)
{
- dout(10) << "handle_statfs " << *statfs << " from " << statfs->get_source() << dendl;
+ dout(10) << "handle_statfs " << *statfs << " from " << statfs->get_orig_source() << dendl;
// fill out stfs
MStatfsReply *reply = new MStatfsReply(statfs->tid);
reply->stfs.f_objects = pg_map.total_osd_num_objects;
// reply
- mon->messenger->send_message(reply, statfs->get_source_inst());
+ mon->messenger->send_message(reply, statfs->get_orig_source_inst());
delete statfs;
}
bool PGMonitor::prepare_pg_stats(MPGStats *stats)
{
- dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_source() << dendl;
- int from = stats->get_source().num();
- if (!stats->get_source().is_osd() ||
+ dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl;
+ int from = stats->get_orig_source().num();
+ if (!stats->get_orig_source().is_osd() ||
!mon->osdmon->osdmap.is_up(from) ||
- stats->get_source_inst() != mon->osdmon->osdmap.get_inst(from)) {
+ stats->get_orig_source_inst() != mon->osdmon->osdmap.get_inst(from)) {
dout(1) << " ignoring stats from non-active osd" << dendl;
}
pg_map.stat_pg_add(pgid, pg_map.pg_stat[pgid]);
}
- paxos->wait_for_commit(new C_Stats(this, ack, stats->orig_src));
+ paxos->wait_for_commit(new C_Stats(this, ack, stats->get_orig_source_inst()));
delete stats;
return true;
}
if (!mon->is_leader()) {
// fw to leader
dout(10) << " fw to leader mon" << mon->get_leader() << dendl;
- mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
+ mon->messenger->forward_message(m, mon->monmap->get_inst(mon->get_leader()));
return;
}
{
int mon = monmap->pick_mon();
dout(10) << "send_boot to mon" << mon << dendl;
- messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock),
+ messenger->send_message(new MOSDBoot(superblock),
monmap->get_inst(mon));
}
int mon = monmap->pick_mon();
while (!failure_queue.empty()) {
int osd = *failure_queue.begin();
- messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(),
- osdmap->get_inst(osd), osdmap->get_epoch()),
+ messenger->send_message(new MOSDFailure(monmap->fsid, osdmap->get_inst(osd), osdmap->get_epoch()),
monmap->get_inst(mon));
failure_queue.erase(osd);
}
dout(0) << "ms_handle_failure " << dest << " inst " << inst
<< ", dropping, reporting to mon" << mon
<< dendl;
- messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), inst,
- osdmap->get_epoch()),
+ messenger->send_message(new MOSDFailure(monmap->fsid, inst, osdmap->get_epoch()),
monmap->get_inst(mon));
}
delete m;
$CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap # --pgbits 2
$CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap
-for osd in 0 #1 2 3 #4 5 6 7 8 9 10 11 12 13 14 15
+for osd in 0 1 2 3 #4 5 6 7 8 9 10 11 12 13 14 15
do
$CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd # initialize empty object store
#valgrind --leak-check=full --show-reachable=yes $CEPH_BIN/cosd dev/osd$osd --debug_ms 1 --debug_osd 20 --debug_filestore 10 1>out/o$osd & #--debug_osd 40