conf.block_size = options["server-bs"].as<unsigned>();
conf.is_fixed_cpu = options["server-fixed-cpu"].as<bool>();
conf.core = options["server-core"].as<unsigned>();
- // TODO
- ceph_assert_always(conf.is_fixed_cpu == false);
return conf;
}
};
bool crc_enabled)
{
struct test_state {
- struct Server;
- using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
-
struct Server final
- : public crimson::net::Dispatcher {
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ // available only in msgr_sid
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
const seastar::shard_id msgr_sid;
std::string lname;
- unsigned msg_len;
- bufferlist msg_data;
+ bool is_fixed_cpu = true;
bool is_stopped = false;
std::optional<seastar::future<>> fut_report;
- Server(unsigned msg_len, bool needs_report)
- : msgr_sid{seastar::this_shard_id()},
+ // available in all shards
+ unsigned msg_len;
+ bufferlist msg_data;
+
+ Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report)
+ : msgr_sid{msgr_sid},
msg_len{msg_len} {
- lname = "server#";
- lname += std::to_string(msgr_sid);
+ lname = fmt::format("server@{}", msgr_sid);
msg_data.append_zero(msg_len);
- if (needs_report) {
+ if (seastar::this_shard_id() == msgr_sid &&
+ needs_report) {
start_report();
}
}
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
+ assert(c->get_shard_id() == seastar::this_shard_id());
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ auto &server = container().local();
+
// server replies with MOSDOp to generate server-side write workload
const static pg_t pgid;
const static object_locator_t oloc;
pgid.pool(), oloc.nspace);
static spg_t spgid(pgid);
auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
- bufferlist data(msg_data);
- rep->write(0, msg_len, data);
+ bufferlist data(server.msg_data);
+ rep->write(0, server.msg_len, data);
rep->set_tid(m->get_tid());
std::ignore = c->send(std::move(rep));
return {seastar::now()};
}
- seastar::future<> init(const entity_addr_t& addr) {
- return seastar::smp::submit_to(msgr_sid, [addr, this] {
+ seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) {
+ return container().invoke_on(
+ msgr_sid, [addr, is_fixed_cpu](auto &server) {
// server msgr is always with nonce 0
- msgr = crimson::net::Messenger::create(
- entity_name_t::OSD(msgr_sid),
- lname, 0, true);
- msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
- return msgr->start({this});
+ server.msgr = crimson::net::Messenger::create(
+ entity_name_t::OSD(server.msgr_sid),
+ server.lname, 0, is_fixed_cpu);
+ server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
+ server.is_fixed_cpu = is_fixed_cpu;
+ return server.msgr->bind(entity_addrvec_t{addr}
+ ).safe_then([&server] {
+ return server.msgr->start({&server});
}, crimson::net::Messenger::bind_ertr::all_same_way(
[addr] (const std::error_code& e) {
logger().error("Server: "
}));
});
}
+
seastar::future<> shutdown() {
logger().info("{} shutdown...", lname);
- return seastar::smp::submit_to(msgr_sid, [this] {
- is_stopped = true;
- ceph_assert(msgr);
- msgr->stop();
- return msgr->shutdown(
- ).then([this] {
- if (fut_report.has_value()) {
- return std::move(fut_report.value());
+ return container().invoke_on(
+ msgr_sid, [](auto &server) {
+ server.is_stopped = true;
+ ceph_assert(server.msgr);
+ server.msgr->stop();
+ return server.msgr->shutdown(
+ ).then([&server] {
+ if (server.fut_report.has_value()) {
+ return std::move(server.fut_report.value());
} else {
return seastar::now();
}
});
}
- static seastar::future<ServerFRef> create(
- seastar::shard_id msgr_sid,
- unsigned msg_len,
- bool needs_report) {
- return seastar::smp::submit_to(
- msgr_sid, [msg_len, needs_report] {
- return seastar::make_foreign(
- std::make_unique<Server>(msg_len, needs_report));
- });
- }
-
private:
struct TimerReport {
unsigned elapsed = 0u;
- seastar::future<> ticktock() {
- return seastar::sleep(1s).then([this] {
+ seastar::future<> ticktock(bool is_fixed_cpu) {
+ return seastar::sleep(1s
+ ).then([this, is_fixed_cpu] {
++elapsed;
- std::ostringstream sout;
- sout << elapsed
- << "s -- server reactor utilization: "
- << get_reactor_utilization();
- std::cout << sout.str() << std::endl;
+ if (is_fixed_cpu) {
+ std::ostringstream sout;
+ sout << elapsed
+ << "s -- server reactor utilization: "
+ << get_reactor_utilization();
+ std::cout << sout.str() << std::endl;
+ return seastar::now();
+ } else {
+ return seastar::do_with(
+ std::vector<double>(seastar::smp::count),
+ [this](auto &rus) {
+ return seastar::smp::invoke_on_all([&rus] {
+ rus[seastar::this_shard_id()] = get_reactor_utilization();
+ }).then([this, &rus] {
+ std::ostringstream sout;
+ sout << elapsed
+ << "s -- server reactor utilization: ";
+ for (double ru : rus) {
+ sout << ru << ",";
+ }
+ std::cout << sout.str() << std::endl;
+ });
+ });
+ }
});
}
};
[this](auto &report) {
return seastar::do_until(
[this] { return is_stopped; },
- [&report] {
- return report.ticktock();
+ [&report, this] {
+ return report.ticktock(is_fixed_cpu);
}
);
+ }).then([this] {
+ logger().info("report is stopped!");
}).forward_to(std::move(pr_report));
}
};
server_needs_report = true;
}
return seastar::when_all(
- test_state::Server::create(
+ create_sharded<test_state::Server>(
server_conf.core,
server_conf.block_size,
server_needs_report),
"ms_crc_data", crc_enabled ? "true" : "false");
})
).then([=](auto&& ret) {
- auto fp_server = std::move(std::get<0>(ret).get0());
+ auto server = std::move(std::get<0>(ret).get0());
auto client = std::move(std::get<1>(ret).get0());
- test_state::Server* server = fp_server.get();
// reserve core 0 for potentially better performance
if (mode == perf_mode_t::both) {
logger().info("\nperf settings:\n smp={}\n {}\n {}\n",
ceph_assert(client_conf.num_clients > 0);
ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
return seastar::when_all_succeed(
- server->init(server_conf.addr),
+ // it is not reasonable to allow server/client to shared cores for
+ // performance benchmarking purposes.
+ server->init(server_conf.addr, server_conf.is_fixed_cpu),
client->init()
).then_unpack([client, addr = client_conf.server_addr] {
return client->connect_wait_verify(addr);
return client->dispatch_with_timer(ramptime, msgtime);
}).then([client] {
return client->shutdown();
- }).then([server, fp_server = std::move(fp_server)] () mutable {
- return server->shutdown().then([cleanup = std::move(fp_server)] {});
+ }).then([server] {
+ return server->shutdown();
});
} else if (mode == perf_mode_t::client) {
logger().info("\nperf settings:\n smp={}\n {}\n",
ceph_assert(seastar::smp::count > server_conf.core);
logger().info("\nperf settings:\n smp={}\n {}\n",
seastar::smp::count, server_conf.str());
- return seastar::async([server, server_conf, fp_server=std::move(fp_server)] {
+ return seastar::async([server, server_conf] {
// FIXME: SIGINT is not received by stop_signal
seastar_apps_lib::stop_signal should_stop;
- server->init(server_conf.addr).get();
+ server->init(server_conf.addr, server_conf.is_fixed_cpu).get();
should_stop.wait().get();
server->shutdown().get();
});
("server-fixed-cpu", bpo::value<bool>()->default_value(true),
"server is in the fixed cpu mode, non-fixed doesn't support the mode both")
("server-core", bpo::value<unsigned>()->default_value(1),
- "server running core")
+ "server messenger running core")
("server-bs", bpo::value<unsigned>()->default_value(0),
"server block size")
("crc-enabled", bpo::value<bool>()->default_value(false),