]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/crimson: improved perf tool for crimson-msgr
authorYingxin Cheng <yingxincheng@gmail.com>
Tue, 19 Mar 2019 14:26:59 +0000 (22:26 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 5 Apr 2019 03:21:18 +0000 (11:21 +0800)
New features:
* --jobs: start multiple client messengers from core #1 ~ #jobs;
* --core: can assign server core to get away from busy client cores;
* --rounds: a client will send <rounds>/<jobs> messages;

Improved:
* Better configuration report;
* Report individual client results plus a summary;
* Validate if CPU number is sufficient before running;
* Sleep 1 second while connecting, so it won't hurt performance;
* Simplify client logic and bug fixes;

Removed unecessary features:
* finish_decode() for MOSDOp;
* keepalive ratio;

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/test/crimson/perf_crimson_msgr.cc

index f197384505f47854e341954b3d591c38b500b019..fc6a202e83a8c04db915f01a9dcd59805b661652 100644 (file)
@@ -32,28 +32,35 @@ seastar::logger& logger() {
   return ceph::get_logger(ceph_subsys_ms);
 }
 
-
 enum class perf_mode_t {
   both,
   client,
   server
 };
 
-static std::random_device rd;
-static std::default_random_engine rng{rd()};
-
 static seastar::future<> run(unsigned rounds,
-                             double keepalive_ratio,
-                             int bs,
-                             int depth,
+                             unsigned jobs,
+                             unsigned bs,
+                             unsigned depth,
                              std::string addr,
-                             perf_mode_t mode)
+                             perf_mode_t mode,
+                             unsigned core)
 {
   struct test_state {
     struct Server final
         : public ceph::net::Dispatcher,
           public seastar::peering_sharded_service<Server> {
       ceph::net::Messenger *msgr = nullptr;
+      const seastar::shard_id sid;
+      const seastar::shard_id msgr_sid;
+      std::string lname;
+
+      Server(unsigned msgr_core)
+        : sid{seastar::engine().cpu_id()},
+          msgr_sid{msgr_core} {
+        lname = "server#";
+        lname += std::to_string(sid);
+      }
 
       Dispatcher* get_local_shard() override {
         return &(container().local());
@@ -66,29 +73,30 @@ static seastar::future<> run(unsigned rounds,
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
         // reply
         Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
-        req->finish_decode();
         return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
       }
 
-      seastar::future<> init(const entity_name_t& name,
-                             const std::string& lname,
-                             const uint64_t nonce,
-                             const entity_addr_t& addr) {
-        auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1);
-        return fut.then([this, addr](ceph::net::Messenger *messenger) {
-            return container().invoke_on_all([messenger](auto& server) {
-                server.msgr = messenger->get_local_shard();
-                server.msgr->set_crc_header();
-              }).then([messenger, addr] {
-                return messenger->bind(entity_addrvec_t{addr});
-              }).then([this, messenger] {
-                return messenger->start(this);
-              });
-          });
+      seastar::future<> init(const entity_addr_t& addr) {
+        return container().invoke_on(msgr_sid, [addr] (auto& server) {
+          // server msgr is always with nonce 0
+          auto&& fut = ceph::net::Messenger::create(entity_name_t::OSD(server.sid), server.lname, 0, server.sid);
+          return fut.then([&server, addr](ceph::net::Messenger *messenger) {
+              return server.container().invoke_on_all([messenger](auto& server) {
+                  server.msgr = messenger->get_local_shard();
+                }).then([messenger, addr] {
+                  return messenger->bind(entity_addrvec_t{addr});
+                }).then([&server, messenger] {
+                  return messenger->start(&server);
+                });
+            });
+        });
       }
       seastar::future<> shutdown() {
-        ceph_assert(msgr);
-        return msgr->shutdown();
+        logger().info("\n{} shutdown...", lname);
+        return container().invoke_on(msgr_sid, [] (auto& server) {
+          ceph_assert(server.msgr);
+          return server.msgr->shutdown();
+        });
       }
     };
 
@@ -97,38 +105,42 @@ static seastar::future<> run(unsigned rounds,
           public seastar::peering_sharded_service<Client> {
 
       struct PingSession : public seastar::enable_shared_from_this<PingSession> {
-        unsigned count = 0u;
+        unsigned received_count = 0u;
+        mono_time connecting_time;
         mono_time connected_time;
+        mono_time start_time;
         mono_time finish_time;
+        seastar::promise<> done;
       };
       using PingSessionRef = seastar::shared_ptr<PingSession>;
 
-      unsigned rounds;
-      std::bernoulli_distribution keepalive_dist;
+      const seastar::shard_id sid;
+      std::string lname;
+
+      const unsigned jobs;
+      const unsigned rounds;
       ceph::net::Messenger *msgr = nullptr;
-      std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
-      std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
-      int msg_len;
+      const unsigned msg_len;
       bufferlist msg_data;
       seastar::semaphore depth;
 
-      Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth)
-        : rounds(rounds),
-          keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
-          depth(depth) {
+      unsigned sent_count = 0u;
+      ceph::net::ConnectionRef active_conn = nullptr;
+      PingSessionRef active_session = nullptr;
+
+      Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth)
+        : sid{seastar::engine().cpu_id()},
+          jobs{jobs},
+          rounds{rounds/jobs},
+          msg_len{msg_len},
+          depth{depth} {
+        lname = "client#";
+        lname += std::to_string(sid);
         bufferptr ptr(msg_len);
         memset(ptr.c_str(), 0, msg_len);
         msg_data.append(ptr);
       }
 
-      PingSessionRef find_session(ceph::net::ConnectionRef c) {
-        auto found = sessions.find(c);
-        if (found == sessions.end()) {
-          ceph_assert(false);
-        }
-        return found->second;
-      }
-
       Dispatcher* get_local_shard() override {
         return &(container().local());
       }
@@ -136,80 +148,95 @@ static seastar::future<> run(unsigned rounds,
         return seastar::now();
       }
       seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
-        logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
-        auto session = seastar::make_shared<PingSession>();
-        auto [i, added] = sessions.emplace(conn, session);
-        std::ignore = i;
-        ceph_assert(added);
-        session->connected_time = mono_clock::now();
+        logger().info("{}: connected", *conn);
+        active_session = seastar::make_shared<PingSession>();
+        active_session->connected_time = mono_clock::now();
         return seastar::now();
       }
       seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
                                     MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
         depth.signal(1);
-        auto session = find_session(c);
-        ++(session->count);
+        ceph_assert(active_session);
+        ++(active_session->received_count);
 
-        if (session->count == rounds) {
-          logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count);
-          session->finish_time = mono_clock::now();
-          return container().invoke_on_all([conn = c.get()](auto &client) {
-              auto found = client.pending_conns.find(conn);
-              ceph_assert(found != client.pending_conns.end());
-              found->second.set_value();
-            });
-        } else {
-          return seastar::now();
+        if (active_session->received_count == rounds) {
+          logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count);
+          active_session->finish_time = mono_clock::now();
+          active_session->done.set_value();
         }
+        return seastar::now();
       }
 
-      seastar::future<> init(const entity_name_t& name,
-                             const std::string& lname,
-                             const uint64_t nonce) {
-        return ceph::net::Messenger::create(name, lname, nonce, 2)
-          .then([this](ceph::net::Messenger *messenger) {
-            return container().invoke_on_all([messenger](auto& client) {
-                client.msgr = messenger->get_local_shard();
-                client.msgr->set_crc_header();
-              }).then([this, messenger] {
-                return messenger->start(this);
-              });
-          });
+      // should start messenger at this shard?
+      bool is_active() {
+        ceph_assert(seastar::engine().cpu_id() == sid);
+        return sid != 0 && sid <= jobs;
+      }
+
+      seastar::future<> init() {
+        return container().invoke_on_all([] (auto& client) {
+          if (client.is_active()) {
+            return ceph::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid)
+            .then([&client] (ceph::net::Messenger *messenger) {
+              client.msgr = messenger;
+              return client.msgr->start(&client);
+            });
+          }
+          return seastar::now();
+        });
       }
 
       seastar::future<> shutdown() {
-        ceph_assert(msgr);
-        return msgr->shutdown();
+        return container().invoke_on_all([] (auto& client) {
+          if (client.is_active()) {
+            logger().info("\n{} shutdown...", client.lname);
+            ceph_assert(client.msgr);
+            return client.msgr->shutdown();
+          }
+          return seastar::now();
+        });
       }
 
-      seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
-        mono_time start_time = mono_clock::now();
-        return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
-          .then([this, foreign_dispatch, start_time](auto conn) {
-            return seastar::futurize_apply([this, conn, foreign_dispatch] {
-                if (foreign_dispatch) {
-                  return do_dispatch_messages(&**conn);
-                } else {
-                  // NOTE: this could be faster if we don't switch cores in do_dispatch_messages().
-                  return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
-                      return client.do_dispatch_messages(conn);
-                    });
-                }
-              }).finally([this, conn, start_time] {
-                return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
-                    auto session = client.find_session((*conn)->shared_from_this());
-                    std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
-                    std::chrono::duration<double> dur_messaging = session->finish_time - session->connected_time;
-                    logger().info("{}: handshake {}, messaging {}",
-                                  **conn, dur_handshake.count(), dur_messaging.count());
-                  });
-              });
+      seastar::future<> dispatch_messages(const entity_addr_t& peer_addr) {
+        return container().invoke_on_all([peer_addr] (auto& client) {
+          // start clients in active cores (#1 ~ #jobs)
+          if (client.is_active()) {
+            mono_time start_time = mono_clock::now();
+            return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+            .then([&client] (auto conn) {
+              client.active_conn = conn->release();
+              // make sure handshake won't heart the performance
+              return seastar::sleep(1s);
+            }).then([&client, start_time] {
+              if (!client.active_session) {
+                logger().error("\n{} not connected after 1s!\n", client.lname);
+                ceph_assert(false);
+              }
+              client.active_session->connecting_time = start_time;
+            });
+          }
+          return seastar::now();
+        }).then([this] {
+          logger().info("\nstart sending {} MOSDOps from {} clients",
+                        jobs * rounds, jobs);
+          mono_time start_time = mono_clock::now();
+          return container().invoke_on_all([] (auto& client) {
+            if (client.is_active()) {
+              return client.do_dispatch_messages(client.active_conn.get());
+            }
+            return seastar::now();
+          }).then([this, start_time] {
+            std::chrono::duration<double> dur_messaging = mono_clock::now() - start_time;
+            logger().info("\nSummary:\n  clients: {}\n  MOSDOps: {}\n  total time: {}s\n",
+                          jobs, jobs * rounds, dur_messaging.count());
           });
+        });
       }
 
      private:
       seastar::future<> send_msg(ceph::net::Connection* conn) {
+        ceph_assert(seastar::engine().cpu_id() == sid);
         return depth.wait(1).then([this, conn] {
           const static pg_t pgid;
           const static object_locator_t oloc;
@@ -225,91 +252,83 @@ static seastar::future<> run(unsigned rounds,
       }
 
       seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
-        return container().invoke_on_all([conn](auto& client) {
-            auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
-            std::ignore = i;
-            ceph_assert(added);
-          }).then([this, conn] {
-            return seastar::do_with(0u, 0u,
-                                    [this, conn](auto &count_ping, auto &count_keepalive) {
-                return seastar::do_until(
-                  [this, conn, &count_ping, &count_keepalive] {
-                    bool stop = (count_ping == rounds);
-                    if (stop) {
-                      logger().info("{}: finished sending {} OSDOPs with {} keepalives",
-                                    *conn, count_ping, count_keepalive);
-                    }
-                    return stop;
-                  },
-                  [this, conn, &count_ping, &count_keepalive] {
-                    return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
-                        if (keepalive_dist(rng)) {
-                          return conn->keepalive()
-                            .then([&count_keepalive] {
-                              count_keepalive += 1;
-                              return seastar::make_ready_future<seastar::stop_iteration>(
-                                seastar::stop_iteration::no);
-                            });
-                        } else {
-                          return send_msg(conn)
-                            .then([&count_ping] {
-                              count_ping += 1;
-                              return seastar::make_ready_future<seastar::stop_iteration>(
-                                seastar::stop_iteration::yes);
-                            });
-                        }
-                      });
-                  }).then([this, conn] {
-                    auto found = pending_conns.find(conn);
-                    return found->second.get_future();
-                  });
-              });
-          });
+        ceph_assert(seastar::engine().cpu_id() == sid);
+        ceph_assert(sent_count == 0);
+        active_session->start_time = mono_clock::now();
+        return seastar::do_until(
+          [this, conn] {
+            bool stop = (sent_count == rounds);
+            if (stop) {
+              logger().info("{}: finished sending {} OSDOPs",
+                            *conn, sent_count);
+            }
+            return stop;
+          },
+          [this, conn] {
+            sent_count += 1;
+            return send_msg(conn);
+          }
+        ).then([this] {
+          return active_session->done.get_future();
+        }).then([this] {
+          std::chrono::duration<double> dur_conn = active_session->connected_time - active_session->connecting_time;
+          std::chrono::duration<double> dur_msg = mono_clock::now() - active_session->start_time;
+          logger().info("\n{}:\n  messages: {}\n  connect time: {}s\n  messaging time: {}s\n",
+                        lname, active_session->received_count, dur_conn.count(), dur_msg.count());
+        });
       }
     };
   };
 
   return seastar::when_all_succeed(
-      ceph::net::create_sharded<test_state::Server>(),
-      ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio, bs, depth))
-    .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server,
-                                                test_state::Client *client) {
+      ceph::net::create_sharded<test_state::Server>(core),
+      ceph::net::create_sharded<test_state::Client>(jobs, rounds, bs, depth))
+    .then([=](test_state::Server *server,
+              test_state::Client *client) {
       entity_addr_t target_addr;
       target_addr.parse(addr.c_str(), nullptr);
       target_addr.set_type(entity_addr_t::TYPE_LEGACY);
       if (mode == perf_mode_t::both) {
+          logger().info("\nperf settings:\n  mode=server+client\n  server addr={}\n  server core={}\n  rounds={}\n  client jobs={}\n  bs={}\n  depth={}\n",
+                        addr, core, rounds, jobs, bs, depth);
+          ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core));
+          ceph_assert(core == 0 || core > jobs);
+          ceph_assert(jobs > 0);
           return seastar::when_all_succeed(
-              server->init(entity_name_t::OSD(0), "server", 0, target_addr),
-              client->init(entity_name_t::OSD(1), "client", 0))
-          // dispatch pingpoing
+              server->init(target_addr),
+              client->init())
+          // dispatch ops
             .then([client, target_addr] {
-              return client->dispatch_messages(target_addr, false);
+              return client->dispatch_messages(target_addr);
           // shutdown
             }).finally([client] {
-              logger().info("client shutdown...");
               return client->shutdown();
             }).finally([server] {
-              logger().info("server shutdown...");
               return server->shutdown();
             });
       } else if (mode == perf_mode_t::client) {
-          return client->init(entity_name_t::OSD(1), "client", 0)
-          // dispatch pingpoing
+          logger().info("\nperf settings:\n  mode=client\n  server addr={}\n  rounds={}\n  client jobs={}\n  bs={}\n  depth={}\n",
+                        addr, rounds, jobs, bs, depth);
+          ceph_assert(seastar::smp::count >= 1+jobs);
+          ceph_assert(jobs > 0);
+          return client->init()
+          // dispatch ops
             .then([client, target_addr] {
-              return client->dispatch_messages(target_addr, false);
+              return client->dispatch_messages(target_addr);
           // shutdown
             }).finally([client] {
-              logger().info("client shutdown...");
               return client->shutdown();
             });
       } else { // mode == perf_mode_t::server
-          return server->init(entity_name_t::OSD(0), "server", 0, target_addr)
-          // dispatch pingpoing
+          ceph_assert(seastar::smp::count >= 1+core);
+          logger().info("\nperf settings:\n  mode=server\n  server addr={}\n  server core={}\n",
+                        addr, core);
+          return server->init(target_addr)
+          // dispatch ops
             .then([server] {
               return server->msgr->wait();
           // shutdown
             }).finally([server] {
-              logger().info("server shutdown...");
               return server->shutdown();
             });
       }
