]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: perf_crimson/async_server write test support
authorYingxin Cheng <yingxincheng@gmail.com>
Thu, 4 Apr 2019 13:18:00 +0000 (21:18 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Tue, 9 Apr 2019 08:06:39 +0000 (16:06 +0800)
Allow server to have write/send workload with specified block size.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/test/crimson/perf_async_msgr.cc
src/test/crimson/perf_crimson_msgr.cc

index ca5780c23b5f0d21b7020e4a45ec53e5addaeab5..d64388ec56cdf1e60a48053a45fb4eef3fe5e950 100644 (file)
@@ -17,8 +17,8 @@ namespace {
 constexpr int CEPH_OSD_PROTOCOL = 10;
 
 struct Server {
-  Server(CephContext* cct)
-    : dummy_auth(cct), dispatcher(cct)
+  Server(CephContext* cct, unsigned msg_len)
+    : dummy_auth(cct), dispatcher(cct, msg_len)
   {
     msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0, 0));
     dummy_auth.auth_registry.refresh_config();
@@ -31,9 +31,14 @@ struct Server {
   DummyAuthClientServer dummy_auth;
   unique_ptr<Messenger> msgr;
   struct ServerDispatcher : Dispatcher {
-    ServerDispatcher(CephContext* cct)
-      : Dispatcher(cct)
-    {}
+    unsigned msg_len = 0;
+    bufferlist msg_data;
+
+    ServerDispatcher(CephContext* cct, unsigned msg_len)
+      : Dispatcher(cct), msg_len(msg_len)
+    {
+      msg_data.append_zero(msg_len);
+    }
     bool ms_can_fast_dispatch_any() const override {
       return true;
     }
@@ -42,8 +47,15 @@ struct Server {
     }
     void ms_fast_dispatch(Message* m) override {
       ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-      MOSDOp *req = static_cast<MOSDOp*>(m);
-      m->get_connection()->send_message(new MOSDOpReply(req, 0, 0, 0, false));
+      const static pg_t pgid;
+      const static object_locator_t oloc;
+      const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                                  pgid.pool(), oloc.nspace);
+      static spg_t spgid(pgid);
+      MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+      bufferlist data(msg_data);
+      rep->write(0, msg_len, data);
+      m->get_connection()->send_message(rep);
       m->put();
     }
     bool ms_dispatch(Message*) override {
@@ -62,10 +74,10 @@ struct Server {
 
 }
 
-static void run(CephContext* cct, entity_addr_t addr)
+static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
 {
   std::cout << "async server listening at " << addr << std::endl;
-  Server server{cct};
+  Server server{cct, bs};
   server.msgr->bind(addr);
   server.msgr->add_dispatcher_head(&server.dispatcher);
   server.msgr->start();
@@ -79,7 +91,9 @@ int main(int argc, char** argv)
   desc.add_options()
     ("help,h", "show help message")
     ("addr", po::value<std::string>()->default_value("v1:0.0.0.0:9010"),
-     "server address");
+     "server address")
+    ("bs", po::value<unsigned>()->default_value(0),
+     "server block size");
   po::variables_map vm;
   std::vector<std::string> unrecognized_options;
   try {
@@ -102,6 +116,7 @@ int main(int argc, char** argv)
   auto addr = vm["addr"].as<std::string>();
   entity_addr_t target_addr;
   target_addr.parse(addr.c_str(), nullptr);
+  auto bs = vm["bs"].as<unsigned>();
 
   std::vector<const char*> args(argv, argv + argc);
   auto cct = global_init(nullptr, args,
@@ -109,5 +124,5 @@ int main(int argc, char** argv)
                          CODE_ENVIRONMENT_UTILITY,
                          CINIT_FLAG_NO_MON_CONFIG);
   common_init_finish(cct.get());
-  run(cct.get(), target_addr);
+  run(cct.get(), target_addr, bs);
 }
index 0d3f15e55801b847190968d16567d2e58dbc0aa7..ce2005bdb7fb2736444bf6620acba32987c9432a 100644 (file)
@@ -14,7 +14,6 @@
 
 #include "common/ceph_time.h"
 #include "messages/MOSDOp.h"
-#include "messages/MOSDOpReply.h"
 
 #include "crimson/auth/DummyAuth.h"
 #include "crimson/common/log.h"
@@ -41,7 +40,8 @@ enum class perf_mode_t {
 
 static seastar::future<> run(unsigned rounds,
                              unsigned jobs,
-                             unsigned bs,
+                             unsigned cbs,
+                             unsigned sbs,
                              unsigned depth,
                              std::string addr,
                              perf_mode_t mode,
@@ -56,12 +56,16 @@ static seastar::future<> run(unsigned rounds,
       const seastar::shard_id sid;
       const seastar::shard_id msgr_sid;
       std::string lname;
+      unsigned msg_len;
+      bufferlist msg_data;
 
-      Server(unsigned msgr_core)
+      Server(unsigned msgr_core, unsigned msg_len)
         : sid{seastar::engine().cpu_id()},
-          msgr_sid{msgr_core} {
+          msgr_sid{msgr_core},
+          msg_len{msg_len} {
         lname = "server#";
         lname += std::to_string(sid);
+        msg_data.append_zero(msg_len);
       }
 
       Dispatcher* get_local_shard() override {
@@ -73,9 +77,18 @@ static seastar::future<> run(unsigned rounds,
       seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
                                     MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-        // reply
-        Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
-        return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
+
+        // server replies with MOSDOp to generate server-side write workload
+        const static pg_t pgid;
+        const static object_locator_t oloc;
+        const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                                    pgid.pool(), oloc.nspace);
+        static spg_t spgid(pgid);
+        MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+        bufferlist data(msg_data);
+        rep->write(0, msg_len, data);
+        MessageRef msg = {rep, false};
+        return c->send(msg);
       }
 
       seastar::future<> init(const entity_addr_t& addr) {
@@ -142,9 +155,7 @@ static seastar::future<> run(unsigned rounds,
           depth{depth} {
         lname = "client#";
         lname += std::to_string(sid);
-        bufferptr ptr(msg_len);
-        memset(ptr.c_str(), 0, msg_len);
-        msg_data.append(ptr);
+        msg_data.append_zero(msg_len);
       }
 
       Dispatcher* get_local_shard() override {
@@ -161,13 +172,14 @@ static seastar::future<> run(unsigned rounds,
       }
       seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
                                     MessageRef m) override {
-        ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
+        // server replies with MOSDOp to generate server-side write workload
+        ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
         depth.signal(1);
         ceph_assert(active_session);
         ++(active_session->received_count);
 
         if (active_session->received_count == rounds) {
-          logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count);
+          logger().info("{}: finished receiving {} REPLYs", *c, active_session->received_count);
           active_session->finish_time = mono_clock::now();
           active_session->done.set_value();
         }
@@ -290,15 +302,15 @@ static seastar::future<> run(unsigned rounds,
   };
 
   return seastar::when_all_succeed(
-      ceph::net::create_sharded<test_state::Server>(core),
-      ceph::net::create_sharded<test_state::Client>(jobs, rounds, bs, depth))
+      ceph::net::create_sharded<test_state::Server>(core, sbs),
+      ceph::net::create_sharded<test_state::Client>(jobs, rounds, cbs, depth))
     .then([=](test_state::Server *server,
               test_state::Client *client) {
       entity_addr_t target_addr;
       target_addr.parse(addr.c_str(), nullptr);
       if (mode == perf_mode_t::both) {
-          logger().info("\nperf settings:\n  mode=server+client\n  server addr={}\n  server core={}\n  rounds={}\n  client jobs={}\n  bs={}\n  depth={}\n",
-                        addr, core, rounds, jobs, bs, depth);
+          logger().info("\nperf settings:\n  mode=server+client\n  server addr={}\n  server core={}\n  rounds={}\n  client jobs={}\n  client bs={}\n  server bs={}\n  depth={}\n",
+                        addr, core, rounds, jobs, cbs, sbs, depth);
           ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core));
           ceph_assert(core == 0 || core > jobs);
           ceph_assert(jobs > 0);
@@ -315,8 +327,8 @@ static seastar::future<> run(unsigned rounds,
               return server->shutdown();
             });
       } else if (mode == perf_mode_t::client) {
-          logger().info("\nperf settings:\n  mode=client\n  server addr={}\n  rounds={}\n  client jobs={}\n  bs={}\n  depth={}\n",
-                        addr, rounds, jobs, bs, depth);
+          logger().info("\nperf settings:\n  mode=client\n  server addr={}\n  rounds={}\n  client jobs={}\n  client bs={}\n  depth={}\n",
+                        addr, rounds, jobs, cbs, depth);
           ceph_assert(seastar::smp::count >= 1+jobs);
           ceph_assert(jobs > 0);
           return client->init()
@@ -329,8 +341,8 @@ static seastar::future<> run(unsigned rounds,
             });
       } else { // mode == perf_mode_t::server
           ceph_assert(seastar::smp::count >= 1+core);
-          logger().info("\nperf settings:\n  mode=server\n  server addr={}\n  server core={}\n",
-                        addr, core);
+          logger().info("\nperf settings:\n  mode=server\n  server addr={}\n  server core={}\n  server bs={}\n",
+                        addr, core, sbs);
           return server->init(target_addr)
           // dispatch ops
             .then([server] {
@@ -359,22 +371,25 @@ int main(int argc, char** argv)
      "number of messaging rounds")
     ("jobs", bpo::value<unsigned>()->default_value(1),
      "number of jobs (client messengers)")
-    ("bs", bpo::value<unsigned>()->default_value(4096),
+    ("cbs", bpo::value<unsigned>()->default_value(4096),
      "block size")
+    ("sbs", bpo::value<unsigned>()->default_value(0),
+     "server block size")
     ("depth", bpo::value<unsigned>()->default_value(512),
      "io depth");
   return app.run(argc, argv, [&app] {
       auto&& config = app.configuration();
       auto rounds = config["rounds"].as<unsigned>();
       auto jobs = config["jobs"].as<unsigned>();
-      auto bs = config["bs"].as<unsigned>();
+      auto cbs = config["cbs"].as<unsigned>();
+      auto sbs = config["sbs"].as<unsigned>();
       auto depth = config["depth"].as<unsigned>();
       auto addr = config["addr"].as<std::string>();
       auto core = config["core"].as<unsigned>();
       auto mode = config["mode"].as<unsigned>();
       ceph_assert(mode <= 2);
       auto _mode = static_cast<perf_mode_t>(mode);
-      return run(rounds, jobs, bs, depth, addr, _mode, core)
+      return run(rounds, jobs, cbs, sbs, depth, addr, _mode, core)
         .then([] {
           logger().info("\nsuccessful!\n");
         }).handle_exception([] (auto eptr) {