]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: rank is just a bad name for local SimpleMessengers now
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 8 Jan 2010 00:53:51 +0000 (16:53 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 8 Jan 2010 00:53:51 +0000 (16:53 -0800)
12 files changed:
src/ceph.cc
src/cfuse.cc
src/cmds.cc
src/cmon.cc
src/cosd.cc
src/csyn.cc
src/dumpjournal.cc
src/libceph.cc
src/librados.cc
src/mon/MonClient.cc
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 66ab8ecd84d16d1ae5dc1835abfc6b80f7be44a2..649a506ea2afa6e038429661f06b8d954758d9f7 100644 (file)
@@ -625,12 +625,11 @@ int main(int argc, const char **argv, const char *envp[])
     return -1;
   
   // start up network
-  SimpleMessenger *rank = new SimpleMessenger();
-  messenger = rank;
-  rank->register_entity(entity_name_t::ADMIN());
+  SimpleMessenger *messenger = new SimpleMessenger();
+  messenger->register_entity(entity_name_t::ADMIN());
   messenger->add_dispatcher_head(&dispatcher);
 
-  rank->start();
+  messenger->start();
 
   mc.set_messenger(messenger);
   mc.init();
@@ -678,8 +677,8 @@ int main(int argc, const char **argv, const char *envp[])
 
 
   // wait for messenger to finish
-  rank->wait();
-  rank->destroy();
+  messenger->wait();
+  messenger->destroy();
   return 0;
 }
 
index ba8dc98c0045f92093267c8f5f406202cbe1705f..a0aeff8f9c72c2cef86f7ccb3966684cea800814 100644 (file)
@@ -68,12 +68,12 @@ int main(int argc, const char **argv, const char *envp[]) {
     return -1;
 
   // start up network
-  SimpleMessenger *rank = new SimpleMessenger();
+  SimpleMessenger *messenger = new SimpleMessenger();
   cout << "mounting ceph" << std::endl;
-  rank->register_entity(entity_name_t::CLIENT());
-  Client *client = new Client(rank, &mc);
+  messenger->register_entity(entity_name_t::CLIENT());
+  Client *client = new Client(messenger, &mc);
 
-  rank->start();
+  messenger->start();
 
   // start client
   client->init();
@@ -100,7 +100,7 @@ int main(int argc, const char **argv, const char *envp[]) {
   delete client;
   
   // wait for messenger to finish
-  rank->wait();
+  messenger->wait();
   
   return 0;
 }
index 205f60614cc55db8730e8747ae03361702f21431..f9a12ce6ea8796eb9bdc0f1a13e63ed56013eef6 100644 (file)
@@ -67,28 +67,27 @@ int main(int argc, const char **argv)
   if (mc.build_initial_monmap() < 0)
     return -1;
 
-  SimpleMessenger *rank = new SimpleMessenger();
-  rank->bind();
+  SimpleMessenger *messenger = new SimpleMessenger();
+  messenger->bind();
   cout << "starting mds." << g_conf.id
-       << " at " << rank->get_rank_addr() 
+       << " at " << messenger->get_ms_addr() 
        << std::endl;
 
-  Messenger *m = rank;
-  rank->register_entity(entity_name_t::MDS(-1));
-  assert_warn(m);
-  if (!m)
+  messenger->register_entity(entity_name_t::MDS(-1));
+  assert_warn(messenger);
+  if (!messenger)
     return 1;
 
-  rank->set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server());
-  rank->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer());
+  messenger->set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server());
+  messenger->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer());
 
-  rank->start();
+  messenger->start();
   
   // start mds
-  MDS *mds = new MDS(g_conf.id, m, &mc);
+  MDS *mds = new MDS(g_conf.id, messenger, &mc);
   mds->init();
   
-  rank->wait();
+  messenger->wait();
 
   // yuck: grab the mds lock, so we can be sure that whoever in *mds 
   // called shutdown finishes what they were doing.
index 883207cdfe3eb4f9a9c777a152a178228da82f11..2bf4d4a19ef186b36ebd29ce3a2c531691a3fd24 100644 (file)
@@ -127,7 +127,7 @@ int main(int argc, const char **argv)
         << "         continuing with monmap configuration" << std::endl;
 
   // bind
-  SimpleMessenger *rank = new SimpleMessenger();
+  SimpleMessenger *messenger = new SimpleMessenger();
 
   cout << "starting mon" << whoami 
        << " at " << monmap.get_inst(whoami).addr
@@ -135,29 +135,28 @@ int main(int argc, const char **argv)
        << " fsid " << monmap.get_fsid()
        << std::endl;
   g_my_addr = monmap.get_inst(whoami).addr;
