#include "common/Logger.h"
-
#define DBL 18
+
TCPMessenger *rankmessenger = 0; //
TCPDirectory *nameserver = 0; // only defined on rank 0
// debug
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << my_rank << " " << getpid() << "." << pthread_self() << "] "
+#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << my_rank /*(<< " " << getpid() << "." << pthread_self() */ << "] "
void tcpmessenger_kick_dispatch_loop();
+int tcpmessenger_get_rank()
+{
+ return my_rank;
+}
/** rankserver
{
while (len > 0) {
int got = ::recv( sd, buf, len, 0 );
- if (got < 0) return false;
+ if (got < 0) {
+ dout(DBL) << "tcp_read bailing with " << got << endl;
+ return false;
+ }
assert(got >= 0);
len -= got;
buf += got;
- //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
+ dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
}
return true;
}
return 0;
}
- dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << env.source << " dst " << env.dest << " nchunks=" << env.nchunks << endl;
+ dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << MSG_ADDR_NICE(env.source) << " dst " << MSG_ADDR_NICE(env.dest) << " nchunks=" << env.nchunks << endl;
// payload
bufferlist blist;
bufferptr bp = new buffer(size);
- tcp_read( sd, bp.c_str(), size );
+ if (!tcp_read( sd, bp.c_str(), size )) return 0;
blist.push_back(bp);
who = m->get_source();
// give to dispatch loop
+ size_t sz;
incoming_lock.Lock();
+ {
+ incoming.push_back(m);
+ incoming_cond.Signal();
- stat_inq++;
- size_t sz = m->get_payload().length();
- stat_inqb += sz;
-
- incoming.push_back(m);
- incoming_cond.Signal();
+ stat_inq++;
+ sz = m->get_payload().length();
+ stat_inqb += sz;
+ }
incoming_lock.Unlock();
if (logger) {
{
dout(5) << "tcp_dispatchthread start pid " << getpid() << endl;
- incoming_lock.Lock();
-
while (1) {
-
- // callbacks?
+ // any pending callbacks?
messenger_do_callbacks();
- // timer events?
- /*if (pending_timer) {
- pending_timer = false;
- dout(DBL) << "dispatch: pending timer" << endl;
- g_timer.execute_pending();
- }
- */
+ // inq?
+ incoming_lock.Lock();
// done?
- if (tcp_done &&
- incoming.empty()) break;
- //&&
- //!pending_timer) break;
+ if (tcp_done && incoming.empty()) {
+ incoming_lock.Unlock();
+ break;
+ }
// wait?
if (incoming.empty()) {
// wait
- dout(12) << "dispatch: waiting for incoming messages" << endl;
+ dout(DBL) << "dispatch: waiting for incoming messages" << endl;
incoming_cond.Wait(incoming_lock);
+ dout(DBL) << "dispatch: woke up" << endl;
}
- // incoming?
- while (!incoming.empty()) {
- // grab incoming messages
- list<Message*> in;
- in.splice(in.begin(), incoming);
-
- // drop lock while we deliver
- incoming_lock.Unlock();
-
-
- while (!in.empty()) {
- Message *m = in.front();
- in.pop_front();
+ // grab incoming messages
+ list<Message*> in;
+ in.splice(in.begin(), incoming);
- stat_inq--;
- stat_inqb -= m->get_payload().length();
- if (logger) {
- logger->set("inq", stat_inq);
- logger->set("inqb", stat_inqb);
- logger->inc("dis");
- }
+ // drop lock while we deliver
+ incoming_lock.Unlock();
- dout(DBL) << "dispatch doing " << *m << endl;
+ // dispatch!
+ while (!in.empty()) {
+ Message *m = in.front();
+ in.pop_front();
- // for rankserver?
- if (m->get_type() == MSG_NS_CONNECTACK || // i just connected
- m->get_dest() == MSG_ADDR_RANK(my_rank)) {
- dout(DBL) << " giving to rankserver" << endl;
- rankserver.dispatch(m);
- continue;
- }
-
- // ok
- int dest = m->get_dest();
- directory_lock.Lock();
- if (directory.count(dest)) {
- Messenger *who = directory[ dest ];
- directory_lock.Unlock();
-
- dout(4) << "---- '" << m->get_type_name() <<
- "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
- " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- "
- << m
- << endl;
-
- who->dispatch(m);
- } else {
- directory_lock.Unlock();
- dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl;
- assert(0);
- }
+ stat_inq--;
+ stat_inqb -= m->get_payload().length();
+ if (logger) {
+ logger->set("inq", stat_inq);
+ logger->set("inqb", stat_inqb);
+ logger->inc("dis");
+ }
+
+ dout(DBL) << "dispatch doing " << *m << endl;
+
+ // for rankserver?
+ if (m->get_type() == MSG_NS_CONNECTACK || // i just connected
+ m->get_dest() == MSG_ADDR_RANK(my_rank)) {
+ dout(DBL) << " giving to rankserver" << endl;
+ rankserver.dispatch(m);
+ continue;
+ }
+
+ // ok
+ int dest = m->get_dest();
+ directory_lock.Lock();
+ if (directory.count(dest)) {
+ Messenger *who = directory[ dest ];
+ directory_lock.Unlock();
+
+ dout(4) << "---- '" << m->get_type_name() <<
+ "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
+ " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- "
+ << m
+ << endl;
+
+ who->dispatch(m);
+ } else {
+ directory_lock.Unlock();
+ dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl;
+ assert(0);
}
-
- incoming_lock.Lock();
}
}
- incoming_lock.Unlock();
g_timer.shutdown();
void tcpmessenger_kick_dispatch_loop()
{
+ dout(DBL) << "kicking" << endl;
incoming_lock.Lock();
+ dout(DBL) << "prekick" << endl;
incoming_cond.Signal();
incoming_lock.Unlock();
+ dout(DBL) << "kicked" << endl;
}
void tcpmessenger_kick_outgoing_loop()
void tcpmessenger_wait()
{
+ dout(10) << "tcpmessenger_wait waking up dispatch loop" << endl;
tcpmessenger_kick_dispatch_loop();
void *returnval;
myaddr = register_entity(myaddr);
}
+
// my address
set_myaddr( myaddr );
if (lastone) {
dout(2) << "shutdown last tcpmessenger on rank " << my_rank << " shut down" << endl;
//pthread_t whoami = pthread_self();
-
+
// no more timer events
g_timer.unset_messenger();
msgr_callback_kicker = 0;
-
-
+
// close incoming sockets
//void *r;
for (map<int,pthread_t>::iterator it = in_threads.begin();
tcpmessenger_kick_outgoing_loop();
-
+
/*
dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
MDS *mds[NUMMDS];
for (int i=0; i<NUMMDS; i++) {
if (myrank != g_conf.tcp_skip_rank0+i) continue;
- cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
- mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+ TCPMessenger *m = new TCPMessenger(MSG_ADDR_MDS(i));
+ cerr << "mds" << i << " on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
+ mds[i] = new MDS(mdc, i, m);
mds[i]->init();
started++;
}
OSD *osd[NUMOSD];
for (int i=0; i<NUMOSD; i++) {
if (myrank != g_conf.tcp_skip_rank0+NUMMDS + i) continue;
- cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
- osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+ TCPMessenger *m = new TCPMessenger(MSG_ADDR_OSD(i));
+ cerr << "osd" << i << " on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
+ osd[i] = new OSD(i, m);
osd[i]->init();
started++;
}
nclients++;
}
if (nclients) {
- cerr << "waiting for " << nclients << " clients to finish" << endl;
+ cerr << nclients << " clients on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
}
for (set<int>::iterator it = clientlist.begin();
if (myrank && !started) {
- dout(1) << "IDLE" << endl;
+ //dout(1) << "IDLE" << endl;
+ cerr << "idle on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
tcpmessenger_stop_rankserver();
}