]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: detect rank addr during connect handshake; clean up addr apis
authorSage Weil <sage@newdream.net>
Wed, 2 Sep 2009 22:55:43 +0000 (15:55 -0700)
committerSage Weil <sage@newdream.net>
Wed, 2 Sep 2009 23:10:06 +0000 (16:10 -0700)
src/client/SyntheticClient.cc
src/mds/MDS.cc
src/messages/MClientMountAck.h
src/messages/MMonMap.h
src/mon/ClientMonitor.cc
src/mon/MonClient.cc
src/mon/Monitor.cc
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 6af0730fba0540c66cf9f94657cf12e72d8e30bf..01d3af6889991a8bb898413b5c31b5ad06e44a70 100644 (file)
@@ -292,8 +292,8 @@ string SyntheticClient::get_sarg(int seq)
     sargs.pop_front();
   }
   if (a.length() == 0 || a == "~") {
-    char s[20];
-    sprintf(s,"/syn.%d.%d", client->whoami.v, seq);
+    char s[30];
+    sprintf(s,"/syn.%lld.%d", (long long)client->whoami.v, seq);
     a = s;
   } 
   return a;
index 402445abc54cd53f04a5fcee30a6a33477e67e61..2dc9e1c2c785eb7de60c0c128dcd475c31d33774 100644 (file)
@@ -387,7 +387,7 @@ int MDS::init()
   want_state = MDSMap::STATE_BOOT;
   beacon_start();
   whoami = -1;
-  messenger->reset_myname(entity_name_t::MDS(whoami));
+  messenger->set_myname(entity_name_t::MDS(whoami));
 
   objecter->init();
    