-  err = rank->bind();
+  err = messenger->bind();
   if (err < 0)
     return 1;
 
   _dout_create_courtesy_output_symlink("mon", whoami);
   
   // start monitor
-  Messenger *m = rank;
-  rank->register_entity(entity_name_t::MON(whoami));
-  m->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
-  Monitor *mon = new Monitor(whoami, &store, m, &monmap);
+  messenger->register_entity(entity_name_t::MON(whoami));
+  messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
+  Monitor *mon = new Monitor(whoami, &store, messenger, &monmap);
 
-  rank->start();  // may daemonize
+  messenger->start();  // may daemonize
 
-  rank->set_default_policy(SimpleMessenger::Policy::stateless_server());
-  rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer());
+  messenger->set_default_policy(SimpleMessenger::Policy::stateless_server());
+  messenger->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer());
 
   mon->init();
-  rank->wait();
+  messenger->wait();
 
   store.umount();
   delete mon;
-  rank->destroy();
+  messenger->destroy();
 
   // cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
   char s[20];
index aacd93d9ac1a8d71e50272aeaa39d3796551c5a8..e0687d78a23a14c6cf1b8bff73fef5eba1399def 100644 (file)
@@ -138,50 +138,48 @@ int main(int argc, const char **argv)
   g_my_addr.ss_addr() = mc.get_my_addr().ss_addr();
   g_my_addr.set_port(0);
 
-  SimpleMessenger *rank = new SimpleMessenger();
-  SimpleMessenger *rank_hb = new SimpleMessenger();
-  rank->bind();
-  rank_hb->bind();
+  SimpleMessenger *messenger = new SimpleMessenger();
+  SimpleMessenger *messenger_hb = new SimpleMessenger();
+  messenger->bind();
+  messenger_hb->bind();
 
   cout << "starting osd" << whoami
-       << " at " << rank->get_rank_addr() 
+       << " at " << messenger->get_ms_addr() 
        << " osd_data " << g_conf.osd_data
        << " " << ((g_conf.osd_journal && g_conf.osd_journal[0]) ? g_conf.osd_journal:"(no journal)")
        << " fsid " << mc.monmap.fsid
        << std::endl;
 
   g_timer.shutdown();
-  rank->register_entity(entity_name_t::OSD(whoami));
-  Messenger *m = rank;
-  assert_warn(m);
-  if (!m)
+  messenger->register_entity(entity_name_t::OSD(whoami));
+  assert_warn(messenger);
+  if (!messenger)
     return 1;
-  rank_hb->register_entity(entity_name_t::OSD(whoami));
-  Messenger *hbm = rank_hb;
-  assert_warn(hbm);
-  if (!hbm)
+  messenger_hb->register_entity(entity_name_t::OSD(whoami));
+  assert_warn(messenger_hb);
+  if (!messenger_hb)
     return 1;
 
-  rank->set_default_policy(SimpleMessenger::Policy::stateless_server());
-  rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client());
-  rank->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer());
+  messenger->set_default_policy(SimpleMessenger::Policy::stateless_server());
+  messenger->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client());
+  messenger->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer());
 
-  rank->start();
-  rank_hb->start(true);  // only need to daemon() once
+  messenger->start();
+  messenger_hb->start(true);  // only need to daemon() once
 
   // start osd
-  OSD *osd = new OSD(whoami, m, hbm, &mc, g_conf.osd_data, g_conf.osd_journal, mkjournal);
+  OSD *osd = new OSD(whoami, messenger, messenger_hb, &mc, g_conf.osd_data, g_conf.osd_journal, mkjournal);
   if (osd->init() < 0) {
     cout << "error initializing osd" << std::endl;
     return 1;
   }
 
-  rank->wait();
-  rank_hb->wait();
+  messenger->wait();
+  messenger_hb->wait();
   // done
   delete osd;
-  rank->destroy();
-  rank_hb->destroy();
+  messenger->destroy();
+  messenger_hb->destroy();
 
   // cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
   char s[20];
index beda348359abd9d2cd60a90e8b475da6f7df0ab3..1765f044c99d76d63b789dbc4567c333e1eed2b5 100644 (file)
@@ -55,7 +55,7 @@ int main(int argc, const char **argv, char *envp[])
     return -1;
 
   // start up network
-  SimpleMessenger *rank = new SimpleMessenger();
+  SimpleMessenger *messenger = new SimpleMessenger();
   cout << "starting csyn" << std::endl;
 
   list<Client*> clients;
