]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/tools/perf_crimson_msgr: integrate multi-core messenger as server
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 31 May 2023 07:38:15 +0000 (15:38 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 3 Jul 2023 05:50:18 +0000 (13:50 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/tools/perf_crimson_msgr.cc

index 9761e8db3bfe470c5c7fecf3da27f83f90aa21ec..71d74abbcc16a52d3a7608e1ffb831d51cf463f3 100644 (file)
@@ -143,8 +143,6 @@ struct server_config {
     conf.block_size = options["server-bs"].as<unsigned>();
     conf.is_fixed_cpu = options["server-fixed-cpu"].as<bool>();
     conf.core = options["server-core"].as<unsigned>();
-    // TODO
-    ceph_assert_always(conf.is_fixed_cpu == false);
     return conf;
   }
 };
@@ -158,37 +156,42 @@ static seastar::future<> run(
     bool crc_enabled)
 {
   struct test_state {
-    struct Server;
-    using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
-
     struct Server final
-        : public crimson::net::Dispatcher {
+        : public crimson::net::Dispatcher,
+          public seastar::peering_sharded_service<Server> {
+      // available only in msgr_sid
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
       const seastar::shard_id msgr_sid;
       std::string lname;
-      unsigned msg_len;
-      bufferlist msg_data;
 
+      bool is_fixed_cpu = true;
       bool is_stopped = false;
       std::optional<seastar::future<>> fut_report;
 
-      Server(unsigned msg_len, bool needs_report)
-        : msgr_sid{seastar::this_shard_id()},
+      // available in all shards
+      unsigned msg_len;
+      bufferlist msg_data;
+
+      Server(seastar::shard_id msgr_sid, unsigned msg_len, bool needs_report)
+        : msgr_sid{msgr_sid},
           msg_len{msg_len} {
-        lname = "server#";
-        lname += std::to_string(msgr_sid);
+        lname = fmt::format("server@{}", msgr_sid);
         msg_data.append_zero(msg_len);
 
-        if (needs_report) {
+        if (seastar::this_shard_id() == msgr_sid &&
+            needs_report) {
           start_report();
         }
       }
 
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
+        assert(c->get_shard_id() == seastar::this_shard_id());
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
+        auto &server = container().local();
+
         // server replies with MOSDOp to generate server-side write workload
         const static pg_t pgid;
         const static object_locator_t oloc;
@@ -196,24 +199,27 @@ static seastar::future<> run(
                                     pgid.pool(), oloc.nspace);
         static spg_t spgid(pgid);
         auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
-        bufferlist data(msg_data);
-        rep->write(0, msg_len, data);
+        bufferlist data(server.msg_data);
+        rep->write(0, server.msg_len, data);
         rep->set_tid(m->get_tid());
         std::ignore = c->send(std::move(rep));
         return {seastar::now()};
       }
 
-      seastar::future<> init(const entity_addr_t& addr) {
-        return seastar::smp::submit_to(msgr_sid, [addr, this] {
+      seastar::future<> init(const entity_addr_t& addr, bool is_fixed_cpu) {
+        return container().invoke_on(
+            msgr_sid, [addr, is_fixed_cpu](auto &server) {
           // server msgr is always with nonce 0
-          msgr = crimson::net::Messenger::create(
-              entity_name_t::OSD(msgr_sid),
-              lname, 0, true);
-          msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-          msgr->set_auth_client(&dummy_auth);
-          msgr->set_auth_server(&dummy_auth);
-          return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
-            return msgr->start({this});
+          server.msgr = crimson::net::Messenger::create(
+              entity_name_t::OSD(server.msgr_sid),
+              server.lname, 0, is_fixed_cpu);
+          server.msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+          server.msgr->set_auth_client(&server.dummy_auth);
+          server.msgr->set_auth_server(&server.dummy_auth);
+          server.is_fixed_cpu = is_fixed_cpu;
+          return server.msgr->bind(entity_addrvec_t{addr}
+          ).safe_then([&server] {
+            return server.msgr->start({&server});
           }, crimson::net::Messenger::bind_ertr::all_same_way(
               [addr] (const std::error_code& e) {
             logger().error("Server: "
@@ -222,16 +228,18 @@ static seastar::future<> run(
           }));
         });
       }
+
       seastar::future<> shutdown() {
         logger().info("{} shutdown...", lname);
-        return seastar::smp::submit_to(msgr_sid, [this] {
-          is_stopped = true;
-          ceph_assert(msgr);
-          msgr->stop();
-          return msgr->shutdown(
-          ).then([this] {
-            if (fut_report.has_value()) {
-              return std::move(fut_report.value());
+        return container().invoke_on(
+            msgr_sid, [](auto &server) {
+          server.is_stopped = true;
+          ceph_assert(server.msgr);
+          server.msgr->stop();
+          return server.msgr->shutdown(
+          ).then([&server] {
+            if (server.fut_report.has_value()) {
+              return std::move(server.fut_report.value());
             } else {
               return seastar::now();
             }
@@ -239,29 +247,38 @@ static seastar::future<> run(
         });
       }
 
-      static seastar::future<ServerFRef> create(
-          seastar::shard_id msgr_sid,
-          unsigned msg_len,
-          bool needs_report) {
-        return seastar::smp::submit_to(
-            msgr_sid, [msg_len, needs_report] {
-          return seastar::make_foreign(
-              std::make_unique<Server>(msg_len, needs_report));
-        });
-      }
-
     private:
       struct TimerReport {
         unsigned elapsed = 0u;
 
-        seastar::future<> ticktock() {
-          return seastar::sleep(1s).then([this] {
+        seastar::future<> ticktock(bool is_fixed_cpu) {
+          return seastar::sleep(1s
+          ).then([this, is_fixed_cpu] {
             ++elapsed;
-            std::ostringstream sout;
-            sout << elapsed
-                 << "s -- server reactor utilization: "
-                 << get_reactor_utilization();
-            std::cout << sout.str() << std::endl;
+            if (is_fixed_cpu) {
+              std::ostringstream sout;
+              sout << elapsed
+                   << "s -- server reactor utilization: "
+                   << get_reactor_utilization();
+              std::cout << sout.str() << std::endl;
+              return seastar::now();
+            } else {
+              return seastar::do_with(
+                  std::vector<double>(seastar::smp::count),
+                  [this](auto &rus) {
+                return seastar::smp::invoke_on_all([&rus] {
+                  rus[seastar::this_shard_id()] = get_reactor_utilization();
+                }).then([this, &rus] {
+                  std::ostringstream sout;
+                  sout << elapsed
+                       << "s -- server reactor utilization: ";
+                  for (double ru : rus) {
+                    sout << ru << ",";
+                  }
+                  std::cout << sout.str() << std::endl;
+                });
+              });
+            }
           });
         }
       };
@@ -274,10 +291,12 @@ static seastar::future<> run(
             [this](auto &report) {
           return seastar::do_until(
             [this] { return is_stopped; },
-            [&report] {
-              return report.ticktock();
+            [&report, this] {
+              return report.ticktock(is_fixed_cpu);
             }
           );
+        }).then([this] {
+          logger().info("report is stopped!");
         }).forward_to(std::move(pr_report));
       }
     };
@@ -926,7 +945,7 @@ static seastar::future<> run(
     server_needs_report = true;
   }
   return seastar::when_all(
-      test_state::Server::create(
+      create_sharded<test_state::Server>(
         server_conf.core,
         server_conf.block_size,
         server_needs_report),
@@ -945,9 +964,8 @@ static seastar::future<> run(
             "ms_crc_data", crc_enabled ? "true" : "false");
       })
   ).then([=](auto&& ret) {
-    auto fp_server = std::move(std::get<0>(ret).get0());
+    auto server = std::move(std::get<0>(ret).get0());
     auto client = std::move(std::get<1>(ret).get0());
-    test_state::Server* server = fp_server.get();
     // reserve core 0 for potentially better performance
     if (mode == perf_mode_t::both) {
       logger().info("\nperf settings:\n  smp={}\n  {}\n  {}\n",
@@ -956,7 +974,9 @@ static seastar::future<> run(
       ceph_assert(client_conf.num_clients > 0);
       ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
       return seastar::when_all_succeed(
-        server->init(server_conf.addr),
+        // it is not reasonable to allow server/client to shared cores for
+        // performance benchmarking purposes.
+        server->init(server_conf.addr, server_conf.is_fixed_cpu),
         client->init()
       ).then_unpack([client, addr = client_conf.server_addr] {
         return client->connect_wait_verify(addr);
@@ -965,8 +985,8 @@ static seastar::future<> run(
         return client->dispatch_with_timer(ramptime, msgtime);
       }).then([client] {
         return client->shutdown();
-      }).then([server, fp_server = std::move(fp_server)] () mutable {
-        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      }).then([server] {
+        return server->shutdown();
       });
     } else if (mode == perf_mode_t::client) {
       logger().info("\nperf settings:\n  smp={}\n  {}\n",
@@ -986,10 +1006,10 @@ static seastar::future<> run(
       ceph_assert(seastar::smp::count > server_conf.core);
       logger().info("\nperf settings:\n  smp={}\n  {}\n",
                     seastar::smp::count, server_conf.str());
-      return seastar::async([server, server_conf, fp_server=std::move(fp_server)] {
+      return seastar::async([server, server_conf] {
         // FIXME: SIGINT is not received by stop_signal
         seastar_apps_lib::stop_signal should_stop;
-        server->init(server_conf.addr).get();
+        server->init(server_conf.addr, server_conf.is_fixed_cpu).get();
         should_stop.wait().get();
         server->shutdown().get();
       });
@@ -1024,7 +1044,7 @@ int main(int argc, char** argv)
     ("server-fixed-cpu", bpo::value<bool>()->default_value(true),
      "server is in the fixed cpu mode, non-fixed doesn't support the mode both")
     ("server-core", bpo::value<unsigned>()->default_value(1),
-     "server running core")
+     "server messenger running core")
     ("server-bs", bpo::value<unsigned>()->default_value(0),
      "server block size")
     ("crc-enabled", bpo::value<bool>()->default_value(false),