]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Make learn_addr async to avoid destroying lock rule
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 19 Dec 2014 14:25:58 +0000 (22:25 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Tue, 23 Dec 2014 11:22:09 +0000 (19:22 +0800)
Make learn_addr become a async op, otherwise holding connection's
lock then acquire messenger's lock will destroy lock rule.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 5a0cf46c3c284bcdb661635a051cddfb7d3ea4c4..9caa02f5574c07494557f22993633a366d7dbc41 100644 (file)
@@ -187,7 +187,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
     state(STATE_NONE), state_after_send(0), sd(-1),
     lock("AsyncConnection::lock"), open_write(false), keepalive(false),
     stop_lock("AsyncConnection::stop_lock"),
-    got_bad_auth(false), authorizer(NULL), replacing(false),
+    got_bad_auth(false), authorizer(NULL), replacing(false), stopping(0),
     state_buffer(4096), state_offset(0), net(cct), center(c)
 {
   read_handler.reset(new C_handle_read(this));
@@ -975,6 +975,11 @@ int AsyncConnection::_process_connection()
         }
 
         ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
+        // TODO: it's tricky that exit loop if exist AsyncMessenger waiting for
+        // mark_down. Otherwise, it will be deadlock while
+        // AsyncMessenger::mark_down_all already hold lock.
+        if (stopping.read())
+          break;
         async_msgr->learned_addr(peer_addr_for_me);
         ::encode(async_msgr->get_myaddr(), myaddrbl);
         r = _try_send(myaddrbl);
index d12b79fa4435814e906263d9f408861e37617c08..348a87b90250e67267d0436dbdeeadaf7d7f7113 100644 (file)
@@ -148,6 +148,7 @@ class AsyncConnection : public Connection {
     if (center->get_owner() == pthread_self()) {
       stop();
     } else {
+      stopping.set(1);
       center->dispatch_event_external(stop_handler);
       stop_cond.Wait(stop_lock);
     }
@@ -290,6 +291,7 @@ class AsyncConnection : public Connection {
                      // there won't exists conflicting connection so we use
                      // "replacing" to skip RESETSESSION to avoid detect wrong
                      // presentation
+  atomic_t stopping;
 
   // used only for local state, it will be overwrite when state transition
   bufferptr state_buffer;
index 81772ba24c8b4bae2c7877e87c4809c0b08d680e..6886fe6e2a9af2691a4f1291c8a972f24b9f7936 100644 (file)
@@ -351,7 +351,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
   : SimplePolicyMessenger(cct, name,mname, _nonce),
     processor(this, _nonce),
     lock("AsyncMessenger::lock"),
-    nonce(_nonce), did_bind(false),
+    nonce(_nonce), need_addr(true), did_bind(false),
     global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
     cluster_protocol(0), stopped(true)
 {
@@ -470,8 +470,12 @@ void AsyncMessenger::wait()
     ldout(cct, 10) << __func__ << ": closing connections" << dendl;
 
     while (!conns.empty()) {
-      AsyncConnectionRef p = conns.begin()->second;
+      ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+      AsyncConnectionRef p = it->second;
+      ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
+      conns.erase(it);
       p->mark_down();
+      ms_deliver_handle_reset(p.get());
     }
   }
   lock.Unlock();
@@ -688,11 +692,16 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   // this always goes from true -> false under the protection of the
   // mutex.  if it is already false, we need not retake the mutex at
   // all.
+  if (!need_addr)
+    return ;
   lock.Lock();
-  entity_addr_t t = peer_addr_for_me;
-  t.set_port(my_inst.addr.get_port());
-  my_inst.addr.addr = t.addr;
-  ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
-  _init_local_connection();
+  if (need_addr) {
+    need_addr = false;
+    entity_addr_t t = peer_addr_for_me;
+    t.set_port(my_inst.addr.get_port());
+    my_inst.addr.addr = t.addr;
+    ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
+    _init_local_connection();
+  }
   lock.Unlock();
 }
index 0f758fe474f0fe88ab42afdf79c4cecdad80bdee..d9112771096822d486d739e827db7f9acb77874a 100644 (file)
@@ -261,6 +261,10 @@ private:
   /// approximately unique ID set by the Constructor for use in entity_addr_t
   uint64_t nonce;
 
+  /// true, specifying we haven't learned our addr; set false when we find it.
+  // maybe this should be protected by the lock?
+  bool need_addr;
+
   /**
    *  The following aren't lock-protected since you shouldn't be able to race
    *  the only writers.