]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Fix memory leak for AsyncConnection 3211/head
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 07:04:48 +0000 (15:04 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 17 Jan 2015 11:52:19 +0000 (19:52 +0800)
*_handler will store a reference to AsyncConnection, it need to explicit reset
it.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.cc
src/msg/async/Event.h
src/test/msgr/test_msgr.cc

index d9803fabaae64d9d9769ac7d90b40a5752bc8c26..9eb584c39df374ba2a95fa876cf64b14c54e40b8 100644 (file)
@@ -140,6 +140,16 @@ class C_local_deliver : public EventCallback {
   }
 };
 
+
+class C_clean_handler : public EventCallback {
+  AsyncConnectionRef conn;
+ public:
+  C_clean_handler(AsyncConnectionRef c): conn(c) {}
+  void do_request(int id) {
+    conn->cleanup_handler();
+  }
+};
+
 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 {
   // create a buffer to read into that matches the data alignment
@@ -192,9 +202,9 @@ AsyncConnection::~AsyncConnection()
   assert(sent.empty());
   assert(!authorizer);
   if (recv_buf)
-    delete recv_buf;
+    delete[] recv_buf;
   if (state_buffer)
-    delete state_buffer;
+    delete[] state_buffer;
 }
 
 /* return -1 means `fd` occurs error or closed, it should be closed
@@ -785,6 +795,7 @@ void AsyncConnection::process()
           } else {
             if (session_security->check_message_signature(message)) {
               ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl;
+              message->put();
               goto fail;
             }
           }
@@ -1688,7 +1699,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
 
   if (existing->policy.lossy) {
     // disconnect from the Connection
-    center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing)));
+    existing->center->dispatch_event_external(existing->reset_handler);
     existing->_stop();
   } else {
     // queue a reset on the new connection, which we're dumping for the old
@@ -1875,16 +1886,19 @@ int AsyncConnection::send_message(Message *m)
       // we want to handle fault within internal thread
       center->dispatch_event_external(write_handler);
     }
+  } else if (state == STATE_CLOSED) {
+      ldout(async_msgr->cct, 10) << __func__ << " connection closed."
+                                 << " Drop message " << m << dendl;
+  } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
+      ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
+      local_messages.push_back(m);
+      center->dispatch_event_external(local_deliver_handler);
   } else {
     out_q[m->get_priority()].push_back(m);
-    if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) {
+    if (state == STATE_STANDBY && !policy.server) {
       ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
                                  << " policy.server is false" << dendl;
       _connect();
-    } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
-      ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
-      local_messages.push_back(m);
-      center->dispatch_event_external(local_deliver_handler);
     } else if (sd > 0 && !open_write) {
       center->dispatch_event_external(write_handler);
     }
@@ -1974,7 +1988,7 @@ void AsyncConnection::fault()
     return ;
   }
 
-  if (policy.lossy && state != STATE_CONNECTING) {
+  if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
     ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
     center->dispatch_event_external(reset_handler);
     _stop();
@@ -2044,7 +2058,6 @@ void AsyncConnection::was_session_reset()
   once_session_reset = true;
 }
 
-// *note: `async` is true only happen when replacing connection process
 void AsyncConnection::_stop()
 {
   assert(lock.is_locked());
@@ -2074,6 +2087,8 @@ void AsyncConnection::_stop()
   for (set<uint64_t>::iterator it = register_time_events.begin();
        it != register_time_events.end(); ++it)
     center->delete_time_event(*it);
+  // Make sure in-queue events will been processed
+  center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
 }
 
 int AsyncConnection::_send(Message *m)
@@ -2206,8 +2221,10 @@ void AsyncConnection::send_keepalive()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
   Mutex::Locker l(lock);
-  keepalive = true;
-  center->dispatch_event_external(write_handler);
+  if (state != STATE_CLOSED) {
+    keepalive = true;
+    center->dispatch_event_external(write_handler);
+  }
 }
 
 void AsyncConnection::mark_down()
index cb53b6545ae89072da93e1e0926b8973c0766254..25a21b4ff2a8a23866250887bfe02fd85cb1593f 100644 (file)
@@ -285,8 +285,17 @@ class AsyncConnection : public Connection {
   void wakeup_from(uint64_t id);
   void local_deliver();
   void stop() {
-    mark_down();
     center->dispatch_event_external(reset_handler);
+    mark_down();
+  }
+  void cleanup_handler() {
+    read_handler.reset();
+    write_handler.reset();
+    reset_handler.reset();
+    remote_reset_handler.reset();
+    connect_handler.reset();
+    accept_handler.reset();
+    local_deliver_handler.reset();
   }
 }; /* AsyncConnection */
 
