]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Add TimeEvent to Event API
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 12 Sep 2014 07:52:24 +0000 (15:52 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:03:17 +0000 (14:03 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncMessenger.cc
src/msg/Event.cc
src/msg/Event.h
src/msg/EventEpoll.cc
src/msg/EventEpoll.h

index 7e9dfb09bef1c63d30477d1371352be3410e9bee..6bb4a3fa31cb4e274d4d7aa8e81fc84f6362c81d 100644 (file)
@@ -30,7 +30,7 @@ class C_handle_read : public EventCallback {
 
  public:
   C_handle_read(AsyncConnection *c): conn(c) {}
-  void do_request(int fd, int mask) {
+  void do_request(int fd_or_id, int mask=0) {
     conn->process();
   }
 };
@@ -72,7 +72,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
     state(STATE_NONE), state_after_send(0), sd(-1),
-    lock("AsyncConnection::lock"), backoff(0),
+    lock("AsyncConnection::lock"),
     got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { }
 
 AsyncConnection::~AsyncConnection()
@@ -182,7 +182,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     // "r" is the remaining length
     sended += msglen - r;
     if (r > 0) {
-      center->create_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+      center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
       ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
                           << " needed to be sent, creating event for writing"
                           << dendl;
@@ -199,7 +199,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
   }
 
   if (!outcoming_bl.length())
-    center->delete_event(sd, EVENT_WRITABLE);
+    center->delete_file_event(sd, EVENT_WRITABLE);
 
   return outcoming_bl.length();
 }
@@ -399,8 +399,8 @@ void AsyncConnection::process()
             ldout(async_msgr->cct,10) << __func__ << " wants " << 1 << " message from policy throttler "
                                 << policy.throttler_messages->get_current() << "/"
                                 << policy.throttler_messages->get_max() << dendl;
-            // FIXME: when to try it again?
-            if (policy.throttler_messages->get_or_fail())
+            // FIXME: may block
+            if (policy.throttler_messages->get())
               state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
           }
 
@@ -415,8 +415,8 @@ void AsyncConnection::process()
               ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " bytes from policy throttler "
                   << policy.throttler_bytes->get_current() << "/"
                   << policy.throttler_bytes->get_max() << dendl;
-              // FIXME: when to try it again?
-              if (policy.throttler_bytes->get_or_fail(message_size))
+              // FIXME: may block
+              if (policy.throttler_bytes->get(message_size))
                 state = STATE_OPEN_MESSAGE_READ_FRONT;
             }
           }
@@ -633,7 +633,7 @@ void AsyncConnection::process()
       case STATE_CLOSED:
       {
         ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
-        center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+        center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
         break;
       }
 
@@ -716,7 +716,7 @@ int AsyncConnection::_process_connection()
         }
         net.set_socket_options(sd);
 
-        center->create_event(sd, EVENT_READABLE, new C_handle_read(this));
+        center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
         state = STATE_CONNECTING_WAIT_BANNER;
         break;
       }
@@ -1524,7 +1524,8 @@ void AsyncConnection::_connect()
   ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl;
 
   state = STATE_CONNECTING;
-  process();
+  // rescheduler connection in order to avoid lock dep
+  center->create_time_event(0, new C_handle_read(this));
 }
 
 void AsyncConnection::accept(int incoming)
@@ -1533,7 +1534,8 @@ void AsyncConnection::accept(int incoming)
   assert(sd < 0);
 
   sd = incoming;
-  center->create_event(sd, EVENT_READABLE, new C_handle_read(this));
+  state = STATE_ACCEPTING;
+  center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
   process();
 }
 
@@ -1624,7 +1626,7 @@ void AsyncConnection::fault()
   }
 
   shutdown_socket();
-  center->delete_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+  center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
 
   // requeue sent items
   requeue_sent();
@@ -1634,18 +1636,17 @@ void AsyncConnection::fault()
     return;
   }
 
