]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Fix time event is called after AsyncMessenger destruction 3101/head
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 9 Dec 2014 08:55:28 +0000 (16:55 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 18 Dec 2014 15:36:27 +0000 (23:36 +0800)
AsyncConnection uses time event to handle async event partially, but it's not
very effective so here use dispatch_event_external instead.

And if client try to connect to server after a period, it may be call
AsyncConnection::process which will reference to AsyncMessenger. Since
AsyncMessenger doesn't use reference count, it will result in segment
fault. Now we record time event id and delete these registered time events
when stopping connection.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.cc
src/msg/async/Event.h

index d7d1281cfd7c65acb6702418f13870fbb7a24dc3..46ea31c7adb17df5c2232fb12a6cdb902d858124 100644 (file)
@@ -39,12 +39,22 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
         << ").";
 }
 
+class C_time_wakeup : public EventCallback {
+  AsyncConnectionRef conn;
+
+ public:
+  C_time_wakeup(AsyncConnectionRef c): conn(c) {}
+  void do_request(int fd_or_id) {
+    conn->wakeup_from(fd_or_id);
+  }
+};
+
 class C_handle_read : public EventCallback {
   AsyncConnectionRef conn;
 
  public:
   C_handle_read(AsyncConnectionRef c): conn(c) {}
-  void do_request(int fd) {
+  void do_request(int fd_or_id) {
     conn->process();
   }
 };
@@ -277,7 +287,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     assert(!outcoming_bl.length());
     connect_seq++;
     state = STATE_CONNECTING;
-    center->create_time_event(0, read_handler);
+    center->dispatch_event_external(read_handler);
     return 0;
   }
 
@@ -760,7 +770,7 @@ void AsyncConnection::process()
           // if send_message always successfully send, it may have no
           // opportunity to send seq ack. 10 is a experience value.
           if (in_seq > in_seq_acked + 10) {
-            center->create_time_event(2, write_handler);
+            center->dispatch_event_external(write_handler);
           }
 
           state = STATE_OPEN;
@@ -771,7 +781,7 @@ void AsyncConnection::process()
             async_msgr->ms_fast_dispatch(message);
             lock.Lock();
           } else {
-            center->create_time_event(1, EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
+            center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
           }
 
           break;
@@ -1158,7 +1168,7 @@ int AsyncConnection::_process_connection()
         // message may in queue between last _try_send and connection ready
         // write event may already notify and we need to force scheduler again
         if (is_queued())
-          center->create_time_event(1, write_handler);
+          center->dispatch_event_external(write_handler);
 
         break;
       }
@@ -1870,7 +1880,8 @@ void AsyncConnection::fault()
   }
 
   // woke up again;
-  center->create_time_event(backoff, read_handler);
+  register_time_events.insert(center->create_time_event(
+          backoff, EventCallbackRef(new C_time_wakeup(this))));
 }
 
 void AsyncConnection::was_session_reset()
@@ -1901,11 +1912,13 @@ void AsyncConnection::_stop()
   if (sd > 0)
     ::close(sd);
   sd = -1;
+  for (set<uint64_t>::iterator it = register_time_events.begin();
+       it != register_time_events.end(); ++it)
+    center->delete_time_event(*it);
   async_msgr->unregister_conn(this);
   // Here we need to dispatch "signal" event, because we want to ensure signal
   // it after all events called by this "_stop" has be done.
   center->dispatch_event_external(signal_handler);
-  put();
 }
 
 int AsyncConnection::_send(Message *m)
@@ -2123,6 +2136,14 @@ void AsyncConnection::handle_write()
   fault();
 }
 
+void AsyncConnection::wakeup_from(uint64_t id)
+{
+  lock.Lock();
+  register_time_events.erase(id);
+  lock.Unlock();
+  process();
+}
+
 void AsyncConnection::local_deliver()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
index e2e60fb2943f82a7564c9987d9a6a7ac40a7dede..d12b79fa4435814e906263d9f408861e37617c08 100644 (file)
@@ -264,6 +264,7 @@ class AsyncConnection : public Connection {
   struct iovec msgvec[IOV_LEN];
   Mutex stop_lock; // used to protect `mark_down_cond`
   Cond stop_cond;
+  set<uint64_t> register_time_events; // need to delete it if stop
 
   // Tis section are temp variables used by state transition
 
@@ -303,6 +304,7 @@ class AsyncConnection : public Connection {
   // used by eventcallback
   void handle_write();
   void process();
+  void wakeup_from(uint64_t id);
   // Helper: only called by C_handle_stop
   void stop() {
     Mutex::Locker l(lock);
index 95d7111db051364855cf2356a5b65bc0f4583115..81772ba24c8b4bae2c7877e87c4809c0b08d680e 100644 (file)
@@ -624,7 +624,6 @@ void AsyncMessenger::mark_down_all()
     AsyncConnectionRef p = *q;
     ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
     p->mark_down();
-    p->get();
     ms_deliver_handle_reset(p.get());
   }
   accepting_conns.clear();
@@ -635,7 +634,6 @@ void AsyncMessenger::mark_down_all()
     ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
     conns.erase(it);
     p->mark_down();
-    p->get();
     ms_deliver_handle_reset(p.get());
   }
   lock.Unlock();
@@ -648,7 +646,6 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
   if (p) {
     ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
     p->mark_down();
-    p->get();
     ms_deliver_handle_reset(p.get());
   } else {
     ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
index 4d01e266546447b502a6c15b248afba6cf0941c7..0f758fe474f0fe88ab42afdf79c4cecdad80bdee 100644 (file)
@@ -323,6 +323,7 @@ private:
     Mutex::Locker l(deleted_lock);
     if (deleted_conns.count(p->second)) {
       deleted_conns.erase(p->second);
+      p->second->put();
       conns.erase(p);
       return NULL;
     }
index 15dccbee8d1db57f19e75667567f5f52a950cd85..27f2d7d3538379889cc61737d00f399ce29839ec 100644 (file)
@@ -188,6 +188,28 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef
   return id;
 }
 
+// TODO: Ineffective implementation now!
+void EventCenter::delete_time_event(uint64_t id)
+{
+  ldout(cct, 10) << __func__ << " id=" << id << dendl;
+  if (id >= time_event_next_id)
+    return ;
+
+
+  for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+       it != time_events.end(); ++it) {
+    for (list<TimeEvent>::iterator j = it->second.begin();
+         j != it->second.end(); ++j) {
+      if (j->id == id) {
+        it->second.erase(j);
+        if (it->second.empty())
+          time_events.erase(it);
+        return ;
+      }
+    }
+  }
+}
+
 void EventCenter::wakeup()
 {
   ldout(cct, 1) << __func__ << dendl;
index a35c9fe9ecb3bd4e97f5c5e9a13d56e96f31599f..a6ff341930fd09f7e46715fe6a854e836dedee3c 100644 (file)
@@ -141,6 +141,7 @@ class EventCenter {
   int create_file_event(int fd, int mask, EventCallbackRef ctxt);
   uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
   void delete_file_event(int fd, int mask);
+  void delete_time_event(uint64_t id);
   int process_events(int timeout_microseconds);
   void wakeup();