From: Haomai Wang Date: Sat, 10 Jan 2015 13:35:04 +0000 (+0800) Subject: AsyncConnection: Avoid calling callback after delteing AsyncMessenger X-Git-Tag: v0.93~247^2~15 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=34cbd4c76c14fbff8f12416c6e6f1ae4b0701d68;p=ceph.git AsyncConnection: Avoid calling callback after delteing AsyncMessenger Now when calling mark_down/mark_down_all, it will dispatch a reset event. If we call Messenger::shutdown/wait, and it will let reset event called after Messenger dealloc. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index e1d803e7e39b..659eeca4795c 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2111,7 +2111,6 @@ void AsyncConnection::mark_down() stopping.set(1); Mutex::Locker l(lock); _stop(); - center->dispatch_event_external(reset_handler); } void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index ab4c445cfe5f..d6c1771495dd 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -286,6 +286,10 @@ class AsyncConnection : public Connection { void process(); void wakeup_from(uint64_t id); void local_deliver(); + void stop() { + mark_down(); + center->dispatch_event_external(reset_handler); + } }; /* AsyncConnection */ typedef boost::intrusive_ptr AsyncConnectionRef; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 902a684e2bb0..48241aacd4a5 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -106,13 +106,19 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) return -errno; } + int r = net.set_nonblock(listen_sd); + if (r < 0) { + ::close(listen_sd); + listen_sd = -1; + return -errno; + } // use whatever user specified (if anything) entity_addr_t listen_addr = bind_addr; listen_addr.set_family(family); /* bind to port */ int rc = -1; - int r = -1; + r = -1; for (int i = 0; i < conf->ms_bind_retry_count; i++) { if (i > 0) { @@ -167,6 +173,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) if (rc < 0) { lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count << " attempts: " << cpp_strerror(errno) << dendl; + ::close(listen_sd); + listen_sd = -1; return r; } @@ -176,6 +184,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) if (rc < 0) { rc = -errno; lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl; + ::close(listen_sd); + listen_sd = -1; return rc; } @@ -187,6 +197,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) rc = -errno; lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr << ": " << cpp_strerror(rc) << dendl; + ::close(listen_sd); + listen_sd = -1; return rc; } @@ -252,10 +264,17 @@ void Processor::accept() ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl; msgr->add_accept(sd); - break; + continue; } else { - ldout(msgr->cct, 0) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + if (errno == EINTR) { + continue; + } else if (errno == EAGAIN) { + break; + } else { + errors++; + ldout(msgr->cct, 20) << __func__ << " no incoming connection? sd = " << sd + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } } } } @@ -371,7 +390,6 @@ void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; pthread_t cur = pthread_self(); - uint64_t send = 0; for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { assert(cur != (*it)->center.get_owner()); (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); @@ -393,7 +411,7 @@ void WorkerPool::barrier() AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), - processor(this, _nonce), + processor(this, cct, _nonce), lock("AsyncMessenger::lock"), nonce(_nonce), need_addr(true), did_bind(false), global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), @@ -663,8 +681,8 @@ void AsyncMessenger::mark_down_all() for (set::iterator q = accepting_conns.begin(); q != accepting_conns.end(); ++q) { AsyncConnectionRef p = *q; - ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl; - p->mark_down(); + ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl; + p->stop(); } accepting_conns.clear(); @@ -673,15 +691,18 @@ void AsyncMessenger::mark_down_all() AsyncConnectionRef p = it->second; ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; conns.erase(it); - p->mark_down(); + p->stop(); } - while (!deleted_conns.empty()) { - set::iterator it = deleted_conns.begin(); - AsyncConnectionRef p = *it; - ldout(cct, 5) << __func__ << " delete " << p << dendl; - p->put(); - deleted_conns.erase(it); + { + Mutex::Locker l(deleted_lock); + while (!deleted_conns.empty()) { + set::iterator it = deleted_conns.begin(); + AsyncConnectionRef p = *it; + ldout(cct, 5) << __func__ << " delete " << p << dendl; + p->put(); + deleted_conns.erase(it); + } } lock.Unlock(); } @@ -692,7 +713,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) AsyncConnectionRef p = _lookup_conn(addr); if (p) { ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; - p->mark_down(); + p->stop(); } else { ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl; } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index f4bb34a8c915..972934ad8efb 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -63,12 +63,13 @@ class Worker : public Thread { */ class Processor { AsyncMessenger *msgr; + NetHandler net; Worker *worker; int listen_sd; uint64_t nonce; public: - Processor(AsyncMessenger *r, uint64_t n): msgr(r), worker(NULL), listen_sd(-1), nonce(n) {} + Processor(AsyncMessenger *r, CephContext *c, uint64_t n): msgr(r), net(c), worker(NULL), listen_sd(-1), nonce(n) {} void stop(); int bind(const entity_addr_t &bind_addr, const set& avoid_ports); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 0928f5c7dc8f..2a95f9ad9172 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -149,7 +149,6 @@ class FakeDispatcher : public Dispatcher { cerr << __func__ << con << std::endl; Session *s = static_cast(con->get_priv()); if (s) { - Mutex::Locker l(s->lock); s->con.reset(NULL); // break con <-> session ref cycle con->set_priv(NULL); // break ref <-> session cycle, if any s->put();