return -errno;
}
+ int r = net.set_nonblock(listen_sd);
+ if (r < 0) {
+ ::close(listen_sd);
+ listen_sd = -1;
+ return -errno;
+ }
// use whatever user specified (if anything)
entity_addr_t listen_addr = bind_addr;
listen_addr.set_family(family);
/* bind to port */
int rc = -1;
- int r = -1;
+ r = -1;
for (int i = 0; i < conf->ms_bind_retry_count; i++) {
if (i > 0) {
if (rc < 0) {
lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
<< " attempts: " << cpp_strerror(errno) << dendl;
+ ::close(listen_sd);
+ listen_sd = -1;
return r;
}
if (rc < 0) {
rc = -errno;
lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl;
+ ::close(listen_sd);
+ listen_sd = -1;
return rc;
}
rc = -errno;
lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
<< ": " << cpp_strerror(rc) << dendl;
+ ::close(listen_sd);
+ listen_sd = -1;
return rc;
}
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
msgr->add_accept(sd);
- break;
+ continue;
} else {
- ldout(msgr->cct, 0) << __func__ << " no incoming connection? sd = " << sd
- << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ if (errno == EINTR) {
+ continue;
+ } else if (errno == EAGAIN) {
+ break;
+ } else {
+ errors++;
+ ldout(msgr->cct, 20) << __func__ << " no incoming connection? sd = " << sd
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ }
}
}
}
{
ldout(cct, 10) << __func__ << " started." << dendl;
pthread_t cur = pthread_self();
- uint64_t send = 0;
for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
assert(cur != (*it)->center.get_owner());
(*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
- processor(this, _nonce),
+ processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), need_addr(true), did_bind(false),
global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
AsyncConnectionRef p = *q;
- ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
- p->mark_down();
+ ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
+ p->stop();
}
accepting_conns.clear();
AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
- p->mark_down();
+ p->stop();
}
- while (!deleted_conns.empty()) {
- set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
- AsyncConnectionRef p = *it;
- ldout(cct, 5) << __func__ << " delete " << p << dendl;
- p->put();
- deleted_conns.erase(it);
+ {
+ Mutex::Locker l(deleted_lock);
+ while (!deleted_conns.empty()) {
+ set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
+ AsyncConnectionRef p = *it;
+ ldout(cct, 5) << __func__ << " delete " << p << dendl;
+ p->put();
+ deleted_conns.erase(it);
+ }
}
lock.Unlock();
}
AsyncConnectionRef p = _lookup_conn(addr);
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
- p->mark_down();
+ p->stop();
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
}