#include "tcp.cc"
+// help find socket resource leaks
+int sockopen = 0;
+#define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
+#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
+
+
Rank rank;
#ifdef DARWIN
derr(0) << "got control-c, exiting" << dendl;
// force close listener socket
- if (accepter.listen_sd >= 0)
+ if (accepter.listen_sd >= 0) {
::close(accepter.listen_sd);
+ accepter.listen_sd = -1;
+ closed_socket();
+ }
// force close all pipe sockets, too
for (hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.begin();
<< strerror(errno) << dendl;
return -errno;
}
+ opened_socket();
int on = 1;
::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
socklen_t slen = sizeof(addr);
int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
if (sd >= 0) {
+ opened_socket();
dout(10) << "accepted incoming on sd " << sd << dendl;
// disable Nagle algorithm?
if (listen_sd >= 0) {
::close(listen_sd);
listen_sd = -1;
+ closed_socket();
}
dout(10) << "accepter stopping" << dendl;
return 0;
pipes.erase(p);
p->join();
dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
+ assert(p->sd < 0);
delete p;
dout(10) << "reaper deleted pipe " << p << dendl;
}
#define dout_prefix _pipe_prefix()
ostream& Rank::Pipe::_pipe_prefix() {
return *_dout << dbeginl << pthread_self()
- << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ").";
+ << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this
+ << " sd=" << sd
+ << " pgs=" << peer_global_seq
+ << " cs=" << connect_seq
+ << ").";
}
int Rank::Pipe::accept()
rc = tcp_read(sd, (char*)&connect, sizeof(connect));
if (rc < 0) {
dout(10) << "accept couldn't read connect" << dendl;
- goto fail;
+ goto fail_unlocked;
}
dout(20) << "accept got peer connect_seq " << connect.connect_seq
<< " global_seq " << connect.global_seq
if (tcp_write(sd, (char*)&gseq, sizeof(gseq)) < 0)
goto fail;
continue;
+ } else {
+ dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+ << " <= " << connect.global_seq << ", looks ok" << dendl;
}
if (existing->policy.lossy_tx) {
goto fail;
}
+ dout(-10) << "accept connect_seq " << connect.connect_seq
+ << " vs existing " << existing->connect_seq
+ << " state " << existing->state << dendl;
+
if (connect.connect_seq < existing->connect_seq) {
if (connect.connect_seq == 0) {
dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
fail:
+ rank.lock.Unlock();
+ fail_unlocked:
lock.Lock();
state = STATE_CLOSED;
fault();
if (sd >= 0) {
::close(sd);
sd = -1;
+ closed_socket();
}
__u32 cseq = connect_seq;
__u32 gseq = rank.get_global_seq();
+ // stop reader thrad
+ join_reader();
+
lock.Unlock();
- int newsd;
char tag = -1;
int rc;
struct sockaddr_in myAddr;
entity_addr_t paddr;
// create socket?
- newsd = ::socket(AF_INET, SOCK_STREAM, 0);
- if (newsd < 0) {
+ sd = ::socket(AF_INET, SOCK_STREAM, 0);
+ if (sd < 0) {
dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl;
assert(0);
goto fail;
}
+ opened_socket();
// bind any port
myAddr.sin_family = AF_INET;
myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
myAddr.sin_port = htons( 0 );
- rc = ::bind(newsd, (struct sockaddr *) &myAddr, sizeof(myAddr));
- assert(rc>=0);
+ dout(10) << "binding to " << myAddr << dendl;
+ rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr));
+ if (rc < 0) {
+ dout(2) << "bind error " << myAddr
+ << ", " << errno << ": " << strerror(errno) << dendl;
+ goto fail;
+ }
// connect!
dout(10) << "connecting to " << peer_addr.ipaddr << dendl;
- rc = ::connect(newsd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
+ rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
if (rc < 0) {
dout(2) << "connect error " << peer_addr.ipaddr
<< ", " << errno << ": " << strerror(errno) << dendl;
// disable Nagle algorithm?
if (g_conf.ms_tcp_nodelay) {
int flag = 1;
- int r = ::setsockopt(newsd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
+ int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
if (r < 0)
dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
}
// verify banner
// FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
- rc = tcp_read(newsd, (char*)&banner, strlen(CEPH_BANNER));
+ rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
if (rc < 0) {
dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
goto fail;
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
- if (do_sendmsg(newsd, &msg, msglen)) {
+ if (do_sendmsg(sd, &msg, msglen)) {
dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
goto fail;
}
// identify peer
- rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr));
+ rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
if (rc < 0) {
dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
goto fail;
}
- dout(20) << "connect read peer addr " << paddr << " on socket " << newsd << dendl;
+ dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
if (!peer_addr.is_local_to(paddr)) {
if (paddr.ipaddr.sin_addr.s_addr == 0 &&
peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port) {
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
- if (do_sendmsg(newsd, &msg, msglen)) {
+ if (do_sendmsg(sd, &msg, msglen)) {
dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
goto fail;
}
msglen = msgvec[0].iov_len;
dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
- if (do_sendmsg(newsd, &msg, msglen)) {
+ if (do_sendmsg(sd, &msg, msglen)) {
dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
goto fail;
}
dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl;
- if (tcp_read(newsd, &tag, 1) < 0) {
+ if (tcp_read(sd, &tag, 1) < 0) {
dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
goto fail;
}
continue;
}
if (tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
- int rc = tcp_read(newsd, (char*)&gseq, sizeof(gseq));
+ int rc = tcp_read(sd, (char*)&gseq, sizeof(gseq));
if (rc < 0) {
dout(0) << "connect got RETRY_GLOBAL tag but couldn't read gseq" << dendl;
goto fail;
continue;
}
if (tag == CEPH_MSGR_TAG_RETRY_SESSION) {
- int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
+ int rc = tcp_read(sd, (char*)&cseq, sizeof(cseq));
if (rc < 0) {
dout(0) << "connect got RETRY_SESSION tag but couldn't read cseq" << dendl;
goto fail;
// read flags
__u8 flags;
- if (tcp_read(newsd, (char *)&flags, 1) < 0) {
+ if (tcp_read(sd, (char *)&flags, 1) < 0) {
dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
goto fail;
}
// hooray!
state = STATE_OPEN;
- sd = newsd;
connect_seq = cseq+1;
first_fault = last_attempt = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
dout(3) << "connect fault, but state != connecting, stopping" << dendl;
stop_locked:
- if (newsd >= 0)
- ::close(newsd);
return -1;
}
}
}
-void Rank::Pipe::fault(bool onconnect)
+void Rank::Pipe::fault(bool onconnect, bool onread)
{
assert(lock.is_locked());
cond.Signal();
+ if (onread && state == STATE_CONNECTING) {
+ dout(10) << "fault already connecting, reader shutting down" << dendl;
+ return;
+ }
+
if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl;
if (state == STATE_CLOSED ||
return;
}
- if (sd >= 0)
+ if (sd >= 0) {
::close(sd);
- sd = -1;
+ sd = -1;
+ closed_socket();
+ }
// lossy channel?
if (policy.lossy_tx) {
cond.Signal();
state = STATE_CLOSED;
- if (sd >= 0)
+ if (sd >= 0) {
::close(sd);
- sd = -1;
+ sd = -1;
+ closed_socket();
+ }
}
lock.Lock();
// loop.
- while (state != STATE_CLOSED) {
+ while (state != STATE_CLOSED &&
+ state != STATE_CONNECTING) {
assert(lock.is_locked());
// sleep if (re)connecting
- if (state == STATE_CONNECTING ||
- state == STATE_STANDBY) {
+ if (state == STATE_STANDBY) {
dout(20) << "reader sleeping during reconnect|standby" << dendl;
cond.Wait(lock);
continue;
if (rc < 0) {
lock.Lock();
dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
- fault();
+ fault(false, true);
continue;
}
lock.Lock();
if (rc < 0) {
dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
- fault();
- } else {
+ fault(false, true);
+ } else if (state != STATE_CLOSED) {
dout(15) << "reader got ack seq " << seq << dendl;
// trim sent list
while (!sent.empty() &&
if (!m) {
derr(2) << "reader read null message, " << strerror(errno) << dendl;
lock.Lock();
- fault();
+ fault(false, true);
continue;
}
// note received seq#
lock.Lock();
+ if (state == STATE_CLOSED ||
+ state == STATE_CONNECTING)
+ continue;
+
if (m->get_seq() <= in_seq) {
dout(-10) << "reader got old message "
<< m->get_seq() << " <= " << in_seq << " " << m << " " << *m
derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
<< " for " << *m << " from " << m->get_source() << dendl;
assert(in_seq == m->get_seq()); // for now!
- fault();
+ fault(false, true);
delete m;
continue;
}
else {
dout(0) << "reader bad tag " << (int)tag << dendl;
lock.Lock();
- fault();
+ fault(false, true);
}
}
// reap?
bool reap = false;
reader_running = false;
- if (!writer_running) reap = true;
+ if (!writer_running)
+ reap = true;
lock.Unlock();
if (reap) {
dout(10) << "reader queueing for reap" << dendl;
- if (sd >= 0) ::close(sd);
+ if (sd >= 0) {
+ ::close(sd);
+ sd = -1;
+ closed_socket();
+ }
rank.lock.Lock();
{
rank.pipe_reap_queue.push_back(this);
if (reap) {
dout(10) << "writer queueing for reap" << dendl;
- if (sd >= 0) ::close(sd);
+ if (sd >= 0) {
+ ::close(sd);
+ sd = -1;
+ closed_socket();
+ }
rank.lock.Lock();
{
rank.pipe_reap_queue.push_back(this);