public:
C_handle_read(AsyncConnection *c): conn(c) {}
- void do_request(int fd, int mask) {
+ void do_request(int fd_or_id, int mask=0) {
conn->process();
}
};
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
state(STATE_NONE), state_after_send(0), sd(-1),
- lock("AsyncConnection::lock"), backoff(0),
+ lock("AsyncConnection::lock"),
got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { }
AsyncConnection::~AsyncConnection()
// "r" is the remaining length
sended += msglen - r;
if (r > 0) {
- center->create_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+ center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
<< " needed to be sent, creating event for writing"
<< dendl;
}
if (!outcoming_bl.length())
- center->delete_event(sd, EVENT_WRITABLE);
+ center->delete_file_event(sd, EVENT_WRITABLE);
return outcoming_bl.length();
}
ldout(async_msgr->cct,10) << __func__ << " wants " << 1 << " message from policy throttler "
<< policy.throttler_messages->get_current() << "/"
<< policy.throttler_messages->get_max() << dendl;
- // FIXME: when to try it again?
- if (policy.throttler_messages->get_or_fail())
+ // FIXME: may block
+ if (policy.throttler_messages->get())
state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
}
ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " bytes from policy throttler "
<< policy.throttler_bytes->get_current() << "/"
<< policy.throttler_bytes->get_max() << dendl;
- // FIXME: when to try it again?
- if (policy.throttler_bytes->get_or_fail(message_size))
+ // FIXME: may block
+ if (policy.throttler_bytes->get(message_size))
state = STATE_OPEN_MESSAGE_READ_FRONT;
}
}
case STATE_CLOSED:
{
ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
- center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
break;
}
}
net.set_socket_options(sd);
- center->create_event(sd, EVENT_READABLE, new C_handle_read(this));
+ center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
state = STATE_CONNECTING_WAIT_BANNER;
break;
}
ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl;
state = STATE_CONNECTING;
- process();
+ // rescheduler connection in order to avoid lock dep
+ center->create_time_event(0, new C_handle_read(this));
}
void AsyncConnection::accept(int incoming)
assert(sd < 0);
sd = incoming;
- center->create_event(sd, EVENT_READABLE, new C_handle_read(this));
+ state = STATE_ACCEPTING;
+ center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
process();
}
}
shutdown_socket();
- center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
// requeue sent items
requeue_sent();
return;
}
- //TODO we need to rescheduler connect again!!!
if (state != STATE_CONNECTING) {
if (policy.server) {
ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl;
state = STATE_STANDBY;
} else {
- ldout(async_msgr->cct,0) << __func__ << " initiating reconnect" << dendl;
+ ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl;
connect_seq++;
state = STATE_CONNECTING;
}
backoff = utime_t();
- ldout(async_msgr->cct,0) << __func__ << dendl;
+ ldout(async_msgr->cct, 0) << __func__ << dendl;
} else {
if (backoff == utime_t()) {
backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff);
backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
}
ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
- // TODO wait!!!!!
- ldout(async_msgr->cct, 10) << __func__ << " done waiting or woke up" << dendl;
}
+
+ uint64_t milliseconds = double(backoff) * 1000;
+ // woke up again;
+ center->create_time_event(milliseconds, new C_handle_read(this));
}
void AsyncConnection::was_session_reset()
// start thread
create();
- center->create_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
+ center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
return 0;
}
done = true;
ldout(msgr->cct, 10) << __func__ << " processor" << dendl;
- center->delete_event(listen_sd, EVENT_READABLE);
+ center->delete_file_event(listen_sd, EVENT_READABLE);
if (listen_sd >= 0) {
::shutdown(listen_sd, SHUT_RDWR);
}
center(cct)
{
ceph_spin_init(&global_seq_lock);
- _init_local_connection();
+ init_local_connection();
}
/**
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include <time.h>
+
#include "common/errno.h"
#include "Event.h"
delete driver;
}
-int EventCenter::create_event(int fd, int mask, EventCallback *ctxt)
+int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
{
+ int r;
Mutex::Locker l(lock);
if (events.size() > nevent) {
- lderr(cct) << __func__ << " event count is exceed." << dendl;
- return -ERANGE;
+ int new_size = nevent << 2;
+ ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
+ r = driver->resize_events(new_size);
+ if (r < 0) {
+ lderr(cct) << __func__ << " event count is exceed." << dendl;
+ return -ERANGE;
+ }
+ nevent = new_size;
}
- EventCenter::Event *event = _get_event(fd);
+ EventCenter::FileEvent *event = _get_file_event(fd);
- int r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask);
+ r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask);
if (r < 0)
return r;
if (!event) {
- events[fd] = EventCenter::Event();
+ events[fd] = EventCenter::FileEvent();
event = &events[fd];
}
return 0;
}
-void EventCenter::delete_event(int fd, int mask)
+void EventCenter::delete_file_event(int fd, int mask)
{
Mutex::Locker l(lock);
- EventCenter::Event *event = _get_event(fd);
+ EventCenter::FileEvent *event = _get_file_event(fd);
driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
if (!event) {
- events[fd] = EventCenter::Event();
+ events[fd] = EventCenter::FileEvent();
event = &events[fd];
}
events.erase(fd);
}
+
+uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
+{
+ Mutex::Locker l(lock);
+ uint64_t id = eventLoop->timeEventNextId++;
+ EventCenter::TimeEvent event;
+ utime_t expire;
+ struct timeval tv;
+
+ expire = ceph_clock_now(cct);
+ expire.copy_to_timeval(&tv);
+ tv.tv_sec = expire.tv_sec + milliseconds / 1000;
+ tv.tv_usec = expire.tv_usec + milliseconds * 1000;
+ expire.set_from_timeval(&tv);
+
+ event.id = id;
+ event.time_cb = ctxt;
+ time_events[expire] = event;
+
+ return id;
+}
+
+void EventCenter::delete_time_event(uint64_t id)
+{
+ Mutex::Locker l(lock);
+ for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+ it != time_events.end(); it++) {
+ if (it->second.id == id) {
+ time_events.erase(it);
+ return ;
+ }
+ }
+}
+
+int EventCenter::_process_time_events()
+{
+ int processed = 0;
+ time_t now = time(NULL);
+ utime_t cur = ceph_clock_now(cct);
+
+ /* If the system clock is moved to the future, and then set back to the
+ * right value, time events may be delayed in a random way. Often this
+ * means that scheduled operations will not be performed soon enough.
+ *
+ * Here we try to detect system clock skews, and force all the time
+ * events to be processed ASAP when this happens: the idea is that
+ * processing events earlier is less dangerous than delaying them
+ * indefinitely, and practice suggests it is. */
+ if (now < last_time) {
+ for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+ it != time_events.end(); ++it) {
+ it->first = utime_t();
+ }
+ }
+ last_time = now;
+
+ map<utime_t, TimeEvent>::iterator prev;
+ for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+ it != time_events.end(); ) {
+ prev = it;
+ if (cur >= it->first) {
+ FiredEvent event;
+ e.time_event.id = it->second.id;
+ e.time_event.time_cb = it->second.time_cb;
+ e.is_file = false;
+ event_wq.queue(e);
+ processed++;
+ ++it;
+ time_events.erase(prev);
+ } else {
+ break;
+ }
+ }
+
+ return processed;
+}
+
int EventCenter::process_events(int timeout_millionseconds)
{
struct timeval tv;
- int j, processed, numevents;
+ int numevents;
+ bool trigger_time = false;
if (timeout_millionseconds > 0) {
tv.tv_sec = timeout_millionseconds / 1000;
tv.tv_usec = 0;
}
- processed = 0;
- vector<FiredEvent> fired_events;
+ Mutex::Locker l(lock);
+ utime_t shortest = utime_t(&tv);
+ for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+ it != time_events.end(); ++it) {
+ if (shortes > it->first) {
+ shortest = it->first;
+ trigger_time = true;
+ break;
+ }
+ }
+ shortes.copy_to_timeval(&tv);
+
+ vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
- for (j = 0; j < numevents; j++)
- event_wq.queue(fired_events[j]);
+ for (int j = 0; j < numevents; j++) {
+ FiredEvent e;
+ e.file_event = fired_events[j];
+ e.is_file = true
+ event_wq.queue(e);
+ }
+
+ if (trigger_time)
+ numevents += _process_time_events(shortest);
return numevents;
}
class EventCenter;
-// Attention:
-// This event library use file description as index to search correspond event
-// in `events` and `fired_events`. So it's important to estimate a suitable
-// capacity in calling eventcenterInit(capacity).
+class EventCallback {
-struct FiredEvent {
- int mask;
+ public:
+ virtual void do_request(int fd_id, int mask=0) = 0;
+ virtual ~EventCallback() {} // we want a virtual destructor!!!
+};
+
+struct FiredFileEvent {
int fd;
+ int mask;
+};
+
+struct FiredTimeEvent {
+ uint64_t id;
+ EventCallback *time_cb;
+};
+
+struct FiredEvent {
+ union {
+ FiredFileEvent file_event;
+ FiredTimeEvent time_event;
+ };
+ bool is_file;
- FiredEvent(): mask(0), fd(0) {}
+ FiredEvent(): is_file(true) {}
};
class EventDriver {
virtual int init(int nevent) = 0;
virtual int add_event(int fd, int cur_mask, int mask) = 0;
virtual void del_event(int fd, int cur_mask, int del_mask) = 0;
- virtual int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp) = 0;
-};
-
-class EventCallback {
-
- public:
- virtual void do_request(int fd, int mask) = 0;
- virtual ~EventCallback() {} // we want a virtual destructor!!!
+ virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
+ virtual int resize_events(int newsize) = 0;
};
class EventCenter {
- struct Event {
+ struct FileEvent {
int mask;
EventCallback *read_cb;
EventCallback *write_cb;
- Event(): mask(0), read_cb(NULL), write_cb(NULL) {}
+ FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
+ };
+
+ struct TimeEvent {
+ uint64_t id;
+ EventCallback *time_cb;
+
+ TimeEvent(): id(0), time_cb(NULL) {}
};
Mutex lock;
- map<int, Event> events;
+ map<int, FileEvent> file_events;
+ map<utime_t, TimeEvent> time_events;
EventDriver *driver;
CephContext *cct;
uint64_t nevent;
+ uint64_t time_event_next_id;
ThreadPool event_tp;
+ time_t last_time; // last time process time event
- Event *_get_event(int fd) {
- map<int, Event>::iterator it = events.find(fd);
- if (it != events.end()) {
+ int _process_time_events();
+ FileEvent *_get_file_event(int fd) {
+ map<int, FileEvent>::iterator it = file_events.find(fd);
+ if (it != file_events.end()) {
return &it->second;
}
return NULL;
}
+
struct EventWQ : public ThreadPool::WorkQueueVal<FiredEvent> {
EventCenter *center;
// In order to ensure the file descriptor is unique in conn_queue,
// pending is introduced to check
//
- // <File Descriptor>
- deque<int> conn_queue;
- // <File Descriptor, Mask>
+ deque<FiredEvent> conn_queue;
+ // used only by file event <File Descriptor, Mask>
map<int, int> pending;
EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
: ThreadPool::WorkQueueVal<FiredEvent>("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {}
void _enqueue(FiredEvent e) {
- // Ensure only one thread process one file descriptor
- map<int, int>::iterator it = pending.find(e.fd);
- if (it != pending.end())
- it->second |= e.mask;
- else
- pending[e.fd] = e.mask;
+ if (e.is_file) {
+ // Ensure only one thread process one file descriptor
+ map<int, int>::iterator it = pending.find(e.file_event.fd);
+ if (it != pending.end()) {
+ it->second |= e.file_event.mask;
+ } else {
+ pending[e.file_event.fd] = e.file_event.mask;
+ conn_queue.push_back(e);
+ }
+ } else {
+ conn_queue.push_back(e);
+ }
}
void _enqueue_front(FiredEvent e) {
assert(0);
}
FiredEvent _dequeue() {
assert(!conn_queue.empty());
- FiredEvent e;
- e.fd = conn_queue.front();
+ FiredEvent e = conn_queue.front();
conn_queue.pop_front();
- assert(pending.count(e.fd));
- e.mask = pending[e.fd];
- pending.erase(e.fd);
+ if (e.is_file) {
+ assert(pending.count(e.file_event.fd));
+ e.file_event.mask = pending[e.file_event.fd];
+ pending.erase(e.file_event.fd);
+ }
return e;
}
void _process(FiredEvent e, ThreadPool::TPHandle &handle) {
- int rfired = 0;
- Event *event = center->get_event(e.fd);
- if (!event)
- return ;
-
- /* note the event->mask & mask & ... code: maybe an already processed
- * event removed an element that fired and we still didn't
- * processed, so we check if the event is still valid. */
- if (event->mask & e.mask & EVENT_READABLE) {
- rfired = 1;
- event->read_cb->do_request(e.fd, e.mask);
- }
- if (event->mask & e.mask & EVENT_WRITABLE) {
- if (!rfired || event->read_cb != event->write_cb)
- event->write_cb->do_request(e.fd, e.mask);
+ if (e.is_file) {
+ int rfired = 0;
+ FileEvent *event = center->get_file_event(e.file_event.fd);
+ if (!event)
+ return ;
+
+ /* note the event->mask & mask & ... code: maybe an already processed
+ * event removed an element that fired and we still didn't
+ * processed, so we check if the event is still valid. */
+ if (event->mask & e.file_event.mask & EVENT_READABLE) {
+ rfired = 1;
+ event->read_cb->do_request(e.file_event.fd, e.file_event.mask);
+ }
+ if (event->mask & e.file_event.mask & EVENT_WRITABLE) {
+ if (!rfired || event->read_cb != event->write_cb)
+ event->write_cb->do_request(e.file_event.fd, e.file_event.mask);
+ }
+ } else {
+ e.time_event.time_cb->do_request(e.time_event.id);
+ delete e.time_event.time_cb;
}
}
void _clear() {
public:
EventCenter(CephContext *c):
- lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0),
+ lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
- event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {}
+ event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
+ last_time = time(NULL);
+ }
~EventCenter();
int init(int nevent);
- int create_event(int fd, int mask, EventCallback *ctxt);
- void delete_event(int fd, int mask);
+ int create_file_event(int fd, int mask, EventCallback *ctxt);
+ uint64_t create_time_event(uint64_t milliseconds, EventCallback *ctxt);
+ void delete_file_event(int fd, int mask);
+ void delete_time_event(uint64_t id);
int process_events(int timeout_milliseconds);
- Event *get_event(int fd) {
+ FileEvent *get_file_event(int fd) {
Mutex::Locker l(lock);
- return _get_event(fd);
+ return _get_file_event(fd);
}
};
}
}
-int EpollDriver::event_wait(vector<FiredEvent> &fired_events, struct timeval *tvp)
+int EpollDriver::resize_events(int newsize)
+{
+ state->events = realloc(events, sizeof(struct epoll_event)*newsize);
+ return 0;
+}
+
+int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
{
int retval, numevents = 0;
int init(int nevent);
int add_event(int fd, int cur_mask, int add_mask);
void del_event(int fd, int cur_mask, int del_mask);
- int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp);
+ int resize_events(int newsize);
+ int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp);
};
#endif