*
*/
+#include <atomic>
#include <iostream>
#include <unistd.h>
#include <stdlib.h>
class SyntheticWorkload;
+struct Payload {
+ enum Who : uint8_t {
+ PING = 0,
+ PONG = 1,
+ };
+ uint8_t who;
+ uint64_t seq;
+ bufferlist data;
+
+ Payload(Who who, uint64_t seq, const bufferlist& data)
+ : who(who), seq(seq), data(data)
+ {}
+ Payload() = default;
+ void encode(bufferlist& bl) const {
+ ::encode(who, bl);
+ ::encode(seq, bl);
+ ::encode(data, bl);
+ }
+ void decode(bufferlist::iterator& p) {
+ ::decode(who, p);
+ ::decode(seq, p);
+ ::decode(data, p);
+ }
+};
+
+ostream& operator<<(ostream& out, const Payload &pl)
+{
+ return out << "reply=" << pl.who << " i = " << pl.seq;
+}
+
class SyntheticDispatcher : public Dispatcher {
public:
Mutex lock;
bool got_connect;
map<ConnectionRef, list<uint64_t> > conn_sent;
map<uint64_t, bufferlist> sent;
- atomic_t index;
+ atomic<uint64_t> index;
SyntheticWorkload *workload;
SyntheticDispatcher(bool s, SyntheticWorkload *wl):
return ;
}
- Mutex::Locker l(lock);
- uint64_t i;
- bool reply;
- assert(m->get_middle().length());
- bufferlist::iterator blp = m->get_middle().begin();
- ::decode(i, blp);
- ::decode(reply, blp);
- if (reply) {
- //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
- reply_message(m, i);
- } else if (sent.count(i)) {
- //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
- ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
- ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
- conn_sent[m->get_connection()].pop_front();
- sent.erase(i);
+ Payload pl;
+ auto p = m->get_data().begin();
+ pl.decode(p);
+ if (pl.who == Payload::PING) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " " << pl << dendl;
+ reply_message(m, pl);
+ m->put();
+ Mutex::Locker l(lock);
+ got_new = true;
+ cond.Signal();
+ } else {
+ Mutex::Locker l(lock);
+ if (sent.count(pl.seq)) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " " << pl << dendl;
+ ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
+ ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
+ conn_sent[m->get_connection()].pop_front();
+ sent.erase(pl.seq);
+ }
+ m->put();
+ got_new = true;
+ cond.Signal();
}
}
return true;
}
- void reply_message(Message *m, uint64_t i) {
+ void reply_message(const Message *m, Payload& pl) {
+ pl.who = Payload::PONG;
bufferlist bl;
- ::encode(i, bl);
- ::encode(false, bl);
+ pl.encode(bl);
MPing *rm = new MPing();
- if (m->get_data_len())
- rm->set_data(m->get_data());
- if (m->get_middle().length())
- rm->set_middle(bl);
+ rm->set_data(bl);
m->get_connection()->send_message(rm);
- lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << dendl;
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
}
- void send_message_wrap(ConnectionRef con, Message *m) {
- {
+ void send_message_wrap(ConnectionRef con, const bufferlist& data) {
+ Message *m = new MPing();
+ Payload pl{Payload::PING, index++, data};
+ bufferlist bl;
+ pl.encode(bl);
+ m->set_data(bl);
+ if (!con->get_messenger()->get_default_policy().lossy) {
Mutex::Locker l(lock);
- bufferlist bl;
- uint64_t i = index.read();
- index.inc();
- ::encode(i, bl);
- ::encode(true, bl);
- m->set_middle(bl);
- if (!con->get_messenger()->get_default_policy().lossy) {
- sent[i] = m->get_data();
- conn_sent[con].push_back(i);
- }
- //cerr << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << std::endl;
+ sent[pl.seq] = pl.data;
+ conn_sent[con].push_back(pl.seq);
}
- ASSERT_EQ(con->send_message(m), 0);
+ lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
+ ASSERT_EQ(0, con->send_message(m));
}
uint64_t get_pending() {
m->set_priority(200);
conn->send_message(m);
} else {
- Message *m = new MPing();
- bufferlist bl;
boost::uniform_int<> u(0, rand_data.size()-1);
- uint64_t index = u(rng);
- bl = rand_data[index];
- m->set_data(bl);
- dispatcher.send_message_wrap(conn, m);
+ dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
}
}