From: Haomai Wang Date: Mon, 20 Apr 2015 06:16:35 +0000 (+0800) Subject: AsyncConnection: Fix deadlock cause by throttle block X-Git-Tag: v9.0.1~74^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b05d144c2f16a39f72b7b23650acef8349e92982;p=ceph.git AsyncConnection: Fix deadlock cause by throttle block Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index defbd67aa78f8..dd85ddd45deb0 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -187,8 +187,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente reset_handler.reset(new C_handle_reset(async_msgr, this)); remote_reset_handler.reset(new C_handle_remote_reset(async_msgr, this)); connect_handler.reset(new C_deliver_connect(async_msgr, this)); - accept_handler.reset(new C_deliver_accept(async_msgr, this)); local_deliver_handler.reset(new C_local_deliver(this)); + wakeup_handler.reset(new C_time_wakeup(this)); memset(msgvec, 0, sizeof(msgvec)); // double recv_max_prefetch see "read_until" recv_buf = new char[2*recv_max_prefetch]; @@ -622,7 +622,10 @@ void AsyncConnection::process() ldout(async_msgr->cct, 1) << __func__ << " wants 1 message from policy throttle " << policy.throttler_messages->get_current() << "/" << policy.throttler_messages->get_max() << " failed, just wait." << dendl; - center->dispatch_event_external(read_handler); + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (register_time_events.empty()) + register_time_events.insert(center->create_time_event(1000, wakeup_handler)); break; } } @@ -643,7 +646,10 @@ void AsyncConnection::process() ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << " failed, just wait." << dendl; - center->dispatch_event_external(read_handler); + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (register_time_events.empty()) + register_time_events.insert(center->create_time_event(1000, wakeup_handler)); break; } } @@ -1844,7 +1850,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a goto fail_registered; // notify - center->dispatch_event_external(accept_handler); + center->dispatch_event_external(EventCallbackRef(new C_deliver_accept(async_msgr, this))); async_msgr->ms_deliver_handle_fast_accept(this); once_ready = true; @@ -2078,7 +2084,7 @@ void AsyncConnection::fault() // woke up again; register_time_events.insert(center->create_time_event( - backoff.to_nsec()/1000, EventCallbackRef(new C_time_wakeup(this)))); + backoff.to_nsec()/1000, wakeup_handler)); } void AsyncConnection::was_session_reset() diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 91c1d6581ac90..b5aac3ece3084 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -224,8 +224,8 @@ class AsyncConnection : public Connection { EventCallbackRef reset_handler; EventCallbackRef remote_reset_handler; EventCallbackRef connect_handler; - EventCallbackRef accept_handler; EventCallbackRef local_deliver_handler; + EventCallbackRef wakeup_handler; bool keepalive; struct iovec msgvec[IOV_MAX]; char *recv_buf; @@ -287,8 +287,8 @@ class AsyncConnection : public Connection { reset_handler.reset(); remote_reset_handler.reset(); connect_handler.reset(); - accept_handler.reset(); local_deliver_handler.reset(); + wakeup_handler.reset(); } }; /* AsyncConnection */ diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index cf60a00607572..fad26148bac3f 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -40,11 +40,14 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) } class C_handle_notify : public EventCallback { + EventCenter *center; + public: - C_handle_notify() {} + C_handle_notify(EventCenter *c): center(c) {} void do_request(int fd_or_id) { - char c[100]; - int r = read(fd_or_id, c, 100); + char c[256]; + center->already_wakeup.set(0); + int r = read(fd_or_id, c, sizeof(c)); assert(r > 0); } }; @@ -86,12 +89,16 @@ int EventCenter::init(int n) if (r < 0) { return -1; } + r = net.set_nonblock(notify_send_fd); + if (r < 0) { + return -1; + } file_events = static_cast(malloc(sizeof(FileEvent)*n)); memset(file_events, 0, sizeof(FileEvent)*n); nevent = n; - create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify())); + create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify(this))); return 0; } @@ -239,13 +246,16 @@ void EventCenter::delete_time_event(uint64_t id) void EventCenter::wakeup() { - ldout(cct, 1) << __func__ << dendl; - char buf[1]; - buf[0] = 'c'; - // wake up "event_wait" - int n = write(notify_send_fd, buf, 1); - // FIXME ? - assert(n == 1); + if (!already_wakeup.read()) { + ldout(cct, 1) << __func__ << dendl; + char buf[1]; + buf[0] = 'c'; + // wake up "event_wait" + int n = write(notify_send_fd, buf, 1); + // FIXME ? + assert(n == 1); + already_wakeup.set(1); + } } int EventCenter::process_time_events() diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 729500c00d184..a526188b3bc7d 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -39,6 +39,7 @@ #include +#include "include/atomic.h" #include "include/Context.h" #include "include/unordered_map.h" #include "common/WorkQueue.h" @@ -124,6 +125,8 @@ class EventCenter { } public: + atomic_t already_wakeup; + EventCenter(CephContext *c): cct(c), nevent(0), external_lock("AsyncMessenger::external_lock"), @@ -131,7 +134,7 @@ class EventCenter { time_lock("AsyncMessenger::time_lock"), file_events(NULL), driver(NULL), time_event_next_id(0), - notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0) { + notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) { last_time = time(NULL); } ~EventCenter();