struct client_config {
entity_addr_t server_addr;
unsigned block_size;
- unsigned rounds;
+ unsigned ramptime;
+ unsigned msgtime;
unsigned jobs;
unsigned depth;
std::ostringstream out;
out << "client[>> " << server_addr
<< "](bs=" << block_size
- << ", rounds=" << rounds
+ << ", ramptime=" << ramptime
+ << ", msgtime=" << msgtime
<< ", jobs=" << jobs
<< ", depth=" << depth
<< ")";
conf.server_addr = addr;
conf.block_size = options["cbs"].as<unsigned>();
- conf.rounds = options["rounds"].as<unsigned>();
+ conf.ramptime = options["ramptime"].as<unsigned>();
+ conf.msgtime = options["msgtime"].as<unsigned>();
conf.jobs = options["jobs"].as<unsigned>();
conf.depth = options["depth"].as<unsigned>();
+ ceph_assert(conf.depth % conf.jobs == 0);
return conf;
}
};
}
};
+const unsigned SAMPLE_RATE = 7;
+
static seastar::future<> run(
perf_mode_t mode,
const client_config& client_conf,
});
}
seastar::future<> shutdown() {
- logger().info("\n{} shutdown...", lname);
+ logger().info("{} shutdown...", lname);
return container().invoke_on(msgr_sid, [] (auto& server) {
ceph_assert(server.msgr);
return server.msgr->shutdown();
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Client> {
- struct ConnSession {
+ struct ConnStats {
mono_time connecting_time = mono_clock::zero();
mono_time connected_time = mono_clock::zero();
+ unsigned received_count = 0u;
mono_time start_time = mono_clock::zero();
+ unsigned start_count = 0u;
+
+ unsigned sampled_count = 0u;
+ double total_lat_s = 0.0;
+
+ // for reporting only
mono_time finish_time = mono_clock::zero();
- unsigned received_count = 0u;
+ void start() {
+ start_time = mono_clock::now();
+ start_count = received_count;
+ sampled_count = 0u;
+ total_lat_s = 0.0;
+ finish_time = mono_clock::zero();
+ }
+ };
+ ConnStats conn_stats;
- const unsigned SAMPLE_RATE = 7;
+ struct PeriodStats {
+ mono_time start_time = mono_clock::zero();
+ unsigned start_count = 0u;
unsigned sampled_count = 0u;
double total_lat_s = 0.0;
- seastar::promise<> done;
+ // for reporting only
+ mono_time finish_time = mono_clock::zero();
+ unsigned finish_count = 0u;
+ unsigned depth = 0u;
+
+ void reset(unsigned received_count, PeriodStats* snap = nullptr) {
+ if (snap) {
+ snap->start_time = start_time;
+ snap->start_count = start_count;
+ snap->sampled_count = sampled_count;
+ snap->total_lat_s = total_lat_s;
+ snap->finish_time = mono_clock::now();
+ snap->finish_count = received_count;
+ }
+ start_time = mono_clock::now();
+ start_count = received_count;
+ sampled_count = 0u;
+ total_lat_s = 0.0;
+ }
};
- ConnSession conn_session;
+ PeriodStats period_stats;
const seastar::shard_id sid;
std::string lname;
const unsigned jobs;
- const unsigned rounds;
ceph::net::Messenger *msgr = nullptr;
const unsigned msg_len;
bufferlist msg_data;
+ const unsigned nr_depth;
seastar::semaphore depth;
std::vector<mono_time> time_msgs_sent;
ceph::auth::DummyAuthClientServer dummy_auth;
unsigned sent_count = 0u;
ceph::net::ConnectionRef active_conn = nullptr;
- Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth)
+ bool stop_send = false;
+ seastar::promise<> stopped_send_promise;
+
+ Client(unsigned jobs, unsigned msg_len, unsigned depth)
: sid{seastar::engine().cpu_id()},
jobs{jobs},
- rounds{rounds/jobs},
msg_len{msg_len},
- depth{depth},
- time_msgs_sent{depth, mono_clock::zero()} {
+ nr_depth{depth/jobs},
+ depth{nr_depth},
+ time_msgs_sent{depth/jobs, mono_clock::zero()} {
lname = "client#";
lname += std::to_string(sid);
msg_data.append_zero(msg_len);
}
+ unsigned get_current_depth() const {
+ ceph_assert(depth.available_units() >= 0);
+ return nr_depth - depth.current();
+ }
+
Dispatcher* get_local_shard() override {
return &(container().local());
}
}
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
logger().info("{}: connected", *conn);
- conn_session.connected_time = mono_clock::now();
+ conn_stats.connected_time = mono_clock::now();
return seastar::now();
}
seastar::future<> ms_dispatch(ceph::net::Connection* c,
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
auto msg_id = m->get_tid();
- if (msg_id % conn_session.SAMPLE_RATE == 0) {
+ if (msg_id % SAMPLE_RATE == 0) {
auto index = msg_id % time_msgs_sent.size();
ceph_assert(time_msgs_sent[index] != mono_clock::zero());
std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
- conn_session.total_lat_s += cur_latency.count();
- ++(conn_session.sampled_count);
+ conn_stats.total_lat_s += cur_latency.count();
+ ++(conn_stats.sampled_count);
+ period_stats.total_lat_s += cur_latency.count();
+ ++(period_stats.sampled_count);
time_msgs_sent[index] = mono_clock::zero();
}
- ++(conn_session.received_count);
+ ++(conn_stats.received_count);
depth.signal(1);
- if (conn_session.received_count == rounds) {
- logger().info("{}: finished receiving {} REPLYs", *c, conn_session.received_count);
- conn_session.finish_time = mono_clock::now();
- conn_session.done.set_value();
- }
return seastar::now();
}
seastar::future<> shutdown() {
return container().invoke_on_all([] (auto& client) {
if (client.is_active()) {
- logger().info("\n{} shutdown...", client.lname);
+ logger().info("{} shutdown...", client.lname);
ceph_assert(client.msgr);
- return client.msgr->shutdown();
+ return client.msgr->shutdown().then([&client] {
+ return client.stop_dispatch_messages();
+ });
}
return seastar::now();
});
}
- seastar::future<> dispatch_messages(const entity_addr_t& peer_addr) {
+ seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
return container().invoke_on_all([peer_addr] (auto& client) {
// start clients in active cores (#1 ~ #jobs)
if (client.is_active()) {
// make sure handshake won't hurt the performance
return seastar::sleep(1s);
}).then([&client, start_time] {
- if (client.conn_session.connected_time == mono_clock::zero()) {
+ if (client.conn_stats.connected_time == mono_clock::zero()) {
logger().error("\n{} not connected after 1s!\n", client.lname);
ceph_assert(false);
}
- client.conn_session.connecting_time = start_time;
+ client.conn_stats.connecting_time = start_time;
});
}
return seastar::now();
+ });
+ }
+
+ private:
+ class TimerReport {
+ private:
+ const unsigned jobs;
+ const unsigned msgtime;
+ const unsigned bytes_of_block;
+
+ unsigned elapsed = 0u;
+ std::vector<mono_time> start_times;
+ std::vector<PeriodStats> snaps;
+ std::vector<ConnStats> summaries;
+
+ public:
+ TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
+ : jobs{jobs},
+ msgtime{msgtime},
+ bytes_of_block{bs},
+ start_times{jobs, mono_clock::zero()},
+ snaps{jobs},
+ summaries{jobs} {}
+
+ unsigned get_elapsed() const { return elapsed; }
+
+ PeriodStats& get_snap_by_job(seastar::shard_id sid) {
+ ceph_assert(sid >= 1 && sid <= jobs);
+ return snaps[sid - 1];
+ }
+
+ ConnStats& get_summary_by_job(seastar::shard_id sid) {
+ ceph_assert(sid >= 1 && sid <= jobs);
+ return summaries[sid - 1];
+ }
+
+ bool should_stop() const {
+ return elapsed >= msgtime;
+ }
+
+ seastar::future<> ticktock() {
+ return seastar::sleep(1s).then([this] {
+ ++elapsed;
+ });
+ }
+
+ void report_header() {
+ std::ostringstream sout;
+ sout << std::setfill(' ')
+ << std::setw(7) << "sec"
+ << std::setw(6) << "depth"
+ << std::setw(8) << "IOPS"
+ << std::setw(8) << "MB/s"
+ << std::setw(8) << "lat(ms)";
+ std::cout << sout.str() << std::endl;
+ }
+
+ void report_period() {
+ if (elapsed == 1) {
+ // init this->start_times at the first period
+ for (unsigned i=0; i<jobs; ++i) {
+ start_times[i] = snaps[i].start_time;
+ }
+ }
+ std::chrono::duration<double> elapsed_d = 0s;
+ unsigned depth = 0u;
+ unsigned ops = 0u;
+ unsigned sampled_count = 0u;
+ double total_lat_s = 0.0;
+ for (const auto& snap: snaps) {
+ elapsed_d += (snap.finish_time - snap.start_time);
+ depth += snap.depth;
+ ops += (snap.finish_count - snap.start_count);
+ sampled_count += snap.sampled_count;
+ total_lat_s += snap.total_lat_s;
+ }
+ double elapsed_s = elapsed_d.count() / jobs;
+ double iops = ops/elapsed_s;
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << std::setw(7) << elapsed_s
+ << std::setw(6) << depth
+ << std::setw(8) << iops
+ << std::setw(8) << iops * bytes_of_block / 1048576
+ << std::setw(8) << (total_lat_s / sampled_count * 1000);
+ std::cout << sout.str() << std::endl;
+ }
+
+ void report_summary() const {
+ std::chrono::duration<double> elapsed_d = 0s;
+ unsigned ops = 0u;
+ unsigned sampled_count = 0u;
+ double total_lat_s = 0.0;
+ for (const auto& summary: summaries) {
+ elapsed_d += (summary.finish_time - summary.start_time);
+ ops += (summary.received_count - summary.start_count);
+ sampled_count += summary.sampled_count;
+ total_lat_s += summary.total_lat_s;
+ }
+ double elapsed_s = elapsed_d.count() / jobs;
+ double iops = ops / elapsed_s;
+ std::ostringstream sout;
+ sout << "--------------"
+ << " summary "
+ << "--------------\n"
+ << setfill(' ')
+ << std::setw(7) << elapsed_s
+ << std::setw(6) << "-"
+ << std::setw(8) << iops
+ << std::setw(8) << iops * bytes_of_block / 1048576
+ << std::setw(8) << (total_lat_s / sampled_count * 1000)
+ << "\n";
+ std::cout << sout.str() << std::endl;
+ }
+ };
+
+ seastar::future<> report_period(TimerReport& report) {
+ return container().invoke_on_all([&report] (auto& client) {
+ if (client.is_active()) {
+ PeriodStats& snap = report.get_snap_by_job(client.sid);
+ client.period_stats.reset(client.conn_stats.received_count,
+ &snap);
+ snap.depth = client.get_current_depth();
+ }
+ }).then([&report] {
+ report.report_period();
+ });
+ }
+
+ seastar::future<> report_summary(TimerReport& report) {
+ return container().invoke_on_all([&report] (auto& client) {
+ if (client.is_active()) {
+ ConnStats& summary = report.get_summary_by_job(client.sid);
+ summary = client.conn_stats;
+ summary.finish_time = mono_clock::now();
+ }
+ }).then([&report] {
+ report.report_summary();
+ });
+ }
+
+ public:
+ seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
+ logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
+ return container().invoke_on_all([] (auto& client) {
+ if (client.is_active()) {
+ client.do_dispatch_messages(client.active_conn.get());
+ }
+ }).then([this, ramptime] {
+ logger().info("[all clients]: ramping up {} seconds...", ramptime);
+ return seastar::sleep(std::chrono::seconds(ramptime));
}).then([this] {
- logger().info("\nstart sending {} MOSDOps from {} clients",
- jobs * rounds, jobs);
- mono_time start_time = mono_clock::now();
return container().invoke_on_all([] (auto& client) {
if (client.is_active()) {
- return client.do_dispatch_messages(client.active_conn.get());
+ client.conn_stats.start();
+ client.period_stats.reset(client.conn_stats.received_count);
}
- return seastar::now();
- }).then([this, start_time] {
- std::chrono::duration<double> dur_messaging = mono_clock::now() - start_time;
- logger().info("\nSummary:\n clients: {}\n MOSDOps: {}\n total time: {}s\n",
- jobs, jobs * rounds, dur_messaging.count());
+ });
+ }).then([this, msgtime] {
+ logger().info("[all clients]: reporting {} seconds...\n", msgtime);
+ return seastar::do_with(
+ TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
+ report.report_header();
+ return seastar::do_until(
+ [&report] { return report.should_stop(); },
+ [&report, this] {
+ return report.ticktock().then([&report, this] {
+ // report period every 1s
+ return report_period(report);
+ }).then([&report, this] {
+ // report summary every 10s
+ if (report.get_elapsed() % 10 == 0) {
+ return report_summary(report);
+ } else {
+ return seastar::now();
+ }
+ });
+ }
+ ).then([&report, this] {
+ // report the final summary
+ if (report.get_elapsed() % 10 != 0) {
+ return report_summary(report);
+ } else {
+ return seastar::now();
+ }
+ });
});
});
}
m->set_tid(sent_count);
// sample message latency
- if (sent_count % conn_session.SAMPLE_RATE == 0) {
+ if (sent_count % SAMPLE_RATE == 0) {
auto index = sent_count % time_msgs_sent.size();
ceph_assert(time_msgs_sent[index] == mono_clock::zero());
time_msgs_sent[index] = mono_clock::now();
});
}
- seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
+ class DepthBroken: public std::exception {};
+
+ seastar::future<> stop_dispatch_messages() {
+ stop_send = true;
+ depth.broken(DepthBroken());
+ return stopped_send_promise.get_future();
+ }
+
+ void do_dispatch_messages(ceph::net::Connection* conn) {
ceph_assert(seastar::engine().cpu_id() == sid);
ceph_assert(sent_count == 0);
- conn_session.start_time = mono_clock::now();
- return seastar::do_until(
- [this, conn] {
- bool stop = (sent_count == rounds);
- if (stop) {
- logger().info("{}: finished sending {} OSDOPs",
- *conn, sent_count);
- }
- return stop;
- },
+ conn_stats.start_time = mono_clock::now();
+ seastar::do_until(
+ [this] { return stop_send; },
[this, conn] {
sent_count += 1;
return send_msg(conn);
}
- ).then([this] {
- return conn_session.done.get_future();
- }).then([this] {
- std::chrono::duration<double> dur_conn = conn_session.connected_time - conn_session.connecting_time;
- std::chrono::duration<double> dur_msg = mono_clock::now() - conn_session.start_time;
- logger().info("\n{}:\n"
- " messages: {}\n"
+ ).handle_exception_type([] (const DepthBroken& e) {
+ // ok, stopped by stop_dispatch_messages()
+ }).finally([this, conn] {
+ std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
+ std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
+ unsigned ops = conn_stats.received_count - conn_stats.start_count;
+ logger().info("{}: stopped sending OSDOPs.\n"
+ "{}(depth={}):\n"
" connect time: {}s\n"
+ " messages received: {}\n"
" messaging time: {}s\n"
" latency: {}ms\n"
" IOPS: {}\n"
" throughput: {}MB/s\n",
- lname, conn_session.received_count,
- dur_conn.count(), dur_msg.count(),
- conn_session.total_lat_s / conn_session.sampled_count * 1000,
- conn_session.received_count / dur_msg.count(),
- conn_session.received_count / dur_msg.count() * msg_len / 1048576);
+ *conn,
+ lname,
+ nr_depth,
+ dur_conn.count(),
+ ops,
+ dur_msg.count(),
+ conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
+ ops / dur_msg.count(),
+ ops / dur_msg.count() * msg_len / 1048576);
+ stopped_send_promise.set_value();
});
}
};
return seastar::when_all_succeed(
ceph::net::create_sharded<test_state::Server>(server_conf.core, server_conf.block_size),
- ceph::net::create_sharded<test_state::Client>(client_conf.jobs, client_conf.rounds,
+ ceph::net::create_sharded<test_state::Client>(client_conf.jobs,
client_conf.block_size, client_conf.depth))
.then([=](test_state::Server *server,
test_state::Client *client) {
return seastar::when_all_succeed(
server->init(server_conf.addr),
client->init())
- // dispatch ops
.then([client, addr = client_conf.server_addr] {
- return client->dispatch_messages(addr);
- // shutdown
+ return client->connect_wait_verify(addr);
+ }).then([client, ramptime = client_conf.ramptime,
+ msgtime = client_conf.msgtime] {
+ return client->dispatch_with_timer(ramptime, msgtime);
}).finally([client] {
return client->shutdown();
}).finally([server] {
ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
ceph_assert(client_conf.jobs > 0);
return client->init()
- // dispatch ops
.then([client, addr = client_conf.server_addr] {
- return client->dispatch_messages(addr);
- // shutdown
+ return client->connect_wait_verify(addr);
+ }).then([client, ramptime = client_conf.ramptime,
+ msgtime = client_conf.msgtime] {
+ return client->dispatch_with_timer(ramptime, msgtime);
}).finally([client] {
return client->shutdown();
});
"0: both, 1:client, 2:server")
("addr", bpo::value<std::string>()->default_value("v1:0.0.0.0:9010"),
"server address")
- ("rounds", bpo::value<unsigned>()->default_value(65536),
- "number of client messaging rounds")
+ ("ramptime", bpo::value<unsigned>()->default_value(5),
+ "seconds of client ramp-up time")
+ ("msgtime", bpo::value<unsigned>()->default_value(15),
+ "seconds of client messaging time")
("jobs", bpo::value<unsigned>()->default_value(1),
"number of client jobs (messengers)")
("cbs", bpo::value<unsigned>()->default_value(4096),