]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/tools/perf_crimson_msgr: misc cleanups
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 25 May 2023 04:11:47 +0000 (12:11 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 3 Jul 2023 05:50:18 +0000 (13:50 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/tools/perf_crimson_msgr.cc

index eaedaf0cd86c46360e60331872e3316a09f00209..43ca33a434ebc442b8499491e06fdf507f4cff05 100644 (file)
@@ -83,14 +83,14 @@ struct client_config {
   static client_config load(bpo::variables_map& options) {
     client_config conf;
     entity_addr_t addr;
-    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+    ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
     ceph_assert_always(addr.is_msgr2());
 
     conf.server_addr = addr;
-    conf.block_size = options["cbs"].as<unsigned>();
+    conf.block_size = options["client-bs"].as<unsigned>();
     conf.ramptime = options["ramptime"].as<unsigned>();
     conf.msgtime = options["msgtime"].as<unsigned>();
-    conf.jobs = options["jobs"].as<unsigned>();
+    conf.jobs = options["client-jobs"].as<unsigned>();
     conf.depth = options["depth"].as<unsigned>();
     ceph_assert(conf.depth % conf.jobs == 0);
     return conf;
@@ -114,12 +114,12 @@ struct server_config {
   static server_config load(bpo::variables_map& options) {
     server_config conf;
     entity_addr_t addr;
-    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+    ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
     ceph_assert_always(addr.is_msgr2());
 
     conf.addr = addr;
-    conf.block_size = options["sbs"].as<unsigned>();
-    conf.core = options["core"].as<unsigned>();
+    conf.block_size = options["server-bs"].as<unsigned>();
+    conf.core = options["server-core"].as<unsigned>();
     return conf;
   }
 };
@@ -205,9 +205,13 @@ static seastar::future<> run(
         });
       }
 
-      static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
-        return seastar::smp::submit_to(msgr_sid, [msg_len] {
-          return seastar::make_foreign(std::make_unique<Server>(msg_len));
+      static seastar::future<ServerFRef> create(
+          seastar::shard_id msgr_sid,
+          unsigned msg_len) {
+        return seastar::smp::submit_to(
+            msgr_sid, [msg_len] {
+          return seastar::make_foreign(
+              std::make_unique<Server>(msg_len));
         });
       }
     };
@@ -225,18 +229,31 @@ static seastar::future<> run(
         unsigned start_count = 0u;
 
         unsigned sampled_count = 0u;
-        double total_lat_s = 0.0;
+        double sampled_total_lat_s = 0.0;
 
         // for reporting only
         mono_time finish_time = mono_clock::zero();
 
-        void start() {
+        void start_connecting() {
+          connecting_time = mono_clock::now();
+        }
+
+        void finish_connecting() {
+          connected_time = mono_clock::now();
+        }
+
+        void start_collect() {
           start_time = mono_clock::now();
           start_count = received_count;
           sampled_count = 0u;
-          total_lat_s = 0.0;
+          sampled_total_lat_s = 0.0;
           finish_time = mono_clock::zero();
         }
+
+        void prepare_summary(const ConnStats &current) {
+          *this = current;
+          finish_time = mono_clock::now();
+        }
       };
       ConnStats conn_stats;
 
@@ -244,26 +261,31 @@ static seastar::future<> run(
         mono_time start_time = mono_clock::zero();
         unsigned start_count = 0u;
         unsigned sampled_count = 0u;
-        double total_lat_s = 0.0;
+        double sampled_total_lat_s = 0.0;
 
         // for reporting only
         mono_time finish_time = mono_clock::zero();
         unsigned finish_count = 0u;
         unsigned depth = 0u;
 
-        void reset(unsigned received_count, PeriodStats* snap = nullptr) {
-          if (snap) {
-            snap->start_time = start_time;
-            snap->start_count = start_count;
-            snap->sampled_count = sampled_count;
-            snap->total_lat_s = total_lat_s;
-            snap->finish_time = mono_clock::now();
-            snap->finish_count = received_count;
-          }
+        void start_collect(unsigned received_count) {
           start_time = mono_clock::now();
           start_count = received_count;
           sampled_count = 0u;
-          total_lat_s = 0.0;
+          sampled_total_lat_s = 0.0;
+        }
+
+        void reset_period(
+            unsigned received_count, unsigned _depth, PeriodStats &snapshot) {
+          snapshot.start_time = start_time;
+          snapshot.start_count = start_count;
+          snapshot.sampled_count = sampled_count;
+          snapshot.sampled_total_lat_s = sampled_total_lat_s;
+          snapshot.finish_time = mono_clock::now();
+          snapshot.finish_count = received_count;
+          snapshot.depth = _depth;
+
+          start_collect(received_count);
         }
       };
       PeriodStats period_stats;
@@ -307,10 +329,12 @@ static seastar::future<> run(
           crimson::net::ConnectionRef conn,
           seastar::shard_id new_shard) override {
         ceph_assert_always(new_shard == seastar::this_shard_id());
-        conn_stats.connected_time = mono_clock::now();
+        conn_stats.finish_connecting();
       }
+
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef, MessageRef m) override {
+        assert(is_active());
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
@@ -319,9 +343,9 @@ static seastar::future<> run(
           auto index = msg_id % time_msgs_sent.size();
           ceph_assert(time_msgs_sent[index] != mono_clock::zero());
           std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
-          conn_stats.total_lat_s += cur_latency.count();
+          conn_stats.sampled_total_lat_s += cur_latency.count();
           ++(conn_stats.sampled_count);
-          period_stats.total_lat_s += cur_latency.count();
+          period_stats.sampled_total_lat_s += cur_latency.count();
           ++(period_stats.sampled_count);
           time_msgs_sent[index] = mono_clock::zero();
         }
@@ -355,15 +379,16 @@ static seastar::future<> run(
 
       seastar::future<> shutdown() {
         return container().invoke_on_all([] (auto& client) {
-          if (client.is_active()) {
-            logger().info("{} shutdown...", client.lname);
-            ceph_assert(client.msgr);
-            client.msgr->stop();
-            return client.msgr->shutdown().then([&client] {
-              return client.stop_dispatch_messages();
-            });
+          if (!client.is_active()) {
+            return seastar::now();
           }
-          return seastar::now();
+
+          logger().info("{} shutdown...", client.lname);
+          ceph_assert(client.msgr);
+          client.msgr->stop();
+          return client.msgr->shutdown().then([&client] {
+            return client.stop_dispatch_messages();
+          });
         });
       }
 
@@ -371,15 +396,14 @@ static seastar::future<> run(
         return container().invoke_on_all([peer_addr] (auto& client) {
           // start clients in active cores (#1 ~ #jobs)
           if (client.is_active()) {
-            mono_time start_time = mono_clock::now();
+            client.conn_stats.start_connecting();
             client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
             // make sure handshake won't hurt the performance
-            return seastar::sleep(1s).then([&client, start_time] {
+            return seastar::sleep(1s).then([&client] {
               if (client.conn_stats.connected_time == mono_clock::zero()) {
                 logger().error("\n{} not connected after 1s!\n", client.lname);
                 ceph_assert(false);
               }
-              client.conn_stats.connecting_time = start_time;
             });
           }
           return seastar::now();
@@ -394,7 +418,6 @@ static seastar::future<> run(
         const unsigned bytes_of_block;
 
         unsigned elapsed = 0u;
-        std::vector<mono_time> start_times;
         std::vector<PeriodStats> snaps;
         std::vector<ConnStats> summaries;
 
@@ -403,7 +426,6 @@ static seastar::future<> run(
           : jobs{jobs},
             msgtime{msgtime},
             bytes_of_block{bs},
-            start_times{jobs, mono_clock::zero()},
             snaps{jobs},
             summaries{jobs} {}
 
@@ -429,7 +451,7 @@ static seastar::future<> run(
           });
         }
 
-        void report_header() {
+        void report_header() const {
           std::ostringstream sout;
           sout << std::setfill(' ')
                << std::setw(7) << "sec"
@@ -441,23 +463,17 @@ static seastar::future<> run(
         }
 
         void report_period() {
-          if (elapsed == 1) {
-            // init this->start_times at the first period
-            for (unsigned i=0; i<jobs; ++i) {
-              start_times[i] = snaps[i].start_time;
-            }
-          }
           std::chrono::duration<double> elapsed_d = 0s;
           unsigned depth = 0u;
           unsigned ops = 0u;
           unsigned sampled_count = 0u;
-          double total_lat_s = 0.0;
+          double sampled_total_lat_s = 0.0;
           for (const auto& snap: snaps) {
             elapsed_d += (snap.finish_time - snap.start_time);
             depth += snap.depth;
             ops += (snap.finish_count - snap.start_count);
             sampled_count += snap.sampled_count;
-            total_lat_s += snap.total_lat_s;
+            sampled_total_lat_s += snap.sampled_total_lat_s;
           }
           double elapsed_s = elapsed_d.count() / jobs;
           double iops = ops/elapsed_s;
@@ -467,7 +483,7 @@ static seastar::future<> run(
                << std::setw(6) << depth
                << std::setw(8) << iops
                << std::setw(8) << iops * bytes_of_block / 1048576
-               << std::setw(8) << (total_lat_s / sampled_count * 1000);
+               << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000);
           std::cout << sout.str() << std::endl;
         }
 
@@ -475,12 +491,12 @@ static seastar::future<> run(
           std::chrono::duration<double> elapsed_d = 0s;
           unsigned ops = 0u;
           unsigned sampled_count = 0u;
-          double total_lat_s = 0.0;
+          double sampled_total_lat_s = 0.0;
           for (const auto& summary: summaries) {
             elapsed_d += (summary.finish_time - summary.start_time);
             ops += (summary.received_count - summary.start_count);
             sampled_count += summary.sampled_count;
-            total_lat_s += summary.total_lat_s;
+            sampled_total_lat_s += summary.sampled_total_lat_s;
           }
           double elapsed_s = elapsed_d.count() / jobs;
           double iops = ops / elapsed_s;
@@ -493,7 +509,7 @@ static seastar::future<> run(
                << std::setw(6) << "-"
                << std::setw(8) << iops
                << std::setw(8) << iops * bytes_of_block / 1048576
-               << std::setw(8) << (total_lat_s / sampled_count * 1000)
+               << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
                << "\n";
           std::cout << sout.str() << std::endl;
         }
@@ -503,9 +519,10 @@ static seastar::future<> run(
         return container().invoke_on_all([&report] (auto& client) {
           if (client.is_active()) {
             PeriodStats& snap = report.get_snap_by_job(client.sid);
-            client.period_stats.reset(client.conn_stats.received_count,
-                                      &snap);
-            snap.depth = client.get_current_depth();
+            client.period_stats.reset_period(
+                client.conn_stats.received_count,
+                client.get_current_depth(),
+                snap);
           }
         }).then([&report] {
           report.report_period();
@@ -516,8 +533,7 @@ static seastar::future<> run(
         return container().invoke_on_all([&report] (auto& client) {
           if (client.is_active()) {
             ConnStats& summary = report.get_summary_by_job(client.sid);
-            summary = client.conn_stats;
-            summary.finish_time = mono_clock::now();
+            summary.prepare_summary(client.conn_stats);
           }
         }).then([&report] {
           report.report_summary();
@@ -537,8 +553,8 @@ static seastar::future<> run(
         }).then([this] {
           return container().invoke_on_all([] (auto& client) {
             if (client.is_active()) {
-              client.conn_stats.start();
-              client.period_stats.reset(client.conn_stats.received_count);
+              client.conn_stats.start_collect();
+              client.period_stats.start_collect(client.conn_stats.received_count);
             }
           });
         }).then([this, msgtime] {
@@ -589,7 +605,7 @@ static seastar::future<> run(
           m->set_tid(sent_count);
 
           // sample message latency
-          if (sent_count % SAMPLE_RATE == 0) {
+          if (unlikely(sent_count % SAMPLE_RATE == 0)) {
             auto index = sent_count % time_msgs_sent.size();
             ceph_assert(time_msgs_sent[index] == mono_clock::zero());
             time_msgs_sent[index] = mono_clock::now();
@@ -638,7 +654,7 @@ static seastar::future<> run(
                         dur_conn.count(),
                         ops,
                         dur_msg.count(),
-                        conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
+                        conn_stats.sampled_total_lat_s / conn_stats.sampled_count * 1000,
                         ops / dur_msg.count(),
                         ops / dur_msg.count() * msg_len / 1048576);
           stopped_send_promise.set_value();
@@ -648,9 +664,16 @@ static seastar::future<> run(
   };
 
   return seastar::when_all(
-      test_state::Server::create(server_conf.core, server_conf.block_size),
-      create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
-      crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
+      test_state::Server::create(
+        server_conf.core,
+        server_conf.block_size),
+      create_sharded<test_state::Client>(
+        client_conf.jobs,
+        client_conf.block_size,
+        client_conf.depth),
+      crimson::common::sharded_conf().start(
+        EntityName{}, std::string_view{"ceph"}
+      ).then([] {
         return crimson::common::local_conf().start();
       }).then([crc_enabled] {
         return crimson::common::local_conf().set_val(
@@ -718,21 +741,21 @@ int main(int argc, char** argv)
   app.add_options()
     ("mode", bpo::value<unsigned>()->default_value(0),
      "0: both, 1:client, 2:server")
-    ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+    ("server-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
      "server address(only support msgr v2 protocol)")
     ("ramptime", bpo::value<unsigned>()->default_value(5),
      "seconds of client ramp-up time")
     ("msgtime", bpo::value<unsigned>()->default_value(15),
      "seconds of client messaging time")
-    ("jobs", bpo::value<unsigned>()->default_value(1),
+    ("client-jobs", bpo::value<unsigned>()->default_value(1),
      "number of client jobs (messengers)")
-    ("cbs", bpo::value<unsigned>()->default_value(4096),
+    ("client-bs", bpo::value<unsigned>()->default_value(4096),
      "client block size")
     ("depth", bpo::value<unsigned>()->default_value(512),
      "client io depth")
-    ("core", bpo::value<unsigned>()->default_value(0),
+    ("server-core", bpo::value<unsigned>()->default_value(0),
      "server running core")
-    ("sbs", bpo::value<unsigned>()->default_value(0),
+    ("server-bs", bpo::value<unsigned>()->default_value(0),
      "server block size")
     ("crc-enabled", bpo::value<bool>()->default_value(false),
      "enable CRC checks");