index 48241aacd4a512b0ea122a3c5c19a8f5624c46d0..85d480d0eb6bd924dd2117a4ba795c4d20501142 100644 (file)
@@ -430,6 +430,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
 AsyncMessenger::~AsyncMessenger()
 {
   assert(!did_bind); // either we didn't bind or we shut down the Processor
+  local_connection->mark_down();
 }
 
 void AsyncMessenger::ready()
@@ -700,7 +701,6 @@ void AsyncMessenger::mark_down_all()
       set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
       AsyncConnectionRef p = *it;
       ldout(cct, 5) << __func__ << " delete " << p << dendl;
-      p->put();
       deleted_conns.erase(it);
     }
   }
index 44b4da1d5188dab76d4ce902e46386ca5fc91bf2..e0061aa0914201befac4d694562da8df8e9e8ef5 100644 (file)
@@ -351,7 +351,6 @@ private:
     Mutex::Locker l(deleted_lock);
     if (deleted_conns.count(p->second)) {
       deleted_conns.erase(p->second);
-      p->second->put();
       conns.erase(p);
       return NULL;
     }
@@ -393,7 +392,6 @@ public:
       Mutex::Locker l(deleted_lock);
       if (deleted_conns.count(existing)) {
         deleted_conns.erase(existing);
-        existing->put();
       } else if (conn != existing) {
         return -1;
       }
index 99a90c50237dd14904447a50740ad42a64358e71..99850cfe317b64fb3578ecbd542fb4941ec1ab49 100644 (file)
@@ -156,6 +156,11 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 void EventCenter::delete_file_event(int fd, int mask)
 {
   Mutex::Locker l(file_lock);
+  if (fd > nevent) {
+    ldout(cct, 1) << __func__ << " delete event fd=" << fd << " exceed nevent=" << nevent
+                  << "mask=" << mask << dendl;
+    return ;
+  }
   EventCenter::FileEvent *event = _get_file_event(fd);
   ldout(cct, 20) << __func__ << " delete event started fd=" << fd << " mask=" << mask
                  << " original mask is " << event->mask << dendl;
@@ -364,7 +369,8 @@ int EventCenter::process_events(int timeout_microseconds)
       EventCallbackRef e = external_events.front();
       external_events.pop_front();
       external_lock.Unlock();
-      e->do_request(0);
+      if (e)
+        e->do_request(0);
       external_lock.Lock();
     }
     external_lock.Unlock();
index 0cc4efd499926218903e7666c7ffd6935f3147e2..729500c00d1840a62af6f101be2d015421eb3f56 100644 (file)
@@ -116,6 +116,7 @@ class EventCenter {
 
   int process_time_events();
   FileEvent *_get_file_event(int fd) {
+    assert(fd < nevent);
     FileEvent *p = &file_events[fd];
     if (!p->mask)
       new(p) FileEvent();
index 25cfa6edd113b0b14f620befcc9a0bb7f73fe05f..85ad067978a90c7874b3bfb3f8b95f95aceaa2f4 100644 (file)
@@ -40,11 +40,11 @@ typedef boost::mt11213b gen_type;
 #if GTEST_HAS_PARAM_TEST
 
 #define CHECK_AND_WAIT_TRUE(expr) do {  \
-  int n = 50;                           \
+  int n = 1000;                         \
   while (--n) {                         \
     if (expr)                           \
       break;                            \
-    usleep(100);                        \
+    usleep(1000);                       \
   }                                     \
 } while(0);
 
@@ -140,6 +140,7 @@ class FakeDispatcher : public Dispatcher {
     }
     got_new = true;
     cond.Signal();
+    m->put();
     return true;
   }
   bool ms_handle_reset(Connection *con) {
@@ -180,6 +181,7 @@ class FakeDispatcher : public Dispatcher {
     }
     got_new = true;
     cond.Signal();
+    m->put();
   }
 
   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
