constexpr int CEPH_OSD_PROTOCOL = 10;
struct Server {
- Server(CephContext* cct)
- : dummy_auth(cct), dispatcher(cct)
+ Server(CephContext* cct, unsigned msg_len)
+ : dummy_auth(cct), dispatcher(cct, msg_len)
{
msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0, 0));
dummy_auth.auth_registry.refresh_config();
DummyAuthClientServer dummy_auth;
unique_ptr<Messenger> msgr;
struct ServerDispatcher : Dispatcher {
- ServerDispatcher(CephContext* cct)
- : Dispatcher(cct)
- {}
+ unsigned msg_len = 0;
+ bufferlist msg_data;
+
+ ServerDispatcher(CephContext* cct, unsigned msg_len)
+ : Dispatcher(cct), msg_len(msg_len)
+ {
+ msg_data.append_zero(msg_len);
+ }
bool ms_can_fast_dispatch_any() const override {
return true;
}
}
void ms_fast_dispatch(Message* m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- MOSDOp *req = static_cast<MOSDOp*>(m);
- m->get_connection()->send_message(new MOSDOpReply(req, 0, 0, 0, false));
+ const static pg_t pgid;
+ const static object_locator_t oloc;
+ const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ static spg_t spgid(pgid);
+ MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+ bufferlist data(msg_data);
+ rep->write(0, msg_len, data);
+ m->get_connection()->send_message(rep);
m->put();
}
bool ms_dispatch(Message*) override {
}
-static void run(CephContext* cct, entity_addr_t addr)
+static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
{
std::cout << "async server listening at " << addr << std::endl;
- Server server{cct};
+ Server server{cct, bs};
server.msgr->bind(addr);
server.msgr->add_dispatcher_head(&server.dispatcher);
server.msgr->start();
desc.add_options()
("help,h", "show help message")
("addr", po::value<std::string>()->default_value("v1:0.0.0.0:9010"),
- "server address");
+ "server address")
+ ("bs", po::value<unsigned>()->default_value(0),
+ "server block size");
po::variables_map vm;
std::vector<std::string> unrecognized_options;
try {
auto addr = vm["addr"].as<std::string>();
entity_addr_t target_addr;
target_addr.parse(addr.c_str(), nullptr);
+ auto bs = vm["bs"].as<unsigned>();
std::vector<const char*> args(argv, argv + argc);
auto cct = global_init(nullptr, args,
CODE_ENVIRONMENT_UTILITY,
CINIT_FLAG_NO_MON_CONFIG);
common_init_finish(cct.get());
- run(cct.get(), target_addr);
+ run(cct.get(), target_addr, bs);
}
#include "common/ceph_time.h"
#include "messages/MOSDOp.h"
-#include "messages/MOSDOpReply.h"
#include "crimson/auth/DummyAuth.h"
#include "crimson/common/log.h"
static seastar::future<> run(unsigned rounds,
unsigned jobs,
- unsigned bs,
+ unsigned cbs,
+ unsigned sbs,
unsigned depth,
std::string addr,
perf_mode_t mode,
const seastar::shard_id sid;
const seastar::shard_id msgr_sid;
std::string lname;
+ unsigned msg_len;
+ bufferlist msg_data;
- Server(unsigned msgr_core)
+ Server(unsigned msgr_core, unsigned msg_len)
: sid{seastar::engine().cpu_id()},
- msgr_sid{msgr_core} {
+ msgr_sid{msgr_core},
+ msg_len{msg_len} {
lname = "server#";
lname += std::to_string(sid);
+ msg_data.append_zero(msg_len);
}
Dispatcher* get_local_shard() override {
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- // reply
- Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
- return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
+
+ // server replies with MOSDOp to generate server-side write workload
+ const static pg_t pgid;
+ const static object_locator_t oloc;
+ const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ static spg_t spgid(pgid);
+ MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+ bufferlist data(msg_data);
+ rep->write(0, msg_len, data);
+ MessageRef msg = {rep, false};
+ return c->send(msg);
}
seastar::future<> init(const entity_addr_t& addr) {
depth{depth} {
lname = "client#";
lname += std::to_string(sid);
- bufferptr ptr(msg_len);
- memset(ptr.c_str(), 0, msg_len);
- msg_data.append(ptr);
+ msg_data.append_zero(msg_len);
}
Dispatcher* get_local_shard() override {
}
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
- ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
+ // server replies with MOSDOp to generate server-side write workload
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
depth.signal(1);
ceph_assert(active_session);
++(active_session->received_count);
if (active_session->received_count == rounds) {
- logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count);
+ logger().info("{}: finished receiving {} REPLYs", *c, active_session->received_count);
active_session->finish_time = mono_clock::now();
active_session->done.set_value();
}
};
return seastar::when_all_succeed(
- ceph::net::create_sharded<test_state::Server>(core),
- ceph::net::create_sharded<test_state::Client>(jobs, rounds, bs, depth))
+ ceph::net::create_sharded<test_state::Server>(core, sbs),
+ ceph::net::create_sharded<test_state::Client>(jobs, rounds, cbs, depth))
.then([=](test_state::Server *server,
test_state::Client *client) {
entity_addr_t target_addr;
target_addr.parse(addr.c_str(), nullptr);
if (mode == perf_mode_t::both) {
- logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n",
- addr, core, rounds, jobs, bs, depth);
+ logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n client bs={}\n server bs={}\n depth={}\n",
+ addr, core, rounds, jobs, cbs, sbs, depth);
ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core));
ceph_assert(core == 0 || core > jobs);
ceph_assert(jobs > 0);
return server->shutdown();
});
} else if (mode == perf_mode_t::client) {
- logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n",
- addr, rounds, jobs, bs, depth);
+ logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n client bs={}\n depth={}\n",
+ addr, rounds, jobs, cbs, depth);
ceph_assert(seastar::smp::count >= 1+jobs);
ceph_assert(jobs > 0);
return client->init()
});
} else { // mode == perf_mode_t::server
ceph_assert(seastar::smp::count >= 1+core);
- logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n",
- addr, core);
+ logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n server bs={}\n",
+ addr, core, sbs);
return server->init(target_addr)
// dispatch ops
.then([server] {
"number of messaging rounds")
("jobs", bpo::value<unsigned>()->default_value(1),
"number of jobs (client messengers)")
- ("bs", bpo::value<unsigned>()->default_value(4096),
+ ("cbs", bpo::value<unsigned>()->default_value(4096),
"block size")
+ ("sbs", bpo::value<unsigned>()->default_value(0),
+ "server block size")
("depth", bpo::value<unsigned>()->default_value(512),
"io depth");
return app.run(argc, argv, [&app] {
auto&& config = app.configuration();
auto rounds = config["rounds"].as<unsigned>();
auto jobs = config["jobs"].as<unsigned>();
- auto bs = config["bs"].as<unsigned>();
+ auto cbs = config["cbs"].as<unsigned>();
+ auto sbs = config["sbs"].as<unsigned>();
auto depth = config["depth"].as<unsigned>();
auto addr = config["addr"].as<std::string>();
auto core = config["core"].as<unsigned>();
auto mode = config["mode"].as<unsigned>();
ceph_assert(mode <= 2);
auto _mode = static_cast<perf_mode_t>(mode);
- return run(rounds, jobs, bs, depth, addr, _mode, core)
+ return run(rounds, jobs, cbs, sbs, depth, addr, _mode, core)
.then([] {
logger().info("\nsuccessful!\n");
}).handle_exception([] (auto eptr) {