]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Use shared_ptr for EventCallback
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 10 Sep 2014 02:50:45 +0000 (10:50 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:04:57 +0000 (14:04 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h
src/msg/AsyncMessenger.cc
src/msg/Event.cc
src/msg/Event.h
src/msg/EventEpoll.cc

index cf8c226ef00677ef65e5ac2bddf50865ce6b26cb..19b9ed891de6495e00d961157c1d0aa153fed6b4 100644 (file)
@@ -63,12 +63,12 @@ class C_handle_dispatch : public EventCallback {
  public:
   C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
   void do_request(int id) {
-    msgr->ms_fast_preprocess(m);
-    if (msgr->ms_can_fast_dispatch(m)) {
-      msgr->ms_fast_dispatch(m);
-    } else {
+    //msgr->ms_fast_preprocess(m);
+    //if (msgr->ms_can_fast_dispatch(m)) {
+    //  msgr->ms_fast_dispatch(m);
+    //} else {
       msgr->ms_deliver_dispatch(m);
-    }
+    //}
   }
 };
 
@@ -101,7 +101,12 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
     state(STATE_NONE), state_after_send(0), sd(-1),
     lock("AsyncConnection::lock"), open_write(false),
     got_bad_auth(false), authorizer(NULL),
-    state_buffer(4096), state_offset(0), net(cct), center(&m->center) { }
+    state_buffer(4096), state_offset(0), net(cct), center(&m->center)
+{
+  read_handler.reset(new C_handle_read(this));
+  write_handler.reset(new C_handle_write(this));
+  reset_handler.reset(new C_handle_reset(async_msgr, this));
+}
 
 AsyncConnection::~AsyncConnection()
 {
@@ -184,7 +189,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     assert(!outcoming_bl.length());
     connect_seq++;
     state = STATE_CONNECTING;
-    center->create_time_event(0, new C_handle_read(this));
+    center->create_time_event(0, read_handler);
     return 0;
   }
 
@@ -237,7 +242,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
   }
 
   if (!open_write && is_queued()) {
-    center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+    center->create_file_event(sd, EVENT_WRITABLE, write_handler);
     open_write = true;
   }
 
@@ -286,7 +291,7 @@ void AsyncConnection::process()
   int prev_state = state;
   Mutex::Locker l(lock);
   do {
-    ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
+    ldout(async_msgr->cct, 20) << __func__ << " state is " << get_state_name(state)
                                << ", prev state is " << get_state_name(prev_state) << dendl;
     prev_state = state;
     switch (state) {
@@ -656,9 +661,17 @@ void AsyncConnection::process()
           in_seq = message->get_seq();
           ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
                                << " " << message << " " << *message << dendl;
-          center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
-
           state = STATE_OPEN;
+
+          async_msgr->ms_fast_preprocess(message);
+          if (async_msgr->ms_can_fast_dispatch(message)) {
+            lock.Unlock();
+            async_msgr->ms_fast_dispatch(message);
+            lock.Lock();
+          } else {
+            center->create_time_event(0, EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
+          }
+
           break;
         }
 
@@ -757,7 +770,7 @@ int AsyncConnection::_process_connection()
         }
         net.set_socket_options(sd);
 
-        center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
+        center->create_file_event(sd, EVENT_READABLE, read_handler);
         state = STATE_CONNECTING_WAIT_BANNER;
         break;
       }
@@ -1561,7 +1574,7 @@ void AsyncConnection::_connect()
 
   state = STATE_CONNECTING;
   // rescheduler connection in order to avoid lock dep
-  center->create_time_event(0, new C_handle_read(this));
+  center->create_time_event(0, read_handler);
 }
 
 void AsyncConnection::accept(int incoming)
@@ -1571,7 +1584,7 @@ void AsyncConnection::accept(int incoming)
 
   sd = incoming;
   state = STATE_ACCEPTING;
-  center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
+  center->create_file_event(sd, EVENT_READABLE, read_handler);
   process();
 }
 
@@ -1584,7 +1597,7 @@ int AsyncConnection::send_message(Message *m)
   Mutex::Locker l(lock);
   out_q[m->get_priority()].push_back(m);
   if (sd > 0 && !open_write) {
-    center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+    center->create_file_event(sd, EVENT_WRITABLE, write_handler);
     open_write = true;
   }
   return 0;
