From b04daeadf5671911f83ccbbea7bf791ced5f7a51 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 31 May 2023 15:38:15 +0800 Subject: [PATCH] crimson/tools/perf_crimson_msgr: integrate multi-core messenger as server Signed-off-by: Yingxin Cheng --- src/crimson/tools/perf_crimson_msgr.cc | 144 ++++++++++++++----------- 1 file changed, 82 insertions(+), 62 deletions(-) diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc index 9761e8db3bfe4..71d74abbcc16a 100644 --- a/src/crimson/tools/perf_crimson_msgr.cc +++ b/src/crimson/tools/perf_crimson_msgr.cc @@ -143,8 +143,6 @@ struct server_config { conf.block_size = options["server-bs"].as(); conf.is_fixed_cpu = options["server-fixed-cpu"].as(); conf.core = options["server-core"].as(); - // TODO - ceph_assert_always(conf.is_fixed_cpu == false); return conf; } }; @@ -158,37 +156,42 @@ static seastar::future<> run( bool crc_enabled) { struct test_state { - struct Server; - using ServerFRef = seastar::foreign_ptr>; - struct Server final - : public crimson::net::Dispatcher { + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service { + // available only in msgr_sid crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; const seastar::shard_id msgr_sid; std::string lname; - unsigned msg_len; - bufferlist msg_data; + bool is_fixed_cpu = true; bool is_stopped = false; std::optional> fut_report; - Server(unsigned msg_len, bool needs_report) - : msgr_sid{seastar::this_shard_id()}, + // available in all shards + unsigned msg_len; + bufferlist msg_data; + + Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report) + : msgr_sid{msgr_sid}, msg_len{msg_len} { - lname = "server#"; - lname += std::to_string(msgr_sid); + lname = fmt::format("server@{}", msgr_sid); msg_data.append_zero(msg_len); - if (needs_report) { + if (seastar::this_shard_id() == msgr_sid && + needs_report) { start_report(); } } std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { + assert(c->get_shard_id() == seastar::this_shard_id()); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + auto &server = container().local(); + // server replies with MOSDOp to generate server-side write workload const static pg_t pgid; const static object_locator_t oloc; @@ -196,24 +199,27 @@ static seastar::future<> run( pgid.pool(), oloc.nspace); static spg_t spgid(pgid); auto rep = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); - bufferlist data(msg_data); - rep->write(0, msg_len, data); + bufferlist data(server.msg_data); + rep->write(0, server.msg_len, data); rep->set_tid(m->get_tid()); std::ignore = c->send(std::move(rep)); return {seastar::now()}; } - seastar::future<> init(const entity_addr_t& addr) { - return seastar::smp::submit_to(msgr_sid, [addr, this] { + seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) { + return container().invoke_on( + msgr_sid, [addr, is_fixed_cpu](auto &server) { // server msgr is always with nonce 0 - msgr = crimson::net::Messenger::create( - entity_name_t::OSD(msgr_sid), - lname, 0, true); - msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - msgr->set_auth_client(&dummy_auth); - msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { - return msgr->start({this}); + server.msgr = crimson::net::Messenger::create( + entity_name_t::OSD(server.msgr_sid), + server.lname, 0, is_fixed_cpu); + server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + server.msgr->set_auth_client(&server.dummy_auth); + server.msgr->set_auth_server(&server.dummy_auth); + server.is_fixed_cpu = is_fixed_cpu; + return server.msgr->bind(entity_addrvec_t{addr} + ).safe_then([&server] { + return server.msgr->start({&server}); }, crimson::net::Messenger::bind_ertr::all_same_way( [addr] (const std::error_code& e) { logger().error("Server: " @@ -222,16 +228,18 @@ static seastar::future<> run( })); }); } + seastar::future<> shutdown() { logger().info("{} shutdown...", lname); - return seastar::smp::submit_to(msgr_sid, [this] { - is_stopped = true; - ceph_assert(msgr); - msgr->stop(); - return msgr->shutdown( - ).then([this] { - if (fut_report.has_value()) { - return std::move(fut_report.value()); + return container().invoke_on( + msgr_sid, [](auto &server) { + server.is_stopped = true; + ceph_assert(server.msgr); + server.msgr->stop(); + return server.msgr->shutdown( + ).then([&server] { + if (server.fut_report.has_value()) { + return std::move(server.fut_report.value()); } else { return seastar::now(); } @@ -239,29 +247,38 @@ static seastar::future<> run( }); } - static seastar::future create( - seastar::shard_id msgr_sid, - unsigned msg_len, - bool needs_report) { - return seastar::smp::submit_to( - msgr_sid, [msg_len, needs_report] { - return seastar::make_foreign( - std::make_unique(msg_len, needs_report)); - }); - } - private: struct TimerReport { unsigned elapsed = 0u; - seastar::future<> ticktock() { - return seastar::sleep(1s).then([this] { + seastar::future<> ticktock(bool is_fixed_cpu) { + return seastar::sleep(1s + ).then([this, is_fixed_cpu] { ++elapsed; - std::ostringstream sout; - sout << elapsed - << "s -- server reactor utilization: " - << get_reactor_utilization(); - std::cout << sout.str() << std::endl; + if (is_fixed_cpu) { + std::ostringstream sout; + sout << elapsed + << "s -- server reactor utilization: " + << get_reactor_utilization(); + std::cout << sout.str() << std::endl; + return seastar::now(); + } else { + return seastar::do_with( + std::vector(seastar::smp::count), + [this](auto &rus) { + return seastar::smp::invoke_on_all([&rus] { + rus[seastar::this_shard_id()] = get_reactor_utilization(); + }).then([this, &rus] { + std::ostringstream sout; + sout << elapsed + << "s -- server reactor utilization: "; + for (double ru : rus) { + sout << ru << ","; + } + std::cout << sout.str() << std::endl; + }); + }); + } }); } }; @@ -274,10 +291,12 @@ static seastar::future<> run( [this](auto &report) { return seastar::do_until( [this] { return is_stopped; }, - [&report] { - return report.ticktock(); + [&report, this] { + return report.ticktock(is_fixed_cpu); } ); + }).then([this] { + logger().info("report is stopped!"); }).forward_to(std::move(pr_report)); } }; @@ -926,7 +945,7 @@ static seastar::future<> run( server_needs_report = true; } return seastar::when_all( - test_state::Server::create( + create_sharded( server_conf.core, server_conf.block_size, server_needs_report), @@ -945,9 +964,8 @@ static seastar::future<> run( "ms_crc_data", crc_enabled ? "true" : "false"); }) ).then([=](auto&& ret) { - auto fp_server = std::move(std::get<0>(ret).get0()); + auto server = std::move(std::get<0>(ret).get0()); auto client = std::move(std::get<1>(ret).get0()); - test_state::Server* server = fp_server.get(); // reserve core 0 for potentially better performance if (mode == perf_mode_t::both) { logger().info("\nperf settings:\n smp={}\n {}\n {}\n", @@ -956,7 +974,9 @@ static seastar::future<> run( ceph_assert(client_conf.num_clients > 0); ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients); return seastar::when_all_succeed( - server->init(server_conf.addr), + // it is not reasonable to allow server/client to shared cores for + // performance benchmarking purposes. + server->init(server_conf.addr, server_conf.is_fixed_cpu), client->init() ).then_unpack([client, addr = client_conf.server_addr] { return client->connect_wait_verify(addr); @@ -965,8 +985,8 @@ static seastar::future<> run( return client->dispatch_with_timer(ramptime, msgtime); }).then([client] { return client->shutdown(); - }).then([server, fp_server = std::move(fp_server)] () mutable { - return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }).then([server] { + return server->shutdown(); }); } else if (mode == perf_mode_t::client) { logger().info("\nperf settings:\n smp={}\n {}\n", @@ -986,10 +1006,10 @@ static seastar::future<> run( ceph_assert(seastar::smp::count > server_conf.core); logger().info("\nperf settings:\n smp={}\n {}\n", seastar::smp::count, server_conf.str()); - return seastar::async([server, server_conf, fp_server=std::move(fp_server)] { + return seastar::async([server, server_conf] { // FIXME: SIGINT is not received by stop_signal seastar_apps_lib::stop_signal should_stop; - server->init(server_conf.addr).get(); + server->init(server_conf.addr, server_conf.is_fixed_cpu).get(); should_stop.wait().get(); server->shutdown().get(); }); @@ -1024,7 +1044,7 @@ int main(int argc, char** argv) ("server-fixed-cpu", bpo::value()->default_value(true), "server is in the fixed cpu mode, non-fixed doesn't support the mode both") ("server-core", bpo::value()->default_value(1), - "server running core") + "server messenger running core") ("server-bs", bpo::value()->default_value(0), "server block size") ("crc-enabled", bpo::value()->default_value(false), -- 2.39.5