From: Yingxin Cheng Date: Wed, 28 Sep 2022 08:16:20 +0000 (+0800) Subject: crimson: merge files in tools/crimson into crimson/tools X-Git-Tag: v18.1.0~1057^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=410e13ba43cda01150a1c866253cb4b729eb015b;p=ceph.git crimson: merge files in tools/crimson into crimson/tools Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/tools/CMakeLists.txt b/src/crimson/tools/CMakeLists.txt index 52436c62dc48..d57c3f9cfc44 100644 --- a/src/crimson/tools/CMakeLists.txt +++ b/src/crimson/tools/CMakeLists.txt @@ -7,3 +7,12 @@ add_executable(crimson-store-nbd target_link_libraries(crimson-store-nbd crimson-os) install(TARGETS crimson-store-nbd DESTINATION bin) + +add_executable(perf-crimson-msgr perf_crimson_msgr.cc) +target_link_libraries(perf-crimson-msgr crimson) + +add_executable(perf-async-msgr perf_async_msgr.cc) +target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS}) + +add_executable(perf-staged-fltree perf_staged_fltree.cc) +target_link_libraries(perf-staged-fltree crimson-seastore) diff --git a/src/crimson/tools/perf_async_msgr.cc b/src/crimson/tools/perf_async_msgr.cc new file mode 100644 index 000000000000..b7b0ca606d35 --- /dev/null +++ b/src/crimson/tools/perf_async_msgr.cc @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include +#include + +#include "auth/Auth.h" +#include "global/global_init.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" +#include "messages/MOSDOp.h" + +#include "auth/DummyAuth.h" + +namespace { + +constexpr int CEPH_OSD_PROTOCOL = 10; + +struct Server { + Server(CephContext* cct, unsigned msg_len) + : dummy_auth(cct), dispatcher(cct, msg_len) + { + msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0)); + dummy_auth.auth_registry.refresh_config(); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + msgr->set_require_authorizer(false); + } + DummyAuthClientServer dummy_auth; + std::unique_ptr msgr; + struct ServerDispatcher : Dispatcher { + unsigned msg_len = 0; + bufferlist msg_data; + + ServerDispatcher(CephContext* cct, unsigned msg_len) + : Dispatcher(cct), msg_len(msg_len) + { + msg_data.append_zero(msg_len); + } + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_OSD_OP; + } + void ms_fast_dispatch(Message* m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + rep->set_tid(m->get_tid()); + m->get_connection()->send_message(rep); + m->put(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection*) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + } dispatcher; +}; + +} + +static void run(CephContext* cct, entity_addr_t addr, unsigned bs) +{ + std::cout << "async server listening at " << addr << std::endl; + Server server{cct, bs}; + server.msgr->bind(addr); + server.msgr->add_dispatcher_head(&server.dispatcher); + server.msgr->start(); + server.msgr->wait(); +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("addr", po::value()->default_value("v2:127.0.0.1:9010"), + "server address(crimson only supports msgr v2 protocol)") + ("bs", po::value()->default_value(0), + "server block size") + ("crc-enabled", po::value()->default_value(false), + "enable CRC checks"); + po::variables_map vm; + std::vector unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + auto addr = vm["addr"].as(); + entity_addr_t target_addr; + target_addr.parse(addr.c_str(), nullptr); + ceph_assert_always(target_addr.is_msgr2()); + auto bs = vm["bs"].as(); + auto crc_enabled = vm["crc-enabled"].as(); + + std::vector args(argv, argv + argc); + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + common_init_finish(cct.get()); + + if (crc_enabled) { + cct->_conf.set_val("ms_crc_header", "true"); + cct->_conf.set_val("ms_crc_data", "true"); + } else { + cct->_conf.set_val("ms_crc_header", "false"); + cct->_conf.set_val("ms_crc_data", "false"); + } + + run(cct.get(), target_addr, bs); +} diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc new file mode 100644 index 000000000000..e6b5b9a0508c --- /dev/null +++ b/src/crimson/tools/perf_crimson_msgr.cc @@ -0,0 +1,749 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "common/ceph_time.h" +#include "messages/MOSDOp.h" + +#include "crimson/auth/DummyAuth.h" +#include "crimson/common/log.h" +#include "crimson/common/config_proxy.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Messenger.h" + +using namespace std; +using namespace std::chrono_literals; + +namespace bpo = boost::program_options; + +namespace { + +template +using Ref = boost::intrusive_ptr; + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); +} + +template +seastar::future create_sharded(Args... args) { + // seems we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [=] { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args...).then([sharded_obj]() { + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().then([sharded_obj] {}); + }); + return sharded_obj.get(); + }); + }).then([] (seastar::sharded *ptr_shard) { + // return the pointer valid for the caller CPU + return &ptr_shard->local(); + }); +} + +enum class perf_mode_t { + both, + client, + server +}; + +struct client_config { + entity_addr_t server_addr; + unsigned block_size; + unsigned ramptime; + unsigned msgtime; + unsigned jobs; + unsigned depth; + + std::string str() const { + std::ostringstream out; + out << "client[>> " << server_addr + << "](bs=" << block_size + << ", ramptime=" << ramptime + << ", msgtime=" << msgtime + << ", jobs=" << jobs + << ", depth=" << depth + << ")"; + return out.str(); + } + + static client_config load(bpo::variables_map& options) { + client_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); + ceph_assert_always(addr.is_msgr2()); + + conf.server_addr = addr; + conf.block_size = options["cbs"].as(); + conf.ramptime = options["ramptime"].as(); + conf.msgtime = options["msgtime"].as(); + conf.jobs = options["jobs"].as(); + conf.depth = options["depth"].as(); + ceph_assert(conf.depth % conf.jobs == 0); + return conf; + } +}; + +struct server_config { + entity_addr_t addr; + unsigned block_size; + unsigned core; + + std::string str() const { + std::ostringstream out; + out << "server[" << addr + << "](bs=" << block_size + << ", core=" << core + << ")"; + return out.str(); + } + + static server_config load(bpo::variables_map& options) { + server_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); + ceph_assert_always(addr.is_msgr2()); + + conf.addr = addr; + conf.block_size = options["sbs"].as(); + conf.core = options["core"].as(); + return conf; + } +}; + +const unsigned SAMPLE_RATE = 7; + +static seastar::future<> run( + perf_mode_t mode, + const client_config& client_conf, + const server_config& server_conf, + bool crc_enabled) +{ + struct test_state { + struct Server; + using ServerFRef = seastar::foreign_ptr>; + + struct Server final + : public crimson::net::Dispatcher { + crimson::net::MessengerRef msgr; + crimson::auth::DummyAuthClientServer dummy_auth; + const seastar::shard_id msgr_sid; + std::string lname; + unsigned msg_len; + bufferlist msg_data; + + Server(unsigned msg_len) + : msgr_sid{seastar::this_shard_id()}, + msg_len{msg_len} { + lname = "server#"; + lname += std::to_string(msgr_sid); + msg_data.append_zero(msg_len); + } + + std::optional> ms_dispatch( + crimson::net::ConnectionRef c, MessageRef m) override { + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + + // server replies with MOSDOp to generate server-side write workload + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + auto rep = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + rep->set_tid(m->get_tid()); + std::ignore = c->send(std::move(rep)); + return {seastar::now()}; + } + + seastar::future<> init(const entity_addr_t& addr) { + return seastar::smp::submit_to(msgr_sid, [addr, this] { + // server msgr is always with nonce 0 + msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0); + msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { + return msgr->start({this}); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("Server: " + "there is another instance running at {}", addr); + ceph_abort(); + })); + }); + } + seastar::future<> shutdown() { + logger().info("{} shutdown...", lname); + return seastar::smp::submit_to(msgr_sid, [this] { + ceph_assert(msgr); + msgr->stop(); + return msgr->shutdown(); + }); + } + seastar::future<> wait() { + return seastar::smp::submit_to(msgr_sid, [this] { + ceph_assert(msgr); + return msgr->wait(); + }); + } + + static seastar::future create(seastar::shard_id msgr_sid, unsigned msg_len) { + return seastar::smp::submit_to(msgr_sid, [msg_len] { + return seastar::make_foreign(std::make_unique(msg_len)); + }); + } + }; + + struct Client final + : public crimson::net::Dispatcher, + public seastar::peering_sharded_service { + + struct ConnStats { + mono_time connecting_time = mono_clock::zero(); + mono_time connected_time = mono_clock::zero(); + unsigned received_count = 0u; + + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + + // for reporting only + mono_time finish_time = mono_clock::zero(); + + void start() { + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + total_lat_s = 0.0; + finish_time = mono_clock::zero(); + } + }; + ConnStats conn_stats; + + struct PeriodStats { + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + + // for reporting only + mono_time finish_time = mono_clock::zero(); + unsigned finish_count = 0u; + unsigned depth = 0u; + + void reset(unsigned received_count, PeriodStats* snap = nullptr) { + if (snap) { + snap->start_time = start_time; + snap->start_count = start_count; + snap->sampled_count = sampled_count; + snap->total_lat_s = total_lat_s; + snap->finish_time = mono_clock::now(); + snap->finish_count = received_count; + } + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + total_lat_s = 0.0; + } + }; + PeriodStats period_stats; + + const seastar::shard_id sid; + std::string lname; + + const unsigned jobs; + crimson::net::MessengerRef msgr; + const unsigned msg_len; + bufferlist msg_data; + const unsigned nr_depth; + seastar::semaphore depth; + std::vector time_msgs_sent; + crimson::auth::DummyAuthClientServer dummy_auth; + + unsigned sent_count = 0u; + crimson::net::ConnectionRef active_conn = nullptr; + + bool stop_send = false; + seastar::promise<> stopped_send_promise; + + Client(unsigned jobs, unsigned msg_len, unsigned depth) + : sid{seastar::this_shard_id()}, + jobs{jobs}, + msg_len{msg_len}, + nr_depth{depth/jobs}, + depth{nr_depth}, + time_msgs_sent{depth/jobs, mono_clock::zero()} { + lname = "client#"; + lname += std::to_string(sid); + msg_data.append_zero(msg_len); + } + + unsigned get_current_depth() const { + ceph_assert(depth.available_units() >= 0); + return nr_depth - depth.current(); + } + + void ms_handle_connect(crimson::net::ConnectionRef conn) override { + conn_stats.connected_time = mono_clock::now(); + } + std::optional> ms_dispatch( + crimson::net::ConnectionRef, MessageRef m) override { + // server replies with MOSDOp to generate server-side write workload + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); + + auto msg_id = m->get_tid(); + if (msg_id % SAMPLE_RATE == 0) { + auto index = msg_id % time_msgs_sent.size(); + ceph_assert(time_msgs_sent[index] != mono_clock::zero()); + std::chrono::duration cur_latency = mono_clock::now() - time_msgs_sent[index]; + conn_stats.total_lat_s += cur_latency.count(); + ++(conn_stats.sampled_count); + period_stats.total_lat_s += cur_latency.count(); + ++(period_stats.sampled_count); + time_msgs_sent[index] = mono_clock::zero(); + } + + ++(conn_stats.received_count); + depth.signal(1); + + return {seastar::now()}; + } + + // should start messenger at this shard? + bool is_active() { + ceph_assert(seastar::this_shard_id() == sid); + return sid != 0 && sid <= jobs; + } + + seastar::future<> init() { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); + client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); + client.msgr->set_require_authorizer(false); + client.msgr->set_auth_client(&client.dummy_auth); + client.msgr->set_auth_server(&client.dummy_auth); + return client.msgr->start({&client}); + } + return seastar::now(); + }); + } + + seastar::future<> shutdown() { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + logger().info("{} shutdown...", client.lname); + ceph_assert(client.msgr); + client.msgr->stop(); + return client.msgr->shutdown().then([&client] { + return client.stop_dispatch_messages(); + }); + } + return seastar::now(); + }); + } + + seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) { + return container().invoke_on_all([peer_addr] (auto& client) { + // start clients in active cores (#1 ~ #jobs) + if (client.is_active()) { + mono_time start_time = mono_clock::now(); + client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD); + // make sure handshake won't hurt the performance + return seastar::sleep(1s).then([&client, start_time] { + if (client.conn_stats.connected_time == mono_clock::zero()) { + logger().error("\n{} not connected after 1s!\n", client.lname); + ceph_assert(false); + } + client.conn_stats.connecting_time = start_time; + }); + } + return seastar::now(); + }); + } + + private: + class TimerReport { + private: + const unsigned jobs; + const unsigned msgtime; + const unsigned bytes_of_block; + + unsigned elapsed = 0u; + std::vector start_times; + std::vector snaps; + std::vector summaries; + + public: + TimerReport(unsigned jobs, unsigned msgtime, unsigned bs) + : jobs{jobs}, + msgtime{msgtime}, + bytes_of_block{bs}, + start_times{jobs, mono_clock::zero()}, + snaps{jobs}, + summaries{jobs} {} + + unsigned get_elapsed() const { return elapsed; } + + PeriodStats& get_snap_by_job(seastar::shard_id sid) { + ceph_assert(sid >= 1 && sid <= jobs); + return snaps[sid - 1]; + } + + ConnStats& get_summary_by_job(seastar::shard_id sid) { + ceph_assert(sid >= 1 && sid <= jobs); + return summaries[sid - 1]; + } + + bool should_stop() const { + return elapsed >= msgtime; + } + + seastar::future<> ticktock() { + return seastar::sleep(1s).then([this] { + ++elapsed; + }); + } + + void report_header() { + std::ostringstream sout; + sout << std::setfill(' ') + << std::setw(7) << "sec" + << std::setw(6) << "depth" + << std::setw(8) << "IOPS" + << std::setw(8) << "MB/s" + << std::setw(8) << "lat(ms)"; + std::cout << sout.str() << std::endl; + } + + void report_period() { + if (elapsed == 1) { + // init this->start_times at the first period + for (unsigned i=0; i elapsed_d = 0s; + unsigned depth = 0u; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + for (const auto& snap: snaps) { + elapsed_d += (snap.finish_time - snap.start_time); + depth += snap.depth; + ops += (snap.finish_count - snap.start_count); + sampled_count += snap.sampled_count; + total_lat_s += snap.total_lat_s; + } + double elapsed_s = elapsed_d.count() / jobs; + double iops = ops/elapsed_s; + std::ostringstream sout; + sout << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << depth + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (total_lat_s / sampled_count * 1000); + std::cout << sout.str() << std::endl; + } + + void report_summary() const { + std::chrono::duration elapsed_d = 0s; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + for (const auto& summary: summaries) { + elapsed_d += (summary.finish_time - summary.start_time); + ops += (summary.received_count - summary.start_count); + sampled_count += summary.sampled_count; + total_lat_s += summary.total_lat_s; + } + double elapsed_s = elapsed_d.count() / jobs; + double iops = ops / elapsed_s; + std::ostringstream sout; + sout << "--------------" + << " summary " + << "--------------\n" + << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << "-" + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (total_lat_s / sampled_count * 1000) + << "\n"; + std::cout << sout.str() << std::endl; + } + }; + + seastar::future<> report_period(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + PeriodStats& snap = report.get_snap_by_job(client.sid); + client.period_stats.reset(client.conn_stats.received_count, + &snap); + snap.depth = client.get_current_depth(); + } + }).then([&report] { + report.report_period(); + }); + } + + seastar::future<> report_summary(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + ConnStats& summary = report.get_summary_by_job(client.sid); + summary = client.conn_stats; + summary.finish_time = mono_clock::now(); + } + }).then([&report] { + report.report_summary(); + }); + } + + public: + seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) { + logger().info("[all clients]: start sending MOSDOps from {} clients", jobs); + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + client.do_dispatch_messages(client.active_conn.get()); + } + }).then([ramptime] { + logger().info("[all clients]: ramping up {} seconds...", ramptime); + return seastar::sleep(std::chrono::seconds(ramptime)); + }).then([this] { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + client.conn_stats.start(); + client.period_stats.reset(client.conn_stats.received_count); + } + }); + }).then([this, msgtime] { + logger().info("[all clients]: reporting {} seconds...\n", msgtime); + return seastar::do_with( + TimerReport(jobs, msgtime, msg_len), [this] (auto& report) { + report.report_header(); + return seastar::do_until( + [&report] { return report.should_stop(); }, + [&report, this] { + return report.ticktock().then([&report, this] { + // report period every 1s + return report_period(report); + }).then([&report, this] { + // report summary every 10s + if (report.get_elapsed() % 10 == 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + } + ).then([&report, this] { + // report the final summary + if (report.get_elapsed() % 10 != 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + }); + }); + } + + private: + seastar::future<> send_msg(crimson::net::Connection* conn) { + ceph_assert(seastar::this_shard_id() == sid); + return depth.wait(1).then([this, conn] { + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + auto m = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + m->write(0, msg_len, data); + // use tid as the identity of each round + m->set_tid(sent_count); + + // sample message latency + if (sent_count % SAMPLE_RATE == 0) { + auto index = sent_count % time_msgs_sent.size(); + ceph_assert(time_msgs_sent[index] == mono_clock::zero()); + time_msgs_sent[index] = mono_clock::now(); + } + + return conn->send(std::move(m)); + }); + } + + class DepthBroken: public std::exception {}; + + seastar::future<> stop_dispatch_messages() { + stop_send = true; + depth.broken(DepthBroken()); + return stopped_send_promise.get_future(); + } + + void do_dispatch_messages(crimson::net::Connection* conn) { + ceph_assert(seastar::this_shard_id() == sid); + ceph_assert(sent_count == 0); + conn_stats.start_time = mono_clock::now(); + // forwarded to stopped_send_promise + (void) seastar::do_until( + [this] { return stop_send; }, + [this, conn] { + sent_count += 1; + return send_msg(conn); + } + ).handle_exception_type([] (const DepthBroken& e) { + // ok, stopped by stop_dispatch_messages() + }).then([this, conn] { + std::chrono::duration dur_conn = conn_stats.connected_time - conn_stats.connecting_time; + std::chrono::duration dur_msg = mono_clock::now() - conn_stats.start_time; + unsigned ops = conn_stats.received_count - conn_stats.start_count; + logger().info("{}: stopped sending OSDOPs.\n" + "{}(depth={}):\n" + " connect time: {}s\n" + " messages received: {}\n" + " messaging time: {}s\n" + " latency: {}ms\n" + " IOPS: {}\n" + " throughput: {}MB/s\n", + *conn, + lname, + nr_depth, + dur_conn.count(), + ops, + dur_msg.count(), + conn_stats.total_lat_s / conn_stats.sampled_count * 1000, + ops / dur_msg.count(), + ops / dur_msg.count() * msg_len / 1048576); + stopped_send_promise.set_value(); + }); + } + }; + }; + + return seastar::when_all( + test_state::Server::create(server_conf.core, server_conf.block_size), + create_sharded(client_conf.jobs, client_conf.block_size, client_conf.depth), + crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] { + return crimson::common::local_conf().start(); + }).then([crc_enabled] { + return crimson::common::local_conf().set_val( + "ms_crc_data", crc_enabled ? "true" : "false"); + }) + ).then([=](auto&& ret) { + auto fp_server = std::move(std::get<0>(ret).get0()); + auto client = std::move(std::get<1>(ret).get0()); + test_state::Server* server = fp_server.get(); + if (mode == perf_mode_t::both) { + logger().info("\nperf settings:\n {}\n {}\n", + client_conf.str(), server_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + ceph_assert(seastar::smp::count >= 1+server_conf.core); + ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); + return seastar::when_all_succeed( + server->init(server_conf.addr), + client->init() + ).then_unpack([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).then([client] { + return client->shutdown(); + }).then([server, fp_server = std::move(fp_server)] () mutable { + return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }); + } else if (mode == perf_mode_t::client) { + logger().info("\nperf settings:\n {}\n", client_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + return client->init( + ).then([client, addr = client_conf.server_addr] { + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); + }).then([client] { + return client->shutdown(); + }); + } else { // mode == perf_mode_t::server + ceph_assert(seastar::smp::count >= 1+server_conf.core); + logger().info("\nperf settings:\n {}\n", server_conf.str()); + return server->init(server_conf.addr + // dispatch ops + ).then([server] { + return server->wait(); + // shutdown + }).then([server, fp_server = std::move(fp_server)] () mutable { + return server->shutdown().then([cleanup = std::move(fp_server)] {}); + }); + } + }).finally([] { + return crimson::common::sharded_conf().stop(); + }); +} + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("mode", bpo::value()->default_value(0), + "0: both, 1:client, 2:server") + ("addr", bpo::value()->default_value("v2:127.0.0.1:9010"), + "server address(only support msgr v2 protocol)") + ("ramptime", bpo::value()->default_value(5), + "seconds of client ramp-up time") + ("msgtime", bpo::value()->default_value(15), + "seconds of client messaging time") + ("jobs", bpo::value()->default_value(1), + "number of client jobs (messengers)") + ("cbs", bpo::value()->default_value(4096), + "client block size") + ("depth", bpo::value()->default_value(512), + "client io depth") + ("core", bpo::value()->default_value(0), + "server running core") + ("sbs", bpo::value()->default_value(0), + "server block size") + ("crc-enabled", bpo::value()->default_value(false), + "enable CRC checks"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto mode = config["mode"].as(); + ceph_assert(mode <= 2); + auto _mode = static_cast(mode); + bool crc_enabled = config["crc-enabled"].as(); + auto server_conf = server_config::load(config); + auto client_conf = client_config::load(config); + return run(_mode, client_conf, server_conf, crc_enabled + ).then([] { + logger().info("\nsuccessful!\n"); + }).handle_exception([] (auto eptr) { + logger().info("\nfailed!\n"); + return seastar::make_exception_future<>(eptr); + }); + }); +} diff --git a/src/crimson/tools/perf_staged_fltree.cc b/src/crimson/tools/perf_staged_fltree.cc new file mode 100644 index 000000000000..81b6217506f0 --- /dev/null +++ b/src/crimson/tools/perf_staged_fltree.cc @@ -0,0 +1,178 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include +#include + +#include "crimson/common/config_proxy.h" +#include "crimson/common/log.h" +#include "crimson/common/perf_counters_collection.h" +#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h" +#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h" + +#include "test/crimson/seastore/onode_tree/test_value.h" +#include "test/crimson/seastore/transaction_manager_test_state.h" + +using namespace crimson::os::seastore::onode; +namespace bpo = boost::program_options; + +seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); +} + +template +class PerfTree : public TMTestState { + public: + PerfTree(bool is_dummy) : is_dummy{is_dummy} {} + + seastar::future<> run(KVPool& kvs, double erase_ratio) { + return tm_setup().then([this, &kvs, erase_ratio] { + return seastar::async([this, &kvs, erase_ratio] { + auto tree = std::make_unique>(kvs, + (is_dummy ? NodeExtentManager::create_dummy(true) + : NodeExtentManager::create_seastore(*tm))); + { + auto t = create_mutate_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->bootstrap(tr); + }).unsafe_get(); + submit_transaction(std::move(t)); + } + { + auto t = create_mutate_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->insert(tr); + }).unsafe_get(); + auto start_time = mono_clock::now(); + submit_transaction(std::move(t)); + std::chrono::duration duration = mono_clock::now() - start_time; + logger().warn("submit_transaction() done! {}s", duration.count()); + } + { + // Note: create_weak_transaction() can also work, but too slow. + auto t = create_read_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->get_stats(tr); + }).unsafe_get(); + + with_trans_intr(*t, [&](auto &tr){ + return tree->validate(tr); + }).unsafe_get(); + } + { + auto t = create_mutate_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->erase(tr, kvs.size() * erase_ratio); + }).unsafe_get(); + submit_transaction(std::move(t)); + } + { + auto t = create_read_transaction(); + with_trans_intr(*t, [&](auto &tr){ + return tree->get_stats(tr); + }).unsafe_get(); + + with_trans_intr(*t, [&](auto &tr){ + return tree->validate(tr); + }).unsafe_get(); + } + tree.reset(); + }); + }).then([this] { + return tm_teardown(); + }); + } + + private: + bool is_dummy; +}; + +template +seastar::future<> run(const bpo::variables_map& config) { + return seastar::async([&config] { + auto backend = config["backend"].as(); + bool is_dummy; + if (backend == "dummy") { + is_dummy = true; + } else if (backend == "seastore") { + is_dummy = false; + } else { + ceph_abort(false && "invalid backend"); + } + auto ns_sizes = config["ns-sizes"].as>(); + auto oid_sizes = config["oid-sizes"].as>(); + auto onode_sizes = config["onode-sizes"].as>(); + auto range2 = config["range2"].as>(); + ceph_assert(range2.size() == 2); + auto range1 = config["range1"].as>(); + ceph_assert(range1.size() == 2); + auto range0 = config["range0"].as>(); + ceph_assert(range0.size() == 2); + auto erase_ratio = config["erase-ratio"].as(); + ceph_assert(erase_ratio >= 0); + ceph_assert(erase_ratio <= 1); + + using crimson::common::sharded_conf; + sharded_conf().start(EntityName{}, std::string_view{"ceph"}).get(); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); + + using crimson::common::sharded_perf_coll; + sharded_perf_coll().start().get(); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + + auto kvs = KVPool::create_raw_range( + ns_sizes, oid_sizes, onode_sizes, + {range2[0], range2[1]}, + {range1[0], range1[1]}, + {range0[0], range0[1]}); + PerfTree perf{is_dummy}; + perf.run(kvs, erase_ratio).get0(); + }); +} + + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("backend", bpo::value()->default_value("dummy"), + "tree backend: dummy, seastore") + ("tracked", bpo::value()->default_value(false), + "track inserted cursors") + ("ns-sizes", bpo::value>()->default_value( + {8, 11, 64, 128, 255, 256}), + "sizes of ns strings") + ("oid-sizes", bpo::value>()->default_value( + {8, 13, 64, 512, 2035, 2048}), + "sizes of oid strings") + ("onode-sizes", bpo::value>()->default_value( + {8, 16, 128, 576, 992, 1200}), + "sizes of onode") + ("range2", bpo::value>()->default_value( + {0, 128}), + "range of shard-pool-crush [a, b)") + ("range1", bpo::value>()->default_value( + {0, 10}), + "range of ns-oid strings [a, b)") + ("range0", bpo::value>()->default_value( + {0, 4}), + "range of snap-gen [a, b)") + ("erase-ratio", bpo::value()->default_value( + 0.8), + "erase-ratio of all the inserted onodes"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + auto tracked = config["tracked"].as(); + if (tracked) { + return run(config); + } else { + return run(config); + } + }); +} diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 4fe63fe4686e..aeb9d0248ea6 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -148,10 +148,6 @@ if(WITH_RBD) endif() endif(WITH_RBD) -if(WITH_SEASTAR) - add_subdirectory(crimson) -endif() - add_subdirectory(immutable_object_cache) add_subdirectory(ceph-dencoder) add_subdirectory(erasure-code) diff --git a/src/tools/crimson/CMakeLists.txt b/src/tools/crimson/CMakeLists.txt deleted file mode 100644 index 19a2cfa9170c..000000000000 --- a/src/tools/crimson/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -add_executable(perf-crimson-msgr perf_crimson_msgr.cc) -target_link_libraries(perf-crimson-msgr crimson) - -add_executable(perf-async-msgr perf_async_msgr.cc) -target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS}) - -add_executable(perf-staged-fltree perf_staged_fltree.cc) -target_link_libraries(perf-staged-fltree crimson-seastore) diff --git a/src/tools/crimson/perf_async_msgr.cc b/src/tools/crimson/perf_async_msgr.cc deleted file mode 100644 index b7b0ca606d35..000000000000 --- a/src/tools/crimson/perf_async_msgr.cc +++ /dev/null @@ -1,141 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- - -#include -#include - -#include "auth/Auth.h" -#include "global/global_init.h" -#include "msg/Dispatcher.h" -#include "msg/Messenger.h" -#include "messages/MOSDOp.h" - -#include "auth/DummyAuth.h" - -namespace { - -constexpr int CEPH_OSD_PROTOCOL = 10; - -struct Server { - Server(CephContext* cct, unsigned msg_len) - : dummy_auth(cct), dispatcher(cct, msg_len) - { - msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0)); - dummy_auth.auth_registry.refresh_config(); - msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); - msgr->set_default_policy(Messenger::Policy::stateless_server(0)); - msgr->set_auth_client(&dummy_auth); - msgr->set_auth_server(&dummy_auth); - msgr->set_require_authorizer(false); - } - DummyAuthClientServer dummy_auth; - std::unique_ptr msgr; - struct ServerDispatcher : Dispatcher { - unsigned msg_len = 0; - bufferlist msg_data; - - ServerDispatcher(CephContext* cct, unsigned msg_len) - : Dispatcher(cct), msg_len(msg_len) - { - msg_data.append_zero(msg_len); - } - bool ms_can_fast_dispatch_any() const override { - return true; - } - bool ms_can_fast_dispatch(const Message* m) const override { - return m->get_type() == CEPH_MSG_OSD_OP; - } - void ms_fast_dispatch(Message* m) override { - ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - const static pg_t pgid; - const static object_locator_t oloc; - const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), - pgid.pool(), oloc.nspace); - static spg_t spgid(pgid); - MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); - bufferlist data(msg_data); - rep->write(0, msg_len, data); - rep->set_tid(m->get_tid()); - m->get_connection()->send_message(rep); - m->put(); - } - bool ms_dispatch(Message*) override { - ceph_abort(); - } - bool ms_handle_reset(Connection*) override { - return true; - } - void ms_handle_remote_reset(Connection*) override { - } - bool ms_handle_refused(Connection*) override { - return true; - } - } dispatcher; -}; - -} - -static void run(CephContext* cct, entity_addr_t addr, unsigned bs) -{ - std::cout << "async server listening at " << addr << std::endl; - Server server{cct, bs}; - server.msgr->bind(addr); - server.msgr->add_dispatcher_head(&server.dispatcher); - server.msgr->start(); - server.msgr->wait(); -} - -int main(int argc, char** argv) -{ - namespace po = boost::program_options; - po::options_description desc{"Allowed options"}; - desc.add_options() - ("help,h", "show help message") - ("addr", po::value()->default_value("v2:127.0.0.1:9010"), - "server address(crimson only supports msgr v2 protocol)") - ("bs", po::value()->default_value(0), - "server block size") - ("crc-enabled", po::value()->default_value(false), - "enable CRC checks"); - po::variables_map vm; - std::vector unrecognized_options; - try { - auto parsed = po::command_line_parser(argc, argv) - .options(desc) - .allow_unregistered() - .run(); - po::store(parsed, vm); - if (vm.count("help")) { - std::cout << desc << std::endl; - return 0; - } - po::notify(vm); - unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); - } catch(const po::error& e) { - std::cerr << "error: " << e.what() << std::endl; - return 1; - } - - auto addr = vm["addr"].as(); - entity_addr_t target_addr; - target_addr.parse(addr.c_str(), nullptr); - ceph_assert_always(target_addr.is_msgr2()); - auto bs = vm["bs"].as(); - auto crc_enabled = vm["crc-enabled"].as(); - - std::vector args(argv, argv + argc); - auto cct = global_init(nullptr, args, - CEPH_ENTITY_TYPE_CLIENT, - CODE_ENVIRONMENT_UTILITY, - CINIT_FLAG_NO_MON_CONFIG); - common_init_finish(cct.get()); - - if (crc_enabled) { - cct->_conf.set_val("ms_crc_header", "true"); - cct->_conf.set_val("ms_crc_data", "true"); - } else { - cct->_conf.set_val("ms_crc_header", "false"); - cct->_conf.set_val("ms_crc_data", "false"); - } - - run(cct.get(), target_addr, bs); -} diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc deleted file mode 100644 index e6b5b9a0508c..000000000000 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ /dev/null @@ -1,749 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "common/ceph_time.h" -#include "messages/MOSDOp.h" - -#include "crimson/auth/DummyAuth.h" -#include "crimson/common/log.h" -#include "crimson/common/config_proxy.h" -#include "crimson/net/Connection.h" -#include "crimson/net/Dispatcher.h" -#include "crimson/net/Messenger.h" - -using namespace std; -using namespace std::chrono_literals; - -namespace bpo = boost::program_options; - -namespace { - -template -using Ref = boost::intrusive_ptr; - -seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_ms); -} - -template -seastar::future create_sharded(Args... args) { - // seems we should only construct/stop shards on #0 - return seastar::smp::submit_to(0, [=] { - auto sharded_obj = seastar::make_lw_shared>(); - return sharded_obj->start(args...).then([sharded_obj]() { - seastar::engine().at_exit([sharded_obj]() { - return sharded_obj->stop().then([sharded_obj] {}); - }); - return sharded_obj.get(); - }); - }).then([] (seastar::sharded *ptr_shard) { - // return the pointer valid for the caller CPU - return &ptr_shard->local(); - }); -} - -enum class perf_mode_t { - both, - client, - server -}; - -struct client_config { - entity_addr_t server_addr; - unsigned block_size; - unsigned ramptime; - unsigned msgtime; - unsigned jobs; - unsigned depth; - - std::string str() const { - std::ostringstream out; - out << "client[>> " << server_addr - << "](bs=" << block_size - << ", ramptime=" << ramptime - << ", msgtime=" << msgtime - << ", jobs=" << jobs - << ", depth=" << depth - << ")"; - return out.str(); - } - - static client_config load(bpo::variables_map& options) { - client_config conf; - entity_addr_t addr; - ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); - ceph_assert_always(addr.is_msgr2()); - - conf.server_addr = addr; - conf.block_size = options["cbs"].as(); - conf.ramptime = options["ramptime"].as(); - conf.msgtime = options["msgtime"].as(); - conf.jobs = options["jobs"].as(); - conf.depth = options["depth"].as(); - ceph_assert(conf.depth % conf.jobs == 0); - return conf; - } -}; - -struct server_config { - entity_addr_t addr; - unsigned block_size; - unsigned core; - - std::string str() const { - std::ostringstream out; - out << "server[" << addr - << "](bs=" << block_size - << ", core=" << core - << ")"; - return out.str(); - } - - static server_config load(bpo::variables_map& options) { - server_config conf; - entity_addr_t addr; - ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); - ceph_assert_always(addr.is_msgr2()); - - conf.addr = addr; - conf.block_size = options["sbs"].as(); - conf.core = options["core"].as(); - return conf; - } -}; - -const unsigned SAMPLE_RATE = 7; - -static seastar::future<> run( - perf_mode_t mode, - const client_config& client_conf, - const server_config& server_conf, - bool crc_enabled) -{ - struct test_state { - struct Server; - using ServerFRef = seastar::foreign_ptr>; - - struct Server final - : public crimson::net::Dispatcher { - crimson::net::MessengerRef msgr; - crimson::auth::DummyAuthClientServer dummy_auth; - const seastar::shard_id msgr_sid; - std::string lname; - unsigned msg_len; - bufferlist msg_data; - - Server(unsigned msg_len) - : msgr_sid{seastar::this_shard_id()}, - msg_len{msg_len} { - lname = "server#"; - lname += std::to_string(msgr_sid); - msg_data.append_zero(msg_len); - } - - std::optional> ms_dispatch( - crimson::net::ConnectionRef c, MessageRef m) override { - ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - - // server replies with MOSDOp to generate server-side write workload - const static pg_t pgid; - const static object_locator_t oloc; - const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), - pgid.pool(), oloc.nspace); - static spg_t spgid(pgid); - auto rep = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); - bufferlist data(msg_data); - rep->write(0, msg_len, data); - rep->set_tid(m->get_tid()); - std::ignore = c->send(std::move(rep)); - return {seastar::now()}; - } - - seastar::future<> init(const entity_addr_t& addr) { - return seastar::smp::submit_to(msgr_sid, [addr, this] { - // server msgr is always with nonce 0 - msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0); - msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); - msgr->set_auth_client(&dummy_auth); - msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { - return msgr->start({this}); - }, crimson::net::Messenger::bind_ertr::all_same_way( - [addr] (const std::error_code& e) { - logger().error("Server: " - "there is another instance running at {}", addr); - ceph_abort(); - })); - }); - } - seastar::future<> shutdown() { - logger().info("{} shutdown...", lname); - return seastar::smp::submit_to(msgr_sid, [this] { - ceph_assert(msgr); - msgr->stop(); - return msgr->shutdown(); - }); - } - seastar::future<> wait() { - return seastar::smp::submit_to(msgr_sid, [this] { - ceph_assert(msgr); - return msgr->wait(); - }); - } - - static seastar::future create(seastar::shard_id msgr_sid, unsigned msg_len) { - return seastar::smp::submit_to(msgr_sid, [msg_len] { - return seastar::make_foreign(std::make_unique(msg_len)); - }); - } - }; - - struct Client final - : public crimson::net::Dispatcher, - public seastar::peering_sharded_service { - - struct ConnStats { - mono_time connecting_time = mono_clock::zero(); - mono_time connected_time = mono_clock::zero(); - unsigned received_count = 0u; - - mono_time start_time = mono_clock::zero(); - unsigned start_count = 0u; - - unsigned sampled_count = 0u; - double total_lat_s = 0.0; - - // for reporting only - mono_time finish_time = mono_clock::zero(); - - void start() { - start_time = mono_clock::now(); - start_count = received_count; - sampled_count = 0u; - total_lat_s = 0.0; - finish_time = mono_clock::zero(); - } - }; - ConnStats conn_stats; - - struct PeriodStats { - mono_time start_time = mono_clock::zero(); - unsigned start_count = 0u; - unsigned sampled_count = 0u; - double total_lat_s = 0.0; - - // for reporting only - mono_time finish_time = mono_clock::zero(); - unsigned finish_count = 0u; - unsigned depth = 0u; - - void reset(unsigned received_count, PeriodStats* snap = nullptr) { - if (snap) { - snap->start_time = start_time; - snap->start_count = start_count; - snap->sampled_count = sampled_count; - snap->total_lat_s = total_lat_s; - snap->finish_time = mono_clock::now(); - snap->finish_count = received_count; - } - start_time = mono_clock::now(); - start_count = received_count; - sampled_count = 0u; - total_lat_s = 0.0; - } - }; - PeriodStats period_stats; - - const seastar::shard_id sid; - std::string lname; - - const unsigned jobs; - crimson::net::MessengerRef msgr; - const unsigned msg_len; - bufferlist msg_data; - const unsigned nr_depth; - seastar::semaphore depth; - std::vector time_msgs_sent; - crimson::auth::DummyAuthClientServer dummy_auth; - - unsigned sent_count = 0u; - crimson::net::ConnectionRef active_conn = nullptr; - - bool stop_send = false; - seastar::promise<> stopped_send_promise; - - Client(unsigned jobs, unsigned msg_len, unsigned depth) - : sid{seastar::this_shard_id()}, - jobs{jobs}, - msg_len{msg_len}, - nr_depth{depth/jobs}, - depth{nr_depth}, - time_msgs_sent{depth/jobs, mono_clock::zero()} { - lname = "client#"; - lname += std::to_string(sid); - msg_data.append_zero(msg_len); - } - - unsigned get_current_depth() const { - ceph_assert(depth.available_units() >= 0); - return nr_depth - depth.current(); - } - - void ms_handle_connect(crimson::net::ConnectionRef conn) override { - conn_stats.connected_time = mono_clock::now(); - } - std::optional> ms_dispatch( - crimson::net::ConnectionRef, MessageRef m) override { - // server replies with MOSDOp to generate server-side write workload - ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - - auto msg_id = m->get_tid(); - if (msg_id % SAMPLE_RATE == 0) { - auto index = msg_id % time_msgs_sent.size(); - ceph_assert(time_msgs_sent[index] != mono_clock::zero()); - std::chrono::duration cur_latency = mono_clock::now() - time_msgs_sent[index]; - conn_stats.total_lat_s += cur_latency.count(); - ++(conn_stats.sampled_count); - period_stats.total_lat_s += cur_latency.count(); - ++(period_stats.sampled_count); - time_msgs_sent[index] = mono_clock::zero(); - } - - ++(conn_stats.received_count); - depth.signal(1); - - return {seastar::now()}; - } - - // should start messenger at this shard? - bool is_active() { - ceph_assert(seastar::this_shard_id() == sid); - return sid != 0 && sid <= jobs; - } - - seastar::future<> init() { - return container().invoke_on_all([] (auto& client) { - if (client.is_active()) { - client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); - client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); - client.msgr->set_require_authorizer(false); - client.msgr->set_auth_client(&client.dummy_auth); - client.msgr->set_auth_server(&client.dummy_auth); - return client.msgr->start({&client}); - } - return seastar::now(); - }); - } - - seastar::future<> shutdown() { - return container().invoke_on_all([] (auto& client) { - if (client.is_active()) { - logger().info("{} shutdown...", client.lname); - ceph_assert(client.msgr); - client.msgr->stop(); - return client.msgr->shutdown().then([&client] { - return client.stop_dispatch_messages(); - }); - } - return seastar::now(); - }); - } - - seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) { - return container().invoke_on_all([peer_addr] (auto& client) { - // start clients in active cores (#1 ~ #jobs) - if (client.is_active()) { - mono_time start_time = mono_clock::now(); - client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD); - // make sure handshake won't hurt the performance - return seastar::sleep(1s).then([&client, start_time] { - if (client.conn_stats.connected_time == mono_clock::zero()) { - logger().error("\n{} not connected after 1s!\n", client.lname); - ceph_assert(false); - } - client.conn_stats.connecting_time = start_time; - }); - } - return seastar::now(); - }); - } - - private: - class TimerReport { - private: - const unsigned jobs; - const unsigned msgtime; - const unsigned bytes_of_block; - - unsigned elapsed = 0u; - std::vector start_times; - std::vector snaps; - std::vector summaries; - - public: - TimerReport(unsigned jobs, unsigned msgtime, unsigned bs) - : jobs{jobs}, - msgtime{msgtime}, - bytes_of_block{bs}, - start_times{jobs, mono_clock::zero()}, - snaps{jobs}, - summaries{jobs} {} - - unsigned get_elapsed() const { return elapsed; } - - PeriodStats& get_snap_by_job(seastar::shard_id sid) { - ceph_assert(sid >= 1 && sid <= jobs); - return snaps[sid - 1]; - } - - ConnStats& get_summary_by_job(seastar::shard_id sid) { - ceph_assert(sid >= 1 && sid <= jobs); - return summaries[sid - 1]; - } - - bool should_stop() const { - return elapsed >= msgtime; - } - - seastar::future<> ticktock() { - return seastar::sleep(1s).then([this] { - ++elapsed; - }); - } - - void report_header() { - std::ostringstream sout; - sout << std::setfill(' ') - << std::setw(7) << "sec" - << std::setw(6) << "depth" - << std::setw(8) << "IOPS" - << std::setw(8) << "MB/s" - << std::setw(8) << "lat(ms)"; - std::cout << sout.str() << std::endl; - } - - void report_period() { - if (elapsed == 1) { - // init this->start_times at the first period - for (unsigned i=0; i elapsed_d = 0s; - unsigned depth = 0u; - unsigned ops = 0u; - unsigned sampled_count = 0u; - double total_lat_s = 0.0; - for (const auto& snap: snaps) { - elapsed_d += (snap.finish_time - snap.start_time); - depth += snap.depth; - ops += (snap.finish_count - snap.start_count); - sampled_count += snap.sampled_count; - total_lat_s += snap.total_lat_s; - } - double elapsed_s = elapsed_d.count() / jobs; - double iops = ops/elapsed_s; - std::ostringstream sout; - sout << setfill(' ') - << std::setw(7) << elapsed_s - << std::setw(6) << depth - << std::setw(8) << iops - << std::setw(8) << iops * bytes_of_block / 1048576 - << std::setw(8) << (total_lat_s / sampled_count * 1000); - std::cout << sout.str() << std::endl; - } - - void report_summary() const { - std::chrono::duration elapsed_d = 0s; - unsigned ops = 0u; - unsigned sampled_count = 0u; - double total_lat_s = 0.0; - for (const auto& summary: summaries) { - elapsed_d += (summary.finish_time - summary.start_time); - ops += (summary.received_count - summary.start_count); - sampled_count += summary.sampled_count; - total_lat_s += summary.total_lat_s; - } - double elapsed_s = elapsed_d.count() / jobs; - double iops = ops / elapsed_s; - std::ostringstream sout; - sout << "--------------" - << " summary " - << "--------------\n" - << setfill(' ') - << std::setw(7) << elapsed_s - << std::setw(6) << "-" - << std::setw(8) << iops - << std::setw(8) << iops * bytes_of_block / 1048576 - << std::setw(8) << (total_lat_s / sampled_count * 1000) - << "\n"; - std::cout << sout.str() << std::endl; - } - }; - - seastar::future<> report_period(TimerReport& report) { - return container().invoke_on_all([&report] (auto& client) { - if (client.is_active()) { - PeriodStats& snap = report.get_snap_by_job(client.sid); - client.period_stats.reset(client.conn_stats.received_count, - &snap); - snap.depth = client.get_current_depth(); - } - }).then([&report] { - report.report_period(); - }); - } - - seastar::future<> report_summary(TimerReport& report) { - return container().invoke_on_all([&report] (auto& client) { - if (client.is_active()) { - ConnStats& summary = report.get_summary_by_job(client.sid); - summary = client.conn_stats; - summary.finish_time = mono_clock::now(); - } - }).then([&report] { - report.report_summary(); - }); - } - - public: - seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) { - logger().info("[all clients]: start sending MOSDOps from {} clients", jobs); - return container().invoke_on_all([] (auto& client) { - if (client.is_active()) { - client.do_dispatch_messages(client.active_conn.get()); - } - }).then([ramptime] { - logger().info("[all clients]: ramping up {} seconds...", ramptime); - return seastar::sleep(std::chrono::seconds(ramptime)); - }).then([this] { - return container().invoke_on_all([] (auto& client) { - if (client.is_active()) { - client.conn_stats.start(); - client.period_stats.reset(client.conn_stats.received_count); - } - }); - }).then([this, msgtime] { - logger().info("[all clients]: reporting {} seconds...\n", msgtime); - return seastar::do_with( - TimerReport(jobs, msgtime, msg_len), [this] (auto& report) { - report.report_header(); - return seastar::do_until( - [&report] { return report.should_stop(); }, - [&report, this] { - return report.ticktock().then([&report, this] { - // report period every 1s - return report_period(report); - }).then([&report, this] { - // report summary every 10s - if (report.get_elapsed() % 10 == 0) { - return report_summary(report); - } else { - return seastar::now(); - } - }); - } - ).then([&report, this] { - // report the final summary - if (report.get_elapsed() % 10 != 0) { - return report_summary(report); - } else { - return seastar::now(); - } - }); - }); - }); - } - - private: - seastar::future<> send_msg(crimson::net::Connection* conn) { - ceph_assert(seastar::this_shard_id() == sid); - return depth.wait(1).then([this, conn] { - const static pg_t pgid; - const static object_locator_t oloc; - const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), - pgid.pool(), oloc.nspace); - static spg_t spgid(pgid); - auto m = crimson::make_message(0, 0, hobj, spgid, 0, 0, 0); - bufferlist data(msg_data); - m->write(0, msg_len, data); - // use tid as the identity of each round - m->set_tid(sent_count); - - // sample message latency - if (sent_count % SAMPLE_RATE == 0) { - auto index = sent_count % time_msgs_sent.size(); - ceph_assert(time_msgs_sent[index] == mono_clock::zero()); - time_msgs_sent[index] = mono_clock::now(); - } - - return conn->send(std::move(m)); - }); - } - - class DepthBroken: public std::exception {}; - - seastar::future<> stop_dispatch_messages() { - stop_send = true; - depth.broken(DepthBroken()); - return stopped_send_promise.get_future(); - } - - void do_dispatch_messages(crimson::net::Connection* conn) { - ceph_assert(seastar::this_shard_id() == sid); - ceph_assert(sent_count == 0); - conn_stats.start_time = mono_clock::now(); - // forwarded to stopped_send_promise - (void) seastar::do_until( - [this] { return stop_send; }, - [this, conn] { - sent_count += 1; - return send_msg(conn); - } - ).handle_exception_type([] (const DepthBroken& e) { - // ok, stopped by stop_dispatch_messages() - }).then([this, conn] { - std::chrono::duration dur_conn = conn_stats.connected_time - conn_stats.connecting_time; - std::chrono::duration dur_msg = mono_clock::now() - conn_stats.start_time; - unsigned ops = conn_stats.received_count - conn_stats.start_count; - logger().info("{}: stopped sending OSDOPs.\n" - "{}(depth={}):\n" - " connect time: {}s\n" - " messages received: {}\n" - " messaging time: {}s\n" - " latency: {}ms\n" - " IOPS: {}\n" - " throughput: {}MB/s\n", - *conn, - lname, - nr_depth, - dur_conn.count(), - ops, - dur_msg.count(), - conn_stats.total_lat_s / conn_stats.sampled_count * 1000, - ops / dur_msg.count(), - ops / dur_msg.count() * msg_len / 1048576); - stopped_send_promise.set_value(); - }); - } - }; - }; - - return seastar::when_all( - test_state::Server::create(server_conf.core, server_conf.block_size), - create_sharded(client_conf.jobs, client_conf.block_size, client_conf.depth), - crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] { - return crimson::common::local_conf().start(); - }).then([crc_enabled] { - return crimson::common::local_conf().set_val( - "ms_crc_data", crc_enabled ? "true" : "false"); - }) - ).then([=](auto&& ret) { - auto fp_server = std::move(std::get<0>(ret).get0()); - auto client = std::move(std::get<1>(ret).get0()); - test_state::Server* server = fp_server.get(); - if (mode == perf_mode_t::both) { - logger().info("\nperf settings:\n {}\n {}\n", - client_conf.str(), server_conf.str()); - ceph_assert(seastar::smp::count >= 1+client_conf.jobs); - ceph_assert(client_conf.jobs > 0); - ceph_assert(seastar::smp::count >= 1+server_conf.core); - ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); - return seastar::when_all_succeed( - server->init(server_conf.addr), - client->init() - ).then_unpack([client, addr = client_conf.server_addr] { - return client->connect_wait_verify(addr); - }).then([client, ramptime = client_conf.ramptime, - msgtime = client_conf.msgtime] { - return client->dispatch_with_timer(ramptime, msgtime); - }).then([client] { - return client->shutdown(); - }).then([server, fp_server = std::move(fp_server)] () mutable { - return server->shutdown().then([cleanup = std::move(fp_server)] {}); - }); - } else if (mode == perf_mode_t::client) { - logger().info("\nperf settings:\n {}\n", client_conf.str()); - ceph_assert(seastar::smp::count >= 1+client_conf.jobs); - ceph_assert(client_conf.jobs > 0); - return client->init( - ).then([client, addr = client_conf.server_addr] { - return client->connect_wait_verify(addr); - }).then([client, ramptime = client_conf.ramptime, - msgtime = client_conf.msgtime] { - return client->dispatch_with_timer(ramptime, msgtime); - }).then([client] { - return client->shutdown(); - }); - } else { // mode == perf_mode_t::server - ceph_assert(seastar::smp::count >= 1+server_conf.core); - logger().info("\nperf settings:\n {}\n", server_conf.str()); - return server->init(server_conf.addr - // dispatch ops - ).then([server] { - return server->wait(); - // shutdown - }).then([server, fp_server = std::move(fp_server)] () mutable { - return server->shutdown().then([cleanup = std::move(fp_server)] {}); - }); - } - }).finally([] { - return crimson::common::sharded_conf().stop(); - }); -} - -} - -int main(int argc, char** argv) -{ - seastar::app_template app; - app.add_options() - ("mode", bpo::value()->default_value(0), - "0: both, 1:client, 2:server") - ("addr", bpo::value()->default_value("v2:127.0.0.1:9010"), - "server address(only support msgr v2 protocol)") - ("ramptime", bpo::value()->default_value(5), - "seconds of client ramp-up time") - ("msgtime", bpo::value()->default_value(15), - "seconds of client messaging time") - ("jobs", bpo::value()->default_value(1), - "number of client jobs (messengers)") - ("cbs", bpo::value()->default_value(4096), - "client block size") - ("depth", bpo::value()->default_value(512), - "client io depth") - ("core", bpo::value()->default_value(0), - "server running core") - ("sbs", bpo::value()->default_value(0), - "server block size") - ("crc-enabled", bpo::value()->default_value(false), - "enable CRC checks"); - return app.run(argc, argv, [&app] { - auto&& config = app.configuration(); - auto mode = config["mode"].as(); - ceph_assert(mode <= 2); - auto _mode = static_cast(mode); - bool crc_enabled = config["crc-enabled"].as(); - auto server_conf = server_config::load(config); - auto client_conf = client_config::load(config); - return run(_mode, client_conf, server_conf, crc_enabled - ).then([] { - logger().info("\nsuccessful!\n"); - }).handle_exception([] (auto eptr) { - logger().info("\nfailed!\n"); - return seastar::make_exception_future<>(eptr); - }); - }); -} diff --git a/src/tools/crimson/perf_staged_fltree.cc b/src/tools/crimson/perf_staged_fltree.cc deleted file mode 100644 index 81b6217506f0..000000000000 --- a/src/tools/crimson/perf_staged_fltree.cc +++ /dev/null @@ -1,178 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab - -#include - -#include -#include - -#include "crimson/common/config_proxy.h" -#include "crimson/common/log.h" -#include "crimson/common/perf_counters_collection.h" -#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h" -#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h" - -#include "test/crimson/seastore/onode_tree/test_value.h" -#include "test/crimson/seastore/transaction_manager_test_state.h" - -using namespace crimson::os::seastore::onode; -namespace bpo = boost::program_options; - -seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_test); -} - -template -class PerfTree : public TMTestState { - public: - PerfTree(bool is_dummy) : is_dummy{is_dummy} {} - - seastar::future<> run(KVPool& kvs, double erase_ratio) { - return tm_setup().then([this, &kvs, erase_ratio] { - return seastar::async([this, &kvs, erase_ratio] { - auto tree = std::make_unique>(kvs, - (is_dummy ? NodeExtentManager::create_dummy(true) - : NodeExtentManager::create_seastore(*tm))); - { - auto t = create_mutate_transaction(); - with_trans_intr(*t, [&](auto &tr){ - return tree->bootstrap(tr); - }).unsafe_get(); - submit_transaction(std::move(t)); - } - { - auto t = create_mutate_transaction(); - with_trans_intr(*t, [&](auto &tr){ - return tree->insert(tr); - }).unsafe_get(); - auto start_time = mono_clock::now(); - submit_transaction(std::move(t)); - std::chrono::duration duration = mono_clock::now() - start_time; - logger().warn("submit_transaction() done! {}s", duration.count()); - } - { - // Note: create_weak_transaction() can also work, but too slow. - auto t = create_read_transaction(); - with_trans_intr(*t, [&](auto &tr){ - return tree->get_stats(tr); - }).unsafe_get(); - - with_trans_intr(*t, [&](auto &tr){ - return tree->validate(tr); - }).unsafe_get(); - } - { - auto t = create_mutate_transaction(); - with_trans_intr(*t, [&](auto &tr){ - return tree->erase(tr, kvs.size() * erase_ratio); - }).unsafe_get(); - submit_transaction(std::move(t)); - } - { - auto t = create_read_transaction(); - with_trans_intr(*t, [&](auto &tr){ - return tree->get_stats(tr); - }).unsafe_get(); - - with_trans_intr(*t, [&](auto &tr){ - return tree->validate(tr); - }).unsafe_get(); - } - tree.reset(); - }); - }).then([this] { - return tm_teardown(); - }); - } - - private: - bool is_dummy; -}; - -template -seastar::future<> run(const bpo::variables_map& config) { - return seastar::async([&config] { - auto backend = config["backend"].as(); - bool is_dummy; - if (backend == "dummy") { - is_dummy = true; - } else if (backend == "seastore") { - is_dummy = false; - } else { - ceph_abort(false && "invalid backend"); - } - auto ns_sizes = config["ns-sizes"].as>(); - auto oid_sizes = config["oid-sizes"].as>(); - auto onode_sizes = config["onode-sizes"].as>(); - auto range2 = config["range2"].as>(); - ceph_assert(range2.size() == 2); - auto range1 = config["range1"].as>(); - ceph_assert(range1.size() == 2); - auto range0 = config["range0"].as>(); - ceph_assert(range0.size() == 2); - auto erase_ratio = config["erase-ratio"].as(); - ceph_assert(erase_ratio >= 0); - ceph_assert(erase_ratio <= 1); - - using crimson::common::sharded_conf; - sharded_conf().start(EntityName{}, std::string_view{"ceph"}).get(); - seastar::engine().at_exit([] { - return sharded_conf().stop(); - }); - - using crimson::common::sharded_perf_coll; - sharded_perf_coll().start().get(); - seastar::engine().at_exit([] { - return sharded_perf_coll().stop(); - }); - - auto kvs = KVPool::create_raw_range( - ns_sizes, oid_sizes, onode_sizes, - {range2[0], range2[1]}, - {range1[0], range1[1]}, - {range0[0], range0[1]}); - PerfTree perf{is_dummy}; - perf.run(kvs, erase_ratio).get0(); - }); -} - - -int main(int argc, char** argv) -{ - seastar::app_template app; - app.add_options() - ("backend", bpo::value()->default_value("dummy"), - "tree backend: dummy, seastore") - ("tracked", bpo::value()->default_value(false), - "track inserted cursors") - ("ns-sizes", bpo::value>()->default_value( - {8, 11, 64, 128, 255, 256}), - "sizes of ns strings") - ("oid-sizes", bpo::value>()->default_value( - {8, 13, 64, 512, 2035, 2048}), - "sizes of oid strings") - ("onode-sizes", bpo::value>()->default_value( - {8, 16, 128, 576, 992, 1200}), - "sizes of onode") - ("range2", bpo::value>()->default_value( - {0, 128}), - "range of shard-pool-crush [a, b)") - ("range1", bpo::value>()->default_value( - {0, 10}), - "range of ns-oid strings [a, b)") - ("range0", bpo::value>()->default_value( - {0, 4}), - "range of snap-gen [a, b)") - ("erase-ratio", bpo::value()->default_value( - 0.8), - "erase-ratio of all the inserted onodes"); - return app.run(argc, argv, [&app] { - auto&& config = app.configuration(); - auto tracked = config["tracked"].as(); - if (tracked) { - return run(config); - } else { - return run(config); - } - }); -}