From: Haomai Wang Date: Fri, 12 Sep 2014 07:52:24 +0000 (+0800) Subject: Add TimeEvent to Event API X-Git-Tag: v0.88~37^2~4^2~36 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6bb277f6399ab916a607372a9e9f3d9b6e526024;p=ceph.git Add TimeEvent to Event API Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 7e9dfb09bef1..6bb4a3fa31cb 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -30,7 +30,7 @@ class C_handle_read : public EventCallback { public: C_handle_read(AsyncConnection *c): conn(c) {} - void do_request(int fd, int mask) { + void do_request(int fd_or_id, int mask=0) { conn->process(); } }; @@ -72,7 +72,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), - lock("AsyncConnection::lock"), backoff(0), + lock("AsyncConnection::lock"), got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { } AsyncConnection::~AsyncConnection() @@ -182,7 +182,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) // "r" is the remaining length sended += msglen - r; if (r > 0) { - center->create_event(sd, EVENT_WRITABLE, new C_handle_write(this)); + center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this)); ldout(async_msgr->cct, 5) << __func__ << " remaining " << r << " needed to be sent, creating event for writing" << dendl; @@ -199,7 +199,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) } if (!outcoming_bl.length()) - center->delete_event(sd, EVENT_WRITABLE); + center->delete_file_event(sd, EVENT_WRITABLE); return outcoming_bl.length(); } @@ -399,8 +399,8 @@ void AsyncConnection::process() ldout(async_msgr->cct,10) << __func__ << " wants " << 1 << " message from policy throttler " << policy.throttler_messages->get_current() << "/" << policy.throttler_messages->get_max() << dendl; - // FIXME: when to try it again? - if (policy.throttler_messages->get_or_fail()) + // FIXME: may block + if (policy.throttler_messages->get()) state = STATE_OPEN_MESSAGE_THROTTLE_BYTES; } @@ -415,8 +415,8 @@ void AsyncConnection::process() ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " bytes from policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << dendl; - // FIXME: when to try it again? - if (policy.throttler_bytes->get_or_fail(message_size)) + // FIXME: may block + if (policy.throttler_bytes->get(message_size)) state = STATE_OPEN_MESSAGE_READ_FRONT; } } @@ -633,7 +633,7 @@ void AsyncConnection::process() case STATE_CLOSED: { ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; - center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE); + center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); break; } @@ -716,7 +716,7 @@ int AsyncConnection::_process_connection() } net.set_socket_options(sd); - center->create_event(sd, EVENT_READABLE, new C_handle_read(this)); + center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this)); state = STATE_CONNECTING_WAIT_BANNER; break; } @@ -1524,7 +1524,8 @@ void AsyncConnection::_connect() ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl; state = STATE_CONNECTING; - process(); + // rescheduler connection in order to avoid lock dep + center->create_time_event(0, new C_handle_read(this)); } void AsyncConnection::accept(int incoming) @@ -1533,7 +1534,8 @@ void AsyncConnection::accept(int incoming) assert(sd < 0); sd = incoming; - center->create_event(sd, EVENT_READABLE, new C_handle_read(this)); + state = STATE_ACCEPTING; + center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this)); process(); } @@ -1624,7 +1626,7 @@ void AsyncConnection::fault() } shutdown_socket(); - center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE); + center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); // requeue sent items requeue_sent(); @@ -1634,18 +1636,17 @@ void AsyncConnection::fault() return; } - //TODO we need to rescheduler connect again!!! if (state != STATE_CONNECTING) { if (policy.server) { ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl; state = STATE_STANDBY; } else { - ldout(async_msgr->cct,0) << __func__ << " initiating reconnect" << dendl; + ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl; connect_seq++; state = STATE_CONNECTING; } backoff = utime_t(); - ldout(async_msgr->cct,0) << __func__ << dendl; + ldout(async_msgr->cct, 0) << __func__ << dendl; } else { if (backoff == utime_t()) { backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff); @@ -1655,9 +1656,11 @@ void AsyncConnection::fault() backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); } ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl; - // TODO wait!!!!! - ldout(async_msgr->cct, 10) << __func__ << " done waiting or woke up" << dendl; } + + uint64_t milliseconds = double(backoff) * 1000; + // woke up again; + center->create_time_event(milliseconds, new C_handle_read(this)); } void AsyncConnection::was_session_reset() diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index f18c4494354e..67f65c7de2ca 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -186,7 +186,7 @@ int Processor::start() // start thread create(); - center->create_event(listen_sd, EVENT_READABLE, new C_handle_accept(this)); + center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this)); return 0; } @@ -239,7 +239,7 @@ void Processor::stop() done = true; ldout(msgr->cct, 10) << __func__ << " processor" << dendl; - center->delete_event(listen_sd, EVENT_READABLE); + center->delete_file_event(listen_sd, EVENT_READABLE); if (listen_sd >= 0) { ::shutdown(listen_sd, SHUT_RDWR); } @@ -274,7 +274,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, center(cct) { ceph_spin_init(&global_seq_lock); - _init_local_connection(); + init_local_connection(); } /** diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 6cdd73e89140..7d8bf9c50cf5 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -1,5 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include + #include "common/errno.h" #include "Event.h" @@ -49,22 +51,29 @@ EventCenter::~EventCenter() delete driver; } -int EventCenter::create_event(int fd, int mask, EventCallback *ctxt) +int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) { + int r; Mutex::Locker l(lock); if (events.size() > nevent) { - lderr(cct) << __func__ << " event count is exceed." << dendl; - return -ERANGE; + int new_size = nevent << 2; + ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl; + r = driver->resize_events(new_size); + if (r < 0) { + lderr(cct) << __func__ << " event count is exceed." << dendl; + return -ERANGE; + } + nevent = new_size; } - EventCenter::Event *event = _get_event(fd); + EventCenter::FileEvent *event = _get_file_event(fd); - int r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask); + r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask); if (r < 0) return r; if (!event) { - events[fd] = EventCenter::Event(); + events[fd] = EventCenter::FileEvent(); event = &events[fd]; } @@ -82,14 +91,14 @@ int EventCenter::create_event(int fd, int mask, EventCallback *ctxt) return 0; } -void EventCenter::delete_event(int fd, int mask) +void EventCenter::delete_file_event(int fd, int mask) { Mutex::Locker l(lock); - EventCenter::Event *event = _get_event(fd); + EventCenter::FileEvent *event = _get_file_event(fd); driver->del_event(fd, event ? event->mask: EVENT_NONE, mask); if (!event) { - events[fd] = EventCenter::Event(); + events[fd] = EventCenter::FileEvent(); event = &events[fd]; } @@ -103,10 +112,88 @@ void EventCenter::delete_event(int fd, int mask) events.erase(fd); } + +uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt) +{ + Mutex::Locker l(lock); + uint64_t id = eventLoop->timeEventNextId++; + EventCenter::TimeEvent event; + utime_t expire; + struct timeval tv; + + expire = ceph_clock_now(cct); + expire.copy_to_timeval(&tv); + tv.tv_sec = expire.tv_sec + milliseconds / 1000; + tv.tv_usec = expire.tv_usec + milliseconds * 1000; + expire.set_from_timeval(&tv); + + event.id = id; + event.time_cb = ctxt; + time_events[expire] = event; + + return id; +} + +void EventCenter::delete_time_event(uint64_t id) +{ + Mutex::Locker l(lock); + for (map::iterator it = time_events.begin(); + it != time_events.end(); it++) { + if (it->second.id == id) { + time_events.erase(it); + return ; + } + } +} + +int EventCenter::_process_time_events() +{ + int processed = 0; + time_t now = time(NULL); + utime_t cur = ceph_clock_now(cct); + + /* If the system clock is moved to the future, and then set back to the + * right value, time events may be delayed in a random way. Often this + * means that scheduled operations will not be performed soon enough. + * + * Here we try to detect system clock skews, and force all the time + * events to be processed ASAP when this happens: the idea is that + * processing events earlier is less dangerous than delaying them + * indefinitely, and practice suggests it is. */ + if (now < last_time) { + for (map::iterator it = time_events.begin(); + it != time_events.end(); ++it) { + it->first = utime_t(); + } + } + last_time = now; + + map::iterator prev; + for (map::iterator it = time_events.begin(); + it != time_events.end(); ) { + prev = it; + if (cur >= it->first) { + FiredEvent event; + e.time_event.id = it->second.id; + e.time_event.time_cb = it->second.time_cb; + e.is_file = false; + event_wq.queue(e); + processed++; + ++it; + time_events.erase(prev); + } else { + break; + } + } + + return processed; +} + int EventCenter::process_events(int timeout_millionseconds) { struct timeval tv; - int j, processed, numevents; + int numevents; + bool trigger_time = false; if (timeout_millionseconds > 0) { tv.tv_sec = timeout_millionseconds / 1000; @@ -117,11 +204,29 @@ int EventCenter::process_events(int timeout_millionseconds) tv.tv_usec = 0; } - processed = 0; - vector fired_events; + Mutex::Locker l(lock); + utime_t shortest = utime_t(&tv); + for (map::iterator it = time_events.begin(); + it != time_events.end(); ++it) { + if (shortes > it->first) { + shortest = it->first; + trigger_time = true; + break; + } + } + shortes.copy_to_timeval(&tv); + + vector fired_events; numevents = driver->event_wait(fired_events, &tv); - for (j = 0; j < numevents; j++) - event_wq.queue(fired_events[j]); + for (int j = 0; j < numevents; j++) { + FiredEvent e; + e.file_event = fired_events[j]; + e.is_file = true + event_wq.queue(e); + } + + if (trigger_time) + numevents += _process_time_events(shortest); return numevents; } diff --git a/src/msg/Event.h b/src/msg/Event.h index c2b5ac657c00..541c0dd30492 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -32,16 +32,31 @@ class EventCenter; -// Attention: -// This event library use file description as index to search correspond event -// in `events` and `fired_events`. So it's important to estimate a suitable -// capacity in calling eventcenterInit(capacity). +class EventCallback { -struct FiredEvent { - int mask; + public: + virtual void do_request(int fd_id, int mask=0) = 0; + virtual ~EventCallback() {} // we want a virtual destructor!!! +}; + +struct FiredFileEvent { int fd; + int mask; +}; + +struct FiredTimeEvent { + uint64_t id; + EventCallback *time_cb; +}; + +struct FiredEvent { + union { + FiredFileEvent file_event; + FiredTimeEvent time_event; + }; + bool is_file; - FiredEvent(): mask(0), fd(0) {} + FiredEvent(): is_file(true) {} }; class EventDriver { @@ -50,58 +65,69 @@ class EventDriver { virtual int init(int nevent) = 0; virtual int add_event(int fd, int cur_mask, int mask) = 0; virtual void del_event(int fd, int cur_mask, int del_mask) = 0; - virtual int event_wait(vector &fired_events, struct timeval *tp) = 0; -}; - -class EventCallback { - - public: - virtual void do_request(int fd, int mask) = 0; - virtual ~EventCallback() {} // we want a virtual destructor!!! + virtual int event_wait(vector &fired_events, struct timeval *tp) = 0; + virtual int resize_events(int newsize) = 0; }; class EventCenter { - struct Event { + struct FileEvent { int mask; EventCallback *read_cb; EventCallback *write_cb; - Event(): mask(0), read_cb(NULL), write_cb(NULL) {} + FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {} + }; + + struct TimeEvent { + uint64_t id; + EventCallback *time_cb; + + TimeEvent(): id(0), time_cb(NULL) {} }; Mutex lock; - map events; + map file_events; + map time_events; EventDriver *driver; CephContext *cct; uint64_t nevent; + uint64_t time_event_next_id; ThreadPool event_tp; + time_t last_time; // last time process time event - Event *_get_event(int fd) { - map::iterator it = events.find(fd); - if (it != events.end()) { + int _process_time_events(); + FileEvent *_get_file_event(int fd) { + map::iterator it = file_events.find(fd); + if (it != file_events.end()) { return &it->second; } return NULL; } + struct EventWQ : public ThreadPool::WorkQueueVal { EventCenter *center; // In order to ensure the file descriptor is unique in conn_queue, // pending is introduced to check // - // - deque conn_queue; - // + deque conn_queue; + // used only by file event map pending; EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp) : ThreadPool::WorkQueueVal("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {} void _enqueue(FiredEvent e) { - // Ensure only one thread process one file descriptor - map::iterator it = pending.find(e.fd); - if (it != pending.end()) - it->second |= e.mask; - else - pending[e.fd] = e.mask; + if (e.is_file) { + // Ensure only one thread process one file descriptor + map::iterator it = pending.find(e.file_event.fd); + if (it != pending.end()) { + it->second |= e.file_event.mask; + } else { + pending[e.file_event.fd] = e.file_event.mask; + conn_queue.push_back(e); + } + } else { + conn_queue.push_back(e); + } } void _enqueue_front(FiredEvent e) { assert(0); @@ -114,30 +140,36 @@ class EventCenter { } FiredEvent _dequeue() { assert(!conn_queue.empty()); - FiredEvent e; - e.fd = conn_queue.front(); + FiredEvent e = conn_queue.front(); conn_queue.pop_front(); - assert(pending.count(e.fd)); - e.mask = pending[e.fd]; - pending.erase(e.fd); + if (e.is_file) { + assert(pending.count(e.file_event.fd)); + e.file_event.mask = pending[e.file_event.fd]; + pending.erase(e.file_event.fd); + } return e; } void _process(FiredEvent e, ThreadPool::TPHandle &handle) { - int rfired = 0; - Event *event = center->get_event(e.fd); - if (!event) - return ; - - /* note the event->mask & mask & ... code: maybe an already processed - * event removed an element that fired and we still didn't - * processed, so we check if the event is still valid. */ - if (event->mask & e.mask & EVENT_READABLE) { - rfired = 1; - event->read_cb->do_request(e.fd, e.mask); - } - if (event->mask & e.mask & EVENT_WRITABLE) { - if (!rfired || event->read_cb != event->write_cb) - event->write_cb->do_request(e.fd, e.mask); + if (e.is_file) { + int rfired = 0; + FileEvent *event = center->get_file_event(e.file_event.fd); + if (!event) + return ; + + /* note the event->mask & mask & ... code: maybe an already processed + * event removed an element that fired and we still didn't + * processed, so we check if the event is still valid. */ + if (event->mask & e.file_event.mask & EVENT_READABLE) { + rfired = 1; + event->read_cb->do_request(e.file_event.fd, e.file_event.mask); + } + if (event->mask & e.file_event.mask & EVENT_WRITABLE) { + if (!rfired || event->read_cb != event->write_cb) + event->write_cb->do_request(e.file_event.fd, e.file_event.mask); + } + } else { + e.time_event.time_cb->do_request(e.time_event.id); + delete e.time_event.time_cb; } } void _clear() { @@ -147,17 +179,21 @@ class EventCenter { public: EventCenter(CephContext *c): - lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), + lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0), event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"), - event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {} + event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) { + last_time = time(NULL); + } ~EventCenter(); int init(int nevent); - int create_event(int fd, int mask, EventCallback *ctxt); - void delete_event(int fd, int mask); + int create_file_event(int fd, int mask, EventCallback *ctxt); + uint64_t create_time_event(uint64_t milliseconds, EventCallback *ctxt); + void delete_file_event(int fd, int mask); + void delete_time_event(uint64_t id); int process_events(int timeout_milliseconds); - Event *get_event(int fd) { + FileEvent *get_file_event(int fd) { Mutex::Locker l(lock); - return _get_event(fd); + return _get_file_event(fd); } }; diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc index ae997374d808..a8f18571cf12 100644 --- a/src/msg/EventEpoll.cc +++ b/src/msg/EventEpoll.cc @@ -91,7 +91,13 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) } } -int EpollDriver::event_wait(vector &fired_events, struct timeval *tvp) +int EpollDriver::resize_events(int newsize) +{ + state->events = realloc(events, sizeof(struct epoll_event)*newsize); + return 0; +} + +int EpollDriver::event_wait(vector &fired_events, struct timeval *tvp) { int retval, numevents = 0; diff --git a/src/msg/EventEpoll.h b/src/msg/EventEpoll.h index 0c4305321a83..1927a62b987a 100644 --- a/src/msg/EventEpoll.h +++ b/src/msg/EventEpoll.h @@ -31,7 +31,8 @@ class EpollDriver : public EventDriver { int init(int nevent); int add_event(int fd, int cur_mask, int add_mask); void del_event(int fd, int cur_mask, int del_mask); - int event_wait(vector &fired_events, struct timeval *tp); + int resize_events(int newsize); + int event_wait(vector &fired_events, struct timeval *tp); }; #endif