#include "tcp.cc"
-
-// help find socket resource leaks
-//static int sockopen = 0;
-#define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
-#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
-
-
-
/********************************************
* Accepter
*/
<< strerror_r(errno, buf, sizeof(buf)) << std::endl;
return -errno;
}
- opened_socket();
// reuse addr+port when possible
int on = 1;
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
errors = 0;
- opened_socket();
dout(10) << "accepted incoming on sd " << sd << dendl;
// disable Nagle algorithm?
if (listen_sd >= 0) {
::close(listen_sd);
listen_sd = -1;
- closed_socket();
}
dout(10) << "accepter stopping" << dendl;
return 0;
dout(10) << "connect " << connect_seq << dendl;
assert(pipe_lock.is_locked());
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
- }
__u32 cseq = connect_seq;
__u32 gseq = messenger->get_global_seq();
AuthAuthorizer *authorizer = NULL;
bufferlist addrbl, myaddrbl;
+ // close old socket. this is safe because we stopped the reader thread above.
+ if (sd >= 0)
+ ::close(sd);
+
// create socket?
sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
if (sd < 0) {
assert(0);
goto fail;
}
- opened_socket();
char buf[80];
return;
}
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
- }
+ shutdown_socket();
// lossy channel?
if (policy.lossy) {
assert(pipe_lock.is_locked());
state = STATE_CLOSED;
cond.Signal();
- if (sd >= 0) {
- ::shutdown(sd, SHUT_RDWR);
- ::close(sd);
- sd = -1;
- }
+ shutdown_socket();
}
char buf[80];
pipe_lock.Lock();
-
while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
void SimpleMessenger::Pipe::unlock_maybe_reap()
{
if (!reader_running && !writer_running) {
- // close
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
- }
-
+ shutdown_socket();
pipe_lock.Unlock();
-
messenger->queue_reap(this);
} else {
pipe_lock.Unlock();
assert(pipes.count(p));
pipes.erase(p);
p->join();
+ if (p->sd >= 0)
+ ::close(p->sd);
dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
- assert(p->sd < 0);
if (p->connection_state)
p->connection_state->clear_pipe();
p->put();
public:
Pipe(SimpleMessenger *r, int st) :
messenger(r),
- sd(-1), peer_type(-1),
+ sd(-1),
+ peer_type(-1),
pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
state(st),
connection_state(new Connection),
void requeue_sent(uint64_t max_acked=0);
void discard_queue();
- void force_close() {
- if (sd >= 0) ::close(sd);
+ void shutdown_socket() {
+ if (sd >= 0)
+ ::shutdown(sd, SHUT_RDWR);
}
};