@@ -323,34 +342,35 @@ int main(int argc, char** argv)
   seastar::app_template app;
   app.add_options()
     ("addr", bpo::value<std::string>()->default_value("0.0.0.0:9010"),
-     "start server")
-    ("mode", bpo::value<int>()->default_value(0),
+     "server address")
+    ("core", bpo::value<unsigned>()->default_value(0),
+     "server running core")
+    ("mode", bpo::value<unsigned>()->default_value(0),
      "0: both, 1:client, 2:server")
     ("rounds", bpo::value<unsigned>()->default_value(65536),
      "number of messaging rounds")
-    ("keepalive-ratio", bpo::value<double>()->default_value(0),
-     "ratio of keepalive in ping messages")
-    ("bs", bpo::value<int>()->default_value(4096),
+    ("jobs", bpo::value<unsigned>()->default_value(1),
+     "number of jobs (client messengers)")
+    ("bs", bpo::value<unsigned>()->default_value(4096),
      "block size")
-    ("depth", bpo::value<int>()->default_value(512),
+    ("depth", bpo::value<unsigned>()->default_value(512),
      "io depth");
   return app.run(argc, argv, [&app] {
       auto&& config = app.configuration();
       auto rounds = config["rounds"].as<unsigned>();
-      auto keepalive_ratio = config["keepalive-ratio"].as<double>();
-      auto bs = config["bs"].as<int>();
-      auto depth = config["depth"].as<int>();
+      auto jobs = config["jobs"].as<unsigned>();
+      auto bs = config["bs"].as<unsigned>();
+      auto depth = config["depth"].as<unsigned>();
       auto addr = config["addr"].as<std::string>();
-      auto mode = config["mode"].as<int>();
-      logger().info("\nsettings:\n  addr={}\n  mode={}\n  rounds={}\n  keepalive-ratio={}\n  bs={}\n  depth={}",
-                    addr, mode, rounds, keepalive_ratio, bs, depth);
-      ceph_assert(mode >= 0 && mode <= 2);
+      auto core = config["core"].as<unsigned>();
+      auto mode = config["mode"].as<unsigned>();
+      ceph_assert(mode <= 2);
       auto _mode = static_cast<perf_mode_t>(mode);
-      return run(rounds, keepalive_ratio, bs, depth, addr, _mode)
+      return run(rounds, jobs, bs, depth, addr, _mode, core)
         .then([] {
-          std::cout << "successful" << std::endl;
+          logger().info("\nsuccessful!\n");
         }).handle_exception([] (auto eptr) {
-          std::cout << "failed" << std::endl;
+          logger().info("\nfailed!\n");
           return seastar::make_exception_future<>(eptr);
         });
     });