The hb_front messenger is not used yet.
Signed-off-by: Sage Weil <sage@inktank.com>
Messenger *messenger_hbclient = Messenger::create(g_ceph_context,
entity_name_t::OSD(whoami), "hbclient",
getpid());
- Messenger *messenger_hbserver = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "hbserver",
+ Messenger *messenger_hb_back_server = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hb_back_server",
+ getpid());
+ Messenger *messenger_hb_front_server = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hb_front_server",
getpid());
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- messenger_hbserver->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ messenger_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ messenger_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cout << "starting osd." << whoami
<< " at " << client_messenger->get_myaddr()
Messenger::Policy::stateless_server(0, 0));
messenger_hbclient->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::lossy_client(0, 0));
- messenger_hbserver->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(0, 0));
+ Messenger::Policy::lossy_client(0, 0));
+ messenger_hb_back_server->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0, 0));
+ messenger_hb_front_server->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0, 0));
r = client_messenger->bind(g_conf->public_addr);
if (r < 0)
if (hb_addr.is_ip())
hb_addr.set_port(0);
}
- r = messenger_hbserver->bind(hb_addr);
+ r = messenger_hb_back_server->bind(hb_addr);
if (r < 0)
exit(1);
global_init_chdir(g_ceph_context);
osd = new OSD(whoami, cluster_messenger, client_messenger,
- messenger_hbclient, messenger_hbserver,
+ messenger_hbclient, messenger_hb_front_server, messenger_hb_back_server,
&mc,
g_conf->osd_data, g_conf->osd_journal);
client_messenger->start();
messenger_hbclient->start();
- messenger_hbserver->start();
+ messenger_hb_front_server->start();
+ messenger_hb_back_server->start();
cluster_messenger->start();
// install signal handlers
client_messenger->wait();
messenger_hbclient->wait();
- messenger_hbserver->wait();
+ messenger_hb_front_server->wait();
+ messenger_hb_back_server->wait();
cluster_messenger->wait();
unregister_async_signal_handler(SIGHUP, sighup_handler);
delete osd;
delete client_messenger;
delete messenger_hbclient;
- delete messenger_hbserver;
+ delete messenger_hb_front_server;
+ delete messenger_hb_back_server;
delete cluster_messenger;
client_byte_throttler.reset();
client_msg_throttler.reset();
// cons/des
OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
- Messenger *hbclientm, Messenger *hbserverm, MonClient *mc,
+ Messenger *hb_clientm,
+ Messenger *hb_front_serverm,
+ Messenger *hb_back_serverm,
+ MonClient *mc,
const std::string &dev, const std::string &jdev) :
Dispatcher(external_messenger->cct),
osd_lock("OSD::osd_lock"),
paused_recovery(false),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0),
- hbclient_messenger(hbclientm),
- hbserver_messenger(hbserverm),
+ hbclient_messenger(hb_clientm),
+ hb_front_server_messenger(hb_front_serverm),
+ hb_back_server_messenger(hb_back_serverm),
heartbeat_thread(this),
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
cluster_messenger->add_dispatcher_head(this);
hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
- hbserver_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
r = monc->init();
client_messenger->shutdown();
cluster_messenger->shutdown();
hbclient_messenger->shutdown();
- hbserver_messenger->shutdown();
+ hb_front_server_messenger->shutdown();
+ hb_back_server_messenger->shutdown();
peering_wq.clear();
return r;
}
curmap->get_epoch(),
MOSDPing::PING_REPLY,
m->stamp);
- hbserver_messenger->send_message(r, m->get_connection());
+ hb_back_server_messenger->send_message(r, m->get_connection());
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
if (name.is_osd() &&
osdmap->is_up(name.num()) &&
(osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
- osdmap->get_hb_addr(name.num()) == con->get_peer_addr())) {
+ osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
// remember
epoch_t has = note_peer_epoch(name.num(), epoch);
} else if (!osdmap->is_up(whoami) ||
!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
- !osdmap->get_hb_back_addr(whoami).probably_equals(hbserver_messenger->get_myaddr())) {
+ !osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr())) {
if (!osdmap->is_up(whoami)) {
if (service.is_preparing_to_stop()) {
service.got_stop_ack();
clog.error() << "map e" << osdmap->get_epoch()
<< " had wrong cluster addr (" << osdmap->get_cluster_addr(whoami)
<< " != my " << cluster_messenger->get_myaddr() << ")";
- else if (!osdmap->get_hb_back_addr(whoami).probably_equals(hbserver_messenger->get_myaddr()))
+ else if (!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()))
clog.error() << "map e" << osdmap->get_epoch()
<< " had wrong hb back addr (" << osdmap->get_hb_back_addr(whoami)
- << " != my " << hbserver_messenger->get_myaddr() << ")";
+ << " != my " << hb_back_server_messenger->get_myaddr() << ")";
if (!service.is_stopping()) {
state = STATE_BOOTING;
bind_epoch = osdmap->get_epoch();
int cport = cluster_messenger->get_myaddr().get_port();
- int hbport = hbserver_messenger->get_myaddr().get_port();
+ int hbport = hb_back_server_messenger->get_myaddr().get_port();
int r = cluster_messenger->rebind(hbport);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
- r = hbserver_messenger->rebind(cport);
+ r = hb_back_server_messenger->rebind(cport);
if (r != 0)
do_shutdown = true; // FIXME: do_restart?
epoch_t heartbeat_epoch; ///< last epoch we updated our heartbeat peers
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
utime_t last_mon_heartbeat;
- Messenger *hbclient_messenger, *hbserver_messenger;
+ Messenger *hbclient_messenger;
+ Messenger *hb_front_server_messenger;
+ Messenger *hb_back_server_messenger;
void _add_heartbeat_peer(int p);
bool heartbeat_reset(Connection *con);
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
- OSD(int id, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout,
+ OSD(int id, Messenger *internal, Messenger *external,
+ Messenger *hb_client, Messenger *hb_front_server, Messenger *hb_back_server,
MonClient *mc, const std::string &dev, const std::string &jdev);
~OSD();