]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
async: adjust test_msgr and normalize log output format
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 14 Jan 2015 07:01:37 +0000 (15:01 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:13 +0000 (03:07 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/Event.cc
src/msg/async/Event.h
src/msg/async/net_handler.cc
src/test/msgr/test_msgr.cc

index 157ec77f7034c6d4a84e82f2975ad298b9ed28d0..556704ec8e4db9a6b4728e595b2938a7e99b105e 100644 (file)
@@ -1578,8 +1578,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     }
 
     ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
-                              << " vs existing " << existing->connect_seq
-                              << " state " << existing->state << dendl;
+                              << " vs existing csq=" << existing->connect_seq << " state="
+                              << get_state_name(existing->state) << dendl;
 
     if (connect.connect_seq == 0 && existing->connect_seq > 0) {
       ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl;
@@ -1677,7 +1677,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (existing->replacing || existing->state == STATE_CLOSED) {
     ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
                               << " state=" << get_state_name(existing->state) << dendl;
-    reply.connect_seq = existing->connect_seq + 1;
+    reply.connect_seq = connect.connect_seq + 1;
     r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
     existing->lock.Unlock();
     if (r < 0)
@@ -1734,7 +1734,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
-                             << connect_seq << ", sending READY" << dendl;
+                             << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
 
   int next_state;
 
index e54d8cce21b7b485cd2512bdd91677adc61d43e5..2cb9c1daf0fff2ed00b7d0357b85cebe06ee0b34 100644 (file)
 #define dout_subsys ceph_subsys_ms
 
 #undef dout_prefix
-#define dout_prefix *_dout << "Event "
+#define dout_prefix _event_prefix(_dout)
+ostream& EventCenter::_event_prefix(std::ostream *_dout)
+{
+  return *_dout << "Event(" << this << " owner=" << get_owner() << " nevent=" << nevent
+                << " time_id=" << time_event_next_id << ").";
+}
 
 class C_handle_notify : public EventCallback {
  public:
@@ -127,6 +132,10 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
   }
 
   EventCenter::FileEvent *event = _get_file_event(fd);
+  ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
+  if (event->mask == mask)
+    return 0;
 
   r = driver->add_event(fd, event->mask, mask);
   if (r < 0)
@@ -139,8 +148,8 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
   if (mask & EVENT_WRITABLE) {
     event->write_cb = ctxt;
   }
-  ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask
-                 << " now mask is " << event->mask << dendl;
+  ldout(cct, 10) << __func__ << " create event end fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
   return 0;
 }
 
@@ -148,6 +157,8 @@ void EventCenter::delete_file_event(int fd, int mask)
 {
   Mutex::Locker l(file_lock);
   EventCenter::FileEvent *event = _get_file_event(fd);
+  ldout(cct, 20) << __func__ << " delete event started fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
   if (!event->mask)
     return ;
 
@@ -161,8 +172,8 @@ void EventCenter::delete_file_event(int fd, int mask)
   }
 
   event->mask = event->mask & (~mask);
-  ldout(cct, 10) << __func__ << " delete fd=" << fd << " mask=" << mask
-                 << " now mask is " << event->mask << dendl;
+  ldout(cct, 10) << __func__ << " delete event end fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
 }
 
 uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
index 85980478cee9a2251634e51609c705738400f5b2..0cc4efd499926218903e7666c7ffd6935f3147e2 100644 (file)
@@ -134,6 +134,8 @@ class EventCenter {
     last_time = time(NULL);
   }
   ~EventCenter();
+  ostream& _event_prefix(std::ostream *_dout);
+
   int init(int nevent);
   void set_owner(pthread_t p) { owner = p; }
   pthread_t get_owner() { return owner; }
index afe3909b200941b2fcb0d1ae632630e38adf876d..6f0e88cadd1b1e6360af49656909b0b4ad8f6577 100644 (file)
@@ -25,7 +25,7 @@
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
-#define dout_prefix *_dout << "net_handler: "
+#define dout_prefix *_dout << "NetHandler "
 
 namespace ceph{
 
@@ -42,7 +42,7 @@ int NetHandler::create_socket(int domain, bool reuse_addr)
    * will be able to close/open sockets a zillion of times */
   if (reuse_addr) {
     if (::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
-      lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: %s"
+      lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: "
                  << strerror(errno) << dendl;
       close(s);
       return -errno;
@@ -60,11 +60,11 @@ int NetHandler::set_nonblock(int sd)
    * Note that fcntl(2) for F_GETFL and F_SETFL can't be
    * interrupted by a signal. */
   if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
-    lderr(cct) << __func__ << " fcntl(F_GETFL) failed: %s" << strerror(errno) << dendl;
+    lderr(cct) << __func__ << " fcntl(F_GETFL) failed: " << strerror(errno) << dendl;
     return -errno;
   }
   if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
-    lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): %s" << strerror(errno) << dendl;
+    lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): " << strerror(errno) << dendl;
     return -errno;
   }
 
@@ -121,7 +121,7 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
     if (errno == EINPROGRESS && nonblock)
       return s;
 
-    lderr(cct) << __func__ << " connect: %s " << strerror(errno) << dendl;
+    lderr(cct) << __func__ << " connect: " << strerror(errno) << dendl;
     close(s);
     return -errno;
   }
index d17226367f61f7cf300c5a5b6c91eab1ed695b2f..25cfa6edd113b0b14f620befcc9a0bb7f73fe05f 100644 (file)
@@ -748,7 +748,7 @@ class SyntheticWorkload {
   vector<bufferlist> rand_data;
 
  public:
-  static const unsigned max_in_flight = 512;
+  static const unsigned max_in_flight = 64;
   static const unsigned max_connections = 128;
   static const unsigned max_message_len = 1024 * 1024 * 4;
 
@@ -905,14 +905,14 @@ TEST_P(MessengerTest, SyntheticStressTest) {
     }
     boost::uniform_int<> true_false(0, 99);
     int val = true_false(rng);
-    if (val > 85) {
+    if (val > 90) {
       test_msg.generate_connection();
-    } else if (val > 70) {
+    } else if (val > 80) {
       test_msg.drop_connection();
     } else if (val > 10) {
       test_msg.send_message();
     } else {
-      usleep(rand() % 500 + 100);
+      usleep(rand() % 1000 + 500);
     }
   }
   test_msg.wait_for_done();
@@ -920,7 +920,7 @@ TEST_P(MessengerTest, SyntheticStressTest) {
 
 
 TEST_P(MessengerTest, SyntheticInjectTest) {
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "10");
+  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
   SyntheticWorkload test_msg(4, 16, GetParam(), 100);
   for (int i = 0; i < 100; ++i) {
@@ -935,9 +935,9 @@ TEST_P(MessengerTest, SyntheticInjectTest) {
     }
     boost::uniform_int<> true_false(0, 99);
     int val = true_false(rng);
-    if (val > 85) {
+    if (val > 90) {
       test_msg.generate_connection();
-    } else if (val > 70) {
+    } else if (val > 80) {
       test_msg.drop_connection();
     } else if (val > 10) {
       test_msg.send_message();
@@ -1106,6 +1106,7 @@ int main(int argc, char **argv) {
   g_ceph_context->_conf->set_val("auth_client_required", "none");
   g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
   g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
+  g_ceph_context->_conf->set_val("ms_max_backoff", "1");
   common_init_finish(g_ceph_context);
 
   ::testing::InitGoogleTest(&argc, argv);