@@ -1713,7 +1726,7 @@ void AsyncConnection::fault()
 
   uint64_t milliseconds = double(backoff) * 1000;
   // woke up again;
-  center->create_time_event(milliseconds, new C_handle_read(this));
+  center->create_time_event(milliseconds, read_handler);
 }
 
 void AsyncConnection::was_session_reset()
@@ -1740,7 +1753,7 @@ void AsyncConnection::was_session_reset()
 void AsyncConnection::_stop()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
-  center->create_time_event(0, new C_handle_reset(async_msgr, this));
+  center->create_time_event(0, reset_handler);
   shutdown_socket();
   discard_out_queue();
   outcoming_bl.clear();
index dfc28c8c62fa74a86f26f4e67d11312fe7d7af3f..d4f8395d535aa736ba1db3bf3f9a670258032f06 100644 (file)
@@ -216,6 +216,9 @@ class AsyncConnection : public Connection {
   Mutex lock;
   utime_t backoff;         // backoff time
   bool open_write;
+  EventCallbackRef read_handler;
+  EventCallbackRef write_handler;
+  EventCallbackRef reset_handler;
 
   // Tis section are temp variables used by state transition
 
index abd27622ecc10102892ad2457a4a1fd678d4fe82..00bd5847ca0fab868aff9e745cd75608eec60fb4 100644 (file)
@@ -187,7 +187,7 @@ int Processor::start()
   // start thread
   create();
   if (listen_sd >= 0)
-    center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
+    center->create_file_event(listen_sd, EVENT_READABLE, EventCallbackRef(new C_handle_accept(this)));
 
   return 0;
 }
@@ -217,7 +217,7 @@ void *Processor::entry()
   while (!done) {
     ldout(msgr->cct,20) << __func__ << " calling poll" << dendl;
 
-    r = center->process_events(1000);
+    r = center->process_events(30000);
     if (r < 0) {
       ldout(msgr->cct,20) << __func__ << " process events failed: "
                           << cpp_strerror(errno) << dendl;
@@ -245,6 +245,7 @@ void Processor::stop()
   if (listen_sd >= 0) {
     ::shutdown(listen_sd, SHUT_RDWR);
   }
+  center->stop();
 
   // wait for thread to stop before closing the socket, to avoid
   // racing against fd re-use.
index 71f019fb84d9380a42e0305eabb0aa227e84aa33..4afb7227bf73819fe76f8f5b798861314f0185aa 100644 (file)
@@ -62,7 +62,7 @@ int EventCenter::init(int n)
   notify_send_fd = fds[1];
 
   nevent = n;
-  create_file_event(notify_receive_fd, EVENT_READABLE, new C_handle_notify());
+  create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify()));
   return 0;
 }
 
@@ -77,7 +77,7 @@ EventCenter::~EventCenter()
     ::close(notify_send_fd);
 }
 
-int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
+int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 {
   int r;
   Mutex::Locker l(lock);
@@ -105,13 +105,9 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
 
   event->mask |= mask;
   if (mask & EVENT_READABLE) {
-    if (event->read_cb)
-      delete event->read_cb;
     event->read_cb = ctxt;
   }
   if (mask & EVENT_WRITABLE) {
-    if (event->write_cb)
-      delete event->write_cb;
     event->write_cb = ctxt;
   }
   ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask
@@ -130,12 +126,10 @@ void EventCenter::delete_file_event(int fd, int mask)
   driver->del_event(fd, event->mask, mask);
 
   if (mask & EVENT_READABLE && event->read_cb) {
-    delete event->read_cb;
-    event->read_cb = NULL;
+    event->read_cb.reset();
   }
   if (mask & EVENT_WRITABLE && event->write_cb) {
-    delete event->write_cb;
-    event->write_cb = NULL;
+    event->write_cb.reset();
   }
 
   event->mask = event->mask & (~mask);
@@ -145,7 +139,7 @@ void EventCenter::delete_file_event(int fd, int mask)
                  << " now mask is " << event->mask << dendl;
 }
 
-uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
+uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef ctxt)
 {
   Mutex::Locker l(lock);
   uint64_t id = time_event_next_id++;
@@ -206,13 +200,17 @@ void EventCenter::start()
   ldout(cct, 1) << __func__ << dendl;
   Mutex::Locker l(lock);
   event_tp.start();
+  tp_stop = false;
 }
 
 void EventCenter::stop()
 {
   ldout(cct, 1) << __func__ << dendl;
   Mutex::Locker l(lock);
-  event_tp.stop();
+  if (!tp_stop) {
+    event_tp.stop();
+    tp_stop = true;
+  }
   char buf[1];
   buf[0] = 'c';
   // wake up "event_wait"
index caebc8af612be96e38d34451de4f2ea0043e69be..e9aad221482b2d448e42d59dcda78b98c1e46f7d 100644 (file)
@@ -40,6 +40,8 @@ class EventCallback {
   virtual ~EventCallback() {}       // we want a virtual destructor!!!
 };
 
+typedef ceph::shared_ptr<EventCallback> EventCallbackRef;
+
 struct FiredFileEvent {
   int fd;
   int mask;
@@ -47,14 +49,12 @@ struct FiredFileEvent {
 
 struct FiredTimeEvent {
   uint64_t id;
-  EventCallback *time_cb;
+  EventCallbackRef time_cb;
 };
 
 struct FiredEvent {
-  union {
-    FiredFileEvent file_event;
-    FiredTimeEvent time_event;
-  };
+  FiredFileEvent file_event;
+  FiredTimeEvent time_event;
   bool is_file;
 
   FiredEvent(): is_file(true) {}
@@ -73,16 +73,16 @@ class EventDriver {
 class EventCenter {
   struct FileEvent {
     int mask;
-    EventCallback *read_cb;
-    EventCallback *write_cb;
-    FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
+    EventCallbackRef read_cb;
+    EventCallbackRef write_cb;
+    FileEvent(): mask(0) {}
   };
 
   struct TimeEvent {
     uint64_t id;
-    EventCallback *time_cb;
+    EventCallbackRef time_cb;
 
-    TimeEvent(): id(0), time_cb(NULL) {}
+    TimeEvent(): id(0) {}
   };
 
   Mutex lock;
@@ -100,6 +100,7 @@ class EventCenter {
   int notify_receive_fd;
   int notify_send_fd;
   utime_t next_wake;
+  bool tp_stop;
 
   int process_time_events();
   FileEvent *_get_file_event(int fd) {
@@ -176,7 +177,6 @@ class EventCenter {
         }
       } else {
         e.time_event.time_cb->do_request(e.time_event.id);
-        delete e.time_event.time_cb;
       }
     }
     void _clear() {
@@ -188,14 +188,14 @@ class EventCenter {
   EventCenter(CephContext *c):
     lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
     event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
-    notify_receive_fd(-1), notify_send_fd(-1),
+    notify_receive_fd(-1), notify_send_fd(-1),tp_stop(true),
     event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
     last_time = time(NULL);
   }
   ~EventCenter();
   int init(int nevent);
-  int create_file_event(int fd, int mask, EventCallback *ctxt);
-  uint64_t create_time_event(uint64_t milliseconds, EventCallback *ctxt);
+  int create_file_event(int fd, int mask, EventCallbackRef ctxt);
+  uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
   void delete_file_event(int fd, int mask);
   void delete_time_event(uint64_t id);
   int process_events(int timeout_milliseconds);
index c12e3b36b997205dbd73cbf123e0be54e823e221..68d0e1459fbe181c474e73c35887332907c75a2b 100644 (file)
@@ -47,7 +47,7 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
     op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD;
   }
 
-  ee.events = 0;
+  ee.events = EPOLLET;
   add_mask |= cur_mask; /* Merge old events */
   if (add_mask & EVENT_READABLE)
     ee.events |= EPOLLIN;
@@ -75,7 +75,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
 
   int mask = cur_mask & (~delmask);
 
-  ee.events = EPOLLET;
+  ee.events = 0;
   if (mask & EVENT_READABLE) ee.events |= EPOLLIN;
   if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
   ee.data.u64 = 0; /* avoid valgrind warning */