void Rank::Pipe::close()
{
- if (sent_close) {
- dout(10) << "pipe(" << peer_addr << ' ' << this << ").close already closing" << endl;
- return;
- }
dout(10) << "pipe(" << peer_addr << ' ' << this << ").close" << endl;
// unreg ourselves
{
if (rank.rank_pipe.count(peer_addr) &&
rank.rank_pipe[peer_addr] == this) {
- dout(10) << "pipe(" << peer_addr << ' ' << this << ").close unregistering pipe" << endl;
+ dout(10) << "pipe(" << peer_addr << ' ' << this
+ << ").close unregistering pipe" << endl;
rank.rank_pipe.erase(peer_addr);
}
}
rank.lock.Unlock();
- // queue close message.
- if (socket_error) {
- dout(10) << "pipe(" << peer_addr << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl;
- }
- else if (!writer_running) {
- dout(10) << "pipe(" << peer_addr << ' ' << this << ").close not queueing MSG_CLOSE, no writer running" << endl;
+ // queue close message?
+ if (!need_to_send_close) {
+ dout(10) << "pipe(" << peer_addr << ' ' << this
+ << ").close already closing/closed" << endl;
+ return;
+ }
+
+ if (!writer_running) {
+ dout(10) << "pipe(" << peer_addr << ' ' << this
+ << ").close not queueing MSG_CLOSE, no writer running" << endl;
} else {
- dout(10) << "pipe(" << peer_addr << ' ' << this << ").close queueing MSG_CLOSE" << endl;
+ dout(10) << "pipe(" << peer_addr << ' ' << this
+ << ").close queueing MSG_CLOSE" << endl;
lock.Lock();
q.push_back(new MGenericMessage(MSG_CLOSE));
cond.Signal();
- sent_close = true;
+ need_to_send_close = false;
lock.Unlock();
}
}
if (m) {
delete m;
dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader read MSG_CLOSE message" << endl;
+ need_to_send_close = false;
} else {
derr(10) << "pipe(" << peer_addr << ' ' << this << ").reader read null message" << endl;
}
- if (!sent_close)
- close();
+ close();
done = true;
cond.Signal(); // wake up writer too.
msg_envelope_t env;
if (!tcp_read( sd, (char*)&env, sizeof(env) )) {
- socket_error = true;
+ need_to_send_close = false;
return 0;
}
for (int i=0; i<env.nchunks; i++) {
int size;
if (!tcp_read( sd, (char*)&size, sizeof(size) )) {
- socket_error = true;
+ need_to_send_close = false;
return 0;
}
bufferptr bp(size);
if (!tcp_read( sd, bp.c_str(), size )) {
- socket_error = true;
+ need_to_send_close = false;
return 0;
}
if (r < 0) {
derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending envelope for " << *m
<< " to " << m->get_dest() << endl;
- socket_error = true;
+ need_to_send_close = false;
return -1;
}
r = tcp_write( sd, (char*)&size, sizeof(size) );
if (r < 0) {
derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << endl;
- socket_error = true;
+ need_to_send_close = false;
return -1;
}
r = tcp_write( sd, (*it).c_str(), size );
if (r < 0) {
derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << endl;
- socket_error = true;
+ need_to_send_close = false;
return -1;
}
i++;
r = tcp_write( sd, (char*)&size, sizeof(size) );
if (r < 0) {
derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data len for " << *m << " to " << m->get_dest() << endl;
- socket_error = true;
+ need_to_send_close = false;
return -1;
}
dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer data len is " << size << " in " << blist.buffers().size() << " buffers" << endl;
r = tcp_write( sd, (char*)(*it).c_str(), (*it).length() );
if (r < 0) {
derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data megachunk for " << *m << " to " << m->get_dest() << " : len " << (*it).length() << endl;
- socket_error = true;
+ need_to_send_close = false;
return -1;
}
}
bool done;
entity_addr_t peer_addr;
bool server;
- bool sent_close;
- bool socket_error;
+ bool need_to_send_close;
bool reader_running;
bool writer_running;
void *entry() { pipe->writer(); return 0; }
} writer_thread;
friend class Writer;
-
+
public:
Pipe(int s) : sd(s),
done(false), server(true),
- sent_close(false), socket_error(false),
+ need_to_send_close(true),
reader_running(false), writer_running(false),
reader_thread(this), writer_thread(this) {
// server
reader_thread.create();
}
Pipe(const entity_addr_t &pi) : sd(0),
- done(false), peer_addr(pi), server(false),
- sent_close(false),
- reader_running(false), writer_running(false),
- reader_thread(this), writer_thread(this) {
+ done(false), peer_addr(pi), server(false),
+ need_to_send_close(true),
+ reader_running(false), writer_running(false),
+ reader_thread(this), writer_thread(this) {
// client
writer_running = true;
writer_thread.create();