From 56896a7ed20869ce91ade4c77c1d6cbab8d50de1 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 29 Oct 2016 01:54:58 +0800 Subject: [PATCH] test/ceph_test_msgr: do not use Message::middle for holding transient data Message::middle is used for holding encoded data, so we we can not stuff it with payload and leave the "payload" field empty. this change refactors the ceph_test_msgr by introducing a Payload class which encodes all test data in it. Fixes: http://tracker.ceph.com/issues/17728 Signed-off-by: Kefu Chai --- src/test/msgr/test_msgr.cc | 100 +++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 4b3bc44392c17..6ea65ebb11845 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -769,6 +769,34 @@ TEST_P(MessengerTest, MessageTest) { class SyntheticWorkload; +struct Payload { + enum Who : uint8_t { + PING = 0, + PONG = 1, + }; + uint8_t who; + uint64_t seq; + bufferlist data; + + Payload(Who who, uint64_t seq, const bufferlist& data) + : who(who), seq(seq), data(data) + {} + Payload() = default; + DENC(Payload, v, p) { + DENC_START(1, 1, p); + denc(v.who, p); + denc(v.seq, p); + denc(v.data, p); + DENC_FINISH(p); + } +}; +WRITE_CLASS_DENC(Payload) + +ostream& operator<<(ostream& out, const Payload &pl) +{ + return out << "reply=" << pl.who << " i = " << pl.seq; +} + class SyntheticDispatcher : public Dispatcher { public: Mutex lock; @@ -779,7 +807,7 @@ class SyntheticDispatcher : public Dispatcher { bool got_connect; map > conn_sent; map sent; - atomic_t index; + atomic index; SyntheticWorkload *workload; SyntheticDispatcher(bool s, SyntheticWorkload *wl): @@ -838,27 +866,24 @@ class SyntheticDispatcher : public Dispatcher { return ; } - uint64_t i; - bool reply; - assert(m->get_middle().length()); - bufferlist::iterator blp = m->get_middle().begin(); - ::decode(i, blp); - ::decode(reply, blp); - if (reply) { - lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl; - reply_message(m, i); + Payload pl; + auto p = m->get_data().begin(); + ::decode(pl, p); + if (pl.who == Payload::PING) { + lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; + reply_message(m, pl); m->put(); Mutex::Locker l(lock); got_new = true; cond.Signal(); } else { Mutex::Locker l(lock); - if (sent.count(i)) { - lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl; - ASSERT_EQ(conn_sent[m->get_connection()].front(), i); - ASSERT_TRUE(m->get_data().contents_equal(sent[i])); + if (sent.count(pl.seq)) { + lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; + ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); + ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); conn_sent[m->get_connection()].pop_front(); - sent.erase(i); + sent.erase(pl.seq); } m->put(); got_new = true; @@ -873,35 +898,29 @@ class SyntheticDispatcher : public Dispatcher { return true; } - void reply_message(Message *m, uint64_t i) { + void reply_message(const Message *m, Payload& pl) { + pl.who = Payload::PONG; bufferlist bl; - ::encode(i, bl); - ::encode(false, bl); + ::encode(pl, bl); MPing *rm = new MPing(); - if (m->get_data_len()) - rm->set_data(m->get_data()); - if (m->get_middle().length()) - rm->set_middle(bl); + rm->set_data(bl); m->get_connection()->send_message(rm); - lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << dendl; + lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; } - void send_message_wrap(ConnectionRef con, Message *m) { - { + void send_message_wrap(ConnectionRef con, const bufferlist& data) { + Message *m = new MPing(); + Payload pl{Payload::PING, index++, data}; + bufferlist bl; + ::encode(pl, bl); + m->set_data(bl); + if (!con->get_messenger()->get_default_policy().lossy) { Mutex::Locker l(lock); - bufferlist bl; - uint64_t i = index.read(); - index.inc(); - ::encode(i, bl); - ::encode(true, bl); - m->set_middle(bl); - if (!con->get_messenger()->get_default_policy().lossy) { - sent[i] = m->get_data(); - conn_sent[con].push_back(i); - } - lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << dendl; + sent[pl.seq] = pl.data; + conn_sent[con].push_back(pl.seq); } - ASSERT_EQ(con->send_message(m), 0); + lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; + ASSERT_EQ(0, con->send_message(m)); } uint64_t get_pending() { @@ -1067,13 +1086,8 @@ class SyntheticWorkload { m->set_priority(200); conn->send_message(m); } else { - Message *m = new MPing(); - bufferlist bl; boost::uniform_int<> u(0, rand_data.size()-1); - uint64_t index = u(rng); - bl = rand_data[index]; - m->set_data(bl); - dispatcher.send_message_wrap(conn, m); + dispatcher.send_message_wrap(conn, rand_data[u(rng)]); } } -- 2.39.5