]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
constifying entity_inst_t, starting to rework messenger lookup stuff...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 3 Oct 2006 20:13:56 +0000 (20:13 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 3 Oct 2006 20:13:56 +0000 (20:13 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@906 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/msg/Message.h
ceph/msg/Messenger.h
ceph/msg/NewMessenger.cc
ceph/msg/NewMessenger.h
ceph/msg/tcp.h
ceph/osd/OSD.cc
ceph/osd/OSDMap.h

index 4b64531a08c8a5d127cecc1f3b43cd1ca00dd043..aebaa9d3fafbb574dec06418a4cb3ed318d7d785 100644 (file)
@@ -289,14 +289,14 @@ class entity_inst_t {
   entity_inst_t(tcpaddr_t& a, int r) : addr(a), rank(r) {}
 };
 
-inline bool operator==(entity_inst_t& a, entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
-inline bool operator!=(entity_inst_t& a, entity_inst_t& b) { return !(a == b); }
-inline bool operator>(entity_inst_t& a, entity_inst_t& b) { return a.rank > b.rank; }
-inline bool operator>=(entity_inst_t& a, entity_inst_t& b) { return a.rank >= b.rank; }
-inline bool operator<(entity_inst_t& a, entity_inst_t& b) { return a.rank < b.rank; }
-inline bool operator<=(entity_inst_t& a, entity_inst_t& b) { return a.rank <= b.rank; }
-
-inline ostream& operator<<(ostream& out, entity_inst_t &i)
+inline bool operator==(const entity_inst_t& a, const entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
+inline bool operator!=(const entity_inst_t& a, const entity_inst_t& b) { return !(a == b); }
+inline bool operator>(const entity_inst_t& a, const entity_inst_t& b) { return a.rank > b.rank; }
+inline bool operator>=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank >= b.rank; }
+inline bool operator<(const entity_inst_t& a, const entity_inst_t& b) { return a.rank < b.rank; }
+inline bool operator<=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank <= b.rank; }
+
+inline ostream& operator<<(ostream& out, const entity_inst_t &i)
 {
   return out << "rank" << i.rank << "_" << i.addr;
 }
index d7bd4ea7c57b3eedf18f16ab57ac2dd60945b79b..6ce29d3e005954bf4fb8e1967be642eccae9e4ba 100644 (file)
@@ -86,6 +86,10 @@ class Messenger {
   // send message
   virtual void prepare_send_message(msg_addr_t dest) {}
   virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
+  virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst) {
+       return send_message(m, dest);   // overload me!
+  }
+
 
   // make a procedure call
   virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
index 1010886fb36cbb8e8e63ecfbc216a5d0675fb4c0..13fc21713edf6af5de705b98b16eecfc1a631b32 100644 (file)
@@ -904,7 +904,7 @@ int Rank::start_rank(tcpaddr_t& ns)
 /* connect_rank
  * NOTE: assumes rank.lock held.
  */
-Rank::Sender *Rank::connect_rank(entity_inst_t& inst)
+Rank::Sender *Rank::connect_rank(const entity_inst_t& inst)
 {
   assert(rank.lock.is_locked());
   assert(inst != rank.my_inst);
@@ -1091,6 +1091,63 @@ void Rank::prepare_dest(msg_addr_t dest)
   lock.Unlock();
 }
 
+void Rank::submit_message(Message *m, const entity_inst_t& dest_inst)
+{
+  const msg_addr_t dest = m->get_dest();
+
+  // lookup
+  EntityMessenger *entity = 0;
+  Sender *sender = 0;
+
+  lock.Lock();
+  {
+       // local?
+       if (dest_inst.rank == my_inst.rank) {
+         if (local.count(dest)) {
+               // local
+               dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl;
+               if (g_conf.ms_single_dispatch) {
+                 _submit_single_dispatch(m);
+               } else {
+                 entity = local[dest];
+               }
+         } else {
+               // mid-register
+               dout(20) << "submit_message " << *m << " dest " << dest << " local but mid-register, waiting." << endl;
+               assert(0);
+               waiting_for_lookup[dest].push_back(m);
+         }
+       }
+       else {
+         // remote.
+         if (rank_sender.count( dest_inst.rank )) {
+               //&&
+               //rank_sender[dest_inst.rank]->inst == dest_inst) {
+               dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connected." << endl;
+               // connected.
+               sender = rank_sender[ dest_inst.rank ];
+         } else {
+               dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl;
+               // not connected.
+               sender = connect_rank( dest_inst );
+         }
+       }
+  }
+  lock.Unlock();
+  
+  // do it
+  if (entity) {  
+       // local!
+       dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl;
+       entity->queue_message(m);
+  } 
+  else if (sender) {
+       // remote!
+       dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl;
+       sender->send(m);
+  } 
+}
+
 
 void Rank::submit_message(Message *m)
 {
@@ -1507,19 +1564,37 @@ void Rank::EntityMessenger::prepare_send_message(msg_addr_t dest)
   rank.prepare_dest(dest);
 }
 
+int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst)
+{
+  // set envelope
+  m->set_source(get_myaddr(), 0);
+  m->set_dest(dest, 0);
+
+  dout(1) << "--> " 
+                 << m->get_source() //<< ':' << m->get_source_port() 
+                 << " to " << m->get_dest() //<< ':' << m->get_dest_port()
+                 << " ---- " << m->get_type_name() 
+                 << " ---- " << rank.my_inst << " --> " << inst
+                 << " ---- " << m 
+                 << endl;
+
+  rank.submit_message(m, inst);
+
+  return 0;
+}
+
 
 int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
 {
   // set envelope
   m->set_source(get_myaddr(), fromport);
   m->set_dest(dest, port);
-  m->set_lamport_send_stamp( get_lamport() );
-  
+
   dout(1) << "--> " 
-                 << m->get_source() << ':' << m->get_source_port() 
-                 << " to " << m->get_dest() << ':' << m->get_dest_port()
+                 << m->get_source() //<< ':' << m->get_source_port() 
+                 << " to " << m->get_dest() //<< ':' << m->get_dest_port()
                  << " ---- " << m->get_type_name() 
-                 << " ---- " << rank.my_inst
+                 << " ---- " << rank.my_inst << " --> ?"
                  << " ---- " << m 
                  << endl;
 
@@ -1609,14 +1684,3 @@ void Rank::mark_up(msg_addr_t a, entity_inst_t& i)
   lock.Unlock();
 }
 
-/*void Rank::EntityMessenger::reset(msg_addr_t a) 
-{
-  assert(a != get_myaddr());
-  if (rank.my_rank == 0) return;
-  rank.lock.Lock();
-  rank.down.erase(a);
-  rank.reset_peer(a);
-  rank.lock.Unlock();
-}
-*/
-
index 387348aafb76ee766451861fbc28d8d1ccd804d9..a63114caeaababcb87e7604ce024a69269ec5fbc 100644 (file)
@@ -106,7 +106,7 @@ class Rank : public Dispatcher {
        Mutex lock;
        Cond cond;
        
-       Sender(entity_inst_t& i) : inst(i), done(false), sd(0) {}
+       Sender(const entity_inst_t& i) : inst(i), done(false), sd(0) {}
        virtual ~Sender() {}
        
        void *entry();
@@ -187,6 +187,7 @@ class Rank : public Dispatcher {
        virtual int shutdown();
        virtual void prepare_send_message(msg_addr_t dest);
        virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
+       virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst);
 
        virtual void mark_down(msg_addr_t a, entity_inst_t& i);
        virtual void mark_up(msg_addr_t a, entity_inst_t& i);
@@ -263,7 +264,7 @@ class Rank : public Dispatcher {
   void handle_register_ack(class MNSRegisterAck *m);
   void handle_lookup_reply(class MNSLookupReply *m);
   
-  Sender *connect_rank(entity_inst_t& inst);
+  Sender *connect_rank(const entity_inst_t& inst);
 
   void mark_down(msg_addr_t addr, entity_inst_t& i);
   void mark_up(msg_addr_t addr, entity_inst_t& i);
@@ -285,6 +286,7 @@ public:
   EntityMessenger *register_entity(msg_addr_t addr);
   void unregister_entity(EntityMessenger *ms);
 
+  void submit_message(Message *m, const entity_inst_t& inst);  
   void prepare_dest(msg_addr_t dest);
   void submit_message(Message *m);  
   void submit_messages(list<Message*>& ls);  
index aac9d98224d98184f4e0a6bad40f97e83a0eb3e1..eb560a09c772c704d3900bfec0371d5edb32066e 100644 (file)
@@ -8,7 +8,7 @@
 
 typedef struct sockaddr_in tcpaddr_t;
 
-inline ostream& operator<<(ostream& out, tcpaddr_t &a)
+inline ostream& operator<<(ostream& out, const tcpaddr_t &a)
 {
   unsigned char addr[4];
   memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4);
index 4b2e0ff11656b1601b57bb36be7999b9870df7d7..7476ad39df85a9e86f809e2c8ad6f2e4b3d7399a 100644 (file)
@@ -67,7 +67,7 @@
 #define  derr(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
 
 char *osd_base_path = "./osddata";
-char *ebofs_base_path = "./ebofsdev";
+char *ebofs_base_path = "./dev";
 
 
 
@@ -123,9 +123,9 @@ OSD::OSD(int id, Messenger *m, char *dev)
 
   // init object store
   // try in this order:
-  // ebofsdev/$num
-  // ebofsdev/$hostname
-  // ebofsdev/all
+  // dev/osd$num
+  // dev/osd.$hostname
+  // dev/osd.all
 
   if (dev) {
        strcpy(dev_path,dev);
@@ -134,14 +134,14 @@ OSD::OSD(int id, Messenger *m, char *dev)
        hostname[0] = 0;
        gethostname(hostname,100);
        
-       sprintf(dev_path, "%s/%d", ebofs_base_path, whoami);
+       sprintf(dev_path, "%s/osd%d", ebofs_base_path, whoami);
        
        struct stat sta;
        if (::lstat(dev_path, &sta) != 0)
-         sprintf(dev_path, "%s/%s", ebofs_base_path, hostname);        
+         sprintf(dev_path, "%s/osd.%s", ebofs_base_path, hostname);    
        
        if (::lstat(dev_path, &sta) != 0)
-         sprintf(dev_path, "%s/all", ebofs_base_path);
+         sprintf(dev_path, "%s/osd.all", ebofs_base_path);
   }
 
   if (g_conf.ebofs) {
@@ -484,7 +484,7 @@ void OSD::heartbeat()
           i++) {
        _share_map_outgoing( MSG_ADDR_OSD(*i) );
        messenger->send_message(new MOSDPing(osdmap->get_epoch()), 
-                                                       MSG_ADDR_OSD(*i));
+                                                       MSG_ADDR_OSD(*i), osdmap->get_osd_inst(*i));
   }
 
   if (logger) logger->set("pingset", pingset.size());
index 97d31944ddc0f545b35705ceabb3fc8631d46ea4..b25e7933f231d82e25302543514c55dfbb3c1b4a 100644 (file)
@@ -143,7 +143,11 @@ private:
   bool is_up(int osd) { return !is_down(osd); }
   bool is_out(int osd) { return out_osds.count(osd); }
   bool is_in(int osd) { return !is_out(osd); }
-
+  
+  const entity_inst_t& get_inst(int osd) {
+       assert(osd_inst.count(osd));
+       return osd_inst[osd];
+  }
   bool get_inst(int osd, entity_inst_t& inst) { 
        if (osd_inst.count(osd)) {
          inst = osd_inst[osd];