class SyntheticWorkload;
+struct Payload {
+ enum Who : uint8_t {
+ PING = 0,
+ PONG = 1,
+ };
+ uint8_t who;
+ uint64_t seq;
+ bufferlist data;
+
+ Payload(Who who, uint64_t seq, const bufferlist& data)
+ : who(who), seq(seq), data(data)
+ {}
+ Payload() = default;
+ DENC(Payload, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.who, p);
+ denc(v.seq, p);
+ denc(v.data, p);
+ DENC_FINISH(p);
+ }
+};
+WRITE_CLASS_DENC(Payload)
+
+ostream& operator<<(ostream& out, const Payload &pl)
+{
+ return out << "reply=" << pl.who << " i = " << pl.seq;
+}
+
class SyntheticDispatcher : public Dispatcher {
public:
Mutex lock;
bool got_connect;
map<ConnectionRef, list<uint64_t> > conn_sent;
map<uint64_t, bufferlist> sent;
- atomic_t index;
+ atomic<uint64_t> index;
SyntheticWorkload *workload;
SyntheticDispatcher(bool s, SyntheticWorkload *wl):
return ;
}
- uint64_t i;
- bool reply;
- assert(m->get_middle().length());
- bufferlist::iterator blp = m->get_middle().begin();
- ::decode(i, blp);
- ::decode(reply, blp);
- if (reply) {
- lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
- reply_message(m, i);
+ Payload pl;
+ auto p = m->get_data().begin();
+ ::decode(pl, p);
+ if (pl.who == Payload::PING) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
+ reply_message(m, pl);
m->put();
Mutex::Locker l(lock);
got_new = true;
cond.Signal();
} else {
Mutex::Locker l(lock);
- if (sent.count(i)) {
- lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
- ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
- ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+ if (sent.count(pl.seq)) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
+ ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
+ ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
conn_sent[m->get_connection()].pop_front();
- sent.erase(i);
+ sent.erase(pl.seq);
}
m->put();
got_new = true;
return true;
}
- void reply_message(Message *m, uint64_t i) {
+ void reply_message(const Message *m, Payload& pl) {
+ pl.who = Payload::PONG;
bufferlist bl;
- ::encode(i, bl);
- ::encode(false, bl);
+ ::encode(pl, bl);
MPing *rm = new MPing();
- if (m->get_data_len())
- rm->set_data(m->get_data());
- if (m->get_middle().length())
- rm->set_middle(bl);
+ rm->set_data(bl);
m->get_connection()->send_message(rm);
- lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << dendl;
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
}
- void send_message_wrap(ConnectionRef con, Message *m) {
- {
+ void send_message_wrap(ConnectionRef con, const bufferlist& data) {
+ Message *m = new MPing();
+ Payload pl{Payload::PING, index++, data};
+ bufferlist bl;
+ ::encode(pl, bl);
+ m->set_data(bl);
+ if (!con->get_messenger()->get_default_policy().lossy) {
Mutex::Locker l(lock);
- bufferlist bl;
- uint64_t i = index.read();
- index.inc();
- ::encode(i, bl);
- ::encode(true, bl);
- m->set_middle(bl);
- if (!con->get_messenger()->get_default_policy().lossy) {
- sent[i] = m->get_data();
- conn_sent[con].push_back(i);
- }
- lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << dendl;
+ sent[pl.seq] = pl.data;
+ conn_sent[con].push_back(pl.seq);
}
- ASSERT_EQ(con->send_message(m), 0);
+ lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
+ ASSERT_EQ(0, con->send_message(m));
}
uint64_t get_pending() {
m->set_priority(200);
conn->send_message(m);
} else {
- Message *m = new MPing();
- bufferlist bl;
boost::uniform_int<> u(0, rand_data.size()-1);
- uint64_t index = u(rng);
- bl = rand_data[index];
- m->set_data(bl);
- dispatcher.send_message_wrap(conn, m);
+ dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
}
}