monc(mc),
logger(NULL), logger_started(false),
store(NULL),
- logclient(messenger, &mc->monmap, mc),
+ logclient(cluster_messenger, &mc->monmap, mc),
whoami(id),
dev_path(dev), journal_path(jdev),
dispatch_running(false),
scrub_wq(this, &disk_tp),
remove_wq(this, &disk_tp)
{
- monc->set_messenger(messenger);
+ monc->set_messenger(cluster_messenger);
osdmap = 0;
dout(0) << "got SIGTERM, shutting down" << dendl;
Message *m = new MGenericMessage(CEPH_MSG_SHUTDOWN);
m->set_priority(CEPH_MSG_PRIO_HIGHEST);
- messenger->send_message(m, messenger->get_myinst());
+ cluster_messenger->send_message(m, cluster_messenger->get_myinst());
return;
}
if (got_sigterm) {
dout(0) << "got SIGTERM, shutting down" << dendl;
- messenger->send_message(new MGenericMessage(CEPH_MSG_SHUTDOWN),
- messenger->get_myinst());
+ cluster_messenger->send_message(new MGenericMessage(CEPH_MSG_SHUTDOWN),
+ cluster_messenger->get_myinst());
return;
}
q++) {
if (osdmap->is_up(q->first)) {
MOSDPGRemove *m = new MOSDPGRemove(p->first, q->second);
- messenger->send_message(m, osdmap->get_inst(q->first));
+ cluster_messenger->send_message(m, osdmap->get_cluster_inst(q->first));
}
}
remove_list.clear();
entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
if (hb_addr.is_blank_addr()) {
int port = hb_addr.get_port();
- hb_addr = messenger->get_myaddr();
+ hb_addr = cluster_messenger->get_myaddr();
hb_addr.set_port(port);
}
monc->send_mon_message(new MOSDBoot(superblock, hb_addr));
void OSD::note_down_osd(int osd)
{
- messenger->mark_down(osdmap->get_addr(osd));
+ cluster_messenger->mark_down(osdmap->get_cluster_addr(osd));
heartbeat_lock.Lock();
// all the way?
if (advanced && cur == superblock.newest_map) {
if (osdmap->is_up(whoami) &&
- osdmap->get_addr(whoami) == messenger->get_myaddr()) {
+ osdmap->get_addr(whoami) == client_messenger->get_myaddr()) {
// yay!
activate_map(t, fin->contexts);
if (osdmap->get_epoch() > 0 &&
state != STATE_BOOTING &&
(!osdmap->exists(whoami) ||
- (!osdmap->is_up(whoami) && osdmap->get_addr(whoami) == messenger->get_myaddr()))) {
+ (!osdmap->is_up(whoami) && osdmap->get_addr(whoami) == client_messenger->get_myaddr()))) {
dout(0) << "map says i am down. switching to boot state." << dendl;
//shutdown();
if (!up_epoch &&
osdmap->is_up(whoami) &&
- osdmap->get_inst(whoami) == messenger->get_myinst()) {
+ osdmap->get_inst(whoami) == client_messenger->get_myinst()) {
up_epoch = osdmap->get_epoch();
dout(10) << "up_epoch is " << up_epoch << dendl;
if (!boot_epoch) {
assert(0); // we should have all maps.
}
}
-
+ Messenger *msgr = client_messenger;
+ if (entity_name_t::TYPE_OSD == inst.name._type) msgr = cluster_messenger;
if (lazy)
- messenger->lazy_send_message(m, inst); // only if we already have an open connection
+ msgr->lazy_send_message(m, inst); // only if we already have an open connection
else
- messenger->send_message(m, inst);
+ msgr->send_message(m, inst);
}
bool OSD::get_map_bl(epoch_t e, bufferlist& bl)
}
dout(7) << "do_notify osd" << it->first << " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second);
- _share_map_outgoing(osdmap->get_inst(it->first));
- messenger->send_message(m, osdmap->get_inst(it->first));
+ _share_map_outgoing(osdmap->get_cluster_inst(it->first));
+ cluster_messenger->send_message(m, osdmap->get_cluster_inst(it->first));
}
}
dout(7) << "do_queries querying osd" << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(), pit->second);
- _share_map_outgoing(osdmap->get_inst(who));
- messenger->send_message(m, osdmap->get_inst(who));
+ _share_map_outgoing(osdmap->get_cluster_inst(who));
+ cluster_messenger->send_message(m, osdmap->get_cluster_inst(who));
}
}
for (map<int,MOSDPGInfo*>::iterator p = info_map.begin();
p != info_map.end();
++p)
- messenger->send_message(p->second, osdmap->get_inst(p->first));
+ cluster_messenger->send_message(p->second, osdmap->get_inst(p->first));
info_map.clear();
}
dout(10) << *pg << " sending " << mlog->log << " " << mlog->missing << dendl;
//m->log.print(cout);
- _share_map_outgoing(osdmap->get_inst(from));
- messenger->send_message(mlog, m->get_connection());
+ _share_map_outgoing(osdmap->get_cluster_inst(from));
+ cluster_messenger->send_message(mlog, m->get_connection());
}
}
MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
m->missing = pg->missing;
m->log = pg->log;
- _share_map_outgoing(osdmap->get_inst(pg->get_primary()));
- messenger->send_message(m, osdmap->get_inst(pg->get_primary()));
+ _share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary()));
+ cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary()));
} else {
dout(10) << *pg << " generated backlog, peering" << dendl;
flags = CEPH_OSD_FLAG_ACK;
MOSDOpReply *reply = new MOSDOpReply(op, err, osdmap->get_epoch(), flags);
- messenger->send_message(reply, op->get_connection());
+ Messenger *msgr = client_messenger;
+ if (op->get_source().is_osd())
+ msgr = cluster_messenger;
+ msgr->send_message(reply, op->get_connection());
op->put();
}
// do this preemptively while we hold osd_lock and pg->lock
// to avoid lock ordering issues later.
for (unsigned i=1; i<pg->acting.size(); i++)
- _share_map_outgoing( osdmap->get_inst(pg->acting[i]) );
+ _share_map_outgoing( osdmap->get_cluster_inst(pg->acting[i]) );
}
osd_lock.Unlock();