]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Remove DispatchQueue in AsyncMessenger
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 12 Sep 2014 07:51:52 +0000 (15:51 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:01:56 +0000 (14:01 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h
src/msg/AsyncMessenger.cc
src/msg/AsyncMessenger.h
src/msg/Event.cc
src/msg/Event.h
src/msg/EventEpoll.cc
src/msg/EventEpoll.h
src/msg/simple/DispatchQueue.h

index 296c4451357df71fefbe961f92ea97bd949e320e..7e9dfb09bef1c63d30477d1371352be3410e9bee 100644 (file)
@@ -1,7 +1,9 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
 #include <sys/types.h>
+#include <sys/socket.h>
 #include <unistd.h>
+
 #include "include/Context.h"
 #include "common/errno.h"
 #include "AsyncMessenger.h"
@@ -69,8 +71,8 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
-    state(STATE_NONE), state_after_send(0), sd(-1), conn_id(m->dispatch_queue.get_id()),
-    in_q(&(m->dispatch_queue)), lock("AsyncConnection::lock"), backoff(0),
+    state(STATE_NONE), state_after_send(0), sd(-1),
+    lock("AsyncConnection::lock"), backoff(0),
     got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { }
 
 AsyncConnection::~AsyncConnection()
@@ -415,32 +417,13 @@ void AsyncConnection::process()
                   << policy.throttler_bytes->get_max() << dendl;
               // FIXME: when to try it again?
               if (policy.throttler_bytes->get_or_fail(message_size))
-                state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH;
+                state = STATE_OPEN_MESSAGE_READ_FRONT;
             }
           }
 
           break;
         }
 
