]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test_msgr: Add SyntheticWorkload to do message measurement
authorHaomai Wang <haomaiwang@gmail.com>
Mon, 12 Jan 2015 04:14:39 +0000 (12:14 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:12 +0000 (03:07 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/test/msgr/test_msgr.cc

index 639db808689aa61a2f9d5ded12ae710893ac2ad7..5df34f3790e221d0d9659713b742dd7334465307 100644 (file)
 #include "messages/MPing.h"
 #include "messages/MCommand.h"
 
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/binomial_distribution.hpp>
 #include <gtest/gtest.h>
 
+typedef boost::mt11213b gen_type;
+
 #if GTEST_HAS_PARAM_TEST
 
 #define CHECK_AND_WAIT_TRUE(expr) do {  \
@@ -73,7 +78,7 @@ class FakeDispatcher : public Dispatcher {
 
     Session(ConnectionRef c): RefCountedObject(g_ceph_context), lock("FakeDispatcher::Session::lock"), count(0), con(c) {
     }
-    uint64_t get_count() {return count;}
+    uint64_t get_count() { return count; }
   };
 
   Mutex lock;
@@ -97,6 +102,7 @@ class FakeDispatcher : public Dispatcher {
   }
 
   void ms_handle_fast_connect(Connection *con) {
+    lock.Lock();
     cerr << __func__ << con << std::endl;
     Session *s = static_cast<Session*>(con->get_priv());
     if (!s) {
@@ -105,12 +111,12 @@ class FakeDispatcher : public Dispatcher {
       cerr << __func__ << " con: " << con << " count: " << s->count << std::endl;
     }
     s->put();
-    lock.Lock();
     got_connect = true;
     cond.Signal();
     lock.Unlock();
   }
   void ms_handle_fast_accept(Connection *con) {
+    Mutex::Locker l(lock);
     Session *s = static_cast<Session*>(con->get_priv());
     if (!s) {
       s = new Session(con);
@@ -119,24 +125,25 @@ class FakeDispatcher : public Dispatcher {
     s->put();
   }
   bool ms_dispatch(Message *m) {
+    Mutex::Locker l(lock);
     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
     if (!s) {
       s = new Session(m->get_connection());
       m->get_connection()->set_priv(s->get());
     }
     s->put();
-    Mutex::Locker l(s->lock);
+    Mutex::Locker l1(s->lock);
     s->count++;
     cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
-    if (is_server)
+    if (is_server) {
       reply_message(m);
-    lock.Lock();
+    }
     got_new = true;
     cond.Signal();
-    lock.Unlock();
     return true;
   }
   bool ms_handle_reset(Connection *con) {
+    Mutex::Locker l(lock);
     cerr << __func__ << con << std::endl;
     Session *s = static_cast<Session*>(con->get_priv());
     if (s) {
@@ -147,6 +154,7 @@ class FakeDispatcher : public Dispatcher {
     return true;
   }
   void ms_handle_remote_reset(Connection *con) {
+    Mutex::Locker l(lock);
     cerr << __func__ << con << std::endl;
     Session *s = static_cast<Session*>(con->get_priv());
     if (s) {
@@ -157,22 +165,23 @@ class FakeDispatcher : public Dispatcher {
     got_remote_reset = true;
   }
   void ms_fast_dispatch(Message *m) {
+    Mutex::Locker l(lock);
     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
     if (!s) {
       s = new Session(m->get_connection());
       m->get_connection()->set_priv(s->get());
     }
     s->put();
-    Mutex::Locker (s->lock);
+    Mutex::Locker l1(s->lock);
     s->count++;
     cerr << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << std::endl;
-    if (is_server)
+    if (is_server) {
       reply_message(m);
-    lock.Lock();
+    }
     got_new = true;
     cond.Signal();
-    lock.Unlock();
   }
+
   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
                             bufferlist& authorizer, bufferlist& authorizer_reply,
                             bool& isvalid, CryptoKey& session_key) {
@@ -180,7 +189,6 @@ class FakeDispatcher : public Dispatcher {
     return true;
   }
 
-
   void reply_message(Message *m) {
     MPing *rm = new MPing();
     m->get_connection()->send_message(rm);
@@ -555,8 +563,119 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   client_msgr->wait();
 }
 
+class SyntheticDispatcher : public Dispatcher {
+ public:
+  Mutex lock;
+  Cond cond;
+  bool is_server;
+  bool got_new;
+  bool got_remote_reset;
+  bool got_connect;
+  map<ConnectionRef, list<uint64_t> > conn_sent;
+  map<uint64_t, bufferlist> sent;
+  atomic_t index;
+
+  SyntheticDispatcher(bool s): Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"),
+                          is_server(s), got_new(false), got_remote_reset(false),
+                          got_connect(false), index(0) {}
+  bool ms_can_fast_dispatch_any() const { return true; }
+  bool ms_can_fast_dispatch(Message *m) const {
+    switch (m->get_type()) {
+    case CEPH_MSG_PING:
+    case MSG_COMMAND:
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  void ms_handle_fast_connect(Connection *con) {
+    lock.Lock();
+    got_connect = true;
+    cond.Signal();
+    lock.Unlock();
+  }
+  void ms_handle_fast_accept(Connection *con) { }
+  bool ms_dispatch(Message *m) {
+    assert(0);
+  }
+  bool ms_handle_reset(Connection *con) {
+    return true;
+  }
+  void ms_handle_remote_reset(Connection *con) {
+    Mutex::Locker l(lock);
+    got_remote_reset = true;
+  }
+  void ms_fast_dispatch(Message *m) {
+    Mutex::Locker l(lock);
+    if (is_server) {
+      reply_message(m);
+    } else if (m->get_middle().length()) {
+      bufferlist middle = m->get_middle();
+      uint64_t i;
+      ASSERT_EQ(sizeof(uint64_t), middle.length());
+      memcpy(&i, middle.c_str(), middle.length());
+      if (sent.count(i)) {
+        ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
+        ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+        conn_sent[m->get_connection()].pop_front();
+        sent.erase(i);
+      }
+    }
+    got_new = true;
+    cond.Signal();
+  }
+
+  bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
+                            bufferlist& authorizer, bufferlist& authorizer_reply,
+                            bool& isvalid, CryptoKey& session_key) {
+    isvalid = true;
+    return true;
+  }
+
+  void reply_message(Message *m) {
+    MPing *rm = new MPing();
+    if (m->get_data_len())
+      rm->set_data(m->get_data());
+    if (m->get_middle().length())
+      rm->set_middle(m->get_middle());
+    m->get_connection()->send_message(rm);
+  }
+
+  void send_message_wrap(ConnectionRef con, Message *m) {
+    {
+      Mutex::Locker l(lock);
+      bufferlist bl;
+      uint64_t i = index.read();
+      index.inc();
+      bufferptr bp(sizeof(i));
+      memcpy(bp.c_str(), (char*)&i, sizeof(i));
+      bl.push_back(bp);
+      m->set_middle(bl);
+      sent[i] = m->get_data();
+      conn_sent[con].push_back(i);
+    }
+    ASSERT_EQ(con->send_message(m), 0);
+  }
+
+  uint64_t get_pending() {
+    Mutex::Locker l(lock);
+    return sent.size();
+  }
+
+  void clear_pending(ConnectionRef con) {
+    Mutex::Locker l(lock);
+
+    for (list<uint64_t>::iterator it = conn_sent[con].begin();
+         it != conn_sent[con].end(); ++it)
+      sent.erase(*it);
+    conn_sent.erase(con);
+  }
+};
+
+
 TEST_P(MessengerTest, MessageTest) {
-  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  SyntheticDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
   bind_addr.parse("127.0.0.1");
   Messenger::Policy p = Messenger::Policy::stateful_server(0, 0);
@@ -594,12 +713,211 @@ TEST_P(MessengerTest, MessageTest) {
     cli_dispatcher.got_new = false;
   }
 
+  // 2. A very large "data"
+  {
+    bufferlist bl;
+    string s("abcdefghijklmnopqrstuvwxyz");
+    for (int i = 0; i < 1024*30; i++)
+      bl.append(s);
+    MPing *m = new MPing();
+    m->set_data(bl);
+    cli_dispatcher.send_message_wrap(conn, m);
+    utime_t t;
+    t += 1000*1000*500;
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.WaitInterval(g_ceph_context, cli_dispatcher.lock, t);
+    ASSERT_TRUE(cli_dispatcher.got_new);
+    cli_dispatcher.got_new = false;
+  }
   server_msgr->shutdown();
   client_msgr->shutdown();
   server_msgr->wait();
   client_msgr->wait();
 }
 
+
+class SyntheticWorkload {
+  Mutex lock;
+  Cond cond;
+  set<Messenger*> available_servers;
+  set<Messenger*> available_clients;
+  map<pair<Messenger*, Messenger*>, ConnectionRef> available_connections;
+  SyntheticDispatcher cli_dispatcher, srv_dispatcher;
+  gen_type rng;
+  vector<bufferlist> rand_data;
+
+ public:
+  static const unsigned max_in_flight = 512;
+  static const unsigned max_connections = 128;
+  static const unsigned max_messege_len = 1024 * 1024 * 4;
+
+  SyntheticWorkload(int servers, int clients, string type, int random_num):
+      lock("SyntheticWorkload::lock"), cli_dispatcher(false), srv_dispatcher(true),
+      rng(time(NULL)) {
+    Messenger *msgr;
+    int base_port = 16800;
+    for (int i = 0; i < servers; ++i) {
+      entity_addr_t bind_addr;
+      char addr[64];
+      snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
+      msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
+                               "server", getpid()+i);
+      bind_addr.parse(addr);
+      msgr->bind(bind_addr);
+      msgr->add_dispatcher_head(&srv_dispatcher);
+
+      assert(msgr);
+      msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
+      available_servers.insert(msgr);
+      msgr->start();
+    }
+
+    for (int i = 0; i < clients; ++i) {
+      msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
+                               "client", getpid()+i+servers);
+      assert(msgr);
+      msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
+      msgr->add_dispatcher_head(&cli_dispatcher);
+      available_clients.insert(msgr);
+      msgr->start();
+    }
+
+    static const char alphanum[] = "0123456789"
+      "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+      "abcdefghijklmnopqrstuvwxyz";
+
+    for (int i = 0; i < random_num; i++) {
+      bufferlist bl;
+      boost::uniform_int<> u(1, max_messege_len);
+      uint64_t value_len = u(rng);
+      bufferptr bp(value_len);
+      for (uint64_t j = 0; j < value_len; j++)
+        bp[j] = alphanum[rand() % (sizeof(alphanum) - 1)];
+
+      bp[value_len - 1] = '\0';
+      bl.append(bp);
+      rand_data.push_back(bl);
+    }
+  }
+
+  ConnectionRef _get_random_connection(pair<Messenger*, Messenger*> *p) {
+    while (cli_dispatcher.get_pending() > max_in_flight)
+      usleep(500);
+    assert(lock.is_locked());
+    boost::uniform_int<> choose(0, available_connections.size() - 1);
+    int index = choose(rng);
+    map<pair<Messenger*, Messenger*>, ConnectionRef>::iterator i = available_connections.begin();
+    for (; index > 0; --index, ++i) ;
+    if (p)
+      *p = i->first;
+    return i->second;
+  }
+
+  bool can_create_connection() {
+    return available_connections.size() < max_connections;
+  }
+
+  void generate_connection() {
+    Mutex::Locker l(lock);
+    if (!can_create_connection())
+      return ;
+
+    Messenger *server, *client;
+    {
+      boost::uniform_int<> choose(0, available_servers.size() - 1);
+      int index = choose(rng);
+      set<Messenger*>::iterator i = available_servers.begin();
+      for (; index > 0; --index, ++i) ;
+      server = *i;
+    }
+    {
+      boost::uniform_int<> choose(0, available_clients.size() - 1);
+      int index = choose(rng);
+      set<Messenger*>::iterator i = available_clients.begin();
+      for (; index > 0; --index, ++i) ;
+      client = *i;
+    }
+
+    if (!available_connections.count(make_pair(client, server))) {
+      ConnectionRef conn = client->get_connection(server->get_myinst());
+      available_connections[make_pair(client, server)] = conn;
+    }
+  }
+
+  void send_message() {
+    Message *m = new MPing();
+    bufferlist bl;
+    boost::uniform_int<> u(0, rand_data.size()-1);
+    uint64_t index = u(rng);
+    bl = rand_data[index];
+    m->set_data(bl);
+    Mutex::Locker l(lock);
+    ConnectionRef conn = _get_random_connection(NULL);
+    cli_dispatcher.send_message_wrap(conn, m);
+  }
+
+  void drop_connection() {
+    pair<Messenger*, Messenger*> p;
+    Mutex::Locker l(lock);
+    if (available_connections.size() < 10)
+      return;
+    ConnectionRef conn = _get_random_connection(&p);
+    cli_dispatcher.clear_pending(conn);
+    conn->mark_down();
+    ASSERT_EQ(available_connections.erase(p), 1U);
+  }
+
+  void print_internal_state() {
+    Mutex::Locker l(lock);
+    cerr << "available_connections: " << available_connections.size()
+         << " inflight messages: " << cli_dispatcher.get_pending() << std::endl;
+  }
+
+  void wait_for_done() {
+    while (cli_dispatcher.get_pending())
+      usleep(500);
+    for (set<Messenger*>::iterator it = available_servers.begin();
+         it != available_servers.end(); ++it) {
+      (*it)->shutdown();
+      (*it)->wait();
+    }
+    for (set<Messenger*>::iterator it = available_clients.begin();
+         it != available_clients.end(); ++it) {
+      (*it)->shutdown();
+      (*it)->wait();
+    }
+  }
+};
+
+TEST_P(MessengerTest, SyntheticTest) {
+  SyntheticWorkload test_msg(32, 128, GetParam(), 100);
+  for (int i = 0; i < 100; ++i) {
+    if (!(i % 10)) cerr << "seeding connection " << i << std::endl;
+    test_msg.generate_connection();
+  }
+  gen_type rng(time(NULL));
+  for (int i = 0; i < 10000; ++i) {
+    if (!(i % 10)) {
+      cerr << "Op " << i << ": ";
+      test_msg.print_internal_state();
+    }
+    boost::uniform_int<> true_false(0, 99);
+    int val = true_false(rng);
+    if (val > 85) {
+      test_msg.generate_connection();
+    } else if (val > 70) {
+      test_msg.drop_connection();
+    } else if (val > 10) {
+      test_msg.send_message();
+    } else {
+      usleep(rand() % 500 + 100);
+    }
+  }
+  test_msg.wait_for_done();
+}
+
+
 class MarkdownDispatcher : public Dispatcher {
   Mutex lock;
   set<Connection*> conns;