]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Don't discard out_q and unregister when replacing
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 13 Jan 2015 15:52:27 +0000 (23:52 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:13 +0000 (03:07 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/EventEpoll.cc

index b7d3b374aff9441addb47e6c10a8b768837d2eaa..bae0511ff52bc3768005a64b9273c6d7de7fd9f0 100644 (file)
@@ -188,6 +188,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
 
 AsyncConnection::~AsyncConnection()
 {
+  assert(out_q.empty());
+  assert(sent.empty());
   assert(!authorizer);
   if (recv_buf)
     delete recv_buf;
@@ -1555,6 +1557,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     goto fail;
   }
 
+  if (existing == this)
+    existing = NULL;
   if (existing) {
     if (connect.global_seq < existing->peer_global_seq) {
       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
@@ -1692,10 +1696,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   // Here we use "_stop" instead of "mark_down" because "mark_down" is a async
   // operation, but now we need ensure all variables in `existing` is cleaned up
   // and we will reuse it next.
-  existing->_stop();
+  existing->_stop(true);
   if (existing->policy.lossy) {
     // disconnect from the Connection
     center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing)));
+    existing->discard_out_queue();
   } else {
     // queue a reset on the new connection, which we're dumping for the old
     center->dispatch_event_external(reset_handler);
@@ -1726,7 +1731,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     swap(existing->sd, sd);
     existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
     existing->open_write = false;
-    existing->discard_out_queue();
     existing->replacing = true;
     _stop();
     existing->lock.Unlock();
@@ -1974,6 +1978,7 @@ void AsyncConnection::fault()
   if (policy.lossy && state != STATE_CONNECTING) {
     ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
     center->dispatch_event_external(reset_handler);
+    discard_out_queue();
     _stop();
     return ;
   }
@@ -2039,14 +2044,17 @@ void AsyncConnection::was_session_reset()
 }
 
 // *note: `async` is true only happen when replacing connection process
-void AsyncConnection::_stop()
+void AsyncConnection::_stop(bool replacing)
 {
   assert(lock.is_locked());
   ldout(async_msgr->cct, 10) << __func__ << dendl;
   if (sd > 0)
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
 
-  async_msgr->unregister_conn(this);
+  if (!replacing) {
+    discard_out_queue();
+    async_msgr->unregister_conn(this);
+  }
 
   if (async_msgr->cct->_conf->ms_inject_internal_delays) {
     ldout(msgr->cct, 10) << __func__ << " sleep for "
@@ -2058,7 +2066,6 @@ void AsyncConnection::_stop()
   }
 
   shutdown_socket();
-  discard_out_queue();
   open_write = false;
   state_offset = 0;
   state = STATE_CLOSED;
index 515b1d700d2a042ca3b6fae7dfce9e1c154d80c3..690f5863725d3dd836dee56338fc5bfc723a6972 100644 (file)
@@ -53,7 +53,7 @@ class AsyncConnection : public Connection {
   int read_until(uint64_t needed, char *p);
   int _process_connection();
   void _connect();
-  void _stop();
+  void _stop(bool discard=true);
   int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
   int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
   void was_session_reset();
index 65e31615fcf47eb2884aceacbba06352c397fcc7..c33dd3d51f886d83704c8e663a9dddb9dab7f829 100644 (file)
@@ -47,7 +47,7 @@ int EpollDriver::init(int nevent)
 int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
 {
   ldout(cct, 20) << __func__ << " add event fd=" << fd << " cur_mask=" << cur_mask
-                 << " add_mask=" << add_mask << dendl;
+                 << " add_mask=" << add_mask << " to " << epfd << dendl;
   struct epoll_event ee;
   /* If the fd was already monitored for some event, we need a MOD
    * operation. Otherwise we need an ADD operation. */
@@ -63,8 +63,8 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
   ee.data.u64 = 0; /* avoid valgrind warning */
   ee.data.fd = fd;
   if (epoll_ctl(epfd, op, fd, &ee) == -1) {
-    lderr(cct) << __func__ << " unable to add event: "
-                       << cpp_strerror(errno) << dendl;
+    lderr(cct) << __func__ << " epoll_ctl: add fd=" << fd << " failed. "
+               << cpp_strerror(errno) << dendl;
     return -errno;
   }
 
@@ -74,7 +74,7 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
 void EpollDriver::del_event(int fd, int cur_mask, int delmask)
 {
   ldout(cct, 20) << __func__ << " del event fd=" << fd << " cur_mask=" << cur_mask
-                 << " delmask=" << delmask << dendl;
+                 << " delmask=" << delmask << " to " << epfd << dendl;
   struct epoll_event ee;
   int mask = cur_mask & (~delmask);