From 1657cacbbda624d60844c0b8d0ff4d7208329398 Mon Sep 17 00:00:00 2001 From: sage Date: Sat, 11 Feb 2006 18:49:25 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@615 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/common/Timer.cc | 18 ++++- ceph/config.cc | 2 +- ceph/msg/TCPMessenger.cc | 166 +++++++++++++++++++-------------------- ceph/msg/TCPMessenger.h | 1 + ceph/script/runset.pl | 7 +- ceph/tcpsyn.cc | 15 ++-- 6 files changed, 114 insertions(+), 95 deletions(-) diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index c997f112f9947..a91595cfcf696 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -46,7 +46,9 @@ void Timer::timer_thread() // any events due? utime_t next; Context *event = get_next_scheduled(next); - + + list pending; + if (event && now > next) { // move to pending list map< utime_t, set >::iterator it = scheduled.begin(); @@ -60,7 +62,7 @@ void Timer::timer_thread() for (set::iterator cit = it->second.begin(); cit != it->second.end(); cit++) - messenger->queue_callback(*cit); + pending.push_back(*cit); } //pending[t] = it->second; @@ -68,6 +70,18 @@ void Timer::timer_thread() scheduled.erase(t); } + if (!pending.empty()) { + lock.Unlock(); + { // make sure we're not holding any locks whil we talk to the messenger + for (list::iterator cit = pending.begin(); + cit != pending.end(); + cit++) + messenger->queue_callback(*cit); + pending.clear(); + } + lock.Lock(); + } + } else { diff --git a/ceph/config.cc b/ceph/config.cc index 78f548343ed2d..1be492108b8ad 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -65,7 +65,7 @@ md_config_t g_conf = { debug_ns: 0, tcp_skip_rank0: false, - tcp_log: true, + tcp_log: false, // --- client --- diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index 3e92cc703f4b2..7d533a284b488 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -38,10 +38,10 @@ using namespace __gnu_cxx; #include "common/Logger.h" - #define DBL 18 + TCPMessenger *rankmessenger = 0; // TCPDirectory *nameserver = 0; // only defined on rank 0 @@ -107,7 +107,7 @@ map in_threads; // sd -> threadid // debug #undef dout -#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << my_rank << " " << getpid() << "." << pthread_self() << "] " +#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << my_rank /*(<< " " << getpid() << "." << pthread_self() */ << "] " @@ -118,6 +118,10 @@ int tcp_send(Message *m); void tcpmessenger_kick_dispatch_loop(); +int tcpmessenger_get_rank() +{ + return my_rank; +} /** rankserver @@ -430,11 +434,14 @@ bool tcp_read(int sd, char *buf, int len) { while (len > 0) { int got = ::recv( sd, buf, len, 0 ); - if (got < 0) return false; + if (got < 0) { + dout(DBL) << "tcp_read bailing with " << got << endl; + return false; + } assert(got >= 0); len -= got; buf += got; - //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl; + dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl; } return true; } @@ -479,7 +486,7 @@ Message *tcp_recv(int sd) return 0; } - dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << env.source << " dst " << env.dest << " nchunks=" << env.nchunks << endl; + dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << MSG_ADDR_NICE(env.source) << " dst " << MSG_ADDR_NICE(env.dest) << " nchunks=" << env.nchunks << endl; // payload bufferlist blist; @@ -489,7 +496,7 @@ Message *tcp_recv(int sd) bufferptr bp = new buffer(size); - tcp_read( sd, bp.c_str(), size ); + if (!tcp_read( sd, bp.c_str(), size )) return 0; blist.push_back(bp); @@ -682,14 +689,16 @@ void *tcp_inthread(void *r) who = m->get_source(); // give to dispatch loop + size_t sz; incoming_lock.Lock(); + { + incoming.push_back(m); + incoming_cond.Signal(); - stat_inq++; - size_t sz = m->get_payload().length(); - stat_inqb += sz; - - incoming.push_back(m); - incoming_cond.Signal(); + stat_inq++; + sz = m->get_payload().length(); + stat_inqb += sz; + } incoming_lock.Unlock(); if (logger) { @@ -747,92 +756,79 @@ void* tcp_dispatchthread(void*) { dout(5) << "tcp_dispatchthread start pid " << getpid() << endl; - incoming_lock.Lock(); - while (1) { - - // callbacks? + // any pending callbacks? messenger_do_callbacks(); - // timer events? - /*if (pending_timer) { - pending_timer = false; - dout(DBL) << "dispatch: pending timer" << endl; - g_timer.execute_pending(); - } - */ + // inq? + incoming_lock.Lock(); // done? - if (tcp_done && - incoming.empty()) break; - //&& - //!pending_timer) break; + if (tcp_done && incoming.empty()) { + incoming_lock.Unlock(); + break; + } // wait? if (incoming.empty()) { // wait - dout(12) << "dispatch: waiting for incoming messages" << endl; + dout(DBL) << "dispatch: waiting for incoming messages" << endl; incoming_cond.Wait(incoming_lock); + dout(DBL) << "dispatch: woke up" << endl; } - // incoming? - while (!incoming.empty()) { - // grab incoming messages - list in; - in.splice(in.begin(), incoming); - - // drop lock while we deliver - incoming_lock.Unlock(); - - - while (!in.empty()) { - Message *m = in.front(); - in.pop_front(); + // grab incoming messages + list in; + in.splice(in.begin(), incoming); - stat_inq--; - stat_inqb -= m->get_payload().length(); - if (logger) { - logger->set("inq", stat_inq); - logger->set("inqb", stat_inqb); - logger->inc("dis"); - } + // drop lock while we deliver + incoming_lock.Unlock(); - dout(DBL) << "dispatch doing " << *m << endl; + // dispatch! + while (!in.empty()) { + Message *m = in.front(); + in.pop_front(); - // for rankserver? - if (m->get_type() == MSG_NS_CONNECTACK || // i just connected - m->get_dest() == MSG_ADDR_RANK(my_rank)) { - dout(DBL) << " giving to rankserver" << endl; - rankserver.dispatch(m); - continue; - } - - // ok - int dest = m->get_dest(); - directory_lock.Lock(); - if (directory.count(dest)) { - Messenger *who = directory[ dest ]; - directory_lock.Unlock(); - - dout(4) << "---- '" << m->get_type_name() << - "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() << - " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " - << m - << endl; - - who->dispatch(m); - } else { - directory_lock.Unlock(); - dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl; - assert(0); - } + stat_inq--; + stat_inqb -= m->get_payload().length(); + if (logger) { + logger->set("inq", stat_inq); + logger->set("inqb", stat_inqb); + logger->inc("dis"); + } + + dout(DBL) << "dispatch doing " << *m << endl; + + // for rankserver? + if (m->get_type() == MSG_NS_CONNECTACK || // i just connected + m->get_dest() == MSG_ADDR_RANK(my_rank)) { + dout(DBL) << " giving to rankserver" << endl; + rankserver.dispatch(m); + continue; + } + + // ok + int dest = m->get_dest(); + directory_lock.Lock(); + if (directory.count(dest)) { + Messenger *who = directory[ dest ]; + directory_lock.Unlock(); + + dout(4) << "---- '" << m->get_type_name() << + "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() << + " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " + << m + << endl; + + who->dispatch(m); + } else { + directory_lock.Unlock(); + dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl; + assert(0); } - - incoming_lock.Lock(); } } - incoming_lock.Unlock(); g_timer.shutdown(); @@ -874,9 +870,12 @@ int tcpmessenger_start() void tcpmessenger_kick_dispatch_loop() { + dout(DBL) << "kicking" << endl; incoming_lock.Lock(); + dout(DBL) << "prekick" << endl; incoming_cond.Signal(); incoming_lock.Unlock(); + dout(DBL) << "kicked" << endl; } void tcpmessenger_kick_outgoing_loop() @@ -891,6 +890,7 @@ void tcpmessenger_kick_outgoing_loop() void tcpmessenger_wait() { + dout(10) << "tcpmessenger_wait waking up dispatch loop" << endl; tcpmessenger_kick_dispatch_loop(); void *returnval; @@ -965,6 +965,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) myaddr = register_entity(myaddr); } + // my address set_myaddr( myaddr ); @@ -1060,12 +1061,11 @@ int TCPMessenger::shutdown() if (lastone) { dout(2) << "shutdown last tcpmessenger on rank " << my_rank << " shut down" << endl; //pthread_t whoami = pthread_self(); - + // no more timer events g_timer.unset_messenger(); msgr_callback_kicker = 0; - - + // close incoming sockets //void *r; for (map::iterator it = in_threads.begin(); @@ -1085,7 +1085,7 @@ int TCPMessenger::shutdown() tcpmessenger_kick_outgoing_loop(); - + /* dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl; diff --git a/ceph/msg/TCPMessenger.h b/ceph/msg/TCPMessenger.h index 583edf07f4e4e..0b5c344eb292a 100644 --- a/ceph/msg/TCPMessenger.h +++ b/ceph/msg/TCPMessenger.h @@ -52,6 +52,7 @@ extern void tcpmessenger_stop_nameserver(); // on rank 0 extern void tcpmessenger_start_rankserver(tcpaddr_t& ta); // on all ranks extern void tcpmessenger_stop_rankserver(); // on all ranks +extern int tcpmessenger_get_rank(); inline ostream& operator<<(ostream& out, struct sockaddr_in &a) { diff --git a/ceph/script/runset.pl b/ceph/script/runset.pl index 41d803adcd379..14fed66c4dcdb 100755 --- a/ceph/script/runset.pl +++ b/ceph/script/runset.pl @@ -65,8 +65,8 @@ sub iterate { my @r; my $this; - for my $k (keys %$sim) { - if ($fix->{$k}) { + for my $k (sort keys %$sim) { + if (defined $fix->{$k}) { $this->{$k} = $fix->{$k}; } elsif (!(ref $sim->{$k})) { @@ -94,7 +94,7 @@ sub run { my $h = shift @_; my @fn; - for my $k (keys %$sim) { + for my $k (sort keys %$sim) { next unless ref $sim->{$k}; push(@fn, "$k=$h->{$k}"); } @@ -130,6 +130,7 @@ sub run { print "-> $c\n"; + print " " . `date`; my $r; unless ($fake) { $r = system "$c > log/$fn/o"; diff --git a/ceph/tcpsyn.cc b/ceph/tcpsyn.cc index f6570f106be4d..025e555bcf18d 100644 --- a/ceph/tcpsyn.cc +++ b/ceph/tcpsyn.cc @@ -79,8 +79,9 @@ int main(int argc, char **argv) MDS *mds[NUMMDS]; for (int i=0; iinit(); started++; } @@ -89,8 +90,9 @@ int main(int argc, char **argv) OSD *osd[NUMOSD]; for (int i=0; iinit(); started++; } @@ -145,7 +147,7 @@ int main(int argc, char **argv) nclients++; } if (nclients) { - cerr << "waiting for " << nclients << " clients to finish" << endl; + cerr << nclients << " clients on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl; } for (set::iterator it = clientlist.begin(); @@ -164,7 +166,8 @@ int main(int argc, char **argv) if (myrank && !started) { - dout(1) << "IDLE" << endl; + //dout(1) << "IDLE" << endl; + cerr << "idle on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl; tcpmessenger_stop_rankserver(); } -- 2.39.5