list<Message*> incoming;
Mutex incoming_lock;
Cond incoming_cond;
+list<Message*> outgoing;
+Mutex outgoing_lock;
+Cond outgoing_cond;
struct sockaddr_in *remote_addr;
int *in_sd; // incoming sockets
bool tcp_done = false; // set this flag to stop the event loop
pthread_t dispatch_thread_id = 0; // thread id of the event loop. init value == nobody
+pthread_t out_thread_id = 0; // thread id of the event loop. init value == nobody
pthread_t listen_thread_id = 0;
Mutex sender_lock;
// recv event loop, for unsolicited messages.
+void* tcp_sendthread(void*)
+{
+ outgoing_lock.Lock();
+ while (!tcp_done) {
+
+ while (outgoing.size()) {
+ Message *m = outgoing.front();
+ outgoing.pop_front();
+ tcp_send(m);
+ }
+
+ outgoing_cond.Wait(outgoing_lock);
+
+ }
+ outgoing_lock.Unlock();
+}
+
void* tcpmessenger_loop(void*)
{
dout(5) << "tcpmessenger_loop start pid " << getpid() << endl;
while (incoming.size()) {
Message *m = incoming.front();
incoming.pop_front();
+
+ incoming_lock.Unlock();
int dest = m->get_dest();
if (directory.count(dest)) {
assert(0);
break;
}
+
+ incoming_lock.Lock();
}
}
tcpmessenger_loop,
0);
+
+ dout(5) << "starting outgoing thread" << endl;
+ pthread_create(&out_thread_id,
+ NULL,
+ tcp_sendthread,
+ 0);
+
}
tcp_done = true;
incoming_cond.Signal();
+ outgoing_cond.Signal();
/*
dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
m->set_source(myaddr, fromport);
m->set_dest(dest, port);
- tcp_send(m);
+ if (0) {
+ tcp_send(m);
+ } else {
+ outgoing_lock.Lock();
+ outgoing.push_back(m);
+ outgoing_lock.Unlock();
+ outgoing_cond.Signal();
+ }
}