]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Fix deadlock cause by throttle block
authorHaomai Wang <haomaiwang@gmail.com>
Mon, 20 Apr 2015 06:16:35 +0000 (14:16 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Mon, 20 Apr 2015 06:51:05 +0000 (14:51 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/Event.cc
src/msg/async/Event.h

index defbd67aa78f840ab62f2627efd3780842a6fedb..dd85ddd45deb0e6f95bd032415ae90af65623d27 100644 (file)
@@ -187,8 +187,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
   reset_handler.reset(new C_handle_reset(async_msgr, this));
   remote_reset_handler.reset(new C_handle_remote_reset(async_msgr, this));
   connect_handler.reset(new C_deliver_connect(async_msgr, this));
-  accept_handler.reset(new C_deliver_accept(async_msgr, this));
   local_deliver_handler.reset(new C_local_deliver(this));
+  wakeup_handler.reset(new C_time_wakeup(this));
   memset(msgvec, 0, sizeof(msgvec));
   // double recv_max_prefetch see "read_until"
   recv_buf = new char[2*recv_max_prefetch];
@@ -622,7 +622,10 @@ void AsyncConnection::process()
               ldout(async_msgr->cct, 1) << __func__ << " wants 1 message from policy throttle "
                                         << policy.throttler_messages->get_current() << "/"
                                         << policy.throttler_messages->get_max() << " failed, just wait." << dendl;
-              center->dispatch_event_external(read_handler);
+              // following thread pool deal with th full message queue isn't a
+              // short time, so we can wait a ms.
+              if (register_time_events.empty())
+                register_time_events.insert(center->create_time_event(1000, wakeup_handler));
               break;
             }
           }
@@ -643,7 +646,10 @@ void AsyncConnection::process()
                 ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
                                            << policy.throttler_bytes->get_current() << "/"
                                            << policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
-                center->dispatch_event_external(read_handler);
+                // following thread pool deal with th full message queue isn't a
+                // short time, so we can wait a ms.
+                if (register_time_events.empty())
+                  register_time_events.insert(center->create_time_event(1000, wakeup_handler));
                 break;
               }
             }
@@ -1844,7 +1850,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     goto fail_registered;
 
   // notify
-  center->dispatch_event_external(accept_handler);
+  center->dispatch_event_external(EventCallbackRef(new C_deliver_accept(async_msgr, this)));
   async_msgr->ms_deliver_handle_fast_accept(this);
   once_ready = true;
 
@@ -2078,7 +2084,7 @@ void AsyncConnection::fault()
 
   // woke up again;
   register_time_events.insert(center->create_time_event(
-          backoff.to_nsec()/1000, EventCallbackRef(new C_time_wakeup(this))));
+          backoff.to_nsec()/1000, wakeup_handler));
 }
 
 void AsyncConnection::was_session_reset()
index 91c1d6581ac908eb943ed7ca81ecf5dccd0926d4..b5aac3ece308497cbdfa9e4c2cc572ed1744abc5 100644 (file)
@@ -224,8 +224,8 @@ class AsyncConnection : public Connection {
   EventCallbackRef reset_handler;
   EventCallbackRef remote_reset_handler;
   EventCallbackRef connect_handler;
-  EventCallbackRef accept_handler;
   EventCallbackRef local_deliver_handler;
+  EventCallbackRef wakeup_handler;
   bool keepalive;
   struct iovec msgvec[IOV_MAX];
   char *recv_buf;
@@ -287,8 +287,8 @@ class AsyncConnection : public Connection {
     reset_handler.reset();
     remote_reset_handler.reset();
     connect_handler.reset();
-    accept_handler.reset();
     local_deliver_handler.reset();
+    wakeup_handler.reset();
   }
 }; /* AsyncConnection */
 
index cf60a00607572ed22d5a040bb0b25d6073df9128..fad26148bac3f001559d3cd7d4f7a917e25995cd 100644 (file)
@@ -40,11 +40,14 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout)
 }
 
 class C_handle_notify : public EventCallback {
+  EventCenter *center;
+
  public:
-  C_handle_notify() {}
+  C_handle_notify(EventCenter *c): center(c) {}
   void do_request(int fd_or_id) {
-    char c[100];
-    int r = read(fd_or_id, c, 100);
+    char c[256];
+    center->already_wakeup.set(0);
+    int r = read(fd_or_id, c, sizeof(c));
     assert(r > 0);
   }
 };
@@ -86,12 +89,16 @@ int EventCenter::init(int n)
   if (r < 0) {
     return -1;
   }
+  r = net.set_nonblock(notify_send_fd);
+  if (r < 0) {
+    return -1;
+  }
 
   file_events = static_cast<FileEvent *>(malloc(sizeof(FileEvent)*n));
   memset(file_events, 0, sizeof(FileEvent)*n);
 
   nevent = n;
-  create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify()));
+  create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify(this)));
   return 0;
 }
 
@@ -239,13 +246,16 @@ void EventCenter::delete_time_event(uint64_t id)
 
 void EventCenter::wakeup()
 {
-  ldout(cct, 1) << __func__ << dendl;
-  char buf[1];
-  buf[0] = 'c';
-  // wake up "event_wait"
-  int n = write(notify_send_fd, buf, 1);
-  // FIXME ?
-  assert(n == 1);
+  if (!already_wakeup.read()) {
+    ldout(cct, 1) << __func__ << dendl;
+    char buf[1];
+    buf[0] = 'c';
+    // wake up "event_wait"
+    int n = write(notify_send_fd, buf, 1);
+    // FIXME ?
+    assert(n == 1);
+    already_wakeup.set(1);
+  }
 }
 
 int EventCenter::process_time_events()
index 729500c00d1840a62af6f101be2d015421eb3f56..a526188b3bc7db9fd3addbca5d8ea79d9b6e62b1 100644 (file)
@@ -39,6 +39,7 @@
 
 #include <pthread.h>
 
+#include "include/atomic.h"
 #include "include/Context.h"
 #include "include/unordered_map.h"
 #include "common/WorkQueue.h"
@@ -124,6 +125,8 @@ class EventCenter {
   }
 
  public:
+  atomic_t already_wakeup;
+
   EventCenter(CephContext *c):
     cct(c), nevent(0),
     external_lock("AsyncMessenger::external_lock"),
@@ -131,7 +134,7 @@ class EventCenter {
     time_lock("AsyncMessenger::time_lock"),
     file_events(NULL),
     driver(NULL), time_event_next_id(0),
-    notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0) {
+    notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
     last_time = time(NULL);
   }
   ~EventCenter();