return ceph::get_logger(ceph_subsys_ms);
}
-
enum class perf_mode_t {
both,
client,
server
};
-static std::random_device rd;
-static std::default_random_engine rng{rd()};
-
static seastar::future<> run(unsigned rounds,
- double keepalive_ratio,
- int bs,
- int depth,
+ unsigned jobs,
+ unsigned bs,
+ unsigned depth,
std::string addr,
- perf_mode_t mode)
+ perf_mode_t mode,
+ unsigned core)
{
struct test_state {
struct Server final
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Server> {
ceph::net::Messenger *msgr = nullptr;
+ const seastar::shard_id sid;
+ const seastar::shard_id msgr_sid;
+ std::string lname;
+
+ Server(unsigned msgr_core)
+ : sid{seastar::engine().cpu_id()},
+ msgr_sid{msgr_core} {
+ lname = "server#";
+ lname += std::to_string(sid);
+ }
Dispatcher* get_local_shard() override {
return &(container().local());
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
// reply
Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
- req->finish_decode();
return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
}
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce,
- const entity_addr_t& addr) {
- auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1);
- return fut.then([this, addr](ceph::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& server) {
- server.msgr = messenger->get_local_shard();
- server.msgr->set_crc_header();
- }).then([messenger, addr] {
- return messenger->bind(entity_addrvec_t{addr});
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
+ seastar::future<> init(const entity_addr_t& addr) {
+ return container().invoke_on(msgr_sid, [addr] (auto& server) {
+ // server msgr is always with nonce 0
+ auto&& fut = ceph::net::Messenger::create(entity_name_t::OSD(server.sid), server.lname, 0, server.sid);
+ return fut.then([&server, addr](ceph::net::Messenger *messenger) {
+ return server.container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([&server, messenger] {
+ return messenger->start(&server);
+ });
+ });
+ });
}
seastar::future<> shutdown() {
- ceph_assert(msgr);
- return msgr->shutdown();
+ logger().info("\n{} shutdown...", lname);
+ return container().invoke_on(msgr_sid, [] (auto& server) {
+ ceph_assert(server.msgr);
+ return server.msgr->shutdown();
+ });
}
};
public seastar::peering_sharded_service<Client> {
struct PingSession : public seastar::enable_shared_from_this<PingSession> {
- unsigned count = 0u;
+ unsigned received_count = 0u;
+ mono_time connecting_time;
mono_time connected_time;
+ mono_time start_time;
mono_time finish_time;
+ seastar::promise<> done;
};
using PingSessionRef = seastar::shared_ptr<PingSession>;
- unsigned rounds;
- std::bernoulli_distribution keepalive_dist;
+ const seastar::shard_id sid;
+ std::string lname;
+
+ const unsigned jobs;
+ const unsigned rounds;
ceph::net::Messenger *msgr = nullptr;
- std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
- std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
- int msg_len;
+ const unsigned msg_len;
bufferlist msg_data;
seastar::semaphore depth;
- Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth)
- : rounds(rounds),
- keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
- depth(depth) {
+ unsigned sent_count = 0u;
+ ceph::net::ConnectionRef active_conn = nullptr;
+ PingSessionRef active_session = nullptr;
+
+ Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth)
+ : sid{seastar::engine().cpu_id()},
+ jobs{jobs},
+ rounds{rounds/jobs},
+ msg_len{msg_len},
+ depth{depth} {
+ lname = "client#";
+ lname += std::to_string(sid);
bufferptr ptr(msg_len);
memset(ptr.c_str(), 0, msg_len);
msg_data.append(ptr);
}
- PingSessionRef find_session(ceph::net::ConnectionRef c) {
- auto found = sessions.find(c);
- if (found == sessions.end()) {
- ceph_assert(false);
- }
- return found->second;
- }
-
Dispatcher* get_local_shard() override {
return &(container().local());
}
return seastar::now();
}
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
- logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
- auto session = seastar::make_shared<PingSession>();
- auto [i, added] = sessions.emplace(conn, session);
- std::ignore = i;
- ceph_assert(added);
- session->connected_time = mono_clock::now();
+ logger().info("{}: connected", *conn);
+ active_session = seastar::make_shared<PingSession>();
+ active_session->connected_time = mono_clock::now();
return seastar::now();
}
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
depth.signal(1);
- auto session = find_session(c);
- ++(session->count);
+ ceph_assert(active_session);
+ ++(active_session->received_count);
- if (session->count == rounds) {
- logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count);
- session->finish_time = mono_clock::now();
- return container().invoke_on_all([conn = c.get()](auto &client) {
- auto found = client.pending_conns.find(conn);
- ceph_assert(found != client.pending_conns.end());
- found->second.set_value();
- });
- } else {
- return seastar::now();
+ if (active_session->received_count == rounds) {
+ logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count);
+ active_session->finish_time = mono_clock::now();
+ active_session->done.set_value();
}
+ return seastar::now();
}
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce) {
- return ceph::net::Messenger::create(name, lname, nonce, 2)
- .then([this](ceph::net::Messenger *messenger) {
- return container().invoke_on_all([messenger](auto& client) {
- client.msgr = messenger->get_local_shard();
- client.msgr->set_crc_header();
- }).then([this, messenger] {
- return messenger->start(this);
- });
- });
+ // should start messenger at this shard?
+ bool is_active() {
+ ceph_assert(seastar::engine().cpu_id() == sid);
+ return sid != 0 && sid <= jobs;
+ }
+
+ seastar::future<> init() {
+ return container().invoke_on_all([] (auto& client) {
+ if (client.is_active()) {
+ return ceph::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid)
+ .then([&client] (ceph::net::Messenger *messenger) {
+ client.msgr = messenger;
+ return client.msgr->start(&client);
+ });
+ }
+ return seastar::now();
+ });
}
seastar::future<> shutdown() {
- ceph_assert(msgr);
- return msgr->shutdown();
+ return container().invoke_on_all([] (auto& client) {
+ if (client.is_active()) {
+ logger().info("\n{} shutdown...", client.lname);
+ ceph_assert(client.msgr);
+ return client.msgr->shutdown();
+ }
+ return seastar::now();
+ });
}
- seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
- mono_time start_time = mono_clock::now();
- return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
- .then([this, foreign_dispatch, start_time](auto conn) {
- return seastar::futurize_apply([this, conn, foreign_dispatch] {
- if (foreign_dispatch) {
- return do_dispatch_messages(&**conn);
- } else {
- // NOTE: this could be faster if we don't switch cores in do_dispatch_messages().
- return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
- return client.do_dispatch_messages(conn);
- });
- }
- }).finally([this, conn, start_time] {
- return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
- auto session = client.find_session((*conn)->shared_from_this());
- std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
- std::chrono::duration<double> dur_messaging = session->finish_time - session->connected_time;
- logger().info("{}: handshake {}, messaging {}",
- **conn, dur_handshake.count(), dur_messaging.count());
- });
- });
+ seastar::future<> dispatch_messages(const entity_addr_t& peer_addr) {
+ return container().invoke_on_all([peer_addr] (auto& client) {
+ // start clients in active cores (#1 ~ #jobs)
+ if (client.is_active()) {
+ mono_time start_time = mono_clock::now();
+ return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+ .then([&client] (auto conn) {
+ client.active_conn = conn->release();
+ // make sure handshake won't heart the performance
+ return seastar::sleep(1s);
+ }).then([&client, start_time] {
+ if (!client.active_session) {
+ logger().error("\n{} not connected after 1s!\n", client.lname);
+ ceph_assert(false);
+ }
+ client.active_session->connecting_time = start_time;
+ });
+ }
+ return seastar::now();
+ }).then([this] {
+ logger().info("\nstart sending {} MOSDOps from {} clients",
+ jobs * rounds, jobs);
+ mono_time start_time = mono_clock::now();
+ return container().invoke_on_all([] (auto& client) {
+ if (client.is_active()) {
+ return client.do_dispatch_messages(client.active_conn.get());
+ }
+ return seastar::now();
+ }).then([this, start_time] {
+ std::chrono::duration<double> dur_messaging = mono_clock::now() - start_time;
+ logger().info("\nSummary:\n clients: {}\n MOSDOps: {}\n total time: {}s\n",
+ jobs, jobs * rounds, dur_messaging.count());
});
+ });
}
private:
seastar::future<> send_msg(ceph::net::Connection* conn) {
+ ceph_assert(seastar::engine().cpu_id() == sid);
return depth.wait(1).then([this, conn] {
const static pg_t pgid;
const static object_locator_t oloc;
}
seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
- return container().invoke_on_all([conn](auto& client) {
- auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
- std::ignore = i;
- ceph_assert(added);
- }).then([this, conn] {
- return seastar::do_with(0u, 0u,
- [this, conn](auto &count_ping, auto &count_keepalive) {
- return seastar::do_until(
- [this, conn, &count_ping, &count_keepalive] {
- bool stop = (count_ping == rounds);
- if (stop) {
- logger().info("{}: finished sending {} OSDOPs with {} keepalives",
- *conn, count_ping, count_keepalive);
- }
- return stop;
- },
- [this, conn, &count_ping, &count_keepalive] {
- return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
- if (keepalive_dist(rng)) {
- return conn->keepalive()
- .then([&count_keepalive] {
- count_keepalive += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- return send_msg(conn)
- .then([&count_ping] {
- count_ping += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- });
- }
- });
- }).then([this, conn] {
- auto found = pending_conns.find(conn);
- return found->second.get_future();
- });
- });
- });
+ ceph_assert(seastar::engine().cpu_id() == sid);
+ ceph_assert(sent_count == 0);
+ active_session->start_time = mono_clock::now();
+ return seastar::do_until(
+ [this, conn] {
+ bool stop = (sent_count == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} OSDOPs",
+ *conn, sent_count);
+ }
+ return stop;
+ },
+ [this, conn] {
+ sent_count += 1;
+ return send_msg(conn);
+ }
+ ).then([this] {
+ return active_session->done.get_future();
+ }).then([this] {
+ std::chrono::duration<double> dur_conn = active_session->connected_time - active_session->connecting_time;
+ std::chrono::duration<double> dur_msg = mono_clock::now() - active_session->start_time;
+ logger().info("\n{}:\n messages: {}\n connect time: {}s\n messaging time: {}s\n",
+ lname, active_session->received_count, dur_conn.count(), dur_msg.count());
+ });
}
};
};
return seastar::when_all_succeed(
- ceph::net::create_sharded<test_state::Server>(),
- ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio, bs, depth))
- .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server,
- test_state::Client *client) {
+ ceph::net::create_sharded<test_state::Server>(core),
+ ceph::net::create_sharded<test_state::Client>(jobs, rounds, bs, depth))
+ .then([=](test_state::Server *server,
+ test_state::Client *client) {
entity_addr_t target_addr;
target_addr.parse(addr.c_str(), nullptr);
target_addr.set_type(entity_addr_t::TYPE_LEGACY);
if (mode == perf_mode_t::both) {
+ logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n",
+ addr, core, rounds, jobs, bs, depth);
+ ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core));
+ ceph_assert(core == 0 || core > jobs);
+ ceph_assert(jobs > 0);
return seastar::when_all_succeed(
- server->init(entity_name_t::OSD(0), "server", 0, target_addr),
- client->init(entity_name_t::OSD(1), "client", 0))
- // dispatch pingpoing
+ server->init(target_addr),
+ client->init())
+ // dispatch ops
.then([client, target_addr] {
- return client->dispatch_messages(target_addr, false);
+ return client->dispatch_messages(target_addr);
// shutdown
}).finally([client] {
- logger().info("client shutdown...");
return client->shutdown();
}).finally([server] {
- logger().info("server shutdown...");
return server->shutdown();
});
} else if (mode == perf_mode_t::client) {
- return client->init(entity_name_t::OSD(1), "client", 0)
- // dispatch pingpoing
+ logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n",
+ addr, rounds, jobs, bs, depth);
+ ceph_assert(seastar::smp::count >= 1+jobs);
+ ceph_assert(jobs > 0);
+ return client->init()
+ // dispatch ops
.then([client, target_addr] {
- return client->dispatch_messages(target_addr, false);
+ return client->dispatch_messages(target_addr);
// shutdown
}).finally([client] {
- logger().info("client shutdown...");
return client->shutdown();
});
} else { // mode == perf_mode_t::server
- return server->init(entity_name_t::OSD(0), "server", 0, target_addr)
- // dispatch pingpoing
+ ceph_assert(seastar::smp::count >= 1+core);
+ logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n",
+ addr, core);
+ return server->init(target_addr)
+ // dispatch ops
.then([server] {
return server->msgr->wait();
// shutdown
}).finally([server] {
- logger().info("server shutdown...");
return server->shutdown();
});
}
seastar::app_template app;
app.add_options()
("addr", bpo::value<std::string>()->default_value("0.0.0.0:9010"),
- "start server")
- ("mode", bpo::value<int>()->default_value(0),
+ "server address")
+ ("core", bpo::value<unsigned>()->default_value(0),
+ "server running core")
+ ("mode", bpo::value<unsigned>()->default_value(0),
"0: both, 1:client, 2:server")
("rounds", bpo::value<unsigned>()->default_value(65536),
"number of messaging rounds")
- ("keepalive-ratio", bpo::value<double>()->default_value(0),
- "ratio of keepalive in ping messages")
- ("bs", bpo::value<int>()->default_value(4096),
+ ("jobs", bpo::value<unsigned>()->default_value(1),
+ "number of jobs (client messengers)")
+ ("bs", bpo::value<unsigned>()->default_value(4096),
"block size")
- ("depth", bpo::value<int>()->default_value(512),
+ ("depth", bpo::value<unsigned>()->default_value(512),
"io depth");
return app.run(argc, argv, [&app] {
auto&& config = app.configuration();
auto rounds = config["rounds"].as<unsigned>();
- auto keepalive_ratio = config["keepalive-ratio"].as<double>();
- auto bs = config["bs"].as<int>();
- auto depth = config["depth"].as<int>();
+ auto jobs = config["jobs"].as<unsigned>();
+ auto bs = config["bs"].as<unsigned>();
+ auto depth = config["depth"].as<unsigned>();
auto addr = config["addr"].as<std::string>();
- auto mode = config["mode"].as<int>();
- logger().info("\nsettings:\n addr={}\n mode={}\n rounds={}\n keepalive-ratio={}\n bs={}\n depth={}",
- addr, mode, rounds, keepalive_ratio, bs, depth);
- ceph_assert(mode >= 0 && mode <= 2);
+ auto core = config["core"].as<unsigned>();
+ auto mode = config["mode"].as<unsigned>();
+ ceph_assert(mode <= 2);
auto _mode = static_cast<perf_mode_t>(mode);
- return run(rounds, keepalive_ratio, bs, depth, addr, _mode)
+ return run(rounds, jobs, bs, depth, addr, _mode, core)
.then([] {
- std::cout << "successful" << std::endl;
+ logger().info("\nsuccessful!\n");
}).handle_exception([] (auto eptr) {
- std::cout << "failed" << std::endl;
+ logger().info("\nfailed!\n");
return seastar::make_exception_future<>(eptr);
});
});