// cons/des
OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
- Messenger *hbm, MonClient *mc,
+ Messenger *hbinm, Messenger *hboutm, MonClient *mc,
const std::string &dev, const std::string &jdev) :
- Dispatcher(hbm->cct),
+ Dispatcher(external_messenger->cct),
osd_lock("OSD::osd_lock"),
- timer(hbm->cct, osd_lock),
+ timer(external_messenger->cct, osd_lock),
cluster_messenger(internal_messenger),
client_messenger(external_messenger),
monc(mc),
logger(NULL),
store(NULL),
map_in_progress(false),
- clog(hbm->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
+ clog(external_messenger->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
whoami(id),
dev_path(dev), journal_path(jdev),
dispatch_running(false),
ceph_osd_feature_ro_compat,
ceph_osd_feature_incompat),
state(STATE_BOOTING), boot_epoch(0), up_epoch(0),
- op_tp(hbm->cct, "OSD::op_tp", g_conf->osd_op_threads),
- recovery_tp(hbm->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads),
- disk_tp(hbm->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
+ op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads),
+ recovery_tp(external_messenger->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads),
+ disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_epoch(0),
- heartbeat_messenger(hbm),
+ hbin_messenger(hbinm),
+ hbout_messenger(hboutm),
heartbeat_thread(this),
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
watch_lock("OSD::watch_lock"),
- watch_timer(hbm->cct, watch_lock)
+ watch_timer(external_messenger->cct, watch_lock)
{
monc->set_messenger(client_messenger);
client_messenger->add_dispatcher_head(&clog);
cluster_messenger->add_dispatcher_head(this);
- heartbeat_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ hbin_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ hbout_messenger->add_dispatcher_head(&heartbeat_dispatcher);
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
monc->init();
client_messenger->shutdown();
cluster_messenger->shutdown();
- if (heartbeat_messenger)
- heartbeat_messenger->shutdown();
+ hbin_messenger->shutdown();
+ hbout_messenger->shutdown();
monc->shutdown();
assert(osd_lock.is_locked());
heartbeat_lock.Lock();
- /*
- for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
- if (heartbeat_inst.count(p->first) == 0)
- dout(0) << " no inst for _to " << p->first << dendl;
- for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
- if (heartbeat_inst.count(p->first) == 0)
- dout(0) << " no inst for _from " << p->first << dendl;
- */
-
// filter heartbeat_from_stamp to only include osds that remain in
// heartbeat_from.
map<int, utime_t> old_from_stamp;
old_from_stamp.swap(heartbeat_from_stamp);
- map<int, epoch_t> old_to, old_from;
+ map<int, epoch_t> old_from;
map<int, Connection*> old_con;
- old_to.swap(heartbeat_to);
old_from.swap(heartbeat_from);
- old_con.swap(heartbeat_con);
+ old_con.swap(heartbeat_from_con);
utime_t now = ceph_clock_now(g_ceph_context);
heartbeat_epoch = osdmap->get_epoch();
- // grandfather newer _to peers
- for (map<int,epoch_t>::iterator p = old_to.begin();
- p != old_to.end();
- p++) {
- if (p->second > osdmap->get_epoch()) {
- dout(10) << "update_heartbeat_peers: keeping newer _to peer osd" << p->first
- << " " << old_con[p->first]->get_peer_addr()
- << " as of " << p->second << dendl;
- heartbeat_to[p->first] = p->second;
- heartbeat_con[p->first] = old_con[p->first];
- }
- }
-
- // build heartbeat to/from set
+ // build heartbeat from set
for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
i != pg_map.end();
i++) {
PG *pg = i->second;
// replicas ping primary.
- if (pg->get_role() > 0) {
- assert(pg->acting.size() > 1);
- int p = pg->acting[0];
- if (heartbeat_to.count(p))
- continue;
- heartbeat_to[p] = osdmap->get_epoch();
- heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
- if (old_to.count(p) == 0 || old_con[p] != heartbeat_con[p])
- dout(10) << "update_heartbeat_peers: new _to osd" << p
- << " " << heartbeat_con[p]->get_peer_addr() << dendl;
- }
- else if (pg->get_role() == 0) {
+ if (pg->get_role() == 0) {
assert(pg->acting[0] == whoami);
for (unsigned i=1; i<pg->acting.size(); i++) {
int p = pg->acting[i]; // peer
if (heartbeat_from.count(p))
continue;
heartbeat_from[p] = osdmap->get_epoch();
- if (!heartbeat_con.count(p)) {
- // Don't update _con, might be from a newer map
- heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
- }
- if (old_from_stamp.count(p) && old_from.count(p) &&
- old_con[p] == heartbeat_con[p]) {
+ Connection *con = hbin_messenger->get_connection(osdmap->get_hb_inst(p));
+ heartbeat_from_con[p] = con;
+ if (old_from_stamp.count(p) && old_from.count(p) && old_con[p] == con) {
// have a stamp _AND_ i'm not new to the set
heartbeat_from_stamp[p] = old_from_stamp[p];
} else {
dout(10) << "update_heartbeat_peers: new _from osd" << p
- << " " << heartbeat_con[p]->get_peer_addr() << dendl;
+ << " " << con->get_peer_addr() << dendl;
heartbeat_from_stamp[p] = now;
MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch,
- MOSDPing::REQUEST_HEARTBEAT);
- heartbeat_messenger->send_message(m, heartbeat_con[p]);
+ MOSDPing::START_HEARTBEAT);
+ hbin_messenger->send_message(m, con);
}
}
}
}
- map<int, Connection*> down;
-
- for (map<int,epoch_t>::iterator p = old_to.begin();
- p != old_to.end();
- p++) {
- assert(old_con.count(p->first));
- if (heartbeat_to.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
- continue;
- assert(p->second <= osdmap->get_epoch());
-
- // share latest map with this peer so they know not to expect
- // heartbeats from us. otherwise they may mark us down!
- if (osdmap->is_up(p->first) && !is_booting()) {
- dout(10) << "update_heartbeat_peers: sharing map with old _to peer osd" << p->first << dendl;
- _share_map_outgoing(osdmap->get_cluster_inst(p->first));
- }
-
- if (heartbeat_from.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
- dout(10) << "update_heartbeat_peers: old _to peer osd" << p->first
- << " " << old_con[p->first]->get_peer_addr()
- << " is still a _from peer, not marking down" << dendl;
- } else {
- dout(10) << "update_heartbeat_peers: will mark down old _to peer osd" << p->first
- << " " << old_con[p->first]->get_peer_addr()
- << " as of " << p->second << dendl;
- down[p->first] = old_con[p->first];
- }
- }
for (map<int,epoch_t>::iterator p = old_from.begin();
p != old_from.end();
p++) {
assert(old_con.count(p->first));
- if (heartbeat_from.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
+ Connection *con = old_con[p->first];
+
+ if (heartbeat_from.count(p->first) && heartbeat_from_con[p->first] == con) {
+ con->put();
continue;
+ }
// share latest map with this peer, just to be nice.
if (osdmap->is_up(p->first) && !is_booting()) {
_share_map_outgoing(osdmap->get_cluster_inst(p->first));
}
- if (heartbeat_to.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
- dout(10) << "update_heartbeat_peers: old _from peer osd" << p->first
- << " " << old_con[p->first]->get_peer_addr()
- << " is still a _to peer, not marking down" << dendl;
- } else {
- dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first
- << " " << old_con[p->first]->get_peer_addr()
- << " as of " << p->second << dendl;
- down[p->first] = old_con[p->first];
- }
- }
- for (map<int, Connection*>::iterator p = down.begin(); p != down.end(); ++p) {
- Connection *con = p->second;
- heartbeat_messenger->mark_disposable(con);
+ dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first
+ << " " << con->get_peer_addr()
+ << " as of " << p->second << dendl;
+
if (!osdmap->is_up(p->first) && !is_booting()) {
dout(10) << "update_heartbeat_peers: telling old peer osd" << p->first
<< " " << old_con[p->first]->get_peer_addr()
<< " they are down" << dendl;
- heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
- heartbeat_epoch,
- MOSDPing::YOU_DIED), con);
+ hbin_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
+ heartbeat_epoch,
+ MOSDPing::YOU_DIED), con);
+ hbin_messenger->mark_disposable(con);
+ hbin_messenger->mark_down_on_empty(con);
+ } else {
+ // tell them to stop sending heartbeats
+ hbin_messenger->send_message(new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch,
+ MOSDPing::STOP_HEARTBEAT), con);
}
- heartbeat_messenger->mark_down_on_empty(con);
- con->put();
- if (!osdmap->is_up(p->first))
+ if (!osdmap->is_up(p->first)) {
forget_peer_epoch(p->first, osdmap->get_epoch());
+ }
+ con->put();
}
dout(10) << "update_heartbeat_peers: hb to: " << heartbeat_to << dendl;
dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
- /*
- for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
- if (heartbeat_inst.count(p->first) == 0)
- dout(0) << " no inst for _to " << p->first << dendl;
- for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
- if (heartbeat_inst.count(p->first) == 0)
- dout(0) << " no inst for _from " << p->first << dendl;
- */
-
heartbeat_lock.Unlock();
}
heartbeat_to.clear();
heartbeat_from.clear();
heartbeat_from_stamp.clear();
- while (!heartbeat_con.empty()) {
- heartbeat_con.begin()->second->put();
- heartbeat_con.erase(heartbeat_con.begin());
+ while (!heartbeat_to_con.empty()) {
+ heartbeat_to_con.begin()->second->put();
+ heartbeat_to_con.erase(heartbeat_to_con.begin());
+ }
+ while (!heartbeat_from_con.empty()) {
+ heartbeat_from_con.begin()->second->put();
+ heartbeat_from_con.erase(heartbeat_from_con.begin());
}
failure_queue.clear();
heartbeat_lock.Unlock();
heartbeat_lock.Lock();
int from = m->get_source().num();
-
bool locked = map_lock.try_get_read();
- // ignore (and mark down connection for) old messages
- epoch_t e = m->map_epoch;
- if (!e)
- e = m->peer_as_of_epoch;
- if (e <= osdmap->get_epoch() &&
- ((heartbeat_to.count(from) == 0 && heartbeat_from.count(from) == 0) ||
- heartbeat_con[from] != m->get_connection())) {
- dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst()
- << " after old message from epoch " << e
- << " <= current " << osdmap->get_epoch() << dendl;
- heartbeat_messenger->mark_down(m->get_connection());
- goto out;
- }
-
switch (m->op) {
- case MOSDPing::REQUEST_HEARTBEAT:
- if (m->peer_as_of_epoch <= osdmap->get_epoch()) {
- dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst()
- << " request for heartbeats as_of " << m->peer_as_of_epoch
- << " <= current " << osdmap->get_epoch() << dendl;
- } else if (heartbeat_to.count(from) && m->peer_as_of_epoch <= heartbeat_to[from]) {
- dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst()
- << " request for heartbeats as_of " << m->peer_as_of_epoch
- << " <= current _to as_of " << heartbeat_to[from] << dendl;
+ case MOSDPing::START_HEARTBEAT:
+ if (heartbeat_to.count(from)) {
+ if (heartbeat_to_con[from] != m->get_connection()) {
+ // different connection
+ if (heartbeat_to[from] > m->peer_as_of_epoch) {
+ dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst()
+ << " after old start message from epoch " << m->peer_as_of_epoch
+ << " <= current " << heartbeat_to[from] << dendl;
+ hbout_messenger->mark_down(m->get_connection());
+ } else {
+ dout(5) << "handle_osd_ping replacing old peer "
+ << heartbeat_to_con[from]->get_peer_addr()
+ << " epoch " << heartbeat_to[from]
+ << " with new peer " << m->get_source_inst()
+ << " epoch " << m->peer_as_of_epoch
+ << dendl;
+ hbout_messenger->mark_down(heartbeat_to_con[from]);
+ heartbeat_to_con[from]->put();
+
+ // remember new connection
+ heartbeat_to[from] = m->peer_as_of_epoch;
+ heartbeat_to_con[from] = m->get_connection();
+ heartbeat_to_con[from]->get();
+ }
+ } else {
+ // same connection
+ dout(5) << "handle_osd_ping dup peer " << m->get_source_inst()
+ << " request for heartbeats as_of " << m->peer_as_of_epoch
+ << ", same connection" << dendl;
+ heartbeat_to[from] = m->peer_as_of_epoch;
+ }
} else {
+ // new
dout(5) << "handle_osd_ping peer " << m->get_source_inst()
<< " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl;
heartbeat_to[from] = m->peer_as_of_epoch;
- if (heartbeat_con.count(from))
- heartbeat_con[from]->put();
- heartbeat_con[from] = m->get_connection();
- heartbeat_con[from]->get();
+ heartbeat_to_con[from] = m->get_connection();
+ heartbeat_to_con[from]->get();
- if (locked && m->map_epoch && !is_booting())
+ if (locked && m->map_epoch && !is_booting()) {
_share_map_incoming(m->get_source_inst(), m->map_epoch,
(Session*) m->get_connection()->get_priv());
+ }
}
break;
- case MOSDPing::HEARTBEAT:
- if (heartbeat_from.count(from) &&
- heartbeat_con[from] == m->get_connection()) {
+ case MOSDPing::STOP_HEARTBEAT:
+ {
+ if (heartbeat_to.count(from) && heartbeat_to_con[from] == m->get_connection()) {
+ dout(5) << "handle_osd_ping peer " << m->get_source_inst()
+ << " stopping heartbeats as_of " << m->peer_as_of_epoch << dendl;
+ heartbeat_to.erase(from);
+ //hbout_messenger->mark_down(heartbeat_to_con[from]);
+ heartbeat_to_con[from]->put();
+ heartbeat_to_con.erase(from);
+ } else {
+ dout(5) << "handle_osd_ping peer " << m->get_source_inst()
+ << " ignoring stop request as_of " << m->peer_as_of_epoch << dendl;
+ }
+ }
+ break;
+ case MOSDPing::HEARTBEAT:
+ if (heartbeat_from.count(from) && heartbeat_from_con[from] == m->get_connection()) {
dout(20) << "handle_osd_ping " << m->get_source_inst() << dendl;
note_peer_epoch(from, m->map_epoch);
break;
}
-out:
if (locked)
map_lock.put_read();
i != heartbeat_to.end();
i++) {
int peer = i->first;
- if (heartbeat_con.count(peer)) {
- dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
- Message *m = new MOSDPing(osdmap->get_fsid(),
- map_locked ? osdmap->get_epoch():0,
- i->second, MOSDPing::HEARTBEAT);
- m->set_priority(CEPH_MSG_PRIO_HIGH);
- dout(30) << "heartbeat sending ping to osd" << peer << dendl;
- heartbeat_messenger->send_message(m, heartbeat_con[peer]);
- }
+ dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
+ Message *m = new MOSDPing(osdmap->get_fsid(),
+ map_locked ? osdmap->get_epoch():0,
+ i->second, MOSDPing::HEARTBEAT);
+ m->set_priority(CEPH_MSG_PRIO_HIGH);
+ dout(30) << "heartbeat sending ping to osd" << peer << dendl;
+ hbout_messenger->send_message(m, heartbeat_to_con[peer]);
}
if (map_locked) {
cluster_messenger->set_ip(cluster_addr);
dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
}
- entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
+ entity_addr_t hb_addr = hbout_messenger->get_myaddr();
if (hb_addr.is_blank_ip()) {
int port = hb_addr.get_port();
hb_addr = cluster_addr;
hb_addr.set_port(port);
- heartbeat_messenger->set_ip(hb_addr);
+ hbout_messenger->set_ip(hb_addr);
dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl;
}
MOSDBoot *mboot = new MOSDBoot(superblock, hb_addr, cluster_addr);
heartbeat_lock.Lock();
- // note: update_heartbeat_peers will mark down the heartbeat connection.
+ // clean out down osds i am sending heartbeats _to_.
+ // update_heartbeat_peers() will clean out peers i expect heartbeast _from_.
+ if (heartbeat_to.count(peer) &&
+ heartbeat_to[peer] < osdmap->get_epoch()) {
+ dout(10) << "note_down_osd osd" << peer << " marking down hbout connection "
+ << heartbeat_to_con[peer]->get_peer_addr() << dendl;
+ hbout_messenger->mark_down(heartbeat_to_con[peer]);
+ heartbeat_to_con[peer]->put();
+ heartbeat_to_con.erase(peer);
+ heartbeat_to.erase(peer);
+ }
failure_queue.erase(peer);
failure_pending.erase(peer);
} else if (!osdmap->is_up(whoami) ||
!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
- !osdmap->get_hb_addr(whoami).probably_equals(heartbeat_messenger->get_myaddr())) {
+ !osdmap->get_hb_addr(whoami).probably_equals(hbout_messenger->get_myaddr())) {
if (!osdmap->is_up(whoami))
clog.warn() << "map e" << osdmap->get_epoch()
<< " wrongly marked me down or wrong addr";
clog.warn() << "map e" << osdmap->get_epoch()
<< " had wrong client addr (" << osdmap->get_cluster_addr(whoami)
<< " != my " << cluster_messenger->get_myaddr();
- else if (osdmap->get_hb_addr(whoami).probably_equals(heartbeat_messenger->get_myaddr()))
+ else if (osdmap->get_hb_addr(whoami).probably_equals(hbout_messenger->get_myaddr()))
clog.warn() << "map e" << osdmap->get_epoch()
<< " had wrong client addr (" << osdmap->get_hb_addr(whoami)
- << " != my " << heartbeat_messenger->get_myaddr();
+ << " != my " << hbout_messenger->get_myaddr();
state = STATE_BOOTING;
up_epoch = 0;
do_restart = true;
int cport = cluster_messenger->get_myaddr().get_port();
- int hbport = heartbeat_messenger->get_myaddr().get_port();
+ int hbport = hbout_messenger->get_myaddr().get_port();
int r = cluster_messenger->rebind(hbport);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
- r = heartbeat_messenger->rebind(cport);
+ r = hbout_messenger->rebind(cport);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?