-      case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH:
-        {
-          uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
-          if (message_size) {
-            // throttle total bytes waiting for dispatch.  do this _after_ the
-            // policy throttle, as this one does not deadlock (unless dispatch
-            // blocks indefinitely, which it shouldn't).  in contrast, the
-            // policy throttle carries for the lifetime of the message.
-            ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " from dispatch throttler "
-                << async_msgr->dispatch_throttler.get_current() << "/"
-                << async_msgr->dispatch_throttler.get_max() << dendl;
-            if (async_msgr->dispatch_throttler.get_or_fail(message_size)) {
-              state = STATE_OPEN_MESSAGE_READ_FRONT;
-              throttle_stamp = ceph_clock_now(async_msgr->cct);
-            }
-          }
-
-          break;
-        }
       case STATE_OPEN_MESSAGE_READ_FRONT:
         {
           // read front
@@ -588,7 +571,7 @@ void AsyncConnection::process()
           //
 
           ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
-          if (auth_handler == NULL) {
+          if (auth_handler) {
             ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl;
           } else {
             if (auth_handler->check_message_signature(message)) {
@@ -617,7 +600,6 @@ void AsyncConnection::process()
             ldout(async_msgr->cct,0) << __func__ << " got old message "
                     << message->get_seq() << " <= " << in_seq << " " << message << " " << *message
                     << ", discarding" << dendl;
-            async_msgr->dispatch_throttle_release(message->get_dispatch_throttle_size());
             message->put();
             if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
               assert(0 == "old msgs despite reconnect_seq feature");
@@ -630,11 +612,11 @@ void AsyncConnection::process()
           in_seq = message->get_seq();
           ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
                                << " " << message << " " << *message << dendl;
-          in_q->fast_preprocess(message);
-          if (in_q->can_fast_dispatch(message)) {
-            in_q->fast_dispatch(message);
+          async_msgr->ms_fast_preprocess(message);
+          if (async_msgr->ms_can_fast_dispatch(message)) {
+            async_msgr->ms_fast_dispatch(message);
           } else {
-            in_q->enqueue(message, message->get_priority(), conn_id);
+            async_msgr->ms_deliver_dispatch(message);
           }
 
           state = STATE_OPEN;
@@ -696,11 +678,6 @@ fail:
                             << policy.throttler_bytes->get_max() << dendl;
         policy.throttler_bytes->put(message_size);
       }
-
-      if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH &&
-          state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
-        async_msgr->dispatch_throttle_release(message_size);
-      }
     }
     fault();
     state = STATE_FAULT;
@@ -1009,7 +986,7 @@ int AsyncConnection::_process_connection()
         }
 
         get();
-        async_msgr->dispatch_queue.queue_connect(this);
+        async_msgr->ms_deliver_handle_connect(this);
         get();
         async_msgr->ms_deliver_handle_fast_connect(this);
 
@@ -1448,11 +1425,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (existing->policy.lossy) {
     // disconnect from the Connection
     existing->get();
-    async_msgr->dispatch_queue.queue_reset(existing);
+    async_msgr->ms_deliver_handle_reset(existing);
   } else {
     // queue a reset on the new connection, which we're dumping for the old
     get();
-    async_msgr->dispatch_queue.queue_reset(this);
+    async_msgr->ms_deliver_handle_reset(this);
 
     // reset the in_seq if this is a hard reset from peer,
     // otherwise we respect our original connection's value
@@ -1501,7 +1478,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
   // notify
   get();
-  async_msgr->dispatch_queue.queue_accept(this);
+  async_msgr->ms_deliver_handle_accept(this);
   get();
   async_msgr->ms_deliver_handle_fast_accept(this);
 
@@ -1642,7 +1619,6 @@ void AsyncConnection::fault()
 
   if (policy.lossy && state != STATE_CONNECTING) {
     ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
-    in_q->discard_queue(conn_id);
     _stop();
     return ;
   }
@@ -1687,12 +1663,11 @@ void AsyncConnection::fault()
 void AsyncConnection::was_session_reset()
 {
   ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl;
-  in_q->discard_queue(conn_id);
   discard_out_queue();
   outcoming_bl.clear();
 
   get();
-  async_msgr->dispatch_queue.queue_remote_reset(this);
+  async_msgr->ms_deliver_handle_remote_reset(this);
 
   if (randomize_out_seq()) {
     lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
@@ -1711,7 +1686,7 @@ void AsyncConnection::_stop()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
   get();
-  async_msgr->dispatch_queue.queue_reset(this);
+  async_msgr->ms_deliver_handle_reset(this);
   shutdown_socket();
   discard_out_queue();
   outcoming_bl.clear();
index b97622613ba7582a968b99389910f0e5ff2a79fd..5a472fc2857e5d334c3f3ca23756bed95699ebac 100644 (file)
@@ -143,7 +143,6 @@ class AsyncConnection : public Connection {
     STATE_OPEN_MESSAGE_HEADER,
     STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
     STATE_OPEN_MESSAGE_THROTTLE_BYTES,
-    STATE_OPEN_MESSAGE_THROTTLE_DISPATCH,
     STATE_OPEN_MESSAGE_READ_FRONT,
     STATE_OPEN_MESSAGE_READ_MIDDLE,
     STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
@@ -182,10 +181,8 @@ class AsyncConnection : public Connection {
   int state_after_send;
   int sd;
   int port;
-  uint64_t conn_id;
   Messenger::Policy policy;
   map<int, list<Message*> > out_q;  // priority queue for outbound msgs
-  DispatchQueue *in_q;
   list<Message*> sent;
   Mutex lock;
   utime_t backoff;         // backoff time
index 7bc8e4cba3625ac99a85c88eab79784ff9657297..f18c4494354edf3e0c548e397fb48dc40aa5acc6 100644 (file)
@@ -266,15 +266,12 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
     processor(this, _nonce, &center),
-    nonce(_nonce),
     lock("AsyncMessenger::lock"),
-    center(cct), did_bind(false),
+    nonce(_nonce), did_bind(false),
     global_seq(0),
     cluster_protocol(0),
     local_connection(new AsyncConnection(cct, this)),
-    dispatch_throttler(cct, string("msgr_dispatch_throttler-") + mname,
-                       cct->_conf->ms_dispatch_throttle_bytes),
-    dispatch_queue(cct, this)
+    center(cct)
 {
   ceph_spin_init(&global_seq_lock);
   _init_local_connection();
@@ -292,7 +289,6 @@ AsyncMessenger::~AsyncMessenger()
 void AsyncMessenger::ready()
 {
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
-  dispatch_queue.start();
 
   lock.Lock();
   if (did_bind)
@@ -304,7 +300,6 @@ int AsyncMessenger::shutdown()
 {
   ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
   mark_down_all();
-  dispatch_queue.shutdown();
 
   // break ref cycles on the loopback connection
   local_connection->set_priv(NULL);
@@ -372,12 +367,6 @@ void AsyncMessenger::wait()
   }
   lock.Unlock();
 
-  if(dispatch_queue.is_started()) {
-    ldout(cct,10) << __func__ << ": waiting for dispatch queue" << dendl;
-    dispatch_queue.wait();
-    ldout(cct,10) << __func__ << ": dispatch queue is stopped" << dendl;
-  }
-
   // done!  clean up.
   if (did_bind) {
     ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
@@ -503,7 +492,19 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnection *con,
   if (my_inst.addr == dest_addr) {
     // local
     ldout(cct, 20) << __func__ << " " << *m << " local" << dendl;
-    dispatch_queue.local_delivery(m, m->get_priority());
+    m->set_connection(local_connection.get());
+    m->set_recv_stamp(ceph_clock_now(cct));
+    ms_fast_preprocess(m);
+    if (ms_can_fast_dispatch(m)) {
+      ms_fast_dispatch(m);
+    } else {
+      if (m->get_priority() >= CEPH_MSG_PRIO_LOW) {
+        ms_fast_dispatch(m);
+      } else {
+        ms_deliver_dispatch(m);
+      }
+    }
+
     return;
   }
 
@@ -550,7 +551,7 @@ void AsyncMessenger::mark_down_all()
     ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
     p->mark_down();
     p->get();
-    dispatch_queue.queue_reset(p);
+    ms_deliver_handle_reset(p);
   }
   accepting_conns.clear();
 
@@ -561,7 +562,7 @@ void AsyncMessenger::mark_down_all()
     conns.erase(it);
     p->mark_down();
     p->get();
-    dispatch_queue.queue_reset(p);
+    ms_deliver_handle_reset(p);
   }
   lock.Unlock();
 }
@@ -574,7 +575,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
     ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
     _stop_conn(p);
     p->get();
-    dispatch_queue.queue_reset(p);
+    ms_deliver_handle_reset(p);
   } else {
     ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl;
   }
@@ -624,12 +625,3 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   _init_local_connection();
   lock.Unlock();
 }
-
-void AsyncMessenger::dispatch_throttle_release(uint64_t msize) {
-  if (msize) {
-    ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
-                  << dispatch_throttler.get_current() << "/"
-                  << dispatch_throttler.get_max() << dendl;
-    dispatch_throttler.put(msize);
-  }
-}
index 55a21eecf42c5ca750c2eb7def928731cb71571e..f9e9113394b32038b56eecc7ba3139500f0f827a 100644 (file)
@@ -106,11 +106,11 @@ public:
   void set_addr_unknowns(entity_addr_t& addr);
 
   int get_dispatch_queue_len() {
-    return dispatch_queue.get_queue_len();
+    return 0;
   }
 
   double get_dispatch_queue_max_age(utime_t now) {
-    return dispatch_queue.get_max_age(now);
+    return 0;
   }
   /** @} Accessors */
 
@@ -280,7 +280,7 @@ private:
     return p->second;
   }
 
-  void *_stop_conn(AsyncConnection *c) {
+  void _stop_conn(AsyncConnection *c) {
     assert(lock.is_locked());
     if (c) {
       c->mark_down();
@@ -301,11 +301,7 @@ public:
   /// con used for sending messages to ourselves
   ConnectionRef local_connection;
 
-  /// Throttle preventing us from building up a big backlog waiting for dispatch
-  Throttle dispatch_throttler;
-
   EventCenter center;
-  DispatchQueue dispatch_queue;
 
   /**
    * @defgroup AsyncMessenger internals
@@ -376,13 +372,6 @@ public:
     _init_local_connection();
   }
 
-  /**
-   * Release memory accounting back to the dispatch throttler.
-   *
-   * @param msize The amount of memory to release.
-   */
-  void dispatch_throttle_release(uint64_t msize);
-
   /**
    * @} // AsyncMessenger Internals
    */
index d7979712a0a36ad9c4462145ff36ea4a6600558f..6cdd73e8914036b4e43df6d933fcf5a2eb799aee 100644 (file)
@@ -1,9 +1,10 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
 #include "common/errno.h"
+#include "Event.h"
 
 #ifdef HAVE_EPOLL
-#include "EventEpoll.h""
+#include "EventEpoll.h"
 #else
 #ifdef HAVE_KQUEUE
 #include "EventKqueue.h"
@@ -23,6 +24,7 @@ int EventCenter::init(int n)
   driver = new KqueueDriver(cct);
 #else
   driver = new SelectDriver(cct);
+#endif
 #endif
 
   if (!driver) {
@@ -41,7 +43,7 @@ int EventCenter::init(int n)
   return 0;
 }
 
-EventCenter::~EventCenter();
+EventCenter::~EventCenter()
 {
   if (driver)
     delete driver;
@@ -50,41 +52,50 @@ EventCenter::~EventCenter();
 int EventCenter::create_event(int fd, int mask, EventCallback *ctxt)
 {
   Mutex::Locker l(lock);
-  if (events.size() > center->nevent) {
+  if (events.size() > nevent) {
     lderr(cct) << __func__ << " event count is exceed." << dendl;
     return -ERANGE;
   }
 
-  int r = driver->add_event(fd, mask);
+  EventCenter::Event *event = _get_event(fd);
+
+  int r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask);
   if (r < 0)
     return r;
 
-  if (events.find(fd) == events.end()) {
+  if (!event) {
     events[fd] = EventCenter::Event();
+    event = &events[fd];
   }
 
-  EventCenter::Event *event = &events[fd];
-
   event->mask |= mask;
-  if (mask & EVENT_READABLE)
+  if (mask & EVENT_READABLE) {
+    if (event->read_cb)
+      delete event->read_cb;
     event->read_cb = ctxt;
-  if (mask & EVENT_WRITABLE)
+  }
+  if (mask & EVENT_WRITABLE) {
+    if (event->write_cb)
+      delete event->write_cb;
     event->write_cb = ctxt;
+  }
   return 0;
 }
 
-void delete_event(int fd, int mask)
+void EventCenter::delete_event(int fd, int mask)
 {
   Mutex::Locker l(lock);
-  if (event->mask == EVENT_NONE)
-    return;
 
-  driver->del_event(fd, mask);
-  struct event *event = &events[fd];
+  EventCenter::Event *event = _get_event(fd);
+  driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
+  if (!event) {
+    events[fd] = EventCenter::Event();
+    event = &events[fd];
+  }
 
-  if (mask & EVENT_READABLE)
+  if (event->read_cb)
     delete event->read_cb;
-  if (mask & EVENT_WRITABLE)
+  if (event->write_cb)
     delete event->write_cb;
 
   event->mask = event->mask & (~mask);
@@ -92,10 +103,10 @@ void delete_event(int fd, int mask)
     events.erase(fd);
 }
 
-int process_events(int timeout_millionseconds)
+int EventCenter::process_events(int timeout_millionseconds)
 {
   struct timeval tv;
-  int j, processed, numevents, mask, fd, rfired;
+  int j, processed, numevents;
 
   if (timeout_millionseconds > 0) {
     tv.tv_sec = timeout_millionseconds / 1000;
@@ -110,7 +121,7 @@ int process_events(int timeout_millionseconds)
   vector<FiredEvent> fired_events;
   numevents = driver->event_wait(fired_events, &tv);
   for (j = 0; j < numevents; j++)
-    event_wq.queue(fired_events[i]);
+    event_wq.queue(fired_events[j]);
 
   return numevents;
 }
index ac58f0b46381943f44f329fef72479c057e0a513..c2b5ac657c00bd0ad0563d4eb9e59874f5b580af 100644 (file)
@@ -8,7 +8,7 @@
 #endif
 
 // We use epoll, kqueue, evport, select in descending order by performance.
-#ifdef __linux__
+#if defined(__linux__)
 #define HAVE_EPOLL 1
 #endif
 
@@ -48,9 +48,9 @@ class EventDriver {
  public:
   virtual ~EventDriver() {}       // we want a virtual destructor!!!
   virtual int init(int nevent) = 0;
-  virtual int add_event(int fd, int mask) = 0;
-  virtual void delete_event(int fd, int del_mask) = 0;
-  virtual int event_wait(FiredEvent &fired_events, struct timeval *tp) = 0;
+  virtual int add_event(int fd, int cur_mask, int mask) = 0;
+  virtual void del_event(int fd, int cur_mask, int del_mask) = 0;
+  virtual int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp) = 0;
 };
 
 class EventCallback {
@@ -72,16 +72,14 @@ class EventCenter {
   map<int, Event> events;
   EventDriver *driver;
   CephContext *cct;
-  int nevent;
+  uint64_t nevent;
   ThreadPool event_tp;
 
-  Event *get_event(int fd) {
-    Mutex::Locker l(lock);
+  Event *_get_event(int fd) {
     map<int, Event>::iterator it = events.find(fd);
     if (it != events.end()) {
       return &it->second;
     }
-
     return NULL;
   }
   struct EventWQ : public ThreadPool::WorkQueueVal<FiredEvent> {
@@ -155,8 +153,12 @@ class EventCenter {
   ~EventCenter();
   int init(int nevent);
   int create_event(int fd, int mask, EventCallback *ctxt);
-  int delete_event(int fd, int mask);
+  void delete_event(int fd, int mask);
   int process_events(int timeout_milliseconds);
+  Event *get_event(int fd) {
+    Mutex::Locker l(lock);
+    return _get_event(fd);
+  }
 };
 
 #endif
index ef6825bc68e33afd145d22ef9352b095446c978a..ae997374d80865715791ceeb62bea0f6b30adbbf 100644 (file)
@@ -1,3 +1,4 @@
+#include "common/errno.h"
 #include "EventEpoll.h"
 
 #define dout_subsys ceph_subsys_ms
@@ -7,11 +8,11 @@
 
 int EpollDriver::init(int nevent)
 {
-  events = malloc(sizeof(struct epoll_event)*nevent);
+  events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*nevent);
   if (!events) {
     lderr(cct) << __func__ << " unable to malloc memory: "
-                       << cpp_strerror(errno) << dendl;
-    return -error;
+                           << cpp_strerror(errno) << dendl;
+    return -errno;
   }
   memset(events, 0, sizeof(struct epoll_event)*nevent);
 
@@ -19,13 +20,13 @@ int EpollDriver::init(int nevent)
   if (epfd == -1) {
     lderr(cct) << __func__ << " unable to do epoll_create: "
                        << cpp_strerror(errno) << dendl;
-    return -error;
+    return -errno;
   }
 
   return 0;
 }
 
-int EpollDriver::add_event(int fd, int mask)
+int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
 {
   struct epoll_event ee;
   /* If the fd was already monitored for some event, we need a MOD
@@ -34,30 +35,29 @@ int EpollDriver::add_event(int fd, int mask)
   map<int, int>::iterator it = fds.find(fd);
   if (it == fds.end()) {
     op = EPOLL_CTL_ADD;
-    if (deleted_fds.length()) {
-      pos = deleted.fds.front();
-      deleted.fds.pop_front();
+    if (deleted_fds.size()) {
+      pos = deleted_fds.front();
+      deleted_fds.pop_front();
     } else {
       fds[fd] = pos = next_pos;
       next_pos++;
     }
   } else {
-    pos = it->second;
-    op = events[pos].mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+    op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD;
   }
 
   ee.events = 0;
-  mask |= events[pos].mask; /* Merge old events */
-  if (mask & EVENT_READABLE)
+  add_mask |= cur_mask; /* Merge old events */
+  if (add_mask & EVENT_READABLE)
     ee.events |= EPOLLIN;
-  if (mask & EVENT_WRITABLE)
+  if (add_mask & EVENT_WRITABLE)
     ee.events |= EPOLLOUT;
   ee.data.u64 = 0; /* avoid valgrind warning */
   ee.data.fd = fd;
   if (epoll_ctl(epfd, op, fd, &ee) == -1) {
     lderr(cct) << __func__ << " unable to add event: "
                        << cpp_strerror(errno) << dendl;
-    return -error;
+    return -errno;
   }
   return 0;
 }
@@ -67,7 +67,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
   struct epoll_event ee;
   map<int, int>::iterator it = fds.find(fd);
   if (it == fds.end())
-    return 0;
+    return ;
 
   int mask = cur_mask & (~delmask);
 
@@ -91,7 +91,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
   }
 }
 
-int EpollDriver::event_wait(FiredEvent &fired_events, struct timeval *tvp)
+int EpollDriver::event_wait(vector<FiredEvent> &fired_events, struct timeval *tvp)
 {
   int retval, numevents = 0;
 
index 4513113c866ac625029e3d9f701e5df099679262..0c4305321a83153ead15b134e3b000034cfbafc6 100644 (file)
@@ -29,9 +29,9 @@ class EpollDriver : public EventDriver {
   }
 
   int init(int nevent);
-  int add_event(int fd, int mask);
+  int add_event(int fd, int cur_mask, int add_mask);
   void del_event(int fd, int cur_mask, int del_mask);
-  int event_wait(FiredEvent &fired_events, struct timeval *tp);
+  int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp);
 };
 
 #endif
index 006938107c238218d47cb89654046eab65c25286..5fe17dcf5936723b36c24919ed35ffeaafcfa7d4 100644 (file)
@@ -65,7 +65,7 @@ class DispatchQueue {
   };
     
   CephContext *cct;
-  Messenger *msgr;
+  SimpleMessenger *msgr;
   Mutex lock;
   Cond cond;
 
@@ -191,7 +191,7 @@ class DispatchQueue {
   void shutdown();
   bool is_started() {return dispatch_thread.is_started();}
 
-  DispatchQueue(CephContext *cct, Messenger *msgr)
+  DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
     : cct(cct), msgr(msgr),
       lock("SimpleMessenger::DispatchQeueu::lock"), 
       mqueue(cct->_conf->ms_pq_max_tokens_per_priority,