]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/ceph_test_msgr: do not use Message::middle for holding transient data
authorKefu Chai <kchai@redhat.com>
Fri, 28 Oct 2016 17:54:58 +0000 (01:54 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 2 Nov 2016 11:23:24 +0000 (19:23 +0800)
Message::middle is used for holding encoded data, so we we can not stuff
it with payload and leave the "payload" field empty. this change
refactors the ceph_test_msgr by introducing a Payload class which
encodes all test data in it.

Fixes: http://tracker.ceph.com/issues/17728
Signed-off-by: Kefu Chai <kchai@redhat.com>
(cherry picked from commit 56896a7ed20869ce91ade4c77c1d6cbab8d50de1)
Conflicts:
src/test/msgr/test_msgr.cc: do not use the new-style DENC()
framework for implementing the encoder of Payload class. DENC() was
introduced after jewel was released.

src/test/msgr/test_msgr.cc

index 9f78bb5ae68e3ec34a756f98db9060810699537e..43f1060c7cdfc9b1ff8bcb279693ee69c893fdc7 100644 (file)
@@ -14,6 +14,7 @@
  *
  */
 
+#include <atomic>
 #include <iostream>
 #include <unistd.h>
 #include <stdlib.h>
@@ -703,6 +704,36 @@ TEST_P(MessengerTest, MessageTest) {
 
 class SyntheticWorkload;
 
+struct Payload {
+  enum Who : uint8_t {
+    PING = 0,
+    PONG = 1,
+  };
+  uint8_t who;
+  uint64_t seq;
+  bufferlist data;
+
+  Payload(Who who, uint64_t seq, const bufferlist& data)
+    : who(who), seq(seq), data(data)
+  {}
+  Payload() = default;
+  void encode(bufferlist& bl) const {
+    ::encode(who, bl);
+    ::encode(seq, bl);
+    ::encode(data, bl);
+  }
+  void decode(bufferlist::iterator& p) {
+    ::decode(who, p);
+    ::decode(seq, p);
+    ::decode(data, p);
+  }
+};
+
+ostream& operator<<(ostream& out, const Payload &pl)
+{
+  return out << "reply=" << pl.who << " i = " << pl.seq;
+}
+
 class SyntheticDispatcher : public Dispatcher {
  public:
   Mutex lock;
@@ -713,7 +744,7 @@ class SyntheticDispatcher : public Dispatcher {
   bool got_connect;
   map<ConnectionRef, list<uint64_t> > conn_sent;
   map<uint64_t, bufferlist> sent;
-  atomic_t index;
+  atomic<uint64_t> index;
   SyntheticWorkload *workload;
 
   SyntheticDispatcher(bool s, SyntheticWorkload *wl):
@@ -757,22 +788,28 @@ class SyntheticDispatcher : public Dispatcher {
       return ;
     }
 
-    Mutex::Locker l(lock);
-    uint64_t i;
-    bool reply;
-    assert(m->get_middle().length());
-    bufferlist::iterator blp = m->get_middle().begin();
-    ::decode(i, blp);
-    ::decode(reply, blp);
-    if (reply) {
-      //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
-      reply_message(m, i);
-    } else if (sent.count(i)) {
-      //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
-      ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
-      ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
-      conn_sent[m->get_connection()].pop_front();
-      sent.erase(i);
+    Payload pl;
+    auto p = m->get_data().begin();
+    pl.decode(p);
+    if (pl.who == Payload::PING) {
+      lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " " << pl << dendl;
+      reply_message(m, pl);
+      m->put();
+      Mutex::Locker l(lock);
+      got_new = true;
+      cond.Signal();
+    } else {
+      Mutex::Locker l(lock);
+      if (sent.count(pl.seq)) {
+       lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " " << pl << dendl;
+       ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
+       ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
+       conn_sent[m->get_connection()].pop_front();
+       sent.erase(pl.seq);
+      }
+      m->put();
+      got_new = true;
+      cond.Signal();
     }
   }
 
@@ -783,35 +820,29 @@ class SyntheticDispatcher : public Dispatcher {
     return true;
   }
 
-  void reply_message(Message *m, uint64_t i) {
+  void reply_message(const Message *m, Payload& pl) {
+    pl.who = Payload::PONG;
     bufferlist bl;
-    ::encode(i, bl);
-    ::encode(false, bl);
+    pl.encode(bl);
     MPing *rm = new MPing();
-    if (m->get_data_len())
-      rm->set_data(m->get_data());
-    if (m->get_middle().length())
-      rm->set_middle(bl);
+    rm->set_data(bl);
     m->get_connection()->send_message(rm);
-    lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << dendl;
+    lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
   }
 
-  void send_message_wrap(ConnectionRef con, Message *m) {
-    {
+  void send_message_wrap(ConnectionRef con, const bufferlist& data) {
+    Message *m = new MPing();
+    Payload pl{Payload::PING, index++, data};
+    bufferlist bl;
+    pl.encode(bl);
+    m->set_data(bl);
+    if (!con->get_messenger()->get_default_policy().lossy) {
       Mutex::Locker l(lock);
-      bufferlist bl;
-      uint64_t i = index.read();
-      index.inc();
-      ::encode(i, bl);
-      ::encode(true, bl);
-      m->set_middle(bl);
-      if (!con->get_messenger()->get_default_policy().lossy) {
-        sent[i] = m->get_data();
-        conn_sent[con].push_back(i);
-      }
-      //cerr << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << std::endl;
+      sent[pl.seq] = pl.data;
+      conn_sent[con].push_back(pl.seq);
     }
-    ASSERT_EQ(con->send_message(m), 0);
+    lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
+    ASSERT_EQ(0, con->send_message(m));
   }
 
   uint64_t get_pending() {
@@ -977,13 +1008,8 @@ class SyntheticWorkload {
       m->set_priority(200);
       conn->send_message(m);
     } else {
-      Message *m = new MPing();
-      bufferlist bl;
       boost::uniform_int<> u(0, rand_data.size()-1);
-      uint64_t index = u(rng);
-      bl = rand_data[index];
-      m->set_data(bl);
-      dispatcher.send_message_wrap(conn, m);
+      dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
     }
   }