@@ -641,7 +641,7 @@ void MDS::handle_mds_map(MMDSMap *m)
   if (oldwhoami != whoami) {
     // update messenger.
     dout(1) << "handle_mds_map i am now mds" << whoami << "." << incarnation << dendl;
-    messenger->reset_myname(entity_name_t::MDS(whoami));
+    messenger->set_myname(entity_name_t::MDS(whoami));
 
     // do i need an osdmap?
     if (oldwhoami < 0) {
index e031200bc33cdf8c79a789766e9a74f4f632add3..a526f0e331732a98013d082a0488eb2e43eda154 100644 (file)
@@ -19,7 +19,6 @@
 
 struct MClientMountAck : public Message {
   __s64 client;
-  entity_addr_t addr;
   __s32 result;
   cstring result_msg;
   bufferlist monmap_bl;
@@ -40,14 +39,12 @@ struct MClientMountAck : public Message {
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(client, p);
-    ::decode(addr, p);
     ::decode(result, p);
     ::decode(result_msg, p);
     ::decode(monmap_bl, p);
   }
   void encode_payload() {
     ::encode(client, payload);
-    ::encode(addr, payload);
     ::encode(result, payload);
     ::encode(result_msg, payload);
     ::encode(monmap_bl, payload);
index b2f5d966fc010c46150f60268a5f7a65171c0c92..38e4280b76361806da89da3a8a4cf06334663393 100644 (file)
 
 class MMonMap : public Message {
 public:
-  entity_addr_t addr;
   bufferlist monmapbl;
 
   MMonMap() : Message(CEPH_MSG_MON_MAP) { }
-  MMonMap(entity_addr_t t, bufferlist &bl) : Message(CEPH_MSG_MON_MAP) { 
-    addr = t;
+  MMonMap(bufferlist &bl) : Message(CEPH_MSG_MON_MAP) { 
     monmapbl.claim(bl);
   }
 
   const char *get_type_name() { return "mon_map"; }
 
   void encode_payload() { 
-    ::encode(addr, payload);
     ::encode(monmapbl, payload);
   }
   void decode_payload() { 
     bufferlist::iterator p = payload.begin();
-    ::decode(addr, p);
     ::decode(monmapbl, p);
   }
 };
index cffcaa6611955be4ae0c5802d72228b0f5461d65..3fe2b985b0996bc2711fb9899ccdcf28c638975c 100644 (file)
@@ -236,7 +236,6 @@ void ClientMonitor::_mounted(__s64 client, MClientMount *m)
   // reply with client ticket
   MClientMountAck *ack = new MClientMountAck;
   ack->client = client;
-  ack->addr = to.addr;
   mon->monmap->encode(ack->monmap_bl);
 
   mon->send_reply(m, ack, to);
index 8c69ee8ffb68dafa45996be148ab73a2f6bf347e..e1d6d5e59bb55cb1180690cfecb1e7abdfb9c3af 100644 (file)
@@ -154,6 +154,9 @@ bool MonClient::ms_dispatch(Message *m)
 {
   dout(10) << "dispatch " << *m << dendl;
 
+  if (my_addr == entity_addr_t())
+    my_addr = messenger->get_myaddr();
+
   switch (m->get_type()) {
   case CEPH_MSG_MON_MAP:
     handle_monmap((MMonMap*)m);
@@ -176,10 +179,6 @@ void MonClient::handle_monmap(MMonMap *m)
   dout(10) << "handle_monmap " << *m << dendl;
   monc_lock.Lock();
 
-  my_addr = m->addr;
-  messenger->_set_myaddr(m->addr);
-  dout(10) << " i am " << m->addr << dendl;
-
   bufferlist::iterator p = m->monmapbl.begin();
   ::decode(monmap, p);
   map_cond.Signal();
@@ -247,8 +246,7 @@ void MonClient::handle_mount_ack(MClientMountAck* m)
   bufferlist::iterator p = m->monmap_bl.begin();
   ::decode(monmap, p);
 
-  messenger->_set_myaddr(m->addr);
-  messenger->reset_myname(entity_name_t::CLIENT(m->client));
+  messenger->set_myname(entity_name_t::CLIENT(m->client));
 
   // finish.
   timer.cancel_event(mount_timeout_event);
index f8360d2795117a4e1c36daa9a496fb969d8f290d..89e6694cd49357ff7836a9627086dc60074cf6b6 100644 (file)
@@ -525,7 +525,7 @@ void Monitor::handle_mon_get_map(MMonGetMap *m)
   dout(10) << "handle_mon_get_map" << dendl;
   bufferlist bl;
   monmap->encode(bl);
-  messenger->send_message(new MMonMap(m->get_orig_source_addr(), bl), m->get_orig_source_inst());
+  messenger->send_message(new MMonMap(bl), m->get_orig_source_inst());
   delete m;
 }
 
index e37e4df701c1a5ba3158065bdb7f6e28b8598926..77b675bfea02c100bb0d32389ed8eb2f99e9cb7d 100644 (file)
@@ -36,7 +36,7 @@ class Messenger {
   Dispatcher          *dispatcher;
 
 protected:
-  entity_inst_t _myinst;
+  entity_name_t _my_name;
   int default_send_priority;
 
   atomic_t nref;
@@ -45,7 +45,7 @@ protected:
   Messenger(entity_name_t w) : dispatcher(0),
                               default_send_priority(CEPH_MSG_PRIO_DEFAULT),
                               nref(1) {
-    _myinst.name = w;
+    _my_name = w;
   }
   virtual ~Messenger() {
     assert(nref.test() == 0);
@@ -63,13 +63,11 @@ protected:
   }
   
   // accessors
-  entity_name_t get_myname() { return _myinst.name; }
-  const entity_addr_t& get_myaddr() { return _myinst.addr; }
-  const entity_inst_t& get_myinst() { return _myinst; }
+  entity_name_t get_myname() { return _my_name; }
+  virtual entity_addr_t get_myaddr() = 0;
+  entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); }
   
-  void _set_myname(entity_name_t m) { _myinst.name = m; }
-  virtual void _set_myaddr(entity_addr_t a) { _myinst.addr = a; }
-  virtual void reset_myname(entity_name_t m) = 0;
+  void set_myname(entity_name_t m) { _my_name = m; }
 
   void set_default_send_priority(int p) { default_send_priority = p; }
   int get_default_send_priority() { return default_send_priority; }
index 4af974d9302fa0cf398c1f8406870229248a0559..55f5d6649cdafcb16a01fba58e8307eca19f76c5 100644 (file)
@@ -144,9 +144,7 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce)
   }
   rank->rank_addr.erank = 0;
 
-  dout(1) << "accepter.bind rank_addr is " << rank->rank_addr 
-         << " need_addr=" << rank->need_addr
-         << dendl;
+  dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl;
   return 0;
 }
 
@@ -379,11 +377,12 @@ void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
 int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
 {
   // set envelope
-  m->set_source_inst(_myinst);
-  m->set_orig_source_inst(_myinst);
+  m->get_header().src = get_myinst();
+  m->get_header().orig_src = m->get_header().src;
+
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
-  dout(1) << m->get_source()
+  dout(1) << m->get_source_inst()
           << " --> " << dest.name << " " << dest.addr
           << " -- " << *m
          << " -- ?+" << m->get_data().length()
@@ -398,7 +397,8 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
 int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
 {
   // set envelope
-  m->set_source_inst(_myinst);
+  m->get_header().src = get_myinst();
+
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
   dout(1) << m->get_source()
@@ -418,8 +418,9 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
 int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
 {
   // set envelope
-  m->set_source_inst(_myinst);
-  m->set_orig_source_inst(_myinst);
+  m->get_header().src = get_myinst();
+  m->get_header().orig_src = m->get_header().src;
+
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
   dout(1) << "lazy " << m->get_source()
@@ -442,31 +443,22 @@ int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest)
 
 
 
-void SimpleMessenger::Endpoint::_set_myaddr(entity_addr_t a)
-{
-  Messenger::_set_myaddr(a);  // still call original
-
-  dout(10) << "_set_myaddr " << a << dendl;
-  rank->rank_addr.ipaddr = a.ipaddr;
-}
-
-void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname)
+void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
 {
-  entity_name_t oldname = get_myname();
-  dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
-  _set_myname(newname);
+  rank->mark_down(a);
 }
 
 
-void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
+entity_addr_t SimpleMessenger::Endpoint::get_myaddr()
 {
-  rank->mark_down(a);
+  entity_addr_t a = rank->rank_addr;
+  a.erank = my_rank;
+  return a;  
 }
 
 
 
 
-
 /**************************************
  * Pipe
  */
@@ -877,16 +869,8 @@ int SimpleMessenger::Pipe::connect()
   }
   dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
 
-  if (rank->need_addr) {
-    rank->lock.Lock();
-    entity_addr_t was = rank->rank_addr;
-    rank->rank_addr.ipaddr = peer_addr_for_me.ipaddr;
-    rank->rank_addr.ipaddr.sin_port = was.ipaddr.sin_port;
-    dout(0) << "rank discovered i am " << rank->rank_addr
-           << " (was " << was << ", peer says i am " << peer_addr_for_me << ")" << dendl;
-    rank->need_addr = false;
-    rank->lock.Unlock();
-  }
+  if (rank->need_addr)
+    rank->learned_addr(peer_addr_for_me);
 
   memset(&msg, 0, sizeof(msg));
   msgvec[0].iov_base = (char*)&rank->rank_addr;
@@ -1319,23 +1303,6 @@ void SimpleMessenger::Pipe::reader()
          // find entity
          entity = rank->local[erank];
          entity->get();
-
-         // first message?
-         /*
-         if (entity->need_addr) {
-           entity->_set_myaddr(m->get_dest_inst().addr);
-           dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
-           entity->need_addr = false;
-         }
-
-         if (rank->need_addr) {
-           rank->rank_addr = m->get_dest_inst().addr;
-           rank->rank_addr.erank = 0;
-           dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
-           rank->need_addr = false;
-         }
-         */
-
        } else {
          derr(0) << "reader got message " << *m << ", which isn't local" << dendl;
        }
@@ -2038,14 +2005,8 @@ SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name)
   msgr->get();
   local[erank] = msgr;
   stopped[erank] = false;
