]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: improved perf_crimson_msgr with timer 28542/head
authorYingxin Cheng <yingxincheng@gmail.com>
Fri, 14 Jun 2019 09:25:48 +0000 (17:25 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Fri, 21 Jun 2019 13:12:25 +0000 (21:12 +0800)
Added timer to support:
* Ramp up before collecting perf reports;
* Shutdown based on running seconds instead of rounds;
* Report latency and throughput per second;
* Report summary every 10 seconds and before shutdown;
* Report summary of each job before shutdown;

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/test/crimson/perf_crimson_msgr.cc

index 2fffd9b2080e309affaa89fc54d6fe5a551c97da..dacfc0aa81efdbef43a8543653f0814d8787c283 100644 (file)
@@ -41,7 +41,8 @@ enum class perf_mode_t {
 struct client_config {
   entity_addr_t server_addr;
   unsigned block_size;
-  unsigned rounds;
+  unsigned ramptime;
+  unsigned msgtime;
   unsigned jobs;
   unsigned depth;
 
@@ -49,7 +50,8 @@ struct client_config {
     std::ostringstream out;
     out << "client[>> " << server_addr
         << "](bs=" << block_size
-        << ", rounds=" << rounds
+        << ", ramptime=" << ramptime
+        << ", msgtime=" << msgtime
         << ", jobs=" << jobs
         << ", depth=" << depth
         << ")";
@@ -63,9 +65,11 @@ struct client_config {
 
     conf.server_addr = addr;
     conf.block_size = options["cbs"].as<unsigned>();
-    conf.rounds = options["rounds"].as<unsigned>();
+    conf.ramptime = options["ramptime"].as<unsigned>();
+    conf.msgtime = options["msgtime"].as<unsigned>();
     conf.jobs = options["jobs"].as<unsigned>();
     conf.depth = options["depth"].as<unsigned>();
+    ceph_assert(conf.depth % conf.jobs == 0);
     return conf;
   }
 };
@@ -96,6 +100,8 @@ struct server_config {
   }
 };
 
+const unsigned SAMPLE_RATE = 7;
+
 static seastar::future<> run(
     perf_mode_t mode,
     const client_config& client_conf,
@@ -165,7 +171,7 @@ static seastar::future<> run(
         });
       }
       seastar::future<> shutdown() {
-        logger().info("\n{} shutdown...", lname);
+        logger().info("{} shutdown...", lname);
         return container().invoke_on(msgr_sid, [] (auto& server) {
           ceph_assert(server.msgr);
           return server.msgr->shutdown();
@@ -177,31 +183,66 @@ static seastar::future<> run(
         : public ceph::net::Dispatcher,
           public seastar::peering_sharded_service<Client> {
 
-      struct ConnSession {
+      struct ConnStats {
         mono_time connecting_time = mono_clock::zero();
         mono_time connected_time = mono_clock::zero();
+        unsigned received_count = 0u;
 
         mono_time start_time = mono_clock::zero();
+        unsigned start_count = 0u;
+
+        unsigned sampled_count = 0u;
+        double total_lat_s = 0.0;
+
+        // for reporting only
         mono_time finish_time = mono_clock::zero();
 
-        unsigned received_count = 0u;
+        void start() {
+          start_time = mono_clock::now();
+          start_count = received_count;
+          sampled_count = 0u;
+          total_lat_s = 0.0;
+          finish_time = mono_clock::zero();
+        }
+      };
+      ConnStats conn_stats;
 
-        const unsigned SAMPLE_RATE = 7;
+      struct PeriodStats {
+        mono_time start_time = mono_clock::zero();
+        unsigned start_count = 0u;
         unsigned sampled_count = 0u;
         double total_lat_s = 0.0;
 
-        seastar::promise<> done;
+        // for reporting only
+        mono_time finish_time = mono_clock::zero();
+        unsigned finish_count = 0u;
+        unsigned depth = 0u;
+
+        void reset(unsigned received_count, PeriodStats* snap = nullptr) {
+          if (snap) {
+            snap->start_time = start_time;
+            snap->start_count = start_count;
+            snap->sampled_count = sampled_count;
+            snap->total_lat_s = total_lat_s;
+            snap->finish_time = mono_clock::now();
+            snap->finish_count = received_count;
+          }
+          start_time = mono_clock::now();
+          start_count = received_count;
+          sampled_count = 0u;
+          total_lat_s = 0.0;
+        }
       };
-      ConnSession conn_session;
+      PeriodStats period_stats;
 
       const seastar::shard_id sid;
       std::string lname;
 
       const unsigned jobs;
-      const unsigned rounds;
       ceph::net::Messenger *msgr = nullptr;
       const unsigned msg_len;
       bufferlist msg_data;
+      const unsigned nr_depth;
       seastar::semaphore depth;
       std::vector<mono_time> time_msgs_sent;
       ceph::auth::DummyAuthClientServer dummy_auth;
@@ -209,18 +250,26 @@ static seastar::future<> run(
       unsigned sent_count = 0u;
       ceph::net::ConnectionRef active_conn = nullptr;
 
-      Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth)
+      bool stop_send = false;
+      seastar::promise<> stopped_send_promise;
+
+      Client(unsigned jobs, unsigned msg_len, unsigned depth)
         : sid{seastar::engine().cpu_id()},
           jobs{jobs},
-          rounds{rounds/jobs},
           msg_len{msg_len},
-          depth{depth},
-          time_msgs_sent{depth, mono_clock::zero()} {
+          nr_depth{depth/jobs},
+          depth{nr_depth},
+          time_msgs_sent{depth/jobs, mono_clock::zero()} {
         lname = "client#";
         lname += std::to_string(sid);
         msg_data.append_zero(msg_len);
       }
 
+      unsigned get_current_depth() const {
+        ceph_assert(depth.available_units() >= 0);
+        return nr_depth - depth.current();
+      }
+
       Dispatcher* get_local_shard() override {
         return &(container().local());
       }
@@ -229,7 +278,7 @@ static seastar::future<> run(
       }
       seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
         logger().info("{}: connected", *conn);
-        conn_session.connected_time = mono_clock::now();
+        conn_stats.connected_time = mono_clock::now();
         return seastar::now();
       }
       seastar::future<> ms_dispatch(ceph::net::Connection* c,
@@ -238,23 +287,20 @@ static seastar::future<> run(
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
         auto msg_id = m->get_tid();
-        if (msg_id  % conn_session.SAMPLE_RATE == 0) {
+        if (msg_id SAMPLE_RATE == 0) {
           auto index = msg_id % time_msgs_sent.size();
           ceph_assert(time_msgs_sent[index] != mono_clock::zero());
           std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
-          conn_session.total_lat_s += cur_latency.count();
-          ++(conn_session.sampled_count);
+          conn_stats.total_lat_s += cur_latency.count();
+          ++(conn_stats.sampled_count);
+          period_stats.total_lat_s += cur_latency.count();
+          ++(period_stats.sampled_count);
           time_msgs_sent[index] = mono_clock::zero();
         }
 
-        ++(conn_session.received_count);
+        ++(conn_stats.received_count);
         depth.signal(1);
 
-        if (conn_session.received_count == rounds) {
-          logger().info("{}: finished receiving {} REPLYs", *c, conn_session.received_count);
-          conn_session.finish_time = mono_clock::now();
-          conn_session.done.set_value();
-        }
         return seastar::now();
       }
 
@@ -284,15 +330,17 @@ static seastar::future<> run(
       seastar::future<> shutdown() {
         return container().invoke_on_all([] (auto& client) {
           if (client.is_active()) {
-            logger().info("\n{} shutdown...", client.lname);
+            logger().info("{} shutdown...", client.lname);
             ceph_assert(client.msgr);
-            return client.msgr->shutdown();
+            return client.msgr->shutdown().then([&client] {
+              return client.stop_dispatch_messages();
+            });
           }
           return seastar::now();
         });
       }
 
-      seastar::future<> dispatch_messages(const entity_addr_t& peer_addr) {
+      seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
         return container().invoke_on_all([peer_addr] (auto& client) {
           // start clients in active cores (#1 ~ #jobs)
           if (client.is_active()) {
@@ -303,27 +351,200 @@ static seastar::future<> run(
               // make sure handshake won't hurt the performance
               return seastar::sleep(1s);
             }).then([&client, start_time] {
-              if (client.conn_session.connected_time == mono_clock::zero()) {
+              if (client.conn_stats.connected_time == mono_clock::zero()) {
                 logger().error("\n{} not connected after 1s!\n", client.lname);
                 ceph_assert(false);
               }
-              client.conn_session.connecting_time = start_time;
+              client.conn_stats.connecting_time = start_time;
             });
           }
           return seastar::now();
+        });
+      }
+
+     private:
+      class TimerReport {
+       private:
+        const unsigned jobs;
+        const unsigned msgtime;
+        const unsigned bytes_of_block;
+
+        unsigned elapsed = 0u;
+        std::vector<mono_time> start_times;
+        std::vector<PeriodStats> snaps;
+        std::vector<ConnStats> summaries;
+
+       public:
+        TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
+          : jobs{jobs},
+            msgtime{msgtime},
+            bytes_of_block{bs},
+            start_times{jobs, mono_clock::zero()},
+            snaps{jobs},
+            summaries{jobs} {}
+
+        unsigned get_elapsed() const { return elapsed; }
+
+        PeriodStats& get_snap_by_job(seastar::shard_id sid) {
+          ceph_assert(sid >= 1 && sid <= jobs);
+          return snaps[sid - 1];
+        }
+
+        ConnStats& get_summary_by_job(seastar::shard_id sid) {
+          ceph_assert(sid >= 1 && sid <= jobs);
+          return summaries[sid - 1];
+        }
+
+        bool should_stop() const {
+          return elapsed >= msgtime;
+        }
+
+        seastar::future<> ticktock() {
+          return seastar::sleep(1s).then([this] {
+            ++elapsed;
+          });
+        }
+
+        void report_header() {
+          std::ostringstream sout;
+          sout << std::setfill(' ')
+               << std::setw(7) << "sec"
+               << std::setw(6) << "depth"
+               << std::setw(8) << "IOPS"
+               << std::setw(8) << "MB/s"
+               << std::setw(8) << "lat(ms)";
+          std::cout << sout.str() << std::endl;
+        }
+
+        void report_period() {
+          if (elapsed == 1) {
+            // init this->start_times at the first period
+            for (unsigned i=0; i<jobs; ++i) {
+              start_times[i] = snaps[i].start_time;
+            }
+          }
+          std::chrono::duration<double> elapsed_d = 0s;
+          unsigned depth = 0u;
+          unsigned ops = 0u;
+          unsigned sampled_count = 0u;
+          double total_lat_s = 0.0;
+          for (const auto& snap: snaps) {
+            elapsed_d += (snap.finish_time - snap.start_time);
+            depth += snap.depth;
+            ops += (snap.finish_count - snap.start_count);
+            sampled_count += snap.sampled_count;
+            total_lat_s += snap.total_lat_s;
+          }
+          double elapsed_s = elapsed_d.count() / jobs;
+          double iops = ops/elapsed_s;
+          std::ostringstream sout;
+          sout << setfill(' ')
+               << std::setw(7) << elapsed_s
+               << std::setw(6) << depth
+               << std::setw(8) << iops
+               << std::setw(8) << iops * bytes_of_block / 1048576
+               << std::setw(8) << (total_lat_s / sampled_count * 1000);
+          std::cout << sout.str() << std::endl;
+        }
+
+        void report_summary() const {
+          std::chrono::duration<double> elapsed_d = 0s;
+          unsigned ops = 0u;
+          unsigned sampled_count = 0u;
+          double total_lat_s = 0.0;
+          for (const auto& summary: summaries) {
+            elapsed_d += (summary.finish_time - summary.start_time);
+            ops += (summary.received_count - summary.start_count);
+            sampled_count += summary.sampled_count;
+            total_lat_s += summary.total_lat_s;
+          }
+          double elapsed_s = elapsed_d.count() / jobs;
+          double iops = ops / elapsed_s;
+          std::ostringstream sout;
+          sout << "--------------"
+               << " summary "
+               << "--------------\n"
+               << setfill(' ')
+               << std::setw(7) << elapsed_s
+               << std::setw(6) << "-"
+               << std::setw(8) << iops
+               << std::setw(8) << iops * bytes_of_block / 1048576
+               << std::setw(8) << (total_lat_s / sampled_count * 1000)
+               << "\n";
+          std::cout << sout.str() << std::endl;
+        }
+      };
+
+      seastar::future<> report_period(TimerReport& report) {
+        return container().invoke_on_all([&report] (auto& client) {
+          if (client.is_active()) {
+            PeriodStats& snap = report.get_snap_by_job(client.sid);
+            client.period_stats.reset(client.conn_stats.received_count,
+                                      &snap);
+            snap.depth = client.get_current_depth();
+          }
+        }).then([&report] {
+          report.report_period();
+        });
+      }
+
+      seastar::future<> report_summary(TimerReport& report) {
+        return container().invoke_on_all([&report] (auto& client) {
+          if (client.is_active()) {
+            ConnStats& summary = report.get_summary_by_job(client.sid);
+            summary = client.conn_stats;
+            summary.finish_time = mono_clock::now();
+          }
+        }).then([&report] {
+          report.report_summary();
+        });
+      }
+
+     public:
+      seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
+        logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
+        return container().invoke_on_all([] (auto& client) {
+          if (client.is_active()) {
+            client.do_dispatch_messages(client.active_conn.get());
+          }
+        }).then([this, ramptime] {
+          logger().info("[all clients]: ramping up {} seconds...", ramptime);
+          return seastar::sleep(std::chrono::seconds(ramptime));
         }).then([this] {
-          logger().info("\nstart sending {} MOSDOps from {} clients",
-                        jobs * rounds, jobs);
-          mono_time start_time = mono_clock::now();
           return container().invoke_on_all([] (auto& client) {
             if (client.is_active()) {
-              return client.do_dispatch_messages(client.active_conn.get());
+              client.conn_stats.start();
+              client.period_stats.reset(client.conn_stats.received_count);
             }
-            return seastar::now();
-          }).then([this, start_time] {
-            std::chrono::duration<double> dur_messaging = mono_clock::now() - start_time;
-            logger().info("\nSummary:\n  clients: {}\n  MOSDOps: {}\n  total time: {}s\n",
-                          jobs, jobs * rounds, dur_messaging.count());
+          });
+        }).then([this, msgtime] {
+          logger().info("[all clients]: reporting {} seconds...\n", msgtime);
+          return seastar::do_with(
+              TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
+            report.report_header();
+            return seastar::do_until(
+              [&report] { return report.should_stop(); },
+              [&report, this] {
+                return report.ticktock().then([&report, this] {
+                  // report period every 1s
+                  return report_period(report);
+                }).then([&report, this] {
+                  // report summary every 10s
+                  if (report.get_elapsed() % 10 == 0) {
+                    return report_summary(report);
+                  } else {
+                    return seastar::now();
+                  }
+                });
+              }
+            ).then([&report, this] {
+              // report the final summary
+              if (report.get_elapsed() % 10 != 0) {
+                return report_summary(report);
+              } else {
+                return seastar::now();
+              }
+            });
           });
         });
       }
@@ -346,7 +567,7 @@ static seastar::future<> run(
           m->set_tid(sent_count);
 
           // sample message latency
-          if (sent_count % conn_session.SAMPLE_RATE == 0) {
+          if (sent_count % SAMPLE_RATE == 0) {
             auto index = sent_count % time_msgs_sent.size();
             ceph_assert(time_msgs_sent[index] == mono_clock::zero());
             time_msgs_sent[index] = mono_clock::now();
@@ -356,40 +577,48 @@ static seastar::future<> run(
         });
       }
 
-      seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
+      class DepthBroken: public std::exception {};
+
+      seastar::future<> stop_dispatch_messages() {
+        stop_send = true;
+        depth.broken(DepthBroken());
+        return stopped_send_promise.get_future();
+      }
+
+      void do_dispatch_messages(ceph::net::Connection* conn) {
         ceph_assert(seastar::engine().cpu_id() == sid);
         ceph_assert(sent_count == 0);
-        conn_session.start_time = mono_clock::now();
-        return seastar::do_until(
-          [this, conn] {
-            bool stop = (sent_count == rounds);
-            if (stop) {
-              logger().info("{}: finished sending {} OSDOPs",
-                            *conn, sent_count);
-            }
-            return stop;
-          },
+        conn_stats.start_time = mono_clock::now();
+        seastar::do_until(
+          [this] { return stop_send; },
           [this, conn] {
             sent_count += 1;
             return send_msg(conn);
           }
-        ).then([this] {
-          return conn_session.done.get_future();
-        }).then([this] {
-          std::chrono::duration<double> dur_conn = conn_session.connected_time - conn_session.connecting_time;
-          std::chrono::duration<double> dur_msg = mono_clock::now() - conn_session.start_time;
-          logger().info("\n{}:\n"
-                        "  messages: {}\n"
+        ).handle_exception_type([] (const DepthBroken& e) {
+          // ok, stopped by stop_dispatch_messages()
+        }).finally([this, conn] {
+          std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
+          std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
+          unsigned ops = conn_stats.received_count - conn_stats.start_count;
+          logger().info("{}: stopped sending OSDOPs.\n"
+                        "{}(depth={}):\n"
                         "  connect time: {}s\n"
+                        "  messages received: {}\n"
                         "  messaging time: {}s\n"
                         "  latency: {}ms\n"
                         "  IOPS: {}\n"
                         "  throughput: {}MB/s\n",
-                        lname, conn_session.received_count,
-                        dur_conn.count(), dur_msg.count(),
-                        conn_session.total_lat_s / conn_session.sampled_count * 1000,
-                        conn_session.received_count / dur_msg.count(),
-                        conn_session.received_count / dur_msg.count() * msg_len / 1048576);
+                        *conn,
+                        lname,
+                        nr_depth,
+                        dur_conn.count(),
+                        ops,
+                        dur_msg.count(),
+                        conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
+                        ops / dur_msg.count(),
+                        ops / dur_msg.count() * msg_len / 1048576);
+          stopped_send_promise.set_value();
         });
       }
     };
@@ -397,7 +626,7 @@ static seastar::future<> run(
 
   return seastar::when_all_succeed(
       ceph::net::create_sharded<test_state::Server>(server_conf.core, server_conf.block_size),
-      ceph::net::create_sharded<test_state::Client>(client_conf.jobs, client_conf.rounds,
+      ceph::net::create_sharded<test_state::Client>(client_conf.jobs,
                                                     client_conf.block_size, client_conf.depth))
     .then([=](test_state::Server *server,
               test_state::Client *client) {
@@ -411,10 +640,11 @@ static seastar::future<> run(
           return seastar::when_all_succeed(
               server->init(server_conf.addr),
               client->init())
-          // dispatch ops
             .then([client, addr = client_conf.server_addr] {
-              return client->dispatch_messages(addr);
-          // shutdown
+              return client->connect_wait_verify(addr);
+            }).then([client, ramptime = client_conf.ramptime,
+                     msgtime = client_conf.msgtime] {
+              return client->dispatch_with_timer(ramptime, msgtime);
             }).finally([client] {
               return client->shutdown();
             }).finally([server] {
@@ -425,10 +655,11 @@ static seastar::future<> run(
           ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
           ceph_assert(client_conf.jobs > 0);
           return client->init()
-          // dispatch ops
             .then([client, addr = client_conf.server_addr] {
-              return client->dispatch_messages(addr);
-          // shutdown
+              return client->connect_wait_verify(addr);
+            }).then([client, ramptime = client_conf.ramptime,
+                     msgtime = client_conf.msgtime] {
+              return client->dispatch_with_timer(ramptime, msgtime);
             }).finally([client] {
               return client->shutdown();
             });
@@ -457,8 +688,10 @@ int main(int argc, char** argv)
      "0: both, 1:client, 2:server")
     ("addr", bpo::value<std::string>()->default_value("v1:0.0.0.0:9010"),
      "server address")
-    ("rounds", bpo::value<unsigned>()->default_value(65536),
-     "number of client messaging rounds")
+    ("ramptime", bpo::value<unsigned>()->default_value(5),
+     "seconds of client ramp-up time")
+    ("msgtime", bpo::value<unsigned>()->default_value(15),
+     "seconds of client messaging time")
     ("jobs", bpo::value<unsigned>()->default_value(1),
      "number of client jobs (messengers)")
     ("cbs", bpo::value<unsigned>()->default_value(4096),