]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 11 Feb 2006 18:49:25 +0000 (18:49 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 11 Feb 2006 18:49:25 +0000 (18:49 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@615 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/common/Timer.cc
ceph/config.cc
ceph/msg/TCPMessenger.cc
ceph/msg/TCPMessenger.h
ceph/script/runset.pl
ceph/tcpsyn.cc

index c997f112f9947c4e50ae0d646ee3a80602092843..a91595cfcf69642e83614fa787f6264a1968647a 100644 (file)
@@ -46,7 +46,9 @@ void Timer::timer_thread()
        // any events due?
        utime_t next;
        Context *event = get_next_scheduled(next);
-         
+       
+       list<Context*> pending;
+       
        if (event && now > next) {
          // move to pending list
          map< utime_t, set<Context*> >::iterator it = scheduled.begin();
@@ -60,7 +62,7 @@ void Timer::timer_thread()
                  for (set<Context*>::iterator cit = it->second.begin();
                           cit != it->second.end();
                           cit++)
-                       messenger->queue_callback(*cit);
+                       pending.push_back(*cit);
                }
 
                //pending[t] = it->second;
@@ -68,6 +70,18 @@ void Timer::timer_thread()
                scheduled.erase(t);
          }
 
+         if (!pending.empty()) {
+               lock.Unlock();
+               { // make sure we're not holding any locks whil we talk to the messenger
+                 for (list<Context*>::iterator cit = pending.begin();
+                          cit != pending.end();
+                          cit++)
+                       messenger->queue_callback(*cit);
+                 pending.clear();
+               }
+               lock.Lock();
+         }
+
        }
 
        else {
index 78f548343ed2dff8885c8898a9e42c195e6dc453..1be492108b8add99d04cbd6da584cfa77aabb8ca 100644 (file)
@@ -65,7 +65,7 @@ md_config_t g_conf = {
   debug_ns: 0,
   
   tcp_skip_rank0: false,
-  tcp_log: true,
+  tcp_log: false,
 
 
   // --- client ---
index 3e92cc703f4b2c8487199ffbfd6b5ed4127bc5e7..7d533a284b48818a1c6c7d400e0f5a6c5b681937 100644 (file)
@@ -38,10 +38,10 @@ using namespace __gnu_cxx;
 
 #include "common/Logger.h"
 
-
 #define DBL 18
 
 
+
 TCPMessenger *rankmessenger = 0; // 
 
 TCPDirectory *nameserver = 0;    // only defined on rank 0
@@ -107,7 +107,7 @@ map<int, pthread_t>      in_threads;    // sd -> threadid
 
 // debug
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "[TCP " << my_rank << " " << getpid() << "." << pthread_self() << "] "
+#define  dout(l)    if (l<=g_conf.debug) cout << "[TCP " << my_rank /*(<< " " << getpid() << "." << pthread_self() */  << "] "
 
 
 
@@ -118,6 +118,10 @@ int tcp_send(Message *m);
 void tcpmessenger_kick_dispatch_loop();
 
 
+int tcpmessenger_get_rank()
+{
+  return my_rank;
+}
 
 
 /** rankserver
@@ -430,11 +434,14 @@ bool tcp_read(int sd, char *buf, int len)
 {
   while (len > 0) {
        int got = ::recv( sd, buf, len, 0 );
-       if (got < 0) return false;
+       if (got < 0) {
+         dout(DBL) << "tcp_read bailing with " << got << endl;
+         return false;
+       }
        assert(got >= 0);
        len -= got;
        buf += got;
-       //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
+       dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
   }
   return true;
 }
@@ -479,7 +486,7 @@ Message *tcp_recv(int sd)
        return 0;
   }
 
-  dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << env.source << " dst " << env.dest << " nchunks=" << env.nchunks << endl;
+  dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << MSG_ADDR_NICE(env.source) << " dst " << MSG_ADDR_NICE(env.dest) << " nchunks=" << env.nchunks << endl;
   
   // payload
   bufferlist blist;
@@ -489,7 +496,7 @@ Message *tcp_recv(int sd)
 
        bufferptr bp = new buffer(size);
        
-       tcp_read( sd, bp.c_str(), size );
+       if (!tcp_read( sd, bp.c_str(), size )) return 0;
 
        blist.push_back(bp);
 
@@ -682,14 +689,16 @@ void *tcp_inthread(void *r)
        who = m->get_source();
 
        // give to dispatch loop
+       size_t sz;
        incoming_lock.Lock();
+       {
+         incoming.push_back(m);
+         incoming_cond.Signal();
 
-       stat_inq++;
-       size_t sz = m->get_payload().length();
-       stat_inqb += sz;
-
-       incoming.push_back(m);
-       incoming_cond.Signal();
+         stat_inq++;
+         sz = m->get_payload().length();
+         stat_inqb += sz;
+       }
        incoming_lock.Unlock();
 
        if (logger) {
@@ -747,92 +756,79 @@ void* tcp_dispatchthread(void*)
 {
   dout(5) << "tcp_dispatchthread start pid " << getpid() << endl;
 
-  incoming_lock.Lock();
-
   while (1) {
-
-       // callbacks?
+       // any pending callbacks?
        messenger_do_callbacks();
 
-       // timer events?
-       /*if (pending_timer) {
-         pending_timer = false;
-         dout(DBL) << "dispatch: pending timer" << endl;
-         g_timer.execute_pending();
-       }
-       */
+       // inq?
+       incoming_lock.Lock();
 
        // done?
-       if (tcp_done &&
-               incoming.empty()) break;
-               //&&
-               //!pending_timer) break;
+       if (tcp_done && incoming.empty()) {
+         incoming_lock.Unlock();
+         break;
+       }
 
        // wait?
        if (incoming.empty()) {
          // wait
-         dout(12) << "dispatch: waiting for incoming messages" << endl;
+         dout(DBL) << "dispatch: waiting for incoming messages" << endl;
          incoming_cond.Wait(incoming_lock);
+         dout(DBL) << "dispatch: woke up" << endl;
        }
 
-       // incoming?
-       while (!incoming.empty()) {
-         // grab incoming messages
-         list<Message*> in;
-         in.splice(in.begin(), incoming);
-
-         // drop lock while we deliver
-         incoming_lock.Unlock();
-
-
-         while (!in.empty()) {
-               Message *m = in.front();
-               in.pop_front();
+       // grab incoming messages  
+       list<Message*> in;
+       in.splice(in.begin(), incoming);
 
-               stat_inq--;
-               stat_inqb -= m->get_payload().length();
-               if (logger) {
-                 logger->set("inq", stat_inq);
-                 logger->set("inqb", stat_inqb);
-                 logger->inc("dis");
-               }
+       // drop lock while we deliver
+       incoming_lock.Unlock();
 
-               dout(DBL) << "dispatch doing " << *m << endl;
+       // dispatch!
+       while (!in.empty()) {
+         Message *m = in.front();
+         in.pop_front();
          
-               // for rankserver?
-               if (m->get_type() == MSG_NS_CONNECTACK ||        // i just connected
-                       m->get_dest() == MSG_ADDR_RANK(my_rank)) {
-                 dout(DBL) <<  " giving to rankserver" << endl;
-                 rankserver.dispatch(m);
-                 continue;
-               }
-
-               // ok
-               int dest = m->get_dest();
-               directory_lock.Lock();
-               if (directory.count(dest)) {
-                 Messenger *who = directory[ dest ];
-                 directory_lock.Unlock();                
-
-                 dout(4) << "---- '" << m->get_type_name() << 
-                       "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
-                       " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " 
-                                 << m 
-                                 << endl;
-                 
-                 who->dispatch(m);
-               } else {
-                 directory_lock.Unlock();
-                 dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl;
-                 assert(0);
-               }
+         stat_inq--;
+         stat_inqb -= m->get_payload().length();
+         if (logger) {
+               logger->set("inq", stat_inq);
+               logger->set("inqb", stat_inqb);
+               logger->inc("dis");
+         }
+         
+         dout(DBL) << "dispatch doing " << *m << endl;
+         
+         // for rankserver?
+         if (m->get_type() == MSG_NS_CONNECTACK ||        // i just connected
+                 m->get_dest() == MSG_ADDR_RANK(my_rank)) {
+               dout(DBL) <<  " giving to rankserver" << endl;
+               rankserver.dispatch(m);
+               continue;
+         }
+         
+         // ok
+         int dest = m->get_dest();
+         directory_lock.Lock();
+         if (directory.count(dest)) {
+               Messenger *who = directory[ dest ];
+               directory_lock.Unlock();                  
+               
+               dout(4) << "---- '" << m->get_type_name() << 
+                 "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
+                 " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " 
+                               << m 
+                               << endl;
+               
+               who->dispatch(m);
+         } else {
+               directory_lock.Unlock();
+               dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl;
+               assert(0);
          }
-
-         incoming_lock.Lock();
        }
   }
 
-  incoming_lock.Unlock();
 
   g_timer.shutdown();
 
@@ -874,9 +870,12 @@ int tcpmessenger_start()
 
 void tcpmessenger_kick_dispatch_loop()
 {
+  dout(DBL) << "kicking" << endl;
   incoming_lock.Lock();
+  dout(DBL) << "prekick" << endl;
   incoming_cond.Signal();
   incoming_lock.Unlock();
+  dout(DBL) << "kicked" << endl;
 }
 
 void tcpmessenger_kick_outgoing_loop()
@@ -891,6 +890,7 @@ void tcpmessenger_kick_outgoing_loop()
 
 void tcpmessenger_wait()
 {
+  dout(10) << "tcpmessenger_wait waking up dispatch loop" << endl;
   tcpmessenger_kick_dispatch_loop();
 
   void *returnval;
@@ -965,6 +965,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
        myaddr = register_entity(myaddr);
   }
 
+
   // my address
   set_myaddr( myaddr );
 
@@ -1060,12 +1061,11 @@ int TCPMessenger::shutdown()
   if (lastone) {
        dout(2) << "shutdown last tcpmessenger on rank " << my_rank << " shut down" << endl;
        //pthread_t whoami = pthread_self();
-
+       
        // no more timer events
        g_timer.unset_messenger();
        msgr_callback_kicker = 0;
-
-  
+       
        // close incoming sockets
        //void *r;
        for (map<int,pthread_t>::iterator it = in_threads.begin();
@@ -1085,7 +1085,7 @@ int TCPMessenger::shutdown()
 
        tcpmessenger_kick_outgoing_loop();
 
-
+       
        /*
 
        dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
index 583edf07f4e4e3e4f1812f14aa06aa02ca4dee26..0b5c344eb292a6a88acce372541b232bedf76622 100644 (file)
@@ -52,6 +52,7 @@ extern void tcpmessenger_stop_nameserver();   // on rank 0
 extern void tcpmessenger_start_rankserver(tcpaddr_t& ta);  // on all ranks
 extern void tcpmessenger_stop_rankserver();   // on all ranks
 
+extern int tcpmessenger_get_rank();
 
 inline ostream& operator<<(ostream& out, struct sockaddr_in &a)
 {
index 41d803adcd3790128c651aea2b5f460edd2c80ba..14fed66c4dcdb931ec93f37a0b01f287ccef6d6a 100755 (executable)
@@ -65,8 +65,8 @@ sub iterate {
        my @r;
 
        my $this;
-       for my $k (keys %$sim) {
-               if ($fix->{$k}) {
+       for my $k (sort keys %$sim) {
+               if (defined $fix->{$k}) {
                        $this->{$k} = $fix->{$k};
                }
                elsif (!(ref $sim->{$k})) {
@@ -94,7 +94,7 @@ sub run {
        my $h = shift @_;
 
        my @fn;
-       for my $k (keys %$sim) {
+       for my $k (sort keys %$sim) {
                next unless ref $sim->{$k};
                push(@fn, "$k=$h->{$k}");
        }
@@ -130,6 +130,7 @@ sub run {
 
        
        print "-> $c\n";
+       print "   " . `date`;
        my $r;
        unless ($fake) {
                $r = system "$c > log/$fn/o";
index f6570f106be4d1eff146ed0f708964c41779a8f0..025e555bcf18d98bdbb550e783361c97f379a5c3 100644 (file)
@@ -79,8 +79,9 @@ int main(int argc, char **argv)
   MDS *mds[NUMMDS];
   for (int i=0; i<NUMMDS; i++) {
        if (myrank != g_conf.tcp_skip_rank0+i) continue;
-       cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
-       mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+       TCPMessenger *m = new TCPMessenger(MSG_ADDR_MDS(i));
+       cerr << "mds" << i << " on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
+       mds[i] = new MDS(mdc, i, m);
        mds[i]->init();
        started++;
   }
@@ -89,8 +90,9 @@ int main(int argc, char **argv)
   OSD *osd[NUMOSD];
   for (int i=0; i<NUMOSD; i++) {
        if (myrank != g_conf.tcp_skip_rank0+NUMMDS + i) continue;
-       cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
-       osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+       TCPMessenger *m = new TCPMessenger(MSG_ADDR_OSD(i));
+       cerr << "osd" << i << " on tcprank " << tcpmessenger_get_rank() <<  " " << hostname << "." << pid << endl;
+       osd[i] = new OSD(i, m);
        osd[i]->init();
        started++;
   }
@@ -145,7 +147,7 @@ int main(int argc, char **argv)
        nclients++;
   }
   if (nclients) {
-       cerr << "waiting for " << nclients << " clients to finish" << endl;
+       cerr << nclients << " clients on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
   }
 
   for (set<int>::iterator it = clientlist.begin();
@@ -164,7 +166,8 @@ int main(int argc, char **argv)
   
 
   if (myrank && !started) {
-       dout(1) << "IDLE" << endl;
+       //dout(1) << "IDLE" << endl;
+       cerr << "idle on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl; 
        tcpmessenger_stop_rankserver();
   }