-  //TODO we need to rescheduler connect again!!!
   if (state != STATE_CONNECTING) {
     if (policy.server) {
       ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl;
       state = STATE_STANDBY;
     } else {
-      ldout(async_msgr->cct,0) << __func__ << " initiating reconnect" << dendl;
+      ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl;
       connect_seq++;
       state = STATE_CONNECTING;
     }
     backoff = utime_t();
-    ldout(async_msgr->cct,0) << __func__ << dendl;
+    ldout(async_msgr->cct, 0) << __func__ << dendl;
   } else {
     if (backoff == utime_t()) {
       backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff);
@@ -1655,9 +1656,11 @@ void AsyncConnection::fault()
         backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
     }
     ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
-    // TODO wait!!!!!
-    ldout(async_msgr->cct, 10) << __func__ << " done waiting or woke up" << dendl;
   }
+
+  uint64_t milliseconds = double(backoff) * 1000;
+  // woke up again;
+  center->create_time_event(milliseconds, new C_handle_read(this));
 }
 
 void AsyncConnection::was_session_reset()
index f18c4494354edf3e0c548e397fb48dc40aa5acc6..67f65c7de2ca50dcd14c19d49dfa8c2bbf833ab1 100644 (file)
@@ -186,7 +186,7 @@ int Processor::start()
 
   // start thread
   create();
-  center->create_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
+  center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
 
   return 0;
 }
@@ -239,7 +239,7 @@ void Processor::stop()
   done = true;
   ldout(msgr->cct, 10) << __func__ << " processor" << dendl;
 
-  center->delete_event(listen_sd, EVENT_READABLE);
+  center->delete_file_event(listen_sd, EVENT_READABLE);
   if (listen_sd >= 0) {
     ::shutdown(listen_sd, SHUT_RDWR);
   }