@@ -63,14 +63,14 @@ int main(int argc, const char **argv, char *envp[])
 
   cout << "mounting and starting " << g_conf.num_client << " syn client(s)" << std::endl;
   for (int i=0; i<g_conf.num_client; i++) {
-    rank->register_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1));
-    Client *client = new Client(rank, &mc);
+    messenger->register_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1));
+    Client *client = new Client(messenger, &mc);
     SyntheticClient *syn = new SyntheticClient(client);
     clients.push_back(client);
     synclients.push_back(syn);
   }
 
-  rank->start();
+  messenger->start();
 
   for (list<SyntheticClient*>::iterator p = synclients.begin(); 
        p != synclients.end();
@@ -89,7 +89,7 @@ int main(int argc, const char **argv, char *envp[])
   }
     
   // wait for messenger to finish
-  rank->wait();
+  messenger->wait();
   
   return 0;
 }
index 3169266183181d9ac504de3e4849197ee7622add..cc245c0f1000a7f45a38cd600d452fa6e7b3dde2 100644 (file)
@@ -89,12 +89,11 @@ int main(int argc, const char **argv, const char *envp[])
     return -1;
   
   // start up network
-  SimpleMessenger *rank = new SimpleMessenger();
-  rank->bind();
+  SimpleMessenger *messenger = new SimpleMessenger();
+  messenger->bind();
   g_conf.daemonize = false; // not us!
-  rank->start();
-  messenger = rank;
-  rank->register_entity(entity_name_t::ADMIN());
+  messenger->start();
+  messenger->register_entity(entity_name_t::ADMIN());
   messenger->add_dispatcher_head(&dispatcher);
 
   inodeno_t ino = MDS_INO_LOG_OFFSET + mds;
@@ -131,7 +130,7 @@ int main(int argc, const char **argv, const char *envp[])
   messenger->shutdown();
 
   // wait for messenger to finish
-  rank->wait();
+  messenger->wait();
   
   return 0;
 }
index e33abe933d50ef186423f5374c7e80937a2c15ba..c6951540b795d03511fa13cc969bcd3fe0471abe 100644 (file)
@@ -30,7 +30,7 @@ static int client_initialized = 0;
 static int client_mount = 0;
 static Client *client = NULL;
 static MonClient *monclient = NULL;
-static SimpleMessenger *rank = NULL;
+static SimpleMessenger *messenger = NULL;
 
 extern "C" int ceph_initialize(int argc, const char **argv)
 {
@@ -48,13 +48,13 @@ extern "C" int ceph_initialize(int argc, const char **argv)
       return -1; //error!
     }
     //network connection
-    rank = new SimpleMessenger();
-    rank->register_entity(entity_name_t::CLIENT());
+    messenger = new SimpleMessenger();
+    messenger->register_entity(entity_name_t::CLIENT());
 
     //at last the client
-    client = new Client(rank, monclient);
+    client = new Client(messenger, monclient);
 
-    rank->start();
+    messenger->start();
 
     client->init();
   }
@@ -71,8 +71,8 @@ extern "C" void ceph_deinitialize()
     client->unmount();
     client->shutdown();
     delete client;
-    rank->wait();
-    rank->destroy();
+    messenger->wait();
+    messenger->destroy();
     delete monclient;
   }
   ceph_client_mutex.Unlock();
index 96e3ec84592b1efdcb0b1ea716162e167b7b0e2e..0d3cde7bb1e011012cff2bced1ad2b8afa96fdab 100644 (file)
@@ -48,9 +48,8 @@ using namespace std;
 class RadosClient : public Dispatcher
 {
   OSDMap osdmap;
-  Messenger *messenger;
   MonClient monclient;
-  SimpleMessenger *rank;
+  SimpleMessenger *messenger;
 
   bool _dispatch(Message *m);
   bool ms_dispatch(Message *m);
@@ -76,7 +75,7 @@ class RadosClient : public Dispatcher
  
 public:
   RadosClient() : messenger(NULL), lock("radosclient") {
-    rank = new SimpleMessenger();
+    messenger = new SimpleMessenger();
   }
 
   ~RadosClient();
@@ -289,10 +288,9 @@ bool RadosClient::init()
   if (monclient.build_initial_monmap() < 0)
     return false;
 
-  dout(1) << "starting msgr at " << rank->get_rank_addr() << dendl;
+  dout(1) << "starting msgr at " << messenger->get_ms_addr() << dendl;
 
-  rank->register_entity(entity_name_t::CLIENT(-1));
-  messenger = rank;
+  messenger->register_entity(entity_name_t::CLIENT(-1));
   assert_warn(messenger);
   if (!messenger)
     return false;
@@ -306,7 +304,7 @@ bool RadosClient::init()
   
   messenger->add_dispatcher_head(this);
 
-  rank->start(1);
+  messenger->start(1);
   messenger->add_dispatcher_head(this);
 
   dout(1) << "setting wanted keys" << dendl;
@@ -340,14 +338,14 @@ void RadosClient::shutdown()
   objecter->shutdown();
   lock.Unlock();
   messenger->shutdown();
-  rank->wait();
+  messenger->wait();
   dout(1) << "shutdown" << dendl;
 }
 
 RadosClient::~RadosClient()
 {
   if (messenger)
-    messenger->shutdown();
+    messenger->destroy();
 }
 
 
