MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
bufferlist data(msg_data);
rep->write(0, msg_len, data);
+ rep->set_tid(m->get_tid());
MessageRef msg = {rep, false};
return c->send(msg);
}
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Client> {
- struct PingSession : public seastar::enable_shared_from_this<PingSession> {
+ struct ConnSession {
+ mono_time connecting_time = mono_clock::zero();
+ mono_time connected_time = mono_clock::zero();
+
+ mono_time start_time = mono_clock::zero();
+ mono_time finish_time = mono_clock::zero();
+
unsigned received_count = 0u;
- mono_time connecting_time;
- mono_time connected_time;
- mono_time start_time;
- mono_time finish_time;
+
+ const unsigned SAMPLE_RATE = 7;
+ unsigned sampled_count = 0u;
+ double total_lat_s = 0.0;
+
seastar::promise<> done;
};
- using PingSessionRef = seastar::shared_ptr<PingSession>;
+ ConnSession conn_session;
const seastar::shard_id sid;
std::string lname;
const unsigned msg_len;
bufferlist msg_data;
seastar::semaphore depth;
+ std::vector<mono_time> time_msgs_sent;
ceph::auth::DummyAuthClientServer dummy_auth;
unsigned sent_count = 0u;
ceph::net::ConnectionRef active_conn = nullptr;
- PingSessionRef active_session = nullptr;
Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth)
: sid{seastar::engine().cpu_id()},
jobs{jobs},
rounds{rounds/jobs},
msg_len{msg_len},
- depth{depth} {
+ depth{depth},
+ time_msgs_sent{depth, mono_clock::zero()} {
lname = "client#";
lname += std::to_string(sid);
msg_data.append_zero(msg_len);
}
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
logger().info("{}: connected", *conn);
- active_session = seastar::make_shared<PingSession>();
- active_session->connected_time = mono_clock::now();
+ conn_session.connected_time = mono_clock::now();
return seastar::now();
}
seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+
+ auto msg_id = m->get_tid();
+ if (msg_id % conn_session.SAMPLE_RATE == 0) {
+ auto index = msg_id % time_msgs_sent.size();
+ ceph_assert(time_msgs_sent[index] != mono_clock::zero());
+ std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
+ conn_session.total_lat_s += cur_latency.count();
+ ++(conn_session.sampled_count);
+ time_msgs_sent[index] = mono_clock::zero();
+ }
+
+ ++(conn_session.received_count);
depth.signal(1);
- ceph_assert(active_session);
- ++(active_session->received_count);
- if (active_session->received_count == rounds) {
- logger().info("{}: finished receiving {} REPLYs", *c, active_session->received_count);
- active_session->finish_time = mono_clock::now();
- active_session->done.set_value();
+ if (conn_session.received_count == rounds) {
+ logger().info("{}: finished receiving {} REPLYs", *c, conn_session.received_count);
+ conn_session.finish_time = mono_clock::now();
+ conn_session.done.set_value();
}
return seastar::now();
}
return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
.then([&client] (auto conn) {
client.active_conn = conn->release();
- // make sure handshake won't heart the performance
+ // make sure handshake won't hurt the performance
return seastar::sleep(1s);
}).then([&client, start_time] {
- if (!client.active_session) {
+ if (client.conn_session.connected_time == mono_clock::zero()) {
logger().error("\n{} not connected after 1s!\n", client.lname);
ceph_assert(false);
}
- client.active_session->connecting_time = start_time;
+ client.conn_session.connecting_time = start_time;
});
}
return seastar::now();
bufferlist data(msg_data);
m->write(0, msg_len, data);
MessageRef msg = {m, false};
+
+ // use tid as the identity of each round
+ m->set_tid(sent_count);
+
+ // sample message latency
+ if (sent_count % conn_session.SAMPLE_RATE == 0) {
+ auto index = sent_count % time_msgs_sent.size();
+ ceph_assert(time_msgs_sent[index] == mono_clock::zero());
+ time_msgs_sent[index] = mono_clock::now();
+ }
+
return conn->send(msg);
});
}
seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
ceph_assert(seastar::engine().cpu_id() == sid);
ceph_assert(sent_count == 0);
- active_session->start_time = mono_clock::now();
+ conn_session.start_time = mono_clock::now();
return seastar::do_until(
[this, conn] {
bool stop = (sent_count == rounds);
return send_msg(conn);
}
).then([this] {
- return active_session->done.get_future();
+ return conn_session.done.get_future();
}).then([this] {
- std::chrono::duration<double> dur_conn = active_session->connected_time - active_session->connecting_time;
- std::chrono::duration<double> dur_msg = mono_clock::now() - active_session->start_time;
- logger().info("\n{}:\n messages: {}\n connect time: {}s\n messaging time: {}s\n",
- lname, active_session->received_count, dur_conn.count(), dur_msg.count());
+ std::chrono::duration<double> dur_conn = conn_session.connected_time - conn_session.connecting_time;
+ std::chrono::duration<double> dur_msg = mono_clock::now() - conn_session.start_time;
+ logger().info("\n{}:\n"
+ " messages: {}\n"
+ " connect time: {}s\n"
+ " messaging time: {}s\n"
+ " latency: {}ms\n"
+ " IOPS: {}\n"
+ " throughput: {}MB/s\n",
+ lname, conn_session.received_count,
+ dur_conn.count(), dur_msg.count(),
+ conn_session.total_lat_s / conn_session.sampled_count * 1000,
+ conn_session.received_count / dur_msg.count(),
+ conn_session.received_count / dur_msg.count() * msg_len / 1048576);
});
}
};