@@ -274,7 +274,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     center(cct)
 {
   ceph_spin_init(&global_seq_lock);
-  _init_local_connection();
+  init_local_connection();
 }
 
 /**
index 6cdd73e8914036b4e43df6d933fcf5a2eb799aee..7d8bf9c50cf568e13d2763a7e62e012743e2b0d1 100644 (file)
@@ -1,5 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
+#include <time.h>
+
 #include "common/errno.h"
 #include "Event.h"
 
@@ -49,22 +51,29 @@ EventCenter::~EventCenter()
     delete driver;
 }
 
-int EventCenter::create_event(int fd, int mask, EventCallback *ctxt)
+int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
 {
+  int r;
   Mutex::Locker l(lock);
   if (events.size() > nevent) {
-    lderr(cct) << __func__ << " event count is exceed." << dendl;
-    return -ERANGE;
+    int new_size = nevent << 2;
+    ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
+    r = driver->resize_events(new_size);
+    if (r < 0) {
+      lderr(cct) << __func__ << " event count is exceed." << dendl;
+      return -ERANGE;
+    }
+    nevent = new_size;
   }
 
-  EventCenter::Event *event = _get_event(fd);
+  EventCenter::FileEvent *event = _get_file_event(fd);
 
-  int r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask);
+  r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask);
   if (r < 0)
     return r;
 
   if (!event) {
-    events[fd] = EventCenter::Event();
+    events[fd] = EventCenter::FileEvent();
     event = &events[fd];
   }
 
@@ -82,14 +91,14 @@ int EventCenter::create_event(int fd, int mask, EventCallback *ctxt)
   return 0;
 }
 
-void EventCenter::delete_event(int fd, int mask)
+void EventCenter::delete_file_event(int fd, int mask)
 {
   Mutex::Locker l(lock);
 
-  EventCenter::Event *event = _get_event(fd);
+  EventCenter::FileEvent *event = _get_file_event(fd);
   driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
   if (!event) {
-    events[fd] = EventCenter::Event();
+    events[fd] = EventCenter::FileEvent();
     event = &events[fd];
   }
 
@@ -103,10 +112,88 @@ void EventCenter::delete_event(int fd, int mask)
     events.erase(fd);
 }
 
+
+uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
+{
+  Mutex::Locker l(lock);
+  uint64_t id = eventLoop->timeEventNextId++;
+  EventCenter::TimeEvent event;
+  utime_t expire;
+  struct timeval tv;
+
+  expire = ceph_clock_now(cct);
+  expire.copy_to_timeval(&tv);
+  tv.tv_sec = expire.tv_sec + milliseconds / 1000;
+  tv.tv_usec = expire.tv_usec + milliseconds * 1000;
+  expire.set_from_timeval(&tv);
+
+  event.id = id;
+  event.time_cb = ctxt;
+  time_events[expire] = event;
+
+  return id;
+}
+
+void EventCenter::delete_time_event(uint64_t id)
+{
+  Mutex::Locker l(lock);
+  for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+       it != time_events.end(); it++) {
+    if (it->second.id == id) {
+      time_events.erase(it);
+      return ;
+    }
+  }
+}
+
+int EventCenter::_process_time_events()
+{
+  int processed = 0;
+  time_t now = time(NULL);
+  utime_t cur = ceph_clock_now(cct);
+
+  /* If the system clock is moved to the future, and then set back to the
+   * right value, time events may be delayed in a random way. Often this
+   * means that scheduled operations will not be performed soon enough.
+   *
+   * Here we try to detect system clock skews, and force all the time
+   * events to be processed ASAP when this happens: the idea is that
+   * processing events earlier is less dangerous than delaying them
+   * indefinitely, and practice suggests it is. */
+  if (now < last_time) {
+    for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+          it != time_events.end(); ++it) {
+      it->first = utime_t();
+    }
+  }
+  last_time = now;
+
+  map<utime_t, TimeEvent>::iterator prev;
+  for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+       it != time_events.end(); ) {
+    prev = it;
+    if (cur >= it->first) {
+      FiredEvent event;
+      e.time_event.id = it->second.id;
+      e.time_event.time_cb = it->second.time_cb;
+      e.is_file = false;
+      event_wq.queue(e);
+      processed++;
+      ++it;
+      time_events.erase(prev);
+    } else {
+      break;
+    }
+  }
+
+  return processed;
+}
+
 int EventCenter::process_events(int timeout_millionseconds)
 {
   struct timeval tv;
-  int j, processed, numevents;
+  int numevents;
+  bool trigger_time = false;
 
   if (timeout_millionseconds > 0) {
     tv.tv_sec = timeout_millionseconds / 1000;
@@ -117,11 +204,29 @@ int EventCenter::process_events(int timeout_millionseconds)
     tv.tv_usec = 0;
   }
 
-  processed = 0;
-  vector<FiredEvent> fired_events;
+  Mutex::Locker l(lock);
+  utime_t shortest = utime_t(&tv);
+  for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
+        it != time_events.end(); ++it) {
+    if (shortes > it->first) {
+      shortest = it->first;
+      trigger_time = true;
+      break;
+    }
+  }
+  shortes.copy_to_timeval(&tv);
+
+  vector<FiredFileEvent> fired_events;
   numevents = driver->event_wait(fired_events, &tv);
-  for (j = 0; j < numevents; j++)
-    event_wq.queue(fired_events[j]);
+  for (int j = 0; j < numevents; j++) {
+    FiredEvent e;
+    e.file_event = fired_events[j];
+    e.is_file = true
+    event_wq.queue(e);
+  }
+
+  if (trigger_time)
+    numevents += _process_time_events(shortest);
 
   return numevents;
 }
index c2b5ac657c00bd0ad0563d4eb9e59874f5b580af..541c0dd304922d8b80371d47f2776cccc384fa2d 100644 (file)
 
 class EventCenter;
 
