From 898d43dbee821d084ab7aa3700774f0507a0520b Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 14 Jan 2015 15:01:37 +0800 Subject: [PATCH] async: adjust test_msgr and normalize log output format Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 8 ++++---- src/msg/async/Event.cc | 21 ++++++++++++++++----- src/msg/async/Event.h | 2 ++ src/msg/async/net_handler.cc | 10 +++++----- src/test/msgr/test_msgr.cc | 15 ++++++++------- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 157ec77f7034c..556704ec8e4db 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1578,8 +1578,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq - << " vs existing " << existing->connect_seq - << " state " << existing->state << dendl; + << " vs existing csq=" << existing->connect_seq << " state=" + << get_state_name(existing->state) << dendl; if (connect.connect_seq == 0 && existing->connect_seq > 0) { ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl; @@ -1677,7 +1677,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (existing->replacing || existing->state == STATE_CLOSED) { ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." << " state=" << get_state_name(existing->state) << dendl; - reply.connect_seq = existing->connect_seq + 1; + reply.connect_seq = connect.connect_seq + 1; r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); existing->lock.Unlock(); if (r < 0) @@ -1734,7 +1734,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " - << connect_seq << ", sending READY" << dendl; + << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl; int next_state; diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index e54d8cce21b7b..2cb9c1daf0fff 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -32,7 +32,12 @@ #define dout_subsys ceph_subsys_ms #undef dout_prefix -#define dout_prefix *_dout << "Event " +#define dout_prefix _event_prefix(_dout) +ostream& EventCenter::_event_prefix(std::ostream *_dout) +{ + return *_dout << "Event(" << this << " owner=" << get_owner() << " nevent=" << nevent + << " time_id=" << time_event_next_id << ")."; +} class C_handle_notify : public EventCallback { public: @@ -127,6 +132,10 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) } EventCenter::FileEvent *event = _get_file_event(fd); + ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask + << " original mask is " << event->mask << dendl; + if (event->mask == mask) + return 0; r = driver->add_event(fd, event->mask, mask); if (r < 0) @@ -139,8 +148,8 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) if (mask & EVENT_WRITABLE) { event->write_cb = ctxt; } - ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask - << " now mask is " << event->mask << dendl; + ldout(cct, 10) << __func__ << " create event end fd=" << fd << " mask=" << mask + << " original mask is " << event->mask << dendl; return 0; } @@ -148,6 +157,8 @@ void EventCenter::delete_file_event(int fd, int mask) { Mutex::Locker l(file_lock); EventCenter::FileEvent *event = _get_file_event(fd); + ldout(cct, 20) << __func__ << " delete event started fd=" << fd << " mask=" << mask + << " original mask is " << event->mask << dendl; if (!event->mask) return ; @@ -161,8 +172,8 @@ void EventCenter::delete_file_event(int fd, int mask) } event->mask = event->mask & (~mask); - ldout(cct, 10) << __func__ << " delete fd=" << fd << " mask=" << mask - << " now mask is " << event->mask << dendl; + ldout(cct, 10) << __func__ << " delete event end fd=" << fd << " mask=" << mask + << " original mask is " << event->mask << dendl; } uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 85980478cee9a..0cc4efd499926 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -134,6 +134,8 @@ class EventCenter { last_time = time(NULL); } ~EventCenter(); + ostream& _event_prefix(std::ostream *_dout); + int init(int nevent); void set_owner(pthread_t p) { owner = p; } pthread_t get_owner() { return owner; } diff --git a/src/msg/async/net_handler.cc b/src/msg/async/net_handler.cc index afe3909b20094..6f0e88cadd1b1 100644 --- a/src/msg/async/net_handler.cc +++ b/src/msg/async/net_handler.cc @@ -25,7 +25,7 @@ #define dout_subsys ceph_subsys_ms #undef dout_prefix -#define dout_prefix *_dout << "net_handler: " +#define dout_prefix *_dout << "NetHandler " namespace ceph{ @@ -42,7 +42,7 @@ int NetHandler::create_socket(int domain, bool reuse_addr) * will be able to close/open sockets a zillion of times */ if (reuse_addr) { if (::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { - lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: %s" + lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: " << strerror(errno) << dendl; close(s); return -errno; @@ -60,11 +60,11 @@ int NetHandler::set_nonblock(int sd) * Note that fcntl(2) for F_GETFL and F_SETFL can't be * interrupted by a signal. */ if ((flags = fcntl(sd, F_GETFL)) < 0 ) { - lderr(cct) << __func__ << " fcntl(F_GETFL) failed: %s" << strerror(errno) << dendl; + lderr(cct) << __func__ << " fcntl(F_GETFL) failed: " << strerror(errno) << dendl; return -errno; } if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) { - lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): %s" << strerror(errno) << dendl; + lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): " << strerror(errno) << dendl; return -errno; } @@ -121,7 +121,7 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock) if (errno == EINPROGRESS && nonblock) return s; - lderr(cct) << __func__ << " connect: %s " << strerror(errno) << dendl; + lderr(cct) << __func__ << " connect: " << strerror(errno) << dendl; close(s); return -errno; } diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index d17226367f61f..25cfa6edd113b 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -748,7 +748,7 @@ class SyntheticWorkload { vector rand_data; public: - static const unsigned max_in_flight = 512; + static const unsigned max_in_flight = 64; static const unsigned max_connections = 128; static const unsigned max_message_len = 1024 * 1024 * 4; @@ -905,14 +905,14 @@ TEST_P(MessengerTest, SyntheticStressTest) { } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); - if (val > 85) { + if (val > 90) { test_msg.generate_connection(); - } else if (val > 70) { + } else if (val > 80) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); } else { - usleep(rand() % 500 + 100); + usleep(rand() % 1000 + 500); } } test_msg.wait_for_done(); @@ -920,7 +920,7 @@ TEST_P(MessengerTest, SyntheticStressTest) { TEST_P(MessengerTest, SyntheticInjectTest) { - g_ceph_context->_conf->set_val("ms_inject_socket_failures", "10"); + g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); SyntheticWorkload test_msg(4, 16, GetParam(), 100); for (int i = 0; i < 100; ++i) { @@ -935,9 +935,9 @@ TEST_P(MessengerTest, SyntheticInjectTest) { } boost::uniform_int<> true_false(0, 99); int val = true_false(rng); - if (val > 85) { + if (val > 90) { test_msg.generate_connection(); - } else if (val > 70) { + } else if (val > 80) { test_msg.drop_connection(); } else if (val > 10) { test_msg.send_message(); @@ -1106,6 +1106,7 @@ int main(int argc, char **argv) { g_ceph_context->_conf->set_val("auth_client_required", "none"); g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true"); + g_ceph_context->_conf->set_val("ms_max_backoff", "1"); common_init_finish(g_ceph_context); ::testing::InitGoogleTest(&argc, argv); -- 2.39.5