index bc701537fe38b3754d37e373c3e2c31d802df024..014c64bf77278ce17e109c1cc1d3062c455b0a15 100644 (file)
@@ -134,13 +134,13 @@ int MonClient::get_monmap_privately()
   dout(10) << "get_monmap_privately" << dendl;
   Mutex::Locker l(monc_lock);
   
-  SimpleMessenger *rank = NULL; 
+  SimpleMessenger *messenger = NULL; 
   bool temp_msgr = false;
   if (!messenger) {
-    messenger = rank = new SimpleMessenger();
-    rank->register_entity(entity_name_t::CLIENT(-1));
+    messenger = messenger = new SimpleMessenger();
+    messenger->register_entity(entity_name_t::CLIENT(-1));
     messenger->add_dispatcher_head(this);
-    rank->start(true);  // do not daemonize!
+    messenger->start(true);  // do not daemonize!
     temp_msgr = true; 
   }
   
@@ -165,8 +165,8 @@ int MonClient::get_monmap_privately()
   if (temp_msgr) {
     monc_lock.Unlock();
     messenger->shutdown();
-    rank->wait();
-    rank->destroy();
+    messenger->wait();
+    messenger->destroy();
     messenger = 0;
     monc_lock.Lock();
   }
index 478440e6de2b229967b1bbc5dd8d202a840ae6f8..4a156667b53a8bda1af42375d51965e87194563e 100644 (file)
@@ -38,9 +38,9 @@
 
 #define DOUT_SUBSYS ms
 #undef dout_prefix
-#define dout_prefix _prefix(rank)
-static ostream& _prefix(SimpleMessenger *rank) {
-  return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " ";
+#define dout_prefix _prefix(messenger)
+static ostream& _prefix(SimpleMessenger *messenger) {
+  return *_dout << dbeginl << pthread_self() << " -- " << messenger->ms_addr << " ";
 }
 
 
@@ -128,23 +128,23 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce)
     return -errno;
   }
   
-  rank->rank_addr = g_my_addr;
-  if (rank->rank_addr != entity_addr_t())
-    rank->need_addr = false;
+  messenger->ms_addr = g_my_addr;
+  if (messenger->ms_addr != entity_addr_t())
+    messenger->need_addr = false;
   else 
-    rank->need_addr = true;
+    messenger->need_addr = true;
 
-  if (rank->rank_addr.get_port() == 0) {
-    rank->rank_addr.in4_addr() = listen_addr;
+  if (messenger->ms_addr.get_port() == 0) {
+    messenger->ms_addr.in4_addr() = listen_addr;
     if (force_nonce >= 0)
-      rank->rank_addr.nonce = force_nonce;
+      messenger->ms_addr.nonce = force_nonce;
     else
-      rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
+      messenger->ms_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
   }
-  rank->rank_addr.erank = 0;
+  messenger->ms_addr.erank = 0;
 
-  dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl;
-  rank->did_bind = true;
+  dout(1) << "accepter.bind ms_addr is " << messenger->ms_addr << " need_addr=" << messenger->need_addr << dendl;
+  messenger->did_bind = true;
   return 0;
 }
 
@@ -209,15 +209,15 @@ void *SimpleMessenger::Accepter::entry()
          dout(0) << "accepter could't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl;
       }
       
-      rank->lock.Lock();
+      messenger->lock.Lock();
 
-      if (!rank->endpoint_stopped) {
-       Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
+      if (!messenger->endpoint_stopped) {
+       Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING);
        p->sd = sd;
        p->start_reader();
-       rank->pipes.insert(p);
+       messenger->pipes.insert(p);
       }