-// Attention:
-// This event library use file description as index to search correspond event
-// in `events` and `fired_events`. So it's important to estimate a suitable
-// capacity in calling eventcenterInit(capacity).
+class EventCallback {
 
-struct FiredEvent {
-  int mask;
+ public:
+  virtual void do_request(int fd_id, int mask=0) = 0;
+  virtual ~EventCallback() {}       // we want a virtual destructor!!!
+};
+
+struct FiredFileEvent {
   int fd;
+  int mask;
+};
+
+struct FiredTimeEvent {
+  uint64_t id;
+  EventCallback *time_cb;
+};
+
+struct FiredEvent {
+  union {
+    FiredFileEvent file_event;
+    FiredTimeEvent time_event;
+  };
+  bool is_file;
 
-  FiredEvent(): mask(0), fd(0) {}
+  FiredEvent(): is_file(true) {}
 };
 
 class EventDriver {
@@ -50,58 +65,69 @@ class EventDriver {
   virtual int init(int nevent) = 0;
   virtual int add_event(int fd, int cur_mask, int mask) = 0;
   virtual void del_event(int fd, int cur_mask, int del_mask) = 0;
-  virtual int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp) = 0;
-};
-
-class EventCallback {
-
- public:
-  virtual void do_request(int fd, int mask) = 0;
-  virtual ~EventCallback() {}       // we want a virtual destructor!!!
+  virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
+  virtual int resize_events(int newsize) = 0;
 };
 
 class EventCenter {
-  struct Event {
+  struct FileEvent {
     int mask;
     EventCallback *read_cb;
     EventCallback *write_cb;
-    Event(): mask(0), read_cb(NULL), write_cb(NULL) {}
+    FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
+  };
+
+  struct TimeEvent {
+    uint64_t id;
+    EventCallback *time_cb;
+
+    TimeEvent(): id(0), time_cb(NULL) {}
   };
 
   Mutex lock;
-  map<int, Event> events;
+  map<int, FileEvent> file_events;
+  map<utime_t, TimeEvent> time_events;
   EventDriver *driver;
   CephContext *cct;
   uint64_t nevent;
+  uint64_t time_event_next_id;
   ThreadPool event_tp;
+  time_t last_time; // last time process time event
 
-  Event *_get_event(int fd) {
-    map<int, Event>::iterator it = events.find(fd);
-    if (it != events.end()) {
+  int _process_time_events();
+  FileEvent *_get_file_event(int fd) {
+    map<int, FileEvent>::iterator it = file_events.find(fd);
+    if (it != file_events.end()) {
       return &it->second;
     }
     return NULL;
   }
+
   struct EventWQ : public ThreadPool::WorkQueueVal<FiredEvent> {
     EventCenter *center;
     // In order to ensure the file descriptor is unique in conn_queue,
     // pending is introduced to check
     //
-    // <File Descriptor>
-    deque<int> conn_queue;
-    // <File Descriptor, Mask>
+    deque<FiredEvent> conn_queue;
+    // used only by file event <File Descriptor, Mask>
     map<int, int> pending;
 
     EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
       : ThreadPool::WorkQueueVal<FiredEvent>("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {}
 
     void _enqueue(FiredEvent e) {
-      // Ensure only one thread process one file descriptor
-      map<int, int>::iterator it = pending.find(e.fd);
-      if (it != pending.end())
-        it->second |= e.mask;
-      else
-        pending[e.fd] = e.mask;
+      if (e.is_file) {
+        // Ensure only one thread process one file descriptor
+        map<int, int>::iterator it = pending.find(e.file_event.fd);
+        if (it != pending.end()) {
+          it->second |= e.file_event.mask;
+        } else {
+          pending[e.file_event.fd] = e.file_event.mask;
+          conn_queue.push_back(e);
+        }
+      } else {
+        conn_queue.push_back(e);
+      }
     }
     void _enqueue_front(FiredEvent e) {
       assert(0);
@@ -114,30 +140,36 @@ class EventCenter {
     }
     FiredEvent _dequeue() {
       assert(!conn_queue.empty());
-      FiredEvent e;
-      e.fd = conn_queue.front();
+      FiredEvent e = conn_queue.front();
       conn_queue.pop_front();
-      assert(pending.count(e.fd));
-      e.mask = pending[e.fd];
-      pending.erase(e.fd);
+      if (e.is_file) {
+        assert(pending.count(e.file_event.fd));
+        e.file_event.mask = pending[e.file_event.fd];
+        pending.erase(e.file_event.fd);
+      }
       return e;
     }
     void _process(FiredEvent e, ThreadPool::TPHandle &handle) {
-      int rfired = 0;
-      Event *event = center->get_event(e.fd);
-      if (!event)
-        return ;
-
-      /* note the event->mask & mask & ... code: maybe an already processed
-       * event removed an element that fired and we still didn't
-       * processed, so we check if the event is still valid. */
-      if (event->mask & e.mask & EVENT_READABLE) {
-        rfired = 1;
-        event->read_cb->do_request(e.fd, e.mask);
-      }
-      if (event->mask & e.mask & EVENT_WRITABLE) {
-        if (!rfired || event->read_cb != event->write_cb)
-          event->write_cb->do_request(e.fd, e.mask);
+      if (e.is_file) {
+        int rfired = 0;
+        FileEvent *event = center->get_file_event(e.file_event.fd);
+        if (!event)
+          return ;
+
+        /* note the event->mask & mask & ... code: maybe an already processed
+        * event removed an element that fired and we still didn't
+        * processed, so we check if the event is still valid. */
+        if (event->mask & e.file_event.mask & EVENT_READABLE) {
+          rfired = 1;
+          event->read_cb->do_request(e.file_event.fd, e.file_event.mask);
+        }
+        if (event->mask & e.file_event.mask & EVENT_WRITABLE) {
+          if (!rfired || event->read_cb != event->write_cb)
+            event->write_cb->do_request(e.file_event.fd, e.file_event.mask);
+        }
+      } else {
+        e.time_event.time_cb->do_request(e.time_event.id);
+        delete e.time_event.time_cb;
       }
     }
     void _clear() {
@@ -147,17 +179,21 @@ class EventCenter {
 
  public:
   EventCenter(CephContext *c):
-    lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0),
+    lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
     event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
-    event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {}
+    event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
+    last_time = time(NULL);
+  }
   ~EventCenter();
   int init(int nevent);
-  int create_event(int fd, int mask, EventCallback *ctxt);
-  void delete_event(int fd, int mask);
+  int create_file_event(int fd, int mask, EventCallback *ctxt);
+  uint64_t create_time_event(uint64_t milliseconds, EventCallback *ctxt);
+  void delete_file_event(int fd, int mask);
+  void delete_time_event(uint64_t id);
   int process_events(int timeout_milliseconds);
-  Event *get_event(int fd) {
+  FileEvent *get_file_event(int fd) {
     Mutex::Locker l(lock);
-    return _get_event(fd);
+    return _get_file_event(fd);
   }
 };
 
index ae997374d80865715791ceeb62bea0f6b30adbbf..a8f18571cf12168c0a2dc4e131324ca24a63f84b 100644 (file)
@@ -91,7 +91,13 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
   }
 }
 
-int EpollDriver::event_wait(vector<FiredEvent> &fired_events, struct timeval *tvp)
+int EpollDriver::resize_events(int newsize)
+{
+  state->events = realloc(events, sizeof(struct epoll_event)*newsize);
+  return 0;
+}
+
+int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
 {
   int retval, numevents = 0;
 
index 0c4305321a83153ead15b134e3b000034cfbafc6..1927a62b987ae2106b4f4cbaf356e0c524f071c3 100644 (file)
@@ -31,7 +31,8 @@ class EpollDriver : public EventDriver {
   int init(int nevent);
   int add_event(int fd, int cur_mask, int add_mask);
   void del_event(int fd, int cur_mask, int del_mask);
-  int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp);
+  int resize_events(int newsize);
+  int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp);
 };
 
 #endif