-  msgr->_myinst.addr = rank_addr;
-  if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
-    msgr->need_addr = true;
-  msgr->_myinst.addr.erank = erank;
 
-  dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr 
-          << " need_addr=" << need_addr
-          << dendl;
+  dout(10) << "register_entity " << name << " at " << msgr->get_myaddr() << dendl;
 
   num_local++;
   
@@ -2261,3 +2222,13 @@ void SimpleMessenger::mark_down(entity_addr_t addr)
   lock.Unlock();
 }
 
+void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me)
+{
+  lock.Lock();
+  entity_addr_t was = rank_addr;
+  rank_addr.ipaddr = peer_addr_for_me.ipaddr;
+  rank_addr.ipaddr.sin_port = was.ipaddr.sin_port;
+  dout(1) << "learned my addr " << rank_addr << dendl;
+  need_addr = false;
+  lock.Unlock();
+}
index d16b48d82d29d48345b481bb130c0fb26acbba2d..f33c2626e2b9dd3502a299c370ccc26b2ff454df 100644 (file)
@@ -291,10 +291,7 @@ private:
     bool stop;
     int qlen;
     int my_rank;
-  public:
-    bool need_addr;
 
-  private:
     class DispatchThread : public Thread {
       Endpoint *m;
     public:
@@ -358,7 +355,6 @@ private:
       stop(false),
       qlen(0),
       my_rank(rn),
-      need_addr(false),
       dispatch_thread(this) { }
     ~Endpoint() { }
 
@@ -379,8 +375,8 @@ private:
     
     int get_dispatch_queue_len() { return qlen; }
 
-    void _set_myaddr(entity_addr_t a);
-    void reset_myname(entity_name_t m);
+    entity_addr_t get_myaddr();
+
 
     int shutdown();
     void suicide();
@@ -472,6 +468,8 @@ public:
   void prepare_dest(const entity_inst_t& inst);
   void send_keepalive(const entity_inst_t& addr);  
 
+  void learned_addr(entity_addr_t peer_addr_for_me);
+
   // create a new messenger
   Endpoint *new_entity(entity_name_t addr);