]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/ceph_test_msgr: do not use Message::middle for holding transient data 11680/head
authorKefu Chai <kchai@redhat.com>
Fri, 28 Oct 2016 17:54:58 +0000 (01:54 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 28 Oct 2016 18:06:37 +0000 (02:06 +0800)
Message::middle is used for holding encoded data, so we we can not stuff
it with payload and leave the "payload" field empty. this change
refactors the ceph_test_msgr by introducing a Payload class which
encodes all test data in it.

Fixes: http://tracker.ceph.com/issues/17728
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/test/msgr/test_msgr.cc

index 4b3bc44392c178d3aaff6b50f4f2bca158efe8b4..6ea65ebb11845bda247e87fc75c26174837ed2d1 100644 (file)
@@ -769,6 +769,34 @@ TEST_P(MessengerTest, MessageTest) {
 
 class SyntheticWorkload;
 
+struct Payload {
+  enum Who : uint8_t {
+    PING = 0,
+    PONG = 1,
+  };
+  uint8_t who;
+  uint64_t seq;
+  bufferlist data;
+
+  Payload(Who who, uint64_t seq, const bufferlist& data)
+    : who(who), seq(seq), data(data)
+  {}
+  Payload() = default;
+  DENC(Payload, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.who, p);
+    denc(v.seq, p);
+    denc(v.data, p);
+    DENC_FINISH(p);
+  }
+};
+WRITE_CLASS_DENC(Payload)
+
+ostream& operator<<(ostream& out, const Payload &pl)
+{
+  return out << "reply=" << pl.who << " i = " << pl.seq;
+}
+
 class SyntheticDispatcher : public Dispatcher {
  public:
   Mutex lock;
@@ -779,7 +807,7 @@ class SyntheticDispatcher : public Dispatcher {
   bool got_connect;
   map<ConnectionRef, list<uint64_t> > conn_sent;
   map<uint64_t, bufferlist> sent;
-  atomic_t index;
+  atomic<uint64_t> index;
   SyntheticWorkload *workload;
 
   SyntheticDispatcher(bool s, SyntheticWorkload *wl):
@@ -838,27 +866,24 @@ class SyntheticDispatcher : public Dispatcher {
       return ;
     }
 
-    uint64_t i;
-    bool reply;
-    assert(m->get_middle().length());
-    bufferlist::iterator blp = m->get_middle().begin();
-    ::decode(i, blp);
-    ::decode(reply, blp);
-    if (reply) {
-      lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
-      reply_message(m, i);
+    Payload pl;
+    auto p = m->get_data().begin();
+    ::decode(pl, p);
+    if (pl.who == Payload::PING) {
+      lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
+      reply_message(m, pl);
       m->put();
       Mutex::Locker l(lock);
       got_new = true;
       cond.Signal();
     } else {
       Mutex::Locker l(lock);
-      if (sent.count(i)) {
-       lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
-       ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
-       ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+      if (sent.count(pl.seq)) {
+       lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
+       ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
+       ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
        conn_sent[m->get_connection()].pop_front();
-       sent.erase(i);
+       sent.erase(pl.seq);
       }
       m->put();
       got_new = true;
@@ -873,35 +898,29 @@ class SyntheticDispatcher : public Dispatcher {
     return true;
   }
 
-  void reply_message(Message *m, uint64_t i) {
+  void reply_message(const Message *m, Payload& pl) {
+    pl.who = Payload::PONG;
     bufferlist bl;
-    ::encode(i, bl);
-    ::encode(false, bl);
+    ::encode(pl, bl);
     MPing *rm = new MPing();
-    if (m->get_data_len())
-      rm->set_data(m->get_data());
-    if (m->get_middle().length())
-      rm->set_middle(bl);
+    rm->set_data(bl);
     m->get_connection()->send_message(rm);
-    lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << dendl;
+    lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
   }
 
-  void send_message_wrap(ConnectionRef con, Message *m) {
-    {
+  void send_message_wrap(ConnectionRef con, const bufferlist& data) {
+    Message *m = new MPing();
+    Payload pl{Payload::PING, index++, data};
+    bufferlist bl;
+    ::encode(pl, bl);
+    m->set_data(bl);
+    if (!con->get_messenger()->get_default_policy().lossy) {
       Mutex::Locker l(lock);
-      bufferlist bl;
-      uint64_t i = index.read();
-      index.inc();
-      ::encode(i, bl);
-      ::encode(true, bl);
-      m->set_middle(bl);
-      if (!con->get_messenger()->get_default_policy().lossy) {
-        sent[i] = m->get_data();
-        conn_sent[con].push_back(i);
-      }
-      lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << dendl;
+      sent[pl.seq] = pl.data;
+      conn_sent[con].push_back(pl.seq);
     }
-    ASSERT_EQ(con->send_message(m), 0);
+    lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
+    ASSERT_EQ(0, con->send_message(m));
   }
 
   uint64_t get_pending() {
@@ -1067,13 +1086,8 @@ class SyntheticWorkload {
       m->set_priority(200);
       conn->send_message(m);
     } else {
-      Message *m = new MPing();
-      bufferlist bl;
       boost::uniform_int<> u(0, rand_data.size()-1);
-      uint64_t index = u(rng);
-      bl = rand_data[index];
-      m->set_data(bl);
-      dispatcher.send_message_wrap(conn, m);
+      dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
     }
   }