reset_handler.reset(new C_handle_reset(async_msgr, this));
remote_reset_handler.reset(new C_handle_remote_reset(async_msgr, this));
connect_handler.reset(new C_deliver_connect(async_msgr, this));
- accept_handler.reset(new C_deliver_accept(async_msgr, this));
local_deliver_handler.reset(new C_local_deliver(this));
+ wakeup_handler.reset(new C_time_wakeup(this));
memset(msgvec, 0, sizeof(msgvec));
// double recv_max_prefetch see "read_until"
recv_buf = new char[2*recv_max_prefetch];
ldout(async_msgr->cct, 1) << __func__ << " wants 1 message from policy throttle "
<< policy.throttler_messages->get_current() << "/"
<< policy.throttler_messages->get_max() << " failed, just wait." << dendl;
- center->dispatch_event_external(read_handler);
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (register_time_events.empty())
+ register_time_events.insert(center->create_time_event(1000, wakeup_handler));
break;
}
}
ldout(async_msgr->cct, 10) << __func__ << " wants " << message_size << " bytes from policy throttler "
<< policy.throttler_bytes->get_current() << "/"
<< policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
- center->dispatch_event_external(read_handler);
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (register_time_events.empty())
+ register_time_events.insert(center->create_time_event(1000, wakeup_handler));
break;
}
}
goto fail_registered;
// notify
- center->dispatch_event_external(accept_handler);
+ center->dispatch_event_external(EventCallbackRef(new C_deliver_accept(async_msgr, this)));
async_msgr->ms_deliver_handle_fast_accept(this);
once_ready = true;
// woke up again;
register_time_events.insert(center->create_time_event(
- backoff.to_nsec()/1000, EventCallbackRef(new C_time_wakeup(this))));
+ backoff.to_nsec()/1000, wakeup_handler));
}
void AsyncConnection::was_session_reset()
EventCallbackRef reset_handler;
EventCallbackRef remote_reset_handler;
EventCallbackRef connect_handler;
- EventCallbackRef accept_handler;
EventCallbackRef local_deliver_handler;
+ EventCallbackRef wakeup_handler;
bool keepalive;
struct iovec msgvec[IOV_MAX];
char *recv_buf;
reset_handler.reset();
remote_reset_handler.reset();
connect_handler.reset();
- accept_handler.reset();
local_deliver_handler.reset();
+ wakeup_handler.reset();
}
}; /* AsyncConnection */
}
class C_handle_notify : public EventCallback {
+ EventCenter *center;
+
public:
- C_handle_notify() {}
+ C_handle_notify(EventCenter *c): center(c) {}
void do_request(int fd_or_id) {
- char c[100];
- int r = read(fd_or_id, c, 100);
+ char c[256];
+ center->already_wakeup.set(0);
+ int r = read(fd_or_id, c, sizeof(c));
assert(r > 0);
}
};
if (r < 0) {
return -1;
}
+ r = net.set_nonblock(notify_send_fd);
+ if (r < 0) {
+ return -1;
+ }
file_events = static_cast<FileEvent *>(malloc(sizeof(FileEvent)*n));
memset(file_events, 0, sizeof(FileEvent)*n);
nevent = n;
- create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify()));
+ create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify(this)));
return 0;
}
void EventCenter::wakeup()
{
- ldout(cct, 1) << __func__ << dendl;
- char buf[1];
- buf[0] = 'c';
- // wake up "event_wait"
- int n = write(notify_send_fd, buf, 1);
- // FIXME ?
- assert(n == 1);
+ if (!already_wakeup.read()) {
+ ldout(cct, 1) << __func__ << dendl;
+ char buf[1];
+ buf[0] = 'c';
+ // wake up "event_wait"
+ int n = write(notify_send_fd, buf, 1);
+ // FIXME ?
+ assert(n == 1);
+ already_wakeup.set(1);
+ }
}
int EventCenter::process_time_events()
#include <pthread.h>
+#include "include/atomic.h"
#include "include/Context.h"
#include "include/unordered_map.h"
#include "common/WorkQueue.h"
}
public:
+ atomic_t already_wakeup;
+
EventCenter(CephContext *c):
cct(c), nevent(0),
external_lock("AsyncMessenger::external_lock"),
time_lock("AsyncMessenger::time_lock"),
file_events(NULL),
driver(NULL), time_event_next_id(0),
- notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0) {
+ notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
last_time = time(NULL);
}
~EventCenter();