From 1cb78eeb73a6b9d79aa30ebd3adb794234a9c5f3 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 19 Dec 2014 22:25:58 +0800 Subject: [PATCH] AsyncMessenger: Make learn_addr async to avoid destroying lock rule Make learn_addr become a async op, otherwise holding connection's lock then acquire messenger's lock will destroy lock rule. Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 7 ++++++- src/msg/async/AsyncConnection.h | 2 ++ src/msg/async/AsyncMessenger.cc | 23 ++++++++++++++++------- src/msg/async/AsyncMessenger.h | 4 ++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5a0cf46c3c284..9caa02f5574c0 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -187,7 +187,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente state(STATE_NONE), state_after_send(0), sd(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), stop_lock("AsyncConnection::stop_lock"), - got_bad_auth(false), authorizer(NULL), replacing(false), + got_bad_auth(false), authorizer(NULL), replacing(false), stopping(0), state_buffer(4096), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); @@ -975,6 +975,11 @@ int AsyncConnection::_process_connection() } ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl; + // TODO: it's tricky that exit loop if exist AsyncMessenger waiting for + // mark_down. Otherwise, it will be deadlock while + // AsyncMessenger::mark_down_all already hold lock. + if (stopping.read()) + break; async_msgr->learned_addr(peer_addr_for_me); ::encode(async_msgr->get_myaddr(), myaddrbl); r = _try_send(myaddrbl); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index d12b79fa44358..348a87b90250e 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -148,6 +148,7 @@ class AsyncConnection : public Connection { if (center->get_owner() == pthread_self()) { stop(); } else { + stopping.set(1); center->dispatch_event_external(stop_handler); stop_cond.Wait(stop_lock); } @@ -290,6 +291,7 @@ class AsyncConnection : public Connection { // there won't exists conflicting connection so we use // "replacing" to skip RESETSESSION to avoid detect wrong // presentation + atomic_t stopping; // used only for local state, it will be overwrite when state transition bufferptr state_buffer; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 81772ba24c8b4..6886fe6e2a9af 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -351,7 +351,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, : SimplePolicyMessenger(cct, name,mname, _nonce), processor(this, _nonce), lock("AsyncMessenger::lock"), - nonce(_nonce), did_bind(false), + nonce(_nonce), need_addr(true), did_bind(false), global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), cluster_protocol(0), stopped(true) { @@ -470,8 +470,12 @@ void AsyncMessenger::wait() ldout(cct, 10) << __func__ << ": closing connections" << dendl; while (!conns.empty()) { - AsyncConnectionRef p = conns.begin()->second; + ceph::unordered_map::iterator it = conns.begin(); + AsyncConnectionRef p = it->second; + ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl; + conns.erase(it); p->mark_down(); + ms_deliver_handle_reset(p.get()); } } lock.Unlock(); @@ -688,11 +692,16 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) // this always goes from true -> false under the protection of the // mutex. if it is already false, we need not retake the mutex at // all. + if (!need_addr) + return ; lock.Lock(); - entity_addr_t t = peer_addr_for_me; - t.set_port(my_inst.addr.get_port()); - my_inst.addr.addr = t.addr; - ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl; - _init_local_connection(); + if (need_addr) { + need_addr = false; + entity_addr_t t = peer_addr_for_me; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.addr = t.addr; + ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl; + _init_local_connection(); + } lock.Unlock(); } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 0f758fe474f0f..d911277109682 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -261,6 +261,10 @@ private: /// approximately unique ID set by the Constructor for use in entity_addr_t uint64_t nonce; + /// true, specifying we haven't learned our addr; set false when we find it. + // maybe this should be protected by the lock? + bool need_addr; + /** * The following aren't lock-protected since you shouldn't be able to race * the only writers. -- 2.39.5