@@ -486,6 +488,7 @@ TEST_P(MessengerTest, StatelessTest) {
   // 2. test for client lossy
   server_conn->mark_down();
   ASSERT_FALSE(server_conn->is_connected());
+  conn->send_keepalive();
   CHECK_AND_WAIT_TRUE(!conn->is_connected());
   ASSERT_FALSE(conn->is_connected());
   conn = client_msgr->get_connection(server_msgr->get_myinst());
@@ -534,25 +537,32 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
+  cli_dispatcher.got_connect = false;
   server_conn->mark_down();
   ASSERT_FALSE(server_conn->is_connected());
   // client should be standby
   usleep(300*1000);
   // client should be standby, so we use original connection
   {
-    m = new MPing();
     conn->send_keepalive();
+    {
+      Mutex::Locker l(cli_dispatcher.lock);
+      while (!cli_dispatcher.got_remote_reset)
+        cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+      cli_dispatcher.got_remote_reset = false;
+      while (!cli_dispatcher.got_connect)
+        cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+      cli_dispatcher.got_connect = false;
+    }
     CHECK_AND_WAIT_TRUE(conn->is_connected());
     ASSERT_TRUE(conn->is_connected());
+    m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
     while (!cli_dispatcher.got_new)
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  // resetcheck for client, so it discard state previously
-  ASSERT_TRUE(cli_dispatcher.got_remote_reset);
-  cli_dispatcher.got_remote_reset = false;
   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
@@ -624,6 +634,7 @@ class SyntheticDispatcher : public Dispatcher {
     }
     got_new = true;
     cond.Signal();
+    m->put();
   }
 
   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
@@ -788,7 +799,8 @@ class SyntheticWorkload {
       boost::uniform_int<> u(32, max_message_len);
       uint64_t value_len = u(rng);
       bufferptr bp(value_len);
-      for (uint64_t j = 0; j < value_len; ) {
+      bp.zero();
+      for (uint64_t j = 0; j < value_len-sizeof(i); ) {
         memcpy(bp.c_str()+j, &i, sizeof(i));
         j += 4096;
       }
@@ -882,12 +894,17 @@ class SyntheticWorkload {
          it != available_servers.end(); ++it) {
       (*it)->shutdown();
       (*it)->wait();
+      delete (*it);
     }
+    available_servers.clear();
+
     for (set<Messenger*>::iterator it = available_clients.begin();
          it != available_clients.end(); ++it) {
       (*it)->shutdown();
       (*it)->wait();
+      delete (*it);
     }
+    available_clients.clear();
   }
 };
 
@@ -953,7 +970,7 @@ TEST_P(MessengerTest, SyntheticInjectTest) {
 
 class MarkdownDispatcher : public Dispatcher {
   Mutex lock;
-  set<Connection*> conns;
+  set<ConnectionRef> conns;
   bool last_mark;
  public:
   atomic_t count;
@@ -982,13 +999,13 @@ class MarkdownDispatcher : public Dispatcher {
     cerr << __func__ << " conn: " << m->get_connection() << std::endl;
     Mutex::Locker l(lock);
     count.inc();
-    conns.insert(m->get_connection().get());
+    conns.insert(m->get_connection());
     if (conns.size() < 2 && !last_mark)
       return true;
 
     last_mark = true;
     usleep(rand() % 500);
-    for (set<Connection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
+    for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
       if ((*it) != m->get_connection().get()) {
         (*it)->mark_down();
         conns.erase(it);
@@ -997,6 +1014,7 @@ class MarkdownDispatcher : public Dispatcher {
     }
     if (conns.empty())
       last_mark = false;
+    m->put();
     return true;
   }
   bool ms_handle_reset(Connection *con) {