From 6c6342898026307264e76ab01c55980ac2936ab0 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Thu, 7 Jan 2010 16:53:51 -0800 Subject: [PATCH] msgr: rank is just a bad name for local SimpleMessengers now --- src/ceph.cc | 11 ++- src/cfuse.cc | 10 +-- src/cmds.cc | 23 +++--- src/cmon.cc | 21 +++-- src/cosd.cc | 44 +++++----- src/csyn.cc | 10 +-- src/dumpjournal.cc | 11 ++- src/libceph.cc | 14 ++-- src/librados.cc | 16 ++-- src/mon/MonClient.cc | 12 +-- src/msg/SimpleMessenger.cc | 164 ++++++++++++++++++------------------- src/msg/SimpleMessenger.h | 44 +++++----- 12 files changed, 185 insertions(+), 195 deletions(-) diff --git a/src/ceph.cc b/src/ceph.cc index 66ab8ecd84d16..649a506ea2afa 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -625,12 +625,11 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network - SimpleMessenger *rank = new SimpleMessenger(); - messenger = rank; - rank->register_entity(entity_name_t::ADMIN()); + SimpleMessenger *messenger = new SimpleMessenger(); + messenger->register_entity(entity_name_t::ADMIN()); messenger->add_dispatcher_head(&dispatcher); - rank->start(); + messenger->start(); mc.set_messenger(messenger); mc.init(); @@ -678,8 +677,8 @@ int main(int argc, const char **argv, const char *envp[]) // wait for messenger to finish - rank->wait(); - rank->destroy(); + messenger->wait(); + messenger->destroy(); return 0; } diff --git a/src/cfuse.cc b/src/cfuse.cc index ba8dc98c0045f..a0aeff8f9c72c 100644 --- a/src/cfuse.cc +++ b/src/cfuse.cc @@ -68,12 +68,12 @@ int main(int argc, const char **argv, const char *envp[]) { return -1; // start up network - SimpleMessenger *rank = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(); cout << "mounting ceph" << std::endl; - rank->register_entity(entity_name_t::CLIENT()); - Client *client = new Client(rank, &mc); + messenger->register_entity(entity_name_t::CLIENT()); + Client *client = new Client(messenger, &mc); - rank->start(); + messenger->start(); // start client client->init(); @@ -100,7 +100,7 @@ int main(int argc, const char **argv, const char *envp[]) { delete client; // wait for messenger to finish - rank->wait(); + messenger->wait(); return 0; } diff --git a/src/cmds.cc b/src/cmds.cc index 205f60614cc55..f9a12ce6ea879 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -67,28 +67,27 @@ int main(int argc, const char **argv) if (mc.build_initial_monmap() < 0) return -1; - SimpleMessenger *rank = new SimpleMessenger(); - rank->bind(); + SimpleMessenger *messenger = new SimpleMessenger(); + messenger->bind(); cout << "starting mds." << g_conf.id - << " at " << rank->get_rank_addr() + << " at " << messenger->get_ms_addr() << std::endl; - Messenger *m = rank; - rank->register_entity(entity_name_t::MDS(-1)); - assert_warn(m); - if (!m) + messenger->register_entity(entity_name_t::MDS(-1)); + assert_warn(messenger); + if (!messenger) return 1; - rank->set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server()); - rank->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer()); + messenger->set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server()); + messenger->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer()); - rank->start(); + messenger->start(); // start mds - MDS *mds = new MDS(g_conf.id, m, &mc); + MDS *mds = new MDS(g_conf.id, messenger, &mc); mds->init(); - rank->wait(); + messenger->wait(); // yuck: grab the mds lock, so we can be sure that whoever in *mds // called shutdown finishes what they were doing. diff --git a/src/cmon.cc b/src/cmon.cc index 883207cdfe3eb..2bf4d4a19ef18 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -127,7 +127,7 @@ int main(int argc, const char **argv) << " continuing with monmap configuration" << std::endl; // bind - SimpleMessenger *rank = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(); cout << "starting mon" << whoami << " at " << monmap.get_inst(whoami).addr @@ -135,29 +135,28 @@ int main(int argc, const char **argv) << " fsid " << monmap.get_fsid() << std::endl; g_my_addr = monmap.get_inst(whoami).addr; - err = rank->bind(); + err = messenger->bind(); if (err < 0) return 1; _dout_create_courtesy_output_symlink("mon", whoami); // start monitor - Messenger *m = rank; - rank->register_entity(entity_name_t::MON(whoami)); - m->set_default_send_priority(CEPH_MSG_PRIO_HIGH); - Monitor *mon = new Monitor(whoami, &store, m, &monmap); + messenger->register_entity(entity_name_t::MON(whoami)); + messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH); + Monitor *mon = new Monitor(whoami, &store, messenger, &monmap); - rank->start(); // may daemonize + messenger->start(); // may daemonize - rank->set_default_policy(SimpleMessenger::Policy::stateless_server()); - rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer()); + messenger->set_default_policy(SimpleMessenger::Policy::stateless_server()); + messenger->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer()); mon->init(); - rank->wait(); + messenger->wait(); store.umount(); delete mon; - rank->destroy(); + messenger->destroy(); // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. char s[20]; diff --git a/src/cosd.cc b/src/cosd.cc index aacd93d9ac1a8..e0687d78a23a1 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -138,50 +138,48 @@ int main(int argc, const char **argv) g_my_addr.ss_addr() = mc.get_my_addr().ss_addr(); g_my_addr.set_port(0); - SimpleMessenger *rank = new SimpleMessenger(); - SimpleMessenger *rank_hb = new SimpleMessenger(); - rank->bind(); - rank_hb->bind(); + SimpleMessenger *messenger = new SimpleMessenger(); + SimpleMessenger *messenger_hb = new SimpleMessenger(); + messenger->bind(); + messenger_hb->bind(); cout << "starting osd" << whoami - << " at " << rank->get_rank_addr() + << " at " << messenger->get_ms_addr() << " osd_data " << g_conf.osd_data << " " << ((g_conf.osd_journal && g_conf.osd_journal[0]) ? g_conf.osd_journal:"(no journal)") << " fsid " << mc.monmap.fsid << std::endl; g_timer.shutdown(); - rank->register_entity(entity_name_t::OSD(whoami)); - Messenger *m = rank; - assert_warn(m); - if (!m) + messenger->register_entity(entity_name_t::OSD(whoami)); + assert_warn(messenger); + if (!messenger) return 1; - rank_hb->register_entity(entity_name_t::OSD(whoami)); - Messenger *hbm = rank_hb; - assert_warn(hbm); - if (!hbm) + messenger_hb->register_entity(entity_name_t::OSD(whoami)); + assert_warn(messenger_hb); + if (!messenger_hb) return 1; - rank->set_default_policy(SimpleMessenger::Policy::stateless_server()); - rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client()); - rank->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer()); + messenger->set_default_policy(SimpleMessenger::Policy::stateless_server()); + messenger->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client()); + messenger->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer()); - rank->start(); - rank_hb->start(true); // only need to daemon() once + messenger->start(); + messenger_hb->start(true); // only need to daemon() once // start osd - OSD *osd = new OSD(whoami, m, hbm, &mc, g_conf.osd_data, g_conf.osd_journal, mkjournal); + OSD *osd = new OSD(whoami, messenger, messenger_hb, &mc, g_conf.osd_data, g_conf.osd_journal, mkjournal); if (osd->init() < 0) { cout << "error initializing osd" << std::endl; return 1; } - rank->wait(); - rank_hb->wait(); + messenger->wait(); + messenger_hb->wait(); // done delete osd; - rank->destroy(); - rank_hb->destroy(); + messenger->destroy(); + messenger_hb->destroy(); // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. char s[20]; diff --git a/src/csyn.cc b/src/csyn.cc index beda348359abd..1765f044c99d7 100644 --- a/src/csyn.cc +++ b/src/csyn.cc @@ -55,7 +55,7 @@ int main(int argc, const char **argv, char *envp[]) return -1; // start up network - SimpleMessenger *rank = new SimpleMessenger(); + SimpleMessenger *messenger = new SimpleMessenger(); cout << "starting csyn" << std::endl; list clients; @@ -63,14 +63,14 @@ int main(int argc, const char **argv, char *envp[]) cout << "mounting and starting " << g_conf.num_client << " syn client(s)" << std::endl; for (int i=0; iregister_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1)); - Client *client = new Client(rank, &mc); + messenger->register_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1)); + Client *client = new Client(messenger, &mc); SyntheticClient *syn = new SyntheticClient(client); clients.push_back(client); synclients.push_back(syn); } - rank->start(); + messenger->start(); for (list::iterator p = synclients.begin(); p != synclients.end(); @@ -89,7 +89,7 @@ int main(int argc, const char **argv, char *envp[]) } // wait for messenger to finish - rank->wait(); + messenger->wait(); return 0; } diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index 3169266183181..cc245c0f1000a 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -89,12 +89,11 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network - SimpleMessenger *rank = new SimpleMessenger(); - rank->bind(); + SimpleMessenger *messenger = new SimpleMessenger(); + messenger->bind(); g_conf.daemonize = false; // not us! - rank->start(); - messenger = rank; - rank->register_entity(entity_name_t::ADMIN()); + messenger->start(); + messenger->register_entity(entity_name_t::ADMIN()); messenger->add_dispatcher_head(&dispatcher); inodeno_t ino = MDS_INO_LOG_OFFSET + mds; @@ -131,7 +130,7 @@ int main(int argc, const char **argv, const char *envp[]) messenger->shutdown(); // wait for messenger to finish - rank->wait(); + messenger->wait(); return 0; } diff --git a/src/libceph.cc b/src/libceph.cc index e33abe933d50e..c6951540b795d 100644 --- a/src/libceph.cc +++ b/src/libceph.cc @@ -30,7 +30,7 @@ static int client_initialized = 0; static int client_mount = 0; static Client *client = NULL; static MonClient *monclient = NULL; -static SimpleMessenger *rank = NULL; +static SimpleMessenger *messenger = NULL; extern "C" int ceph_initialize(int argc, const char **argv) { @@ -48,13 +48,13 @@ extern "C" int ceph_initialize(int argc, const char **argv) return -1; //error! } //network connection - rank = new SimpleMessenger(); - rank->register_entity(entity_name_t::CLIENT()); + messenger = new SimpleMessenger(); + messenger->register_entity(entity_name_t::CLIENT()); //at last the client - client = new Client(rank, monclient); + client = new Client(messenger, monclient); - rank->start(); + messenger->start(); client->init(); } @@ -71,8 +71,8 @@ extern "C" void ceph_deinitialize() client->unmount(); client->shutdown(); delete client; - rank->wait(); - rank->destroy(); + messenger->wait(); + messenger->destroy(); delete monclient; } ceph_client_mutex.Unlock(); diff --git a/src/librados.cc b/src/librados.cc index 96e3ec84592b1..0d3cde7bb1e01 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -48,9 +48,8 @@ using namespace std; class RadosClient : public Dispatcher { OSDMap osdmap; - Messenger *messenger; MonClient monclient; - SimpleMessenger *rank; + SimpleMessenger *messenger; bool _dispatch(Message *m); bool ms_dispatch(Message *m); @@ -76,7 +75,7 @@ class RadosClient : public Dispatcher public: RadosClient() : messenger(NULL), lock("radosclient") { - rank = new SimpleMessenger(); + messenger = new SimpleMessenger(); } ~RadosClient(); @@ -289,10 +288,9 @@ bool RadosClient::init() if (monclient.build_initial_monmap() < 0) return false; - dout(1) << "starting msgr at " << rank->get_rank_addr() << dendl; + dout(1) << "starting msgr at " << messenger->get_ms_addr() << dendl; - rank->register_entity(entity_name_t::CLIENT(-1)); - messenger = rank; + messenger->register_entity(entity_name_t::CLIENT(-1)); assert_warn(messenger); if (!messenger) return false; @@ -306,7 +304,7 @@ bool RadosClient::init() messenger->add_dispatcher_head(this); - rank->start(1); + messenger->start(1); messenger->add_dispatcher_head(this); dout(1) << "setting wanted keys" << dendl; @@ -340,14 +338,14 @@ void RadosClient::shutdown() objecter->shutdown(); lock.Unlock(); messenger->shutdown(); - rank->wait(); + messenger->wait(); dout(1) << "shutdown" << dendl; } RadosClient::~RadosClient() { if (messenger) - messenger->shutdown(); + messenger->destroy(); } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index bc701537fe38b..014c64bf77278 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -134,13 +134,13 @@ int MonClient::get_monmap_privately() dout(10) << "get_monmap_privately" << dendl; Mutex::Locker l(monc_lock); - SimpleMessenger *rank = NULL; + SimpleMessenger *messenger = NULL; bool temp_msgr = false; if (!messenger) { - messenger = rank = new SimpleMessenger(); - rank->register_entity(entity_name_t::CLIENT(-1)); + messenger = messenger = new SimpleMessenger(); + messenger->register_entity(entity_name_t::CLIENT(-1)); messenger->add_dispatcher_head(this); - rank->start(true); // do not daemonize! + messenger->start(true); // do not daemonize! temp_msgr = true; } @@ -165,8 +165,8 @@ int MonClient::get_monmap_privately() if (temp_msgr) { monc_lock.Unlock(); messenger->shutdown(); - rank->wait(); - rank->destroy(); + messenger->wait(); + messenger->destroy(); messenger = 0; monc_lock.Lock(); } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 478440e6de2b2..4a156667b53a8 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -38,9 +38,9 @@ #define DOUT_SUBSYS ms #undef dout_prefix -#define dout_prefix _prefix(rank) -static ostream& _prefix(SimpleMessenger *rank) { - return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " "; +#define dout_prefix _prefix(messenger) +static ostream& _prefix(SimpleMessenger *messenger) { + return *_dout << dbeginl << pthread_self() << " -- " << messenger->ms_addr << " "; } @@ -128,23 +128,23 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce) return -errno; } - rank->rank_addr = g_my_addr; - if (rank->rank_addr != entity_addr_t()) - rank->need_addr = false; + messenger->ms_addr = g_my_addr; + if (messenger->ms_addr != entity_addr_t()) + messenger->need_addr = false; else - rank->need_addr = true; + messenger->need_addr = true; - if (rank->rank_addr.get_port() == 0) { - rank->rank_addr.in4_addr() = listen_addr; + if (messenger->ms_addr.get_port() == 0) { + messenger->ms_addr.in4_addr() = listen_addr; if (force_nonce >= 0) - rank->rank_addr.nonce = force_nonce; + messenger->ms_addr.nonce = force_nonce; else - rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here. + messenger->ms_addr.nonce = getpid(); // FIXME: pid might not be best choice here. } - rank->rank_addr.erank = 0; + messenger->ms_addr.erank = 0; - dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl; - rank->did_bind = true; + dout(1) << "accepter.bind ms_addr is " << messenger->ms_addr << " need_addr=" << messenger->need_addr << dendl; + messenger->did_bind = true; return 0; } @@ -209,15 +209,15 @@ void *SimpleMessenger::Accepter::entry() dout(0) << "accepter could't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl; } - rank->lock.Lock(); + messenger->lock.Lock(); - if (!rank->endpoint_stopped) { - Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING); + if (!messenger->endpoint_stopped) { + Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING); p->sd = sd; p->start_reader(); - rank->pipes.insert(p); + messenger->pipes.insert(p); } - rank->lock.Unlock(); + messenger->lock.Unlock(); } else { dout(0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -452,7 +452,7 @@ int SimpleMessenger::lazy_send_message(Message *m, entity_inst_t dest) entity_addr_t SimpleMessenger::get_myaddr() { - entity_addr_t a = rank->rank_addr; + entity_addr_t a = messenger->ms_addr; a.erank = 0; return a; } @@ -468,7 +468,7 @@ entity_addr_t SimpleMessenger::get_myaddr() #define dout_prefix _pipe_prefix() ostream& SimpleMessenger::Pipe::_pipe_prefix() { return *_dout << dbeginl << pthread_self() - << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this + << " -- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this << " sd=" << sd << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -523,7 +523,7 @@ int SimpleMessenger::Pipe::accept() // and my addr bufferlist addrs; - ::encode(rank->rank_addr, addrs); + ::encode(messenger->ms_addr, addrs); // and peer's socket addr (they might not know their ip) entity_addr_t socket_addr; @@ -621,24 +621,24 @@ int SimpleMessenger::Pipe::accept() << " global_seq " << connect.global_seq << dendl; - rank->lock.Lock(); + messenger->lock.Lock(); // note peer's type, flags set_peer_type(connect.host_type); - policy = rank->get_policy(connect.host_type); + policy = messenger->get_policy(connect.host_type); dout(10) << "accept of host_type " << connect.host_type << ", policy.lossy=" << policy.lossy << dendl; memset(&reply, 0, sizeof(reply)); - reply.protocol_version = get_proto_version(rank->my_type, peer_type, false); + reply.protocol_version = get_proto_version(messenger->my_type, peer_type, false); // mismatch? dout(10) << "accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { reply.tag = CEPH_MSGR_TAG_BADPROTOVER; - rank->lock.Unlock(); + messenger->lock.Unlock(); goto reply; } @@ -646,23 +646,23 @@ int SimpleMessenger::Pipe::accept() if (feat_missing) { dout(1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; reply.tag = CEPH_MSGR_TAG_FEATURES; - rank->lock.Unlock(); + messenger->lock.Unlock(); goto reply; } - rank->lock.Unlock(); - if (rank->verify_authorizer(connection_state, peer_type, + messenger->lock.Unlock(); + if (messenger->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid) && !authorizer_valid) { dout(0) << "accept bad authorizer" << dendl; reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; goto reply; } - rank->lock.Lock(); + messenger->lock.Lock(); // existing? - if (rank->rank_pipe.count(peer_addr)) { - existing = rank->rank_pipe[peer_addr]; + if (messenger->rank_pipe.count(peer_addr)) { + existing = messenger->rank_pipe[peer_addr]; existing->pipe_lock.Lock(); if (connect.global_seq < existing->peer_global_seq) { @@ -671,7 +671,7 @@ int SimpleMessenger::Pipe::accept() reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; reply.global_seq = existing->peer_global_seq; // so we can send it below.. existing->pipe_lock.Unlock(); - rank->lock.Unlock(); + messenger->lock.Unlock(); goto reply; } else { dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq @@ -714,14 +714,14 @@ int SimpleMessenger::Pipe::accept() reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq; // so we can send it below.. existing->pipe_lock.Unlock(); - rank->lock.Unlock(); + messenger->lock.Unlock(); goto reply; } } if (connect.connect_seq == existing->connect_seq) { // connection race? - if (peer_addr < rank->rank_addr || + if (peer_addr < messenger->ms_addr || existing->policy.server) { // incoming wins dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq @@ -734,11 +734,11 @@ int SimpleMessenger::Pipe::accept() // our existing outgoing wins dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > rank->rank_addr); + assert(peer_addr > messenger->ms_addr); assert(existing->state == STATE_CONNECTING); // this will win reply.tag = CEPH_MSGR_TAG_WAIT; existing->pipe_lock.Unlock(); - rank->lock.Unlock(); + messenger->lock.Unlock(); goto reply; } } @@ -750,7 +750,7 @@ int SimpleMessenger::Pipe::accept() << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RESETSESSION; - rank->lock.Unlock(); + messenger->lock.Unlock(); existing->pipe_lock.Unlock(); goto reply; } @@ -763,7 +763,7 @@ int SimpleMessenger::Pipe::accept() else if (connect.connect_seq > 0) { // we reset, and they are opening a new session dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; - rank->lock.Unlock(); + messenger->lock.Unlock(); reply.tag = CEPH_MSGR_TAG_RESETSESSION; goto reply; } else { @@ -813,7 +813,7 @@ int SimpleMessenger::Pipe::accept() // send READY reply reply.tag = CEPH_MSGR_TAG_READY; - reply.global_seq = rank->get_global_seq(); + reply.global_seq = messenger->get_global_seq(); reply.connect_seq = connect_seq; reply.flags = 0; reply.authorizer_len = authorizer_reply.length(); @@ -822,7 +822,7 @@ int SimpleMessenger::Pipe::accept() // ok! register_pipe(); - rank->lock.Unlock(); + messenger->lock.Unlock(); rc = tcp_write(sd, (char*)&reply, sizeof(reply)); if (rc < 0) @@ -865,7 +865,7 @@ int SimpleMessenger::Pipe::connect() closed_socket(); } __u32 cseq = connect_seq; - __u32 gseq = rank->get_global_seq(); + __u32 gseq = messenger->get_global_seq(); // stop reader thrad join_reader(); @@ -967,10 +967,10 @@ int SimpleMessenger::Pipe::connect() dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl; - if (rank->need_addr) - rank->learned_addr(peer_addr_for_me); + if (messenger->need_addr) + messenger->learned_addr(peer_addr_for_me); - ::encode(rank->rank_addr, myaddrbl); + ::encode(messenger->ms_addr, myaddrbl); memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = myaddrbl.c_str(); @@ -982,20 +982,20 @@ int SimpleMessenger::Pipe::connect() dout(2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } - dout(10) << "connect sent my addr " << rank->rank_addr << dendl; + dout(10) << "connect sent my addr " << messenger->ms_addr << dendl; while (1) { delete authorizer; - authorizer = rank->get_authorizer(peer_type, false); + authorizer = messenger->get_authorizer(peer_type, false); bufferlist authorizer_reply; ceph_msg_connect connect; connect.features = CEPH_FEATURE_SUPPORTED; - connect.host_type = rank->my_type; + connect.host_type = messenger->my_type; connect.global_seq = gseq; connect.connect_seq = cseq; - connect.protocol_version = get_proto_version(rank->my_type, peer_type, true); + connect.protocol_version = get_proto_version(messenger->my_type, peer_type, true); connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; if (authorizer) @@ -1083,7 +1083,7 @@ int SimpleMessenger::Pipe::connect() goto stop_locked; got_bad_auth = true; pipe_lock.Unlock(); - authorizer = rank->get_authorizer(peer_type, true); // try harder + authorizer = messenger->get_authorizer(peer_type, true); // try harder continue; } if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { @@ -1094,7 +1094,7 @@ int SimpleMessenger::Pipe::connect() continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { - gseq = rank->get_global_seq(reply.global_seq); + gseq = messenger->get_global_seq(reply.global_seq); dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq << " chose new " << gseq << dendl; pipe_lock.Unlock(); @@ -1131,10 +1131,10 @@ int SimpleMessenger::Pipe::connect() backoff = utime_t(); dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl; - if (!rank->endpoint_stopped) { + if (!messenger->endpoint_stopped) { Connection * cstate = connection_state->get(); pipe_lock.Unlock(); - rank->dispatch_queue.queue_connect(cstate); + messenger->dispatch_queue.queue_connect(cstate); pipe_lock.Lock(); } @@ -1166,18 +1166,18 @@ int SimpleMessenger::Pipe::connect() void SimpleMessenger::Pipe::register_pipe() { dout(10) << "register_pipe" << dendl; - assert(rank->lock.is_locked()); - assert(rank->rank_pipe.count(peer_addr) == 0); - rank->rank_pipe[peer_addr] = this; + assert(messenger->lock.is_locked()); + assert(messenger->rank_pipe.count(peer_addr) == 0); + messenger->rank_pipe[peer_addr] = this; } void SimpleMessenger::Pipe::unregister_pipe() { - assert(rank->lock.is_locked()); - if (rank->rank_pipe.count(peer_addr) && - rank->rank_pipe[peer_addr] == this) { + assert(messenger->lock.is_locked()); + if (messenger->rank_pipe.count(peer_addr) && + messenger->rank_pipe[peer_addr] == this) { dout(10) << "unregister_pipe" << dendl; - rank->rank_pipe.erase(peer_addr); + messenger->rank_pipe.erase(peer_addr); } else { dout(10) << "unregister_pipe - not registered" << dendl; } @@ -1207,7 +1207,7 @@ void SimpleMessenger::Pipe::requeue_sent() void SimpleMessenger::Pipe::discard_queue() { dout(10) << "discard_queue" << dendl; - DispatchQueue& q = rank->dispatch_queue; + DispatchQueue& q = messenger->dispatch_queue; pipe_lock.Unlock(); xlist* list_on; @@ -1328,10 +1328,10 @@ void SimpleMessenger::Pipe::fail() discard_queue(); - if (!rank->endpoint_stopped) { + if (!messenger->endpoint_stopped) { Connection * cstate = connection_state->get(); pipe_lock.Unlock(); - rank->dispatch_queue.queue_reset(cstate); + messenger->dispatch_queue.queue_reset(cstate); pipe_lock.Lock(); } } @@ -1343,10 +1343,10 @@ void SimpleMessenger::Pipe::was_session_reset() dout(10) << "was_session_reset" << dendl; discard_queue(); - if (!rank->endpoint_stopped) { + if (!messenger->endpoint_stopped) { Connection * cstate = connection_state->get(); pipe_lock.Unlock(); - rank->dispatch_queue.queue_remote_reset(cstate); + messenger->dispatch_queue.queue_remote_reset(cstate); pipe_lock.Lock(); } @@ -1635,12 +1635,12 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() // queue for reap dout(10) << "unlock_maybe_reap queueing for reap" << dendl; - rank->lock.Lock(); + messenger->lock.Lock(); { - rank->pipe_reap_queue.push_back(this); - rank->wait_cond.Signal(); + messenger->pipe_reap_queue.push_back(this); + messenger->wait_cond.Signal(); } - rank->lock.Unlock(); + messenger->lock.Unlock(); } else { pipe_lock.Unlock(); } @@ -2095,20 +2095,20 @@ int SimpleMessenger::start(bool nodaemon) } if (!did_bind) - rank_addr.nonce = getpid(); + ms_addr.nonce = getpid(); - dout(1) << "rank.start" << dendl; + dout(1) << "messenger.start" << dendl; started = true; lock.Unlock(); // daemonize? if (g_conf.daemonize && !nodaemon) { if (Thread::get_num_threads() > 0) { - derr(0) << "rank.start BUG: there are " << Thread::get_num_threads() - << " already started that will now die! call rank.start() sooner." + derr(0) << "messenger.start BUG: there are " << Thread::get_num_threads() + << " already started that will now die! call messenger.start() sooner." << dendl; } - dout(1) << "rank.start daemonizing" << dendl; + dout(1) << "messenger.start daemonizing" << dendl; if (1) { daemon(1, 0); @@ -2156,12 +2156,12 @@ int SimpleMessenger::start(bool nodaemon) /* connect_rank - * NOTE: assumes rank.lock held. + * NOTE: assumes messenger.lock held. */ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type) { assert(lock.is_locked()); - assert(addr != rank_addr); + assert(addr != ms_addr); dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; @@ -2238,7 +2238,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lock.Lock(); { // local? - if (rank_addr.is_local_to(dest_addr)) { + if (ms_addr.is_local_to(dest_addr)) { if (dest_addr.get_erank() == 0 && !endpoint_stopped) { // local dout(20) << "submit_message " << *m << " local" << dendl; @@ -2293,7 +2293,7 @@ int SimpleMessenger::send_keepalive(entity_inst_t dest) lock.Lock(); { // local? - if (!rank_addr.is_local_to(dest_addr)) { + if (!ms_addr.is_local_to(dest_addr)) { // remote. Pipe *pipe = 0; if (rank_pipe.count( dest_proc_addr )) { @@ -2399,10 +2399,10 @@ void SimpleMessenger::mark_down(entity_addr_t addr) void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me) { lock.Lock(); - int port = rank_addr.get_port(); - rank_addr.addr = peer_addr_for_me.addr; - rank_addr.set_port(port); - dout(1) << "learned my addr " << rank_addr << dendl; + int port = ms_addr.get_port(); + ms_addr.addr = peer_addr_for_me.addr; + ms_addr.set_port(port); + dout(1) << "learned my addr " << ms_addr << dendl; need_addr = false; lock.Unlock(); } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 975d797a9772d..928bb2d2c9919 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -51,8 +51,6 @@ using namespace __gnu_cxx; * the destructor will lead to badness. */ -/* Rank - per-process - */ class SimpleMessenger : public Messenger { public: struct Policy { @@ -78,11 +76,11 @@ private: // incoming class Accepter : public Thread { public: - SimpleMessenger *rank; + SimpleMessenger *messenger; bool done; int listen_sd; - Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {} + Accepter(SimpleMessenger *r) : messenger(r), done(false), listen_sd(-1) {} void *entry(); void stop(); @@ -95,7 +93,7 @@ private: // pipe class Pipe { public: - SimpleMessenger *rank; + SimpleMessenger *messenger; ostream& _pipe_prefix(); enum { @@ -173,7 +171,7 @@ private: public: Pipe(SimpleMessenger *r, int st) : - rank(r), + messenger(r), sd(-1), peer_type(-1), pipe_lock("SimpleMessenger::Pipe::pipe_lock"), state(st), @@ -226,11 +224,11 @@ private: if (!queue_items.count(priority)) queue_items[priority] = new xlist::item(this); pipe_lock.Unlock(); - rank->dispatch_queue.lock.Lock(); - if (rank->dispatch_queue.queued_pipes.empty()) - rank->dispatch_queue.cond.Signal(); - rank->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]); - rank->dispatch_queue.lock.Unlock(); + messenger->dispatch_queue.lock.Lock(); + if (messenger->dispatch_queue.queued_pipes.empty()) + messenger->dispatch_queue.cond.Signal(); + messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]); + messenger->dispatch_queue.lock.Unlock(); pipe_lock.Lock(); } @@ -248,9 +246,9 @@ private: //increment queue length counters in_qlen++; - rank->dispatch_queue.qlen_lock.lock(); - ++rank->dispatch_queue.qlen; - rank->dispatch_queue.qlen_lock.unlock(); + messenger->dispatch_queue.qlen_lock.lock(); + ++messenger->dispatch_queue.qlen; + messenger->dispatch_queue.qlen_lock.unlock(); pipe_lock.Unlock(); } @@ -402,7 +400,7 @@ private: // where i listen bool need_addr; - entity_addr_t rank_addr; + entity_addr_t ms_addr; // local bool endpoint_stopped; @@ -422,7 +420,7 @@ private: Pipe *connect_rank(const entity_addr_t& addr, int type); - const entity_addr_t &get_rank_addr() { return rank_addr; } + const entity_addr_t &get_ms_addr() { return ms_addr; } void mark_down(entity_addr_t addr); @@ -459,20 +457,20 @@ private: private: class DispatchThread : public Thread { - SimpleMessenger *rank; + SimpleMessenger *messenger; public: - DispatchThread(SimpleMessenger *_rank) : rank(_rank) {} + DispatchThread(SimpleMessenger *_messenger) : messenger(_messenger) {} void *entry() { - rank->get(); - rank->dispatch_entry(); - rank->put(); + messenger->get(); + messenger->dispatch_entry(); + messenger->put(); return 0; } } dispatch_thread; void dispatch_entry(); - SimpleMessenger *rank; //hack to make dout macro work, will fix + SimpleMessenger *messenger; //hack to make dout macro work, will fix public: SimpleMessenger() : @@ -481,7 +479,7 @@ public: lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true), endpoint_stopped(true), my_type(-1), global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), - dispatch_thread(this), rank(this) { + dispatch_thread(this), messenger(this) { // for local dmsg delivery dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN); } -- 2.39.5