-      rank->lock.Unlock();
+      messenger->lock.Unlock();
     } else {
       dout(0) << "accepter no incoming connection?  sd = " << sd
              << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@@ -452,7 +452,7 @@ int SimpleMessenger::lazy_send_message(Message *m, entity_inst_t dest)
 
 entity_addr_t SimpleMessenger::get_myaddr()
 {
-  entity_addr_t a = rank->rank_addr;
+  entity_addr_t a = messenger->ms_addr;
   a.erank = 0;
   return a;  
 }
@@ -468,7 +468,7 @@ entity_addr_t SimpleMessenger::get_myaddr()
 #define dout_prefix _pipe_prefix()
 ostream& SimpleMessenger::Pipe::_pipe_prefix() {
   return *_dout << dbeginl << pthread_self()
-               << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
+               << " -- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this
                << " sd=" << sd
                << " pgs=" << peer_global_seq
                << " cs=" << connect_seq
@@ -523,7 +523,7 @@ int SimpleMessenger::Pipe::accept()
 
   // and my addr
   bufferlist addrs;
-  ::encode(rank->rank_addr, addrs);
+  ::encode(messenger->ms_addr, addrs);
 
   // and peer's socket addr (they might not know their ip)
   entity_addr_t socket_addr;
@@ -621,24 +621,24 @@ int SimpleMessenger::Pipe::accept()
             << " global_seq " << connect.global_seq
             << dendl;
     
-    rank->lock.Lock();
+    messenger->lock.Lock();
 
     // note peer's type, flags
     set_peer_type(connect.host_type);
-    policy = rank->get_policy(connect.host_type);
+    policy = messenger->get_policy(connect.host_type);
     dout(10) << "accept of host_type " << connect.host_type
             << ", policy.lossy=" << policy.lossy
             << dendl;
 
     memset(&reply, 0, sizeof(reply));
-    reply.protocol_version = get_proto_version(rank->my_type, peer_type, false);
+    reply.protocol_version = get_proto_version(messenger->my_type, peer_type, false);
 
     // mismatch?
     dout(10) << "accept my proto " << reply.protocol_version
             << ", their proto " << connect.protocol_version << dendl;
     if (connect.protocol_version != reply.protocol_version) {
       reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
-      rank->lock.Unlock();
+      messenger->lock.Unlock();
       goto reply;
     }
 
@@ -646,23 +646,23 @@ int SimpleMessenger::Pipe::accept()
     if (feat_missing) {
       dout(1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
       reply.tag = CEPH_MSGR_TAG_FEATURES;
-      rank->lock.Unlock();
+      messenger->lock.Unlock();
       goto reply;
     }
     
-    rank->lock.Unlock();
-    if (rank->verify_authorizer(connection_state, peer_type,
+    messenger->lock.Unlock();
+    if (messenger->verify_authorizer(connection_state, peer_type,
                                connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid) &&
        !authorizer_valid) {
       dout(0) << "accept bad authorizer" << dendl;
       reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
       goto reply;
     }
-    rank->lock.Lock();
+    messenger->lock.Lock();
     
     // existing?
-    if (rank->rank_pipe.count(peer_addr)) {
-      existing = rank->rank_pipe[peer_addr];
+    if (messenger->rank_pipe.count(peer_addr)) {
+      existing = messenger->rank_pipe[peer_addr];
       existing->pipe_lock.Lock();
 
       if (connect.global_seq < existing->peer_global_seq) {
@@ -671,7 +671,7 @@ int SimpleMessenger::Pipe::accept()
        reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
        reply.global_seq = existing->peer_global_seq;  // so we can send it below..
        existing->pipe_lock.Unlock();
-       rank->lock.Unlock();
+       messenger->lock.Unlock();
        goto reply;
       } else {
        dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
@@ -714,14 +714,14 @@ int SimpleMessenger::Pipe::accept()
          reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
          reply.connect_seq = existing->connect_seq;  // so we can send it below..
          existing->pipe_lock.Unlock();
-         rank->lock.Unlock();
+         messenger->lock.Unlock();
          goto reply;
        }
       }
 
       if (connect.connect_seq == existing->connect_seq) {
        // connection race?
-       if (peer_addr < rank->rank_addr ||
+       if (peer_addr < messenger->ms_addr ||
            existing->policy.server) {
          // incoming wins
          dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
@@ -734,11 +734,11 @@ int SimpleMessenger::Pipe::accept()
          // our existing outgoing wins
          dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
                   << " == " << connect.connect_seq << ", sending WAIT" << dendl;
-         assert(peer_addr > rank->rank_addr);
+         assert(peer_addr > messenger->ms_addr);
          assert(existing->state == STATE_CONNECTING); // this will win
          reply.tag = CEPH_MSGR_TAG_WAIT;
          existing->pipe_lock.Unlock();
-         rank->lock.Unlock();
+         messenger->lock.Unlock();
          goto reply;
        }
       }
@@ -750,7 +750,7 @@ int SimpleMessenger::Pipe::accept()
                 << ", " << existing << ".cseq = " << existing->connect_seq
                 << "), sending RESETSESSION" << dendl;
        reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-       rank->lock.Unlock();
+       messenger->lock.Unlock();
        existing->pipe_lock.Unlock();
        goto reply;
       }
@@ -763,7 +763,7 @@ int SimpleMessenger::Pipe::accept()
     else if (connect.connect_seq > 0) {
       // we reset, and they are opening a new session
       dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
-      rank->lock.Unlock();
+      messenger->lock.Unlock();
       reply.tag = CEPH_MSGR_TAG_RESETSESSION;
       goto reply;
     } else {
@@ -813,7 +813,7 @@ int SimpleMessenger::Pipe::accept()
 
   // send READY reply
   reply.tag = CEPH_MSGR_TAG_READY;
-  reply.global_seq = rank->get_global_seq();
+  reply.global_seq = messenger->get_global_seq();
   reply.connect_seq = connect_seq;
   reply.flags = 0;
   reply.authorizer_len = authorizer_reply.length();
@@ -822,7 +822,7 @@ int SimpleMessenger::Pipe::accept()
 
   // ok!
   register_pipe();
-  rank->lock.Unlock();
+  messenger->lock.Unlock();
 
   rc = tcp_write(sd, (char*)&reply, sizeof(reply));
   if (rc < 0)
@@ -865,7 +865,7 @@ int SimpleMessenger::Pipe::connect()
     closed_socket();
   }
   __u32 cseq = connect_seq;
-  __u32 gseq = rank->get_global_seq();
+  __u32 gseq = messenger->get_global_seq();
 
   // stop reader thrad
   join_reader();
@@ -967,10 +967,10 @@ int SimpleMessenger::Pipe::connect()
 
   dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
 
-  if (rank->need_addr)
-    rank->learned_addr(peer_addr_for_me);
+  if (messenger->need_addr)
+    messenger->learned_addr(peer_addr_for_me);
 
-  ::encode(rank->rank_addr, myaddrbl);
+  ::encode(messenger->ms_addr, myaddrbl);
 
   memset(&msg, 0, sizeof(msg));
   msgvec[0].iov_base = myaddrbl.c_str();
@@ -982,20 +982,20 @@ int SimpleMessenger::Pipe::connect()
     dout(2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
     goto fail;
   }
-  dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
+  dout(10) << "connect sent my addr " << messenger->ms_addr << dendl;
 
 
   while (1) {
     delete authorizer;
-    authorizer = rank->get_authorizer(peer_type, false);
+    authorizer = messenger->get_authorizer(peer_type, false);
     bufferlist authorizer_reply;
 
     ceph_msg_connect connect;
     connect.features = CEPH_FEATURE_SUPPORTED;
-    connect.host_type = rank->my_type;
+    connect.host_type = messenger->my_type;
     connect.global_seq = gseq;
     connect.connect_seq = cseq;
-    connect.protocol_version = get_proto_version(rank->my_type, peer_type, true);
+    connect.protocol_version = get_proto_version(messenger->my_type, peer_type, true);
     connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
     connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
     if (authorizer) 
@@ -1083,7 +1083,7 @@ int SimpleMessenger::Pipe::connect()
         goto stop_locked;
       got_bad_auth = true;
       pipe_lock.Unlock();
-      authorizer = rank->get_authorizer(peer_type, true);  // try harder
+      authorizer = messenger->get_authorizer(peer_type, true);  // try harder
       continue;
     }
     if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
@@ -1094,7 +1094,7 @@ int SimpleMessenger::Pipe::connect()
       continue;
     }
     if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
-      gseq = rank->get_global_seq(reply.global_seq);
+      gseq = messenger->get_global_seq(reply.global_seq);
       dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
               << " chose new " << gseq << dendl;
       pipe_lock.Unlock();
@@ -1131,10 +1131,10 @@ int SimpleMessenger::Pipe::connect()
       backoff = utime_t();
       dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
       
-      if (!rank->endpoint_stopped) {
+      if (!messenger->endpoint_stopped) {
        Connection * cstate = connection_state->get();
        pipe_lock.Unlock();
-       rank->dispatch_queue.queue_connect(cstate);
+       messenger->dispatch_queue.queue_connect(cstate);
        pipe_lock.Lock();
       }
       
@@ -1166,18 +1166,18 @@ int SimpleMessenger::Pipe::connect()
 void SimpleMessenger::Pipe::register_pipe()
 {
   dout(10) << "register_pipe" << dendl;
-  assert(rank->lock.is_locked());
-  assert(rank->rank_pipe.count(peer_addr) == 0);
-  rank->rank_pipe[peer_addr] = this;
+  assert(messenger->lock.is_locked());
+  assert(messenger->rank_pipe.count(peer_addr) == 0);
+  messenger->rank_pipe[peer_addr] = this;
 }
 
 void SimpleMessenger::Pipe::unregister_pipe()
 {
-  assert(rank->lock.is_locked());
-  if (rank->rank_pipe.count(peer_addr) &&
-      rank->rank_pipe[peer_addr] == this) {
+  assert(messenger->lock.is_locked());
+  if (messenger->rank_pipe.count(peer_addr) &&
+      messenger->rank_pipe[peer_addr] == this) {
     dout(10) << "unregister_pipe" << dendl;
-    rank->rank_pipe.erase(peer_addr);
+    messenger->rank_pipe.erase(peer_addr);
   } else {
     dout(10) << "unregister_pipe - not registered" << dendl;
   }
@@ -1207,7 +1207,7 @@ void SimpleMessenger::Pipe::requeue_sent()
 void SimpleMessenger::Pipe::discard_queue()
 {
   dout(10) << "discard_queue" << dendl;
-  DispatchQueue& q = rank->dispatch_queue;
+  DispatchQueue& q = messenger->dispatch_queue;
 
   pipe_lock.Unlock();
   xlist<Pipe *>* list_on;
@@ -1328,10 +1328,10 @@ void SimpleMessenger::Pipe::fail()
 
   discard_queue();
   
-  if (!rank->endpoint_stopped) {
+  if (!messenger->endpoint_stopped) {
     Connection * cstate = connection_state->get();
     pipe_lock.Unlock();
-    rank->dispatch_queue.queue_reset(cstate);
+    messenger->dispatch_queue.queue_reset(cstate);
     pipe_lock.Lock();
   }
 }
@@ -1343,10 +1343,10 @@ void SimpleMessenger::Pipe::was_session_reset()
   dout(10) << "was_session_reset" << dendl;
   discard_queue();
 
-  if (!rank->endpoint_stopped) {
+  if (!messenger->endpoint_stopped) {
     Connection * cstate = connection_state->get();
     pipe_lock.Unlock();
-    rank->dispatch_queue.queue_remote_reset(cstate);
+    messenger->dispatch_queue.queue_remote_reset(cstate);
     pipe_lock.Lock();
   }
 
@@ -1635,12 +1635,12 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
 
     // queue for reap
     dout(10) << "unlock_maybe_reap queueing for reap" << dendl;
-    rank->lock.Lock();
+    messenger->lock.Lock();
     {
-      rank->pipe_reap_queue.push_back(this);
-      rank->wait_cond.Signal();
+      messenger->pipe_reap_queue.push_back(this);
+      messenger->wait_cond.Signal();
     }
-    rank->lock.Unlock();
+    messenger->lock.Unlock();
   } else {
     pipe_lock.Unlock();
   }
@@ -2095,20 +2095,20 @@ int SimpleMessenger::start(bool nodaemon)
   }
 
   if (!did_bind)
-    rank_addr.nonce = getpid();
+    ms_addr.nonce = getpid();
 
-  dout(1) << "rank.start" << dendl;
+  dout(1) << "messenger.start" << dendl;
   started = true;
   lock.Unlock();
 
   // daemonize?
   if (g_conf.daemonize && !nodaemon) {
     if (Thread::get_num_threads() > 0) {
-      derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
-             << " already started that will now die!  call rank.start() sooner." 
+      derr(0) << "messenger.start BUG: there are " << Thread::get_num_threads()
+             << " already started that will now die!  call messenger.start() sooner." 
              << dendl;
     }
-    dout(1) << "rank.start daemonizing" << dendl;
+    dout(1) << "messenger.start daemonizing" << dendl;
 
     if (1) {
       daemon(1, 0);
@@ -2156,12 +2156,12 @@ int SimpleMessenger::start(bool nodaemon)
 
 
 /* connect_rank
- * NOTE: assumes rank.lock held.
+ * NOTE: assumes messenger.lock held.
  */
 SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type)
 {
   assert(lock.is_locked());
-  assert(addr != rank_addr);
+  assert(addr != ms_addr);
   
   dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
   
@@ -2238,7 +2238,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool
   lock.Lock();
   {
     // local?
-    if (rank_addr.is_local_to(dest_addr)) {
+    if (ms_addr.is_local_to(dest_addr)) {
       if (dest_addr.get_erank() == 0 && !endpoint_stopped) {
         // local
         dout(20) << "submit_message " << *m << " local" << dendl;
@@ -2293,7 +2293,7 @@ int SimpleMessenger::send_keepalive(entity_inst_t dest)
   lock.Lock();
   {
     // local?
-    if (!rank_addr.is_local_to(dest_addr)) {
+    if (!ms_addr.is_local_to(dest_addr)) {
       // remote.
       Pipe *pipe = 0;
       if (rank_pipe.count( dest_proc_addr )) {
@@ -2399,10 +2399,10 @@ void SimpleMessenger::mark_down(entity_addr_t addr)
 void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me)
 {
   lock.Lock();
-  int port = rank_addr.get_port();
-  rank_addr.addr = peer_addr_for_me.addr;
-  rank_addr.set_port(port);
-  dout(1) << "learned my addr " << rank_addr << dendl;
+  int port = ms_addr.get_port();
+  ms_addr.addr = peer_addr_for_me.addr;
+  ms_addr.set_port(port);
+  dout(1) << "learned my addr " << ms_addr << dendl;
   need_addr = false;
   lock.Unlock();
 }
index 975d797a9772daeb85295e23b934b016bebde6e7..928bb2d2c9919f6e13404ccb1ea98cd2c47f4f43 100644 (file)
@@ -51,8 +51,6 @@ using namespace __gnu_cxx;
  * the destructor will lead to badness.
  */
 
-/* Rank - per-process
- */
 class SimpleMessenger : public Messenger {
 public:
   struct Policy {
@@ -78,11 +76,11 @@ private:
   // incoming
   class Accepter : public Thread {
   public:
-    SimpleMessenger *rank;
+    SimpleMessenger *messenger;
     bool done;
     int listen_sd;
     
-    Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {}
+    Accepter(SimpleMessenger *r) : messenger(r), done(false), listen_sd(-1) {}
     
     void *entry();
     void stop();
@@ -95,7 +93,7 @@ private:
   // pipe
   class Pipe {
   public:
-    SimpleMessenger *rank;
+    SimpleMessenger *messenger;
     ostream& _pipe_prefix();
 
     enum {
@@ -173,7 +171,7 @@ private:
     
   public:
     Pipe(SimpleMessenger *r, int st) : 
-      rank(r),
+      messenger(r),
       sd(-1), peer_type(-1),
       pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
       state(st), 
@@ -226,11 +224,11 @@ private:
       if (!queue_items.count(priority))
        queue_items[priority] = new xlist<Pipe *>::item(this);
       pipe_lock.Unlock();
-      rank->dispatch_queue.lock.Lock();
-      if (rank->dispatch_queue.queued_pipes.empty())
-       rank->dispatch_queue.cond.Signal();
-      rank->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
-      rank->dispatch_queue.lock.Unlock();
+      messenger->dispatch_queue.lock.Lock();
+      if (messenger->dispatch_queue.queued_pipes.empty())
+       messenger->dispatch_queue.cond.Signal();
+      messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+      messenger->dispatch_queue.lock.Unlock();
       pipe_lock.Lock();
     }
 
@@ -248,9 +246,9 @@ private:
 
       //increment queue length counters
       in_qlen++;
-      rank->dispatch_queue.qlen_lock.lock();
-      ++rank->dispatch_queue.qlen;
-      rank->dispatch_queue.qlen_lock.unlock();
+      messenger->dispatch_queue.qlen_lock.lock();
+      ++messenger->dispatch_queue.qlen;
+      messenger->dispatch_queue.qlen_lock.unlock();
 
       pipe_lock.Unlock();
     }
@@ -402,7 +400,7 @@ private:
 
   // where i listen
   bool need_addr;
-  entity_addr_t rank_addr;
+  entity_addr_t ms_addr;
   
   // local
   bool endpoint_stopped;
@@ -422,7 +420,7 @@ private:
       
   Pipe *connect_rank(const entity_addr_t& addr, int type);
 
-  const entity_addr_t &get_rank_addr() { return rank_addr; }
+  const entity_addr_t &get_ms_addr() { return ms_addr; }
 
   void mark_down(entity_addr_t addr);
 
@@ -459,20 +457,20 @@ private:
 
 private:
   class DispatchThread : public Thread {
-    SimpleMessenger *rank;
+    SimpleMessenger *messenger;
   public:
-    DispatchThread(SimpleMessenger *_rank) : rank(_rank) {}
+    DispatchThread(SimpleMessenger *_messenger) : messenger(_messenger) {}
     void *entry() {
-      rank->get();
-      rank->dispatch_entry();
-      rank->put();
+      messenger->get();
+      messenger->dispatch_entry();
+      messenger->put();
       return 0;
     }
   } dispatch_thread;
 
   void dispatch_entry();
 
-  SimpleMessenger *rank; //hack to make dout macro work, will fix
+  SimpleMessenger *messenger; //hack to make dout macro work, will fix
 
 public:
   SimpleMessenger() :
@@ -481,7 +479,7 @@ public:
     lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
     endpoint_stopped(true), my_type(-1),
     global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
-    dispatch_thread(this), rank(this) {
+    dispatch_thread(this), messenger(this) {
     // for local dmsg delivery
     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
   }