]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Ensure "mark_down" complete when returned
authorHaomai Wang <haomaiwang@gmail.com>
Mon, 24 Nov 2014 02:05:22 +0000 (10:05 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Fri, 28 Nov 2014 05:43:28 +0000 (13:43 +0800)
Now AsyncConnection try to process or modify any own variables within self
thread. It means actual function will be called by event loop not external.
So we can avoid lockdep or unnecessary lock for AsyncConnection.

So "mark_down" should be returned after _stop completed and all events has
be processed.

"external" for unregister_conn is unnecessary, because the caller won't
own AsyncMessenger::Lock again.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.h

index 457aff815f76f099ba8dcf175a43d111f5b1e788..d7edf86d8fd67439252c5682a49cac893358c806 100644 (file)
@@ -97,6 +97,16 @@ class C_handle_dispatch : public EventCallback {
   }
 };
 
+class C_handle_stop : public EventCallback {
+  AsyncConnectionRef conn;
+
+ public:
+  C_handle_stop(AsyncConnectionRef c): conn(c) {}
+  void do_request(int id) {
+    conn->stop();
+  }
+};
+
 
 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 {
@@ -133,6 +143,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
   write_handler.reset(new C_handle_write(this));
   reset_handler.reset(new C_handle_reset(async_msgr, this));
   remote_reset_handler.reset(new C_handle_remote_reset(async_msgr, this));
+  stop_handler.reset(new C_handle_stop(this));
   memset(msgvec, 0, sizeof(msgvec));
 }
 
@@ -1818,7 +1829,7 @@ void AsyncConnection::was_session_reset()
   in_seq_acked = 0;
 }
 
-void AsyncConnection::_stop(bool external)
+void AsyncConnection::_stop()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
   center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
@@ -1832,7 +1843,11 @@ void AsyncConnection::_stop(bool external)
   state = STATE_CLOSED;
   ::close(sd);
   sd = -1;
-  async_msgr->unregister_conn(peer_addr, external);
+  async_msgr->unregister_conn(peer_addr);
+  {
+    Mutex::Locker l(stop_lock);
+    stop_cond.Signal();
+  }
   put();
 }
 
index 0e61d8b1e11c6f3e9c84f9c5bc6007d82fd79f06..13cdd568871a5b82bbdc08f7af8ff48075a4b17f 100644 (file)
@@ -52,7 +52,7 @@ class AsyncConnection : public Connection {
   int read_until(uint64_t needed, bufferptr &p);
   int _process_connection();
   void _connect();
-  void _stop(bool external=false);
+  void _stop();
   int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
   int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
   void was_session_reset();
@@ -126,8 +126,9 @@ class AsyncConnection : public Connection {
   void send_keepalive();
   // Don't call it from AsyncConnection
   void mark_down() {
-    Mutex::Locker l(lock);
-    _stop(true);
+    Mutex::Locker l(stop_lock);
+    center.dispatch_event_external(stop_event);
+    mark_down_cond.Wait(stop_lock);
   }
   void mark_disposable() {
     Mutex::Locker l(lock);
@@ -231,6 +232,8 @@ class AsyncConnection : public Connection {
   EventCallbackRef remote_reset_handler;
   bool keepalive;
   struct iovec msgvec[IOV_LEN];
+  Mutex stop_lock; // used to protect `mark_down_cond`
+  Cond stop_cond;
 
   // Tis section are temp variables used by state transition
 
@@ -264,6 +267,11 @@ class AsyncConnection : public Connection {
   // used by eventcallback
   void handle_write();
   void process();
+  // Helper: only called by C_handle_stop
+  void stop() {
+    Mutex::Locker l(lock);
+    _stop();
+  }
 }; /* AsyncConnection */
 
 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
index 0952b67cf8a06b7e6674ecbe5bbdc9be5efd2cb5..d3875d2526fb6a2440e40f2832c3e38e95c88dba 100644 (file)
@@ -398,16 +398,10 @@ public:
 
   /**
    * Unregister connection from `conns`
-   * `external` is used to indicate whether need to lock AsyncMessenger::lock,
-   * it may call. If external is false, it means that AsyncConnection take the
-   * initiative to unregister
    */
-  void unregister_conn(const entity_addr_t &addr, bool external) {
-    if (!external)
-      lock.Lock();
+  void unregister_conn(const entity_addr_t &addr) {
+    Mutex::Locker l(lock);
     conns.erase(addr);
-    if (!external)
-      lock.Unlock();
   }
   /**
    * @} // AsyncMessenger Internals