]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Fix mark_down problem when calling send_message
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 03:57:49 +0000 (11:57 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 07:53:13 +0000 (15:53 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h

index 8c9b79959ee8a97171d2953d1e5c0c0be4fe927d..31a99488c429fb1b15ec55c2e59a5504b6415564 100644 (file)
@@ -83,16 +83,6 @@ class C_handle_dispatch : public EventCallback {
   }
 };
 
-class C_handle_stop : public EventCallback {
-  AsyncConnectionRef conn;
-
- public:
-  C_handle_stop(AsyncConnection *c): conn(c) {}
-  void do_request(int id) {
-    conn->stop();
-  }
-};
-
 
 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 {
@@ -731,6 +721,7 @@ void AsyncConnection::process()
 
       case STATE_CLOSED:
         {
+          center->delete_file_event(sd, EVENT_READABLE);
           ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
           break;
         }
@@ -806,8 +797,10 @@ int AsyncConnection::_process_connection()
 
         global_seq = async_msgr->get_global_seq();
         // close old socket.  this is safe because we stopped the reader thread above.
-        if (sd >= 0)
+        if (sd >= 0) {
+          center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
           ::close(sd);
+        }
 
         sd = net.connect(get_peer_addr());
         if (sd < 0) {
@@ -1800,7 +1793,7 @@ void AsyncConnection::was_session_reset()
   discard_out_queue();
   outcoming_bl.clear();
 
-  center->create_time_event(0, remote_reset_handler);
+  center->dispatch_event_external(remote_reset_handler);
 
   if (randomize_out_seq()) {
     lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
@@ -1811,29 +1804,15 @@ void AsyncConnection::was_session_reset()
   in_seq_acked = 0;
 }
 
-void AsyncConnection::mark_down()
-{
-  Mutex::Locker l(lock);
-  state = STATE_CLOSED;
-  center->dispatch_event_external(EventCallbackRef(new C_handle_stop(this)));
-}
-
-// Who call "_stop():
-// 1. receive STATE_OPEN_TAG_CLOSE
-// 2. fault when policy.lossy
-// 3. mark_down
-// 4. caller from Messenger
 void AsyncConnection::_stop()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
-  center->create_time_event(0, reset_handler);
+  center->dispatch_event_external(reset_handler);
   shutdown_socket();
   discard_out_queue();
   outcoming_bl.clear();
   if (policy.lossy)
     was_session_reset();
-  if (sd >= 0)
-    center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
   open_write = false;
   state = STATE_CLOSED;
 }
index 2a116b5b92eeeeb3e4c535b88240e2ce904f67ce..3c025ff92bae089b3463d89b2abf48191a23794d 100644 (file)
@@ -103,7 +103,10 @@ class AsyncConnection : public Connection {
   int send_message(Message *m);
 
   void send_keepalive();
-  void mark_down();
+  void mark_down() {
+    Mutex::Locker l(lock);
+    _stop();
+  }
   void mark_disposable() {
     Mutex::Locker l(lock);
     policy.lossy = true;
@@ -239,10 +242,6 @@ class AsyncConnection : public Connection {
   // used by eventcallback
   void handle_write();
   void process();
-  void stop() {
-    Mutex::Locker l(lock);
-    _stop();
-  }
 }; /* AsyncConnection */
 
 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;