From: Haomai Wang Date: Thu, 15 Jan 2015 07:04:48 +0000 (+0800) Subject: AsyncConnection: Fix memory leak for AsyncConnection X-Git-Tag: v0.93~247^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4d0e0ae3b5a15bc3532287f2220c9ddd3697c3cd;p=ceph.git AsyncConnection: Fix memory leak for AsyncConnection *_handler will store a reference to AsyncConnection, it need to explicit reset it. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index d9803fabaae6..9eb584c39df3 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -140,6 +140,16 @@ class C_local_deliver : public EventCallback { } }; + +class C_clean_handler : public EventCallback { + AsyncConnectionRef conn; + public: + C_clean_handler(AsyncConnectionRef c): conn(c) {} + void do_request(int id) { + conn->cleanup_handler(); + } +}; + static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment @@ -192,9 +202,9 @@ AsyncConnection::~AsyncConnection() assert(sent.empty()); assert(!authorizer); if (recv_buf) - delete recv_buf; + delete[] recv_buf; if (state_buffer) - delete state_buffer; + delete[] state_buffer; } /* return -1 means `fd` occurs error or closed, it should be closed @@ -785,6 +795,7 @@ void AsyncConnection::process() } else { if (session_security->check_message_signature(message)) { ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl; + message->put(); goto fail; } } @@ -1688,7 +1699,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (existing->policy.lossy) { // disconnect from the Connection - center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing))); + existing->center->dispatch_event_external(existing->reset_handler); existing->_stop(); } else { // queue a reset on the new connection, which we're dumping for the old @@ -1875,16 +1886,19 @@ int AsyncConnection::send_message(Message *m) // we want to handle fault within internal thread center->dispatch_event_external(write_handler); } + } else if (state == STATE_CLOSED) { + ldout(async_msgr->cct, 10) << __func__ << " connection closed." + << " Drop message " << m << dendl; + } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection + ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; + local_messages.push_back(m); + center->dispatch_event_external(local_deliver_handler); } else { out_q[m->get_priority()].push_back(m); - if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) { + if (state == STATE_STANDBY && !policy.server) { ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) << " policy.server is false" << dendl; _connect(); - } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection - ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; - local_messages.push_back(m); - center->dispatch_event_external(local_deliver_handler); } else if (sd > 0 && !open_write) { center->dispatch_event_external(write_handler); } @@ -1974,7 +1988,7 @@ void AsyncConnection::fault() return ; } - if (policy.lossy && state != STATE_CONNECTING) { + if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) { ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl; center->dispatch_event_external(reset_handler); _stop(); @@ -2044,7 +2058,6 @@ void AsyncConnection::was_session_reset() once_session_reset = true; } -// *note: `async` is true only happen when replacing connection process void AsyncConnection::_stop() { assert(lock.is_locked()); @@ -2074,6 +2087,8 @@ void AsyncConnection::_stop() for (set::iterator it = register_time_events.begin(); it != register_time_events.end(); ++it) center->delete_time_event(*it); + // Make sure in-queue events will been processed + center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); } int AsyncConnection::_send(Message *m) @@ -2206,8 +2221,10 @@ void AsyncConnection::send_keepalive() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; Mutex::Locker l(lock); - keepalive = true; - center->dispatch_event_external(write_handler); + if (state != STATE_CLOSED) { + keepalive = true; + center->dispatch_event_external(write_handler); + } } void AsyncConnection::mark_down() diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index cb53b6545ae8..25a21b4ff2a8 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -285,8 +285,17 @@ class AsyncConnection : public Connection { void wakeup_from(uint64_t id); void local_deliver(); void stop() { - mark_down(); center->dispatch_event_external(reset_handler); + mark_down(); + } + void cleanup_handler() { + read_handler.reset(); + write_handler.reset(); + reset_handler.reset(); + remote_reset_handler.reset(); + connect_handler.reset(); + accept_handler.reset(); + local_deliver_handler.reset(); } }; /* AsyncConnection */ diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 48241aacd4a5..85d480d0eb6b 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -430,6 +430,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, AsyncMessenger::~AsyncMessenger() { assert(!did_bind); // either we didn't bind or we shut down the Processor + local_connection->mark_down(); } void AsyncMessenger::ready() @@ -700,7 +701,6 @@ void AsyncMessenger::mark_down_all() set::iterator it = deleted_conns.begin(); AsyncConnectionRef p = *it; ldout(cct, 5) << __func__ << " delete " << p << dendl; - p->put(); deleted_conns.erase(it); } } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 44b4da1d5188..e0061aa09142 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -351,7 +351,6 @@ private: Mutex::Locker l(deleted_lock); if (deleted_conns.count(p->second)) { deleted_conns.erase(p->second); - p->second->put(); conns.erase(p); return NULL; } @@ -393,7 +392,6 @@ public: Mutex::Locker l(deleted_lock); if (deleted_conns.count(existing)) { deleted_conns.erase(existing); - existing->put(); } else if (conn != existing) { return -1; } diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 99a90c50237d..99850cfe317b 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -156,6 +156,11 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) void EventCenter::delete_file_event(int fd, int mask) { Mutex::Locker l(file_lock); + if (fd > nevent) { + ldout(cct, 1) << __func__ << " delete event fd=" << fd << " exceed nevent=" << nevent + << "mask=" << mask << dendl; + return ; + } EventCenter::FileEvent *event = _get_file_event(fd); ldout(cct, 20) << __func__ << " delete event started fd=" << fd << " mask=" << mask << " original mask is " << event->mask << dendl; @@ -364,7 +369,8 @@ int EventCenter::process_events(int timeout_microseconds) EventCallbackRef e = external_events.front(); external_events.pop_front(); external_lock.Unlock(); - e->do_request(0); + if (e) + e->do_request(0); external_lock.Lock(); } external_lock.Unlock(); diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 0cc4efd49992..729500c00d18 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -116,6 +116,7 @@ class EventCenter { int process_time_events(); FileEvent *_get_file_event(int fd) { + assert(fd < nevent); FileEvent *p = &file_events[fd]; if (!p->mask) new(p) FileEvent(); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 25cfa6edd113..85ad067978a9 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -40,11 +40,11 @@ typedef boost::mt11213b gen_type; #if GTEST_HAS_PARAM_TEST #define CHECK_AND_WAIT_TRUE(expr) do { \ - int n = 50; \ + int n = 1000; \ while (--n) { \ if (expr) \ break; \ - usleep(100); \ + usleep(1000); \ } \ } while(0); @@ -140,6 +140,7 @@ class FakeDispatcher : public Dispatcher { } got_new = true; cond.Signal(); + m->put(); return true; } bool ms_handle_reset(Connection *con) { @@ -180,6 +181,7 @@ class FakeDispatcher : public Dispatcher { } got_new = true; cond.Signal(); + m->put(); } bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, @@ -486,6 +488,7 @@ TEST_P(MessengerTest, StatelessTest) { // 2. test for client lossy server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); + conn->send_keepalive(); CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); conn = client_msgr->get_connection(server_msgr->get_myinst()); @@ -534,25 +537,32 @@ TEST_P(MessengerTest, ClientStandbyTest) { ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_FALSE(cli_dispatcher.got_remote_reset); + cli_dispatcher.got_connect = false; server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); // client should be standby usleep(300*1000); // client should be standby, so we use original connection { - m = new MPing(); conn->send_keepalive(); + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_remote_reset) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_remote_reset = false; + while (!cli_dispatcher.got_connect) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_connect = false; + } CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); + m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); Mutex::Locker l(cli_dispatcher.lock); while (!cli_dispatcher.got_new) cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - // resetcheck for client, so it discard state previously - ASSERT_TRUE(cli_dispatcher.got_remote_reset); - cli_dispatcher.got_remote_reset = false; ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); @@ -624,6 +634,7 @@ class SyntheticDispatcher : public Dispatcher { } got_new = true; cond.Signal(); + m->put(); } bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, @@ -788,7 +799,8 @@ class SyntheticWorkload { boost::uniform_int<> u(32, max_message_len); uint64_t value_len = u(rng); bufferptr bp(value_len); - for (uint64_t j = 0; j < value_len; ) { + bp.zero(); + for (uint64_t j = 0; j < value_len-sizeof(i); ) { memcpy(bp.c_str()+j, &i, sizeof(i)); j += 4096; } @@ -882,12 +894,17 @@ class SyntheticWorkload { it != available_servers.end(); ++it) { (*it)->shutdown(); (*it)->wait(); + delete (*it); } + available_servers.clear(); + for (set::iterator it = available_clients.begin(); it != available_clients.end(); ++it) { (*it)->shutdown(); (*it)->wait(); + delete (*it); } + available_clients.clear(); } }; @@ -953,7 +970,7 @@ TEST_P(MessengerTest, SyntheticInjectTest) { class MarkdownDispatcher : public Dispatcher { Mutex lock; - set conns; + set conns; bool last_mark; public: atomic_t count; @@ -982,13 +999,13 @@ class MarkdownDispatcher : public Dispatcher { cerr << __func__ << " conn: " << m->get_connection() << std::endl; Mutex::Locker l(lock); count.inc(); - conns.insert(m->get_connection().get()); + conns.insert(m->get_connection()); if (conns.size() < 2 && !last_mark) return true; last_mark = true; usleep(rand() % 500); - for (set::iterator it = conns.begin(); it != conns.end(); ++it) { + for (set::iterator it = conns.begin(); it != conns.end(); ++it) { if ((*it) != m->get_connection().get()) { (*it)->mark_down(); conns.erase(it); @@ -997,6 +1014,7 @@ class MarkdownDispatcher : public Dispatcher { } if (conns.empty()) last_mark = false; + m->put(); return true; } bool ms_handle_reset(Connection *con) {