]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: merge files in tools/crimson into crimson/tools 48277/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 28 Sep 2022 08:16:20 +0000 (16:16 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 28 Sep 2022 08:19:56 +0000 (16:19 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/tools/CMakeLists.txt
src/crimson/tools/perf_async_msgr.cc [new file with mode: 0644]
src/crimson/tools/perf_crimson_msgr.cc [new file with mode: 0644]
src/crimson/tools/perf_staged_fltree.cc [new file with mode: 0644]
src/tools/CMakeLists.txt
src/tools/crimson/CMakeLists.txt [deleted file]
src/tools/crimson/perf_async_msgr.cc [deleted file]
src/tools/crimson/perf_crimson_msgr.cc [deleted file]
src/tools/crimson/perf_staged_fltree.cc [deleted file]

index 52436c62dc48d7b3e21bcdb2387a38e4e8094395..d57c3f9cfc444b071a4b989592dd8b37a25c2611 100644 (file)
@@ -7,3 +7,12 @@ add_executable(crimson-store-nbd
 target_link_libraries(crimson-store-nbd
   crimson-os)
 install(TARGETS crimson-store-nbd DESTINATION bin)
+
+add_executable(perf-crimson-msgr perf_crimson_msgr.cc)
+target_link_libraries(perf-crimson-msgr crimson)
+
+add_executable(perf-async-msgr perf_async_msgr.cc)
+target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS})
+
+add_executable(perf-staged-fltree perf_staged_fltree.cc)
+target_link_libraries(perf-staged-fltree crimson-seastore)
diff --git a/src/crimson/tools/perf_async_msgr.cc b/src/crimson/tools/perf_async_msgr.cc
new file mode 100644 (file)
index 0000000..b7b0ca6
--- /dev/null
@@ -0,0 +1,141 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+#include "auth/Auth.h"
+#include "global/global_init.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+
+#include "auth/DummyAuth.h"
+
+namespace {
+
+constexpr int CEPH_OSD_PROTOCOL = 10;
+
+struct Server {
+  Server(CephContext* cct, unsigned msg_len)
+    : dummy_auth(cct), dispatcher(cct, msg_len)
+  {
+    msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0));
+    dummy_auth.auth_registry.refresh_config();
+    msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+    msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+    msgr->set_auth_client(&dummy_auth);
+    msgr->set_auth_server(&dummy_auth);
+    msgr->set_require_authorizer(false);
+  }
+  DummyAuthClientServer dummy_auth;
+  std::unique_ptr<Messenger> msgr;
+  struct ServerDispatcher : Dispatcher {
+    unsigned msg_len = 0;
+    bufferlist msg_data;
+
+    ServerDispatcher(CephContext* cct, unsigned msg_len)
+      : Dispatcher(cct), msg_len(msg_len)
+    {
+      msg_data.append_zero(msg_len);
+    }
+    bool ms_can_fast_dispatch_any() const override {
+      return true;
+    }
+    bool ms_can_fast_dispatch(const Message* m) const override {
+      return m->get_type() == CEPH_MSG_OSD_OP;
+    }
+    void ms_fast_dispatch(Message* m) override {
+      ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+      const static pg_t pgid;
+      const static object_locator_t oloc;
+      const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                                  pgid.pool(), oloc.nspace);
+      static spg_t spgid(pgid);
+      MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+      bufferlist data(msg_data);
+      rep->write(0, msg_len, data);
+      rep->set_tid(m->get_tid());
+      m->get_connection()->send_message(rep);
+      m->put();
+    }
+    bool ms_dispatch(Message*) override {
+      ceph_abort();
+    }
+    bool ms_handle_reset(Connection*) override {
+      return true;
+    }
+    void ms_handle_remote_reset(Connection*) override {
+    }
+    bool ms_handle_refused(Connection*) override {
+      return true;
+    }
+  } dispatcher;
+};
+
+}
+
+static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
+{
+  std::cout << "async server listening at " << addr << std::endl;
+  Server server{cct, bs};
+  server.msgr->bind(addr);
+  server.msgr->add_dispatcher_head(&server.dispatcher);
+  server.msgr->start();
+  server.msgr->wait();
+}
+
+int main(int argc, char** argv)
+{
+  namespace po = boost::program_options;
+  po::options_description desc{"Allowed options"};
+  desc.add_options()
+    ("help,h", "show help message")
+    ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+     "server address(crimson only supports msgr v2 protocol)")
+    ("bs", po::value<unsigned>()->default_value(0),
+     "server block size")
+    ("crc-enabled", po::value<bool>()->default_value(false),
+     "enable CRC checks");
+  po::variables_map vm;
+  std::vector<std::string> unrecognized_options;
+  try {
+    auto parsed = po::command_line_parser(argc, argv)
+      .options(desc)
+      .allow_unregistered()
+      .run();
+    po::store(parsed, vm);
+    if (vm.count("help")) {
+      std::cout << desc << std::endl;
+      return 0;
+    }
+    po::notify(vm);
+    unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
+  } catch(const po::error& e) {
+    std::cerr << "error: " << e.what() << std::endl;
+    return 1;
+  }
+
+  auto addr = vm["addr"].as<std::string>();
+  entity_addr_t target_addr;
+  target_addr.parse(addr.c_str(), nullptr);
+  ceph_assert_always(target_addr.is_msgr2());
+  auto bs = vm["bs"].as<unsigned>();
+  auto crc_enabled = vm["crc-enabled"].as<bool>();
+
+  std::vector<const char*> args(argv, argv + argc);
+  auto cct = global_init(nullptr, args,
+                         CEPH_ENTITY_TYPE_CLIENT,
+                         CODE_ENVIRONMENT_UTILITY,
+                         CINIT_FLAG_NO_MON_CONFIG);
+  common_init_finish(cct.get());
+
+  if (crc_enabled) {
+    cct->_conf.set_val("ms_crc_header", "true");
+    cct->_conf.set_val("ms_crc_data", "true");
+  } else {
+    cct->_conf.set_val("ms_crc_header", "false");
+    cct->_conf.set_val("ms_crc_data", "false");
+  }
+
+  run(cct.get(), target_addr, bs);
+}
diff --git a/src/crimson/tools/perf_crimson_msgr.cc b/src/crimson/tools/perf_crimson_msgr.cc
new file mode 100644 (file)
index 0000000..e6b5b9a
--- /dev/null
@@ -0,0 +1,749 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <map>
+#include <random>
+#include <boost/program_options.hpp>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/smp.hh>
+
+#include "common/ceph_time.h"
+#include "messages/MOSDOp.h"
+
+#include "crimson/auth/DummyAuth.h"
+#include "crimson/common/log.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+
+using namespace std;
+using namespace std::chrono_literals;
+
+namespace bpo = boost::program_options;
+
+namespace {
+
+template<typename Message>
+using Ref = boost::intrusive_ptr<Message>;
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_ms);
+}
+
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+  // seems we should only construct/stop shards on #0
+  return seastar::smp::submit_to(0, [=] {
+    auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+    return sharded_obj->start(args...).then([sharded_obj]() {
+      seastar::engine().at_exit([sharded_obj]() {
+          return sharded_obj->stop().then([sharded_obj] {});
+        });
+      return sharded_obj.get();
+    });
+  }).then([] (seastar::sharded<T> *ptr_shard) {
+    // return the pointer valid for the caller CPU
+    return &ptr_shard->local();
+  });
+}
+
+enum class perf_mode_t {
+  both,
+  client,
+  server
+};
+
+struct client_config {
+  entity_addr_t server_addr;
+  unsigned block_size;
+  unsigned ramptime;
+  unsigned msgtime;
+  unsigned jobs;
+  unsigned depth;
+
+  std::string str() const {
+    std::ostringstream out;
+    out << "client[>> " << server_addr
+        << "](bs=" << block_size
+        << ", ramptime=" << ramptime
+        << ", msgtime=" << msgtime
+        << ", jobs=" << jobs
+        << ", depth=" << depth
+        << ")";
+    return out.str();
+  }
+
+  static client_config load(bpo::variables_map& options) {
+    client_config conf;
+    entity_addr_t addr;
+    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+    ceph_assert_always(addr.is_msgr2());
+
+    conf.server_addr = addr;
+    conf.block_size = options["cbs"].as<unsigned>();
+    conf.ramptime = options["ramptime"].as<unsigned>();
+    conf.msgtime = options["msgtime"].as<unsigned>();
+    conf.jobs = options["jobs"].as<unsigned>();
+    conf.depth = options["depth"].as<unsigned>();
+    ceph_assert(conf.depth % conf.jobs == 0);
+    return conf;
+  }
+};
+
+struct server_config {
+  entity_addr_t addr;
+  unsigned block_size;
+  unsigned core;
+
+  std::string str() const {
+    std::ostringstream out;
+    out << "server[" << addr
+        << "](bs=" << block_size
+        << ", core=" << core
+        << ")";
+    return out.str();
+  }
+
+  static server_config load(bpo::variables_map& options) {
+    server_config conf;
+    entity_addr_t addr;
+    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+    ceph_assert_always(addr.is_msgr2());
+
+    conf.addr = addr;
+    conf.block_size = options["sbs"].as<unsigned>();
+    conf.core = options["core"].as<unsigned>();
+    return conf;
+  }
+};
+
+const unsigned SAMPLE_RATE = 7;
+
+static seastar::future<> run(
+    perf_mode_t mode,
+    const client_config& client_conf,
+    const server_config& server_conf,
+    bool crc_enabled)
+{
+  struct test_state {
+    struct Server;
+    using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
+
+    struct Server final
+        : public crimson::net::Dispatcher {
+      crimson::net::MessengerRef msgr;
+      crimson::auth::DummyAuthClientServer dummy_auth;
+      const seastar::shard_id msgr_sid;
+      std::string lname;
+      unsigned msg_len;
+      bufferlist msg_data;
+
+      Server(unsigned msg_len)
+        : msgr_sid{seastar::this_shard_id()},
+          msg_len{msg_len} {
+        lname = "server#";
+        lname += std::to_string(msgr_sid);
+        msg_data.append_zero(msg_len);
+      }
+
+      std::optional<seastar::future<>> ms_dispatch(
+          crimson::net::ConnectionRef c, MessageRef m) override {
+        ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+
+        // server replies with MOSDOp to generate server-side write workload
+        const static pg_t pgid;
+        const static object_locator_t oloc;
+        const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                                    pgid.pool(), oloc.nspace);
+        static spg_t spgid(pgid);
+        auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
+        bufferlist data(msg_data);
+        rep->write(0, msg_len, data);
+        rep->set_tid(m->get_tid());
+        std::ignore = c->send(std::move(rep));
+        return {seastar::now()};
+      }
+
+      seastar::future<> init(const entity_addr_t& addr) {
+        return seastar::smp::submit_to(msgr_sid, [addr, this] {
+          // server msgr is always with nonce 0
+          msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
+          msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
+          msgr->set_auth_client(&dummy_auth);
+          msgr->set_auth_server(&dummy_auth);
+          return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+            return msgr->start({this});
+          }, crimson::net::Messenger::bind_ertr::all_same_way(
+              [addr] (const std::error_code& e) {
+            logger().error("Server: "
+                           "there is another instance running at {}", addr);
+            ceph_abort();
+          }));
+        });
+      }
+      seastar::future<> shutdown() {
+        logger().info("{} shutdown...", lname);
+        return seastar::smp::submit_to(msgr_sid, [this] {
+          ceph_assert(msgr);
+          msgr->stop();
+          return msgr->shutdown();
+        });
+      }
+      seastar::future<> wait() {
+        return seastar::smp::submit_to(msgr_sid, [this] {
+          ceph_assert(msgr);
+          return msgr->wait();
+        });
+      }
+
+      static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
+        return seastar::smp::submit_to(msgr_sid, [msg_len] {
+          return seastar::make_foreign(std::make_unique<Server>(msg_len));
+        });
+      }
+    };
+
+    struct Client final
+        : public crimson::net::Dispatcher,
+          public seastar::peering_sharded_service<Client> {
+
+      struct ConnStats {
+        mono_time connecting_time = mono_clock::zero();
+        mono_time connected_time = mono_clock::zero();
+        unsigned received_count = 0u;
+
+        mono_time start_time = mono_clock::zero();
+        unsigned start_count = 0u;
+
+        unsigned sampled_count = 0u;
+        double total_lat_s = 0.0;
+
+        // for reporting only
+        mono_time finish_time = mono_clock::zero();
+
+        void start() {
+          start_time = mono_clock::now();
+          start_count = received_count;
+          sampled_count = 0u;
+          total_lat_s = 0.0;
+          finish_time = mono_clock::zero();
+        }
+      };
+      ConnStats conn_stats;
+
+      struct PeriodStats {
+        mono_time start_time = mono_clock::zero();
+        unsigned start_count = 0u;
+        unsigned sampled_count = 0u;
+        double total_lat_s = 0.0;
+
+        // for reporting only
+        mono_time finish_time = mono_clock::zero();
+        unsigned finish_count = 0u;
+        unsigned depth = 0u;
+
+        void reset(unsigned received_count, PeriodStats* snap = nullptr) {
+          if (snap) {
+            snap->start_time = start_time;
+            snap->start_count = start_count;
+            snap->sampled_count = sampled_count;
+            snap->total_lat_s = total_lat_s;
+            snap->finish_time = mono_clock::now();
+            snap->finish_count = received_count;
+          }
+          start_time = mono_clock::now();
+          start_count = received_count;
+          sampled_count = 0u;
+          total_lat_s = 0.0;
+        }
+      };
+      PeriodStats period_stats;
+
+      const seastar::shard_id sid;
+      std::string lname;
+
+      const unsigned jobs;
+      crimson::net::MessengerRef msgr;
+      const unsigned msg_len;
+      bufferlist msg_data;
+      const unsigned nr_depth;
+      seastar::semaphore depth;
+      std::vector<mono_time> time_msgs_sent;
+      crimson::auth::DummyAuthClientServer dummy_auth;
+
+      unsigned sent_count = 0u;
+      crimson::net::ConnectionRef active_conn = nullptr;
+
+      bool stop_send = false;
+      seastar::promise<> stopped_send_promise;
+
+      Client(unsigned jobs, unsigned msg_len, unsigned depth)
+        : sid{seastar::this_shard_id()},
+          jobs{jobs},
+          msg_len{msg_len},
+          nr_depth{depth/jobs},
+          depth{nr_depth},
+          time_msgs_sent{depth/jobs, mono_clock::zero()} {
+        lname = "client#";
+        lname += std::to_string(sid);
+        msg_data.append_zero(msg_len);
+      }
+
+      unsigned get_current_depth() const {
+        ceph_assert(depth.available_units() >= 0);
+        return nr_depth - depth.current();
+      }
+
+      void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+        conn_stats.connected_time = mono_clock::now();
+      }
+      std::optional<seastar::future<>> ms_dispatch(
+          crimson::net::ConnectionRef, MessageRef m) override {
+        // server replies with MOSDOp to generate server-side write workload
+        ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+
+        auto msg_id = m->get_tid();
+        if (msg_id % SAMPLE_RATE == 0) {
+          auto index = msg_id % time_msgs_sent.size();
+          ceph_assert(time_msgs_sent[index] != mono_clock::zero());
+          std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
+          conn_stats.total_lat_s += cur_latency.count();
+          ++(conn_stats.sampled_count);
+          period_stats.total_lat_s += cur_latency.count();
+          ++(period_stats.sampled_count);
+          time_msgs_sent[index] = mono_clock::zero();
+        }
+
+        ++(conn_stats.received_count);
+        depth.signal(1);
+
+        return {seastar::now()};
+      }
+
+      // should start messenger at this shard?
+      bool is_active() {
+        ceph_assert(seastar::this_shard_id() == sid);
+        return sid != 0 && sid <= jobs;
+      }
+
+      seastar::future<> init() {
+        return container().invoke_on_all([] (auto& client) {
+          if (client.is_active()) {
+            client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
+            client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+            client.msgr->set_require_authorizer(false);
+            client.msgr->set_auth_client(&client.dummy_auth);
+            client.msgr->set_auth_server(&client.dummy_auth);
+            return client.msgr->start({&client});
+          }
+          return seastar::now();
+        });
+      }
+
+      seastar::future<> shutdown() {
+        return container().invoke_on_all([] (auto& client) {
+          if (client.is_active()) {
+            logger().info("{} shutdown...", client.lname);
+            ceph_assert(client.msgr);
+            client.msgr->stop();
+            return client.msgr->shutdown().then([&client] {
+              return client.stop_dispatch_messages();
+            });
+          }
+          return seastar::now();
+        });
+      }
+
+      seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
+        return container().invoke_on_all([peer_addr] (auto& client) {
+          // start clients in active cores (#1 ~ #jobs)
+          if (client.is_active()) {
+            mono_time start_time = mono_clock::now();
+            client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+            // make sure handshake won't hurt the performance
+            return seastar::sleep(1s).then([&client, start_time] {
+              if (client.conn_stats.connected_time == mono_clock::zero()) {
+                logger().error("\n{} not connected after 1s!\n", client.lname);
+                ceph_assert(false);
+              }
+              client.conn_stats.connecting_time = start_time;
+            });
+          }
+          return seastar::now();
+        });
+      }
+
+     private:
+      class TimerReport {
+       private:
+        const unsigned jobs;
+        const unsigned msgtime;
+        const unsigned bytes_of_block;
+
+        unsigned elapsed = 0u;
+        std::vector<mono_time> start_times;
+        std::vector<PeriodStats> snaps;
+        std::vector<ConnStats> summaries;
+
+       public:
+        TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
+          : jobs{jobs},
+            msgtime{msgtime},
+            bytes_of_block{bs},
+            start_times{jobs, mono_clock::zero()},
+            snaps{jobs},
+            summaries{jobs} {}
+
+        unsigned get_elapsed() const { return elapsed; }
+
+        PeriodStats& get_snap_by_job(seastar::shard_id sid) {
+          ceph_assert(sid >= 1 && sid <= jobs);
+          return snaps[sid - 1];
+        }
+
+        ConnStats& get_summary_by_job(seastar::shard_id sid) {
+          ceph_assert(sid >= 1 && sid <= jobs);
+          return summaries[sid - 1];
+        }
+
+        bool should_stop() const {
+          return elapsed >= msgtime;
+        }
+
+        seastar::future<> ticktock() {
+          return seastar::sleep(1s).then([this] {
+            ++elapsed;
+          });
+        }
+
+        void report_header() {
+          std::ostringstream sout;
+          sout << std::setfill(' ')
+               << std::setw(7) << "sec"
+               << std::setw(6) << "depth"
+               << std::setw(8) << "IOPS"
+               << std::setw(8) << "MB/s"
+               << std::setw(8) << "lat(ms)";
+          std::cout << sout.str() << std::endl;
+        }
+
+        void report_period() {
+          if (elapsed == 1) {
+            // init this->start_times at the first period
+            for (unsigned i=0; i<jobs; ++i) {
+              start_times[i] = snaps[i].start_time;
+            }
+          }
+          std::chrono::duration<double> elapsed_d = 0s;
+          unsigned depth = 0u;
+          unsigned ops = 0u;
+          unsigned sampled_count = 0u;
+          double total_lat_s = 0.0;
+          for (const auto& snap: snaps) {
+            elapsed_d += (snap.finish_time - snap.start_time);
+            depth += snap.depth;
+            ops += (snap.finish_count - snap.start_count);
+            sampled_count += snap.sampled_count;
+            total_lat_s += snap.total_lat_s;
+          }
+          double elapsed_s = elapsed_d.count() / jobs;
+          double iops = ops/elapsed_s;
+          std::ostringstream sout;
+          sout << setfill(' ')
+               << std::setw(7) << elapsed_s
+               << std::setw(6) << depth
+               << std::setw(8) << iops
+               << std::setw(8) << iops * bytes_of_block / 1048576
+               << std::setw(8) << (total_lat_s / sampled_count * 1000);
+          std::cout << sout.str() << std::endl;
+        }
+
+        void report_summary() const {
+          std::chrono::duration<double> elapsed_d = 0s;
+          unsigned ops = 0u;
+          unsigned sampled_count = 0u;
+          double total_lat_s = 0.0;
+          for (const auto& summary: summaries) {
+            elapsed_d += (summary.finish_time - summary.start_time);
+            ops += (summary.received_count - summary.start_count);
+            sampled_count += summary.sampled_count;
+            total_lat_s += summary.total_lat_s;
+          }
+          double elapsed_s = elapsed_d.count() / jobs;
+          double iops = ops / elapsed_s;
+          std::ostringstream sout;
+          sout << "--------------"
+               << " summary "
+               << "--------------\n"
+               << setfill(' ')
+               << std::setw(7) << elapsed_s
+               << std::setw(6) << "-"
+               << std::setw(8) << iops
+               << std::setw(8) << iops * bytes_of_block / 1048576
+               << std::setw(8) << (total_lat_s / sampled_count * 1000)
+               << "\n";
+          std::cout << sout.str() << std::endl;
+        }
+      };
+
+      seastar::future<> report_period(TimerReport& report) {
+        return container().invoke_on_all([&report] (auto& client) {
+          if (client.is_active()) {
+            PeriodStats& snap = report.get_snap_by_job(client.sid);
+            client.period_stats.reset(client.conn_stats.received_count,
+                                      &snap);
+            snap.depth = client.get_current_depth();
+          }
+        }).then([&report] {
+          report.report_period();
+        });
+      }
+
+      seastar::future<> report_summary(TimerReport& report) {
+        return container().invoke_on_all([&report] (auto& client) {
+          if (client.is_active()) {
+            ConnStats& summary = report.get_summary_by_job(client.sid);
+            summary = client.conn_stats;
+            summary.finish_time = mono_clock::now();
+          }
+        }).then([&report] {
+          report.report_summary();
+        });
+      }
+
+     public:
+      seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
+        logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
+        return container().invoke_on_all([] (auto& client) {
+          if (client.is_active()) {
+            client.do_dispatch_messages(client.active_conn.get());
+          }
+        }).then([ramptime] {
+          logger().info("[all clients]: ramping up {} seconds...", ramptime);
+          return seastar::sleep(std::chrono::seconds(ramptime));
+        }).then([this] {
+          return container().invoke_on_all([] (auto& client) {
+            if (client.is_active()) {
+              client.conn_stats.start();
+              client.period_stats.reset(client.conn_stats.received_count);
+            }
+          });
+        }).then([this, msgtime] {
+          logger().info("[all clients]: reporting {} seconds...\n", msgtime);
+          return seastar::do_with(
+              TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
+            report.report_header();
+            return seastar::do_until(
+              [&report] { return report.should_stop(); },
+              [&report, this] {
+                return report.ticktock().then([&report, this] {
+                  // report period every 1s
+                  return report_period(report);
+                }).then([&report, this] {
+                  // report summary every 10s
+                  if (report.get_elapsed() % 10 == 0) {
+                    return report_summary(report);
+                  } else {
+                    return seastar::now();
+                  }
+                });
+              }
+            ).then([&report, this] {
+              // report the final summary
+              if (report.get_elapsed() % 10 != 0) {
+                return report_summary(report);
+              } else {
+                return seastar::now();
+              }
+            });
+          });
+        });
+      }
+
+     private:
+      seastar::future<> send_msg(crimson::net::Connection* conn) {
+        ceph_assert(seastar::this_shard_id() == sid);
+        return depth.wait(1).then([this, conn] {
+          const static pg_t pgid;
+          const static object_locator_t oloc;
+          const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+                                      pgid.pool(), oloc.nspace);
+          static spg_t spgid(pgid);
+          auto m = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
+          bufferlist data(msg_data);
+          m->write(0, msg_len, data);
+          // use tid as the identity of each round
+          m->set_tid(sent_count);
+
+          // sample message latency
+          if (sent_count % SAMPLE_RATE == 0) {
+            auto index = sent_count % time_msgs_sent.size();
+            ceph_assert(time_msgs_sent[index] == mono_clock::zero());
+            time_msgs_sent[index] = mono_clock::now();
+          }
+
+          return conn->send(std::move(m));
+        });
+      }
+
+      class DepthBroken: public std::exception {};
+
+      seastar::future<> stop_dispatch_messages() {
+        stop_send = true;
+        depth.broken(DepthBroken());
+        return stopped_send_promise.get_future();
+      }
+
+      void do_dispatch_messages(crimson::net::Connection* conn) {
+        ceph_assert(seastar::this_shard_id() == sid);
+        ceph_assert(sent_count == 0);
+        conn_stats.start_time = mono_clock::now();
+        // forwarded to stopped_send_promise
+        (void) seastar::do_until(
+          [this] { return stop_send; },
+          [this, conn] {
+            sent_count += 1;
+            return send_msg(conn);
+          }
+        ).handle_exception_type([] (const DepthBroken& e) {
+          // ok, stopped by stop_dispatch_messages()
+        }).then([this, conn] {
+          std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
+          std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
+          unsigned ops = conn_stats.received_count - conn_stats.start_count;
+          logger().info("{}: stopped sending OSDOPs.\n"
+                        "{}(depth={}):\n"
+                        "  connect time: {}s\n"
+                        "  messages received: {}\n"
+                        "  messaging time: {}s\n"
+                        "  latency: {}ms\n"
+                        "  IOPS: {}\n"
+                        "  throughput: {}MB/s\n",
+                        *conn,
+                        lname,
+                        nr_depth,
+                        dur_conn.count(),
+                        ops,
+                        dur_msg.count(),
+                        conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
+                        ops / dur_msg.count(),
+                        ops / dur_msg.count() * msg_len / 1048576);
+          stopped_send_promise.set_value();
+        });
+      }
+    };
+  };
+
+  return seastar::when_all(
+      test_state::Server::create(server_conf.core, server_conf.block_size),
+      create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
+      crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
+        return crimson::common::local_conf().start();
+      }).then([crc_enabled] {
+        return crimson::common::local_conf().set_val(
+            "ms_crc_data", crc_enabled ? "true" : "false");
+      })
+  ).then([=](auto&& ret) {
+    auto fp_server = std::move(std::get<0>(ret).get0());
+    auto client = std::move(std::get<1>(ret).get0());
+    test_state::Server* server = fp_server.get();
+    if (mode == perf_mode_t::both) {
+      logger().info("\nperf settings:\n  {}\n  {}\n",
+                    client_conf.str(), server_conf.str());
+      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
+      ceph_assert(client_conf.jobs > 0);
+      ceph_assert(seastar::smp::count >= 1+server_conf.core);
+      ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
+      return seastar::when_all_succeed(
+        server->init(server_conf.addr),
+        client->init()
+      ).then_unpack([client, addr = client_conf.server_addr] {
+        return client->connect_wait_verify(addr);
+      }).then([client, ramptime = client_conf.ramptime,
+               msgtime = client_conf.msgtime] {
+        return client->dispatch_with_timer(ramptime, msgtime);
+      }).then([client] {
+        return client->shutdown();
+      }).then([server, fp_server = std::move(fp_server)] () mutable {
+        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      });
+    } else if (mode == perf_mode_t::client) {
+      logger().info("\nperf settings:\n  {}\n", client_conf.str());
+      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
+      ceph_assert(client_conf.jobs > 0);
+      return client->init(
+      ).then([client, addr = client_conf.server_addr] {
+        return client->connect_wait_verify(addr);
+      }).then([client, ramptime = client_conf.ramptime,
+               msgtime = client_conf.msgtime] {
+        return client->dispatch_with_timer(ramptime, msgtime);
+      }).then([client] {
+        return client->shutdown();
+      });
+    } else { // mode == perf_mode_t::server
+      ceph_assert(seastar::smp::count >= 1+server_conf.core);
+      logger().info("\nperf settings:\n  {}\n", server_conf.str());
+      return server->init(server_conf.addr
+      // dispatch ops
+      ).then([server] {
+        return server->wait();
+      // shutdown
+      }).then([server, fp_server = std::move(fp_server)] () mutable {
+        return server->shutdown().then([cleanup = std::move(fp_server)] {});
+      });
+    }
+  }).finally([] {
+    return crimson::common::sharded_conf().stop();
+  });
+}
+
+}
+
+int main(int argc, char** argv)
+{
+  seastar::app_template app;
+  app.add_options()
+    ("mode", bpo::value<unsigned>()->default_value(0),
+     "0: both, 1:client, 2:server")
+    ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+     "server address(only support msgr v2 protocol)")
+    ("ramptime", bpo::value<unsigned>()->default_value(5),
+     "seconds of client ramp-up time")
+    ("msgtime", bpo::value<unsigned>()->default_value(15),
+     "seconds of client messaging time")
+    ("jobs", bpo::value<unsigned>()->default_value(1),
+     "number of client jobs (messengers)")
+    ("cbs", bpo::value<unsigned>()->default_value(4096),
+     "client block size")
+    ("depth", bpo::value<unsigned>()->default_value(512),
+     "client io depth")
+    ("core", bpo::value<unsigned>()->default_value(0),
+     "server running core")
+    ("sbs", bpo::value<unsigned>()->default_value(0),
+     "server block size")
+    ("crc-enabled", bpo::value<bool>()->default_value(false),
+     "enable CRC checks");
+  return app.run(argc, argv, [&app] {
+      auto&& config = app.configuration();
+      auto mode = config["mode"].as<unsigned>();
+      ceph_assert(mode <= 2);
+      auto _mode = static_cast<perf_mode_t>(mode);
+      bool crc_enabled = config["crc-enabled"].as<bool>();
+      auto server_conf = server_config::load(config);
+      auto client_conf = client_config::load(config);
+      return run(_mode, client_conf, server_conf, crc_enabled
+      ).then([] {
+          logger().info("\nsuccessful!\n");
+        }).handle_exception([] (auto eptr) {
+          logger().info("\nfailed!\n");
+          return seastar::make_exception_future<>(eptr);
+        });
+    });
+}
diff --git a/src/crimson/tools/perf_staged_fltree.cc b/src/crimson/tools/perf_staged_fltree.cc
new file mode 100644 (file)
index 0000000..81b6217
--- /dev/null
@@ -0,0 +1,178 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/program_options.hpp>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/log.h"
+#include "crimson/common/perf_counters_collection.h"
+#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h"
+#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h"
+
+#include "test/crimson/seastore/onode_tree/test_value.h"
+#include "test/crimson/seastore/transaction_manager_test_state.h"
+
+using namespace crimson::os::seastore::onode;
+namespace bpo = boost::program_options;
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_test);
+}
+
+template <bool TRACK>
+class PerfTree : public TMTestState {
+ public:
+  PerfTree(bool is_dummy) : is_dummy{is_dummy} {}
+
+  seastar::future<> run(KVPool<test_item_t>& kvs, double erase_ratio) {
+    return tm_setup().then([this, &kvs, erase_ratio] {
+      return seastar::async([this, &kvs, erase_ratio] {
+        auto tree = std::make_unique<TreeBuilder<TRACK, ExtendedValue>>(kvs,
+            (is_dummy ? NodeExtentManager::create_dummy(true)
+                      : NodeExtentManager::create_seastore(*tm)));
+        {
+          auto t = create_mutate_transaction();
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->bootstrap(tr);
+          }).unsafe_get();
+          submit_transaction(std::move(t));
+        }
+        {
+          auto t = create_mutate_transaction();
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->insert(tr);
+          }).unsafe_get();
+          auto start_time = mono_clock::now();
+          submit_transaction(std::move(t));
+          std::chrono::duration<double> duration = mono_clock::now() - start_time;
+          logger().warn("submit_transaction() done! {}s", duration.count());
+        }
+        {
+          // Note: create_weak_transaction() can also work, but too slow.
+          auto t = create_read_transaction();
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->get_stats(tr);
+          }).unsafe_get();
+
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->validate(tr);
+          }).unsafe_get();
+        }
+        {
+          auto t = create_mutate_transaction();
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->erase(tr, kvs.size() * erase_ratio);
+          }).unsafe_get();
+          submit_transaction(std::move(t));
+        }
+        {
+          auto t = create_read_transaction();
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->get_stats(tr);
+          }).unsafe_get();
+
+          with_trans_intr(*t, [&](auto &tr){
+            return tree->validate(tr);
+          }).unsafe_get();
+        }
+        tree.reset();
+      });
+    }).then([this] {
+      return tm_teardown();
+    });
+  }
+
+ private:
+  bool is_dummy;
+};
+
+template <bool TRACK>
+seastar::future<> run(const bpo::variables_map& config) {
+  return seastar::async([&config] {
+    auto backend = config["backend"].as<std::string>();
+    bool is_dummy;
+    if (backend == "dummy") {
+      is_dummy = true;
+    } else if (backend == "seastore") {
+      is_dummy = false;
+    } else {
+      ceph_abort(false && "invalid backend");
+    }
+    auto ns_sizes = config["ns-sizes"].as<std::vector<size_t>>();
+    auto oid_sizes = config["oid-sizes"].as<std::vector<size_t>>();
+    auto onode_sizes = config["onode-sizes"].as<std::vector<size_t>>();
+    auto range2 = config["range2"].as<std::vector<int>>();
+    ceph_assert(range2.size() == 2);
+    auto range1 = config["range1"].as<std::vector<unsigned>>();
+    ceph_assert(range1.size() == 2);
+    auto range0 = config["range0"].as<std::vector<unsigned>>();
+    ceph_assert(range0.size() == 2);
+    auto erase_ratio = config["erase-ratio"].as<double>();
+    ceph_assert(erase_ratio >= 0);
+    ceph_assert(erase_ratio <= 1);
+
+    using crimson::common::sharded_conf;
+    sharded_conf().start(EntityName{}, std::string_view{"ceph"}).get();
+    seastar::engine().at_exit([] {
+      return sharded_conf().stop();
+    });
+
+    using crimson::common::sharded_perf_coll;
+    sharded_perf_coll().start().get();
+    seastar::engine().at_exit([] {
+      return sharded_perf_coll().stop();
+    });
+
+    auto kvs = KVPool<test_item_t>::create_raw_range(
+        ns_sizes, oid_sizes, onode_sizes,
+        {range2[0], range2[1]},
+        {range1[0], range1[1]},
+        {range0[0], range0[1]});
+    PerfTree<TRACK> perf{is_dummy};
+    perf.run(kvs, erase_ratio).get0();
+  });
+}
+
+
+int main(int argc, char** argv)
+{
+  seastar::app_template app;
+  app.add_options()
+    ("backend", bpo::value<std::string>()->default_value("dummy"),
+     "tree backend: dummy, seastore")
+    ("tracked", bpo::value<bool>()->default_value(false),
+     "track inserted cursors")
+    ("ns-sizes", bpo::value<std::vector<size_t>>()->default_value(
+        {8, 11, 64, 128, 255, 256}),
+     "sizes of ns strings")
+    ("oid-sizes", bpo::value<std::vector<size_t>>()->default_value(
+        {8, 13, 64, 512, 2035, 2048}),
+     "sizes of oid strings")
+    ("onode-sizes", bpo::value<std::vector<size_t>>()->default_value(
+        {8, 16, 128, 576, 992, 1200}),
+     "sizes of onode")
+    ("range2", bpo::value<std::vector<int>>()->default_value(
+        {0, 128}),
+     "range of shard-pool-crush [a, b)")
+    ("range1", bpo::value<std::vector<unsigned>>()->default_value(
+        {0, 10}),
+     "range of ns-oid strings [a, b)")
+    ("range0", bpo::value<std::vector<unsigned>>()->default_value(
+        {0, 4}),
+     "range of snap-gen [a, b)")
+    ("erase-ratio", bpo::value<double>()->default_value(
+        0.8),
+     "erase-ratio of all the inserted onodes");
+  return app.run(argc, argv, [&app] {
+    auto&& config = app.configuration();
+    auto tracked = config["tracked"].as<bool>();
+    if (tracked) {
+      return run<true>(config);
+    } else {
+      return run<false>(config);
+    }
+  });
+}
index 4fe63fe4686e830d8721dba2966e4a3613245849..aeb9d0248ea6ef70ff98b4644e7b1699aec92974 100644 (file)
@@ -148,10 +148,6 @@ if(WITH_RBD)
   endif()
 endif(WITH_RBD)
 
-if(WITH_SEASTAR)
-  add_subdirectory(crimson)
-endif()
-
 add_subdirectory(immutable_object_cache)
 add_subdirectory(ceph-dencoder)
 add_subdirectory(erasure-code)
diff --git a/src/tools/crimson/CMakeLists.txt b/src/tools/crimson/CMakeLists.txt
deleted file mode 100644 (file)
index 19a2cfa..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-add_executable(perf-crimson-msgr perf_crimson_msgr.cc)
-target_link_libraries(perf-crimson-msgr crimson)
-
-add_executable(perf-async-msgr perf_async_msgr.cc)
-target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS})
-
-add_executable(perf-staged-fltree perf_staged_fltree.cc)
-target_link_libraries(perf-staged-fltree crimson-seastore)
diff --git a/src/tools/crimson/perf_async_msgr.cc b/src/tools/crimson/perf_async_msgr.cc
deleted file mode 100644 (file)
index b7b0ca6..0000000
+++ /dev/null
@@ -1,141 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-
-#include <boost/program_options/variables_map.hpp>
-#include <boost/program_options/parsers.hpp>
-
-#include "auth/Auth.h"
-#include "global/global_init.h"
-#include "msg/Dispatcher.h"
-#include "msg/Messenger.h"
-#include "messages/MOSDOp.h"
-
-#include "auth/DummyAuth.h"
-
-namespace {
-
-constexpr int CEPH_OSD_PROTOCOL = 10;
-
-struct Server {
-  Server(CephContext* cct, unsigned msg_len)
-    : dummy_auth(cct), dispatcher(cct, msg_len)
-  {
-    msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0));
-    dummy_auth.auth_registry.refresh_config();
-    msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
-    msgr->set_default_policy(Messenger::Policy::stateless_server(0));
-    msgr->set_auth_client(&dummy_auth);
-    msgr->set_auth_server(&dummy_auth);
-    msgr->set_require_authorizer(false);
-  }
-  DummyAuthClientServer dummy_auth;
-  std::unique_ptr<Messenger> msgr;
-  struct ServerDispatcher : Dispatcher {
-    unsigned msg_len = 0;
-    bufferlist msg_data;
-
-    ServerDispatcher(CephContext* cct, unsigned msg_len)
-      : Dispatcher(cct), msg_len(msg_len)
-    {
-      msg_data.append_zero(msg_len);
-    }
-    bool ms_can_fast_dispatch_any() const override {
-      return true;
-    }
-    bool ms_can_fast_dispatch(const Message* m) const override {
-      return m->get_type() == CEPH_MSG_OSD_OP;
-    }
-    void ms_fast_dispatch(Message* m) override {
-      ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-      const static pg_t pgid;
-      const static object_locator_t oloc;
-      const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
-                                  pgid.pool(), oloc.nspace);
-      static spg_t spgid(pgid);
-      MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
-      bufferlist data(msg_data);
-      rep->write(0, msg_len, data);
-      rep->set_tid(m->get_tid());
-      m->get_connection()->send_message(rep);
-      m->put();
-    }
-    bool ms_dispatch(Message*) override {
-      ceph_abort();
-    }
-    bool ms_handle_reset(Connection*) override {
-      return true;
-    }
-    void ms_handle_remote_reset(Connection*) override {
-    }
-    bool ms_handle_refused(Connection*) override {
-      return true;
-    }
-  } dispatcher;
-};
-
-}
-
-static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
-{
-  std::cout << "async server listening at " << addr << std::endl;
-  Server server{cct, bs};
-  server.msgr->bind(addr);
-  server.msgr->add_dispatcher_head(&server.dispatcher);
-  server.msgr->start();
-  server.msgr->wait();
-}
-
-int main(int argc, char** argv)
-{
-  namespace po = boost::program_options;
-  po::options_description desc{"Allowed options"};
-  desc.add_options()
-    ("help,h", "show help message")
-    ("addr", po::value<std::string>()->default_value("v2:127.0.0.1:9010"),
-     "server address(crimson only supports msgr v2 protocol)")
-    ("bs", po::value<unsigned>()->default_value(0),
-     "server block size")
-    ("crc-enabled", po::value<bool>()->default_value(false),
-     "enable CRC checks");
-  po::variables_map vm;
-  std::vector<std::string> unrecognized_options;
-  try {
-    auto parsed = po::command_line_parser(argc, argv)
-      .options(desc)
-      .allow_unregistered()
-      .run();
-    po::store(parsed, vm);
-    if (vm.count("help")) {
-      std::cout << desc << std::endl;
-      return 0;
-    }
-    po::notify(vm);
-    unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
-  } catch(const po::error& e) {
-    std::cerr << "error: " << e.what() << std::endl;
-    return 1;
-  }
-
-  auto addr = vm["addr"].as<std::string>();
-  entity_addr_t target_addr;
-  target_addr.parse(addr.c_str(), nullptr);
-  ceph_assert_always(target_addr.is_msgr2());
-  auto bs = vm["bs"].as<unsigned>();
-  auto crc_enabled = vm["crc-enabled"].as<bool>();
-
-  std::vector<const char*> args(argv, argv + argc);
-  auto cct = global_init(nullptr, args,
-                         CEPH_ENTITY_TYPE_CLIENT,
-                         CODE_ENVIRONMENT_UTILITY,
-                         CINIT_FLAG_NO_MON_CONFIG);
-  common_init_finish(cct.get());
-
-  if (crc_enabled) {
-    cct->_conf.set_val("ms_crc_header", "true");
-    cct->_conf.set_val("ms_crc_data", "true");
-  } else {
-    cct->_conf.set_val("ms_crc_header", "false");
-    cct->_conf.set_val("ms_crc_data", "false");
-  }
-
-  run(cct.get(), target_addr, bs);
-}
diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc
deleted file mode 100644 (file)
index e6b5b9a..0000000
+++ /dev/null
@@ -1,749 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <map>
-#include <random>
-#include <boost/program_options.hpp>
-
-#include <seastar/core/app-template.hh>
-#include <seastar/core/do_with.hh>
-#include <seastar/core/future-util.hh>
-#include <seastar/core/reactor.hh>
-#include <seastar/core/sleep.hh>
-#include <seastar/core/semaphore.hh>
-#include <seastar/core/smp.hh>
-
-#include "common/ceph_time.h"
-#include "messages/MOSDOp.h"
-
-#include "crimson/auth/DummyAuth.h"
-#include "crimson/common/log.h"
-#include "crimson/common/config_proxy.h"
-#include "crimson/net/Connection.h"
-#include "crimson/net/Dispatcher.h"
-#include "crimson/net/Messenger.h"
-
-using namespace std;
-using namespace std::chrono_literals;
-
-namespace bpo = boost::program_options;
-
-namespace {
-
-template<typename Message>
-using Ref = boost::intrusive_ptr<Message>;
-
-seastar::logger& logger() {
-  return crimson::get_logger(ceph_subsys_ms);
-}
-
-template <typename T, typename... Args>
-seastar::future<T*> create_sharded(Args... args) {
-  // seems we should only construct/stop shards on #0
-  return seastar::smp::submit_to(0, [=] {
-    auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
-    return sharded_obj->start(args...).then([sharded_obj]() {
-      seastar::engine().at_exit([sharded_obj]() {
-          return sharded_obj->stop().then([sharded_obj] {});
-        });
-      return sharded_obj.get();
-    });
-  }).then([] (seastar::sharded<T> *ptr_shard) {
-    // return the pointer valid for the caller CPU
-    return &ptr_shard->local();
-  });
-}
-
-enum class perf_mode_t {
-  both,
-  client,
-  server
-};
-
-struct client_config {
-  entity_addr_t server_addr;
-  unsigned block_size;
-  unsigned ramptime;
-  unsigned msgtime;
-  unsigned jobs;
-  unsigned depth;
-
-  std::string str() const {
-    std::ostringstream out;
-    out << "client[>> " << server_addr
-        << "](bs=" << block_size
-        << ", ramptime=" << ramptime
-        << ", msgtime=" << msgtime
-        << ", jobs=" << jobs
-        << ", depth=" << depth
-        << ")";
-    return out.str();
-  }
-
-  static client_config load(bpo::variables_map& options) {
-    client_config conf;
-    entity_addr_t addr;
-    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
-    ceph_assert_always(addr.is_msgr2());
-
-    conf.server_addr = addr;
-    conf.block_size = options["cbs"].as<unsigned>();
-    conf.ramptime = options["ramptime"].as<unsigned>();
-    conf.msgtime = options["msgtime"].as<unsigned>();
-    conf.jobs = options["jobs"].as<unsigned>();
-    conf.depth = options["depth"].as<unsigned>();
-    ceph_assert(conf.depth % conf.jobs == 0);
-    return conf;
-  }
-};
-
-struct server_config {
-  entity_addr_t addr;
-  unsigned block_size;
-  unsigned core;
-
-  std::string str() const {
-    std::ostringstream out;
-    out << "server[" << addr
-        << "](bs=" << block_size
-        << ", core=" << core
-        << ")";
-    return out.str();
-  }
-
-  static server_config load(bpo::variables_map& options) {
-    server_config conf;
-    entity_addr_t addr;
-    ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
-    ceph_assert_always(addr.is_msgr2());
-
-    conf.addr = addr;
-    conf.block_size = options["sbs"].as<unsigned>();
-    conf.core = options["core"].as<unsigned>();
-    return conf;
-  }
-};
-
-const unsigned SAMPLE_RATE = 7;
-
-static seastar::future<> run(
-    perf_mode_t mode,
-    const client_config& client_conf,
-    const server_config& server_conf,
-    bool crc_enabled)
-{
-  struct test_state {
-    struct Server;
-    using ServerFRef = seastar::foreign_ptr<std::unique_ptr<Server>>;
-
-    struct Server final
-        : public crimson::net::Dispatcher {
-      crimson::net::MessengerRef msgr;
-      crimson::auth::DummyAuthClientServer dummy_auth;
-      const seastar::shard_id msgr_sid;
-      std::string lname;
-      unsigned msg_len;
-      bufferlist msg_data;
-
-      Server(unsigned msg_len)
-        : msgr_sid{seastar::this_shard_id()},
-          msg_len{msg_len} {
-        lname = "server#";
-        lname += std::to_string(msgr_sid);
-        msg_data.append_zero(msg_len);
-      }
-
-      std::optional<seastar::future<>> ms_dispatch(
-          crimson::net::ConnectionRef c, MessageRef m) override {
-        ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-
-        // server replies with MOSDOp to generate server-side write workload
-        const static pg_t pgid;
-        const static object_locator_t oloc;
-        const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
-                                    pgid.pool(), oloc.nspace);
-        static spg_t spgid(pgid);
-        auto rep = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
-        bufferlist data(msg_data);
-        rep->write(0, msg_len, data);
-        rep->set_tid(m->get_tid());
-        std::ignore = c->send(std::move(rep));
-        return {seastar::now()};
-      }
-
-      seastar::future<> init(const entity_addr_t& addr) {
-        return seastar::smp::submit_to(msgr_sid, [addr, this] {
-          // server msgr is always with nonce 0
-          msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
-          msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-          msgr->set_auth_client(&dummy_auth);
-          msgr->set_auth_server(&dummy_auth);
-          return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
-            return msgr->start({this});
-          }, crimson::net::Messenger::bind_ertr::all_same_way(
-              [addr] (const std::error_code& e) {
-            logger().error("Server: "
-                           "there is another instance running at {}", addr);
-            ceph_abort();
-          }));
-        });
-      }
-      seastar::future<> shutdown() {
-        logger().info("{} shutdown...", lname);
-        return seastar::smp::submit_to(msgr_sid, [this] {
-          ceph_assert(msgr);
-          msgr->stop();
-          return msgr->shutdown();
-        });
-      }
-      seastar::future<> wait() {
-        return seastar::smp::submit_to(msgr_sid, [this] {
-          ceph_assert(msgr);
-          return msgr->wait();
-        });
-      }
-
-      static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
-        return seastar::smp::submit_to(msgr_sid, [msg_len] {
-          return seastar::make_foreign(std::make_unique<Server>(msg_len));
-        });
-      }
-    };
-
-    struct Client final
-        : public crimson::net::Dispatcher,
-          public seastar::peering_sharded_service<Client> {
-
-      struct ConnStats {
-        mono_time connecting_time = mono_clock::zero();
-        mono_time connected_time = mono_clock::zero();
-        unsigned received_count = 0u;
-
-        mono_time start_time = mono_clock::zero();
-        unsigned start_count = 0u;
-
-        unsigned sampled_count = 0u;
-        double total_lat_s = 0.0;
-
-        // for reporting only
-        mono_time finish_time = mono_clock::zero();
-
-        void start() {
-          start_time = mono_clock::now();
-          start_count = received_count;
-          sampled_count = 0u;
-          total_lat_s = 0.0;
-          finish_time = mono_clock::zero();
-        }
-      };
-      ConnStats conn_stats;
-
-      struct PeriodStats {
-        mono_time start_time = mono_clock::zero();
-        unsigned start_count = 0u;
-        unsigned sampled_count = 0u;
-        double total_lat_s = 0.0;
-
-        // for reporting only
-        mono_time finish_time = mono_clock::zero();
-        unsigned finish_count = 0u;
-        unsigned depth = 0u;
-
-        void reset(unsigned received_count, PeriodStats* snap = nullptr) {
-          if (snap) {
-            snap->start_time = start_time;
-            snap->start_count = start_count;
-            snap->sampled_count = sampled_count;
-            snap->total_lat_s = total_lat_s;
-            snap->finish_time = mono_clock::now();
-            snap->finish_count = received_count;
-          }
-          start_time = mono_clock::now();
-          start_count = received_count;
-          sampled_count = 0u;
-          total_lat_s = 0.0;
-        }
-      };
-      PeriodStats period_stats;
-
-      const seastar::shard_id sid;
-      std::string lname;
-
-      const unsigned jobs;
-      crimson::net::MessengerRef msgr;
-      const unsigned msg_len;
-      bufferlist msg_data;
-      const unsigned nr_depth;
-      seastar::semaphore depth;
-      std::vector<mono_time> time_msgs_sent;
-      crimson::auth::DummyAuthClientServer dummy_auth;
-
-      unsigned sent_count = 0u;
-      crimson::net::ConnectionRef active_conn = nullptr;
-
-      bool stop_send = false;
-      seastar::promise<> stopped_send_promise;
-
-      Client(unsigned jobs, unsigned msg_len, unsigned depth)
-        : sid{seastar::this_shard_id()},
-          jobs{jobs},
-          msg_len{msg_len},
-          nr_depth{depth/jobs},
-          depth{nr_depth},
-          time_msgs_sent{depth/jobs, mono_clock::zero()} {
-        lname = "client#";
-        lname += std::to_string(sid);
-        msg_data.append_zero(msg_len);
-      }
-
-      unsigned get_current_depth() const {
-        ceph_assert(depth.available_units() >= 0);
-        return nr_depth - depth.current();
-      }
-
-      void ms_handle_connect(crimson::net::ConnectionRef conn) override {
-        conn_stats.connected_time = mono_clock::now();
-      }
-      std::optional<seastar::future<>> ms_dispatch(
-          crimson::net::ConnectionRef, MessageRef m) override {
-        // server replies with MOSDOp to generate server-side write workload
-        ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-
-        auto msg_id = m->get_tid();
-        if (msg_id % SAMPLE_RATE == 0) {
-          auto index = msg_id % time_msgs_sent.size();
-          ceph_assert(time_msgs_sent[index] != mono_clock::zero());
-          std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
-          conn_stats.total_lat_s += cur_latency.count();
-          ++(conn_stats.sampled_count);
-          period_stats.total_lat_s += cur_latency.count();
-          ++(period_stats.sampled_count);
-          time_msgs_sent[index] = mono_clock::zero();
-        }
-
-        ++(conn_stats.received_count);
-        depth.signal(1);
-
-        return {seastar::now()};
-      }
-
-      // should start messenger at this shard?
-      bool is_active() {
-        ceph_assert(seastar::this_shard_id() == sid);
-        return sid != 0 && sid <= jobs;
-      }
-
-      seastar::future<> init() {
-        return container().invoke_on_all([] (auto& client) {
-          if (client.is_active()) {
-            client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
-            client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-            client.msgr->set_require_authorizer(false);
-            client.msgr->set_auth_client(&client.dummy_auth);
-            client.msgr->set_auth_server(&client.dummy_auth);
-            return client.msgr->start({&client});
-          }
-          return seastar::now();
-        });
-      }
-
-      seastar::future<> shutdown() {
-        return container().invoke_on_all([] (auto& client) {
-          if (client.is_active()) {
-            logger().info("{} shutdown...", client.lname);
-            ceph_assert(client.msgr);
-            client.msgr->stop();
-            return client.msgr->shutdown().then([&client] {
-              return client.stop_dispatch_messages();
-            });
-          }
-          return seastar::now();
-        });
-      }
-
-      seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
-        return container().invoke_on_all([peer_addr] (auto& client) {
-          // start clients in active cores (#1 ~ #jobs)
-          if (client.is_active()) {
-            mono_time start_time = mono_clock::now();
-            client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
-            // make sure handshake won't hurt the performance
-            return seastar::sleep(1s).then([&client, start_time] {
-              if (client.conn_stats.connected_time == mono_clock::zero()) {
-                logger().error("\n{} not connected after 1s!\n", client.lname);
-                ceph_assert(false);
-              }
-              client.conn_stats.connecting_time = start_time;
-            });
-          }
-          return seastar::now();
-        });
-      }
-
-     private:
-      class TimerReport {
-       private:
-        const unsigned jobs;
-        const unsigned msgtime;
-        const unsigned bytes_of_block;
-
-        unsigned elapsed = 0u;
-        std::vector<mono_time> start_times;
-        std::vector<PeriodStats> snaps;
-        std::vector<ConnStats> summaries;
-
-       public:
-        TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
-          : jobs{jobs},
-            msgtime{msgtime},
-            bytes_of_block{bs},
-            start_times{jobs, mono_clock::zero()},
-            snaps{jobs},
-            summaries{jobs} {}
-
-        unsigned get_elapsed() const { return elapsed; }
-
-        PeriodStats& get_snap_by_job(seastar::shard_id sid) {
-          ceph_assert(sid >= 1 && sid <= jobs);
-          return snaps[sid - 1];
-        }
-
-        ConnStats& get_summary_by_job(seastar::shard_id sid) {
-          ceph_assert(sid >= 1 && sid <= jobs);
-          return summaries[sid - 1];
-        }
-
-        bool should_stop() const {
-          return elapsed >= msgtime;
-        }
-
-        seastar::future<> ticktock() {
-          return seastar::sleep(1s).then([this] {
-            ++elapsed;
-          });
-        }
-
-        void report_header() {
-          std::ostringstream sout;
-          sout << std::setfill(' ')
-               << std::setw(7) << "sec"
-               << std::setw(6) << "depth"
-               << std::setw(8) << "IOPS"
-               << std::setw(8) << "MB/s"
-               << std::setw(8) << "lat(ms)";
-          std::cout << sout.str() << std::endl;
-        }
-
-        void report_period() {
-          if (elapsed == 1) {
-            // init this->start_times at the first period
-            for (unsigned i=0; i<jobs; ++i) {
-              start_times[i] = snaps[i].start_time;
-            }
-          }
-          std::chrono::duration<double> elapsed_d = 0s;
-          unsigned depth = 0u;
-          unsigned ops = 0u;
-          unsigned sampled_count = 0u;
-          double total_lat_s = 0.0;
-          for (const auto& snap: snaps) {
-            elapsed_d += (snap.finish_time - snap.start_time);
-            depth += snap.depth;
-            ops += (snap.finish_count - snap.start_count);
-            sampled_count += snap.sampled_count;
-            total_lat_s += snap.total_lat_s;
-          }
-          double elapsed_s = elapsed_d.count() / jobs;
-          double iops = ops/elapsed_s;
-          std::ostringstream sout;
-          sout << setfill(' ')
-               << std::setw(7) << elapsed_s
-               << std::setw(6) << depth
-               << std::setw(8) << iops
-               << std::setw(8) << iops * bytes_of_block / 1048576
-               << std::setw(8) << (total_lat_s / sampled_count * 1000);
-          std::cout << sout.str() << std::endl;
-        }
-
-        void report_summary() const {
-          std::chrono::duration<double> elapsed_d = 0s;
-          unsigned ops = 0u;
-          unsigned sampled_count = 0u;
-          double total_lat_s = 0.0;
-          for (const auto& summary: summaries) {
-            elapsed_d += (summary.finish_time - summary.start_time);
-            ops += (summary.received_count - summary.start_count);
-            sampled_count += summary.sampled_count;
-            total_lat_s += summary.total_lat_s;
-          }
-          double elapsed_s = elapsed_d.count() / jobs;
-          double iops = ops / elapsed_s;
-          std::ostringstream sout;
-          sout << "--------------"
-               << " summary "
-               << "--------------\n"
-               << setfill(' ')
-               << std::setw(7) << elapsed_s
-               << std::setw(6) << "-"
-               << std::setw(8) << iops
-               << std::setw(8) << iops * bytes_of_block / 1048576
-               << std::setw(8) << (total_lat_s / sampled_count * 1000)
-               << "\n";
-          std::cout << sout.str() << std::endl;
-        }
-      };
-
-      seastar::future<> report_period(TimerReport& report) {
-        return container().invoke_on_all([&report] (auto& client) {
-          if (client.is_active()) {
-            PeriodStats& snap = report.get_snap_by_job(client.sid);
-            client.period_stats.reset(client.conn_stats.received_count,
-                                      &snap);
-            snap.depth = client.get_current_depth();
-          }
-        }).then([&report] {
-          report.report_period();
-        });
-      }
-
-      seastar::future<> report_summary(TimerReport& report) {
-        return container().invoke_on_all([&report] (auto& client) {
-          if (client.is_active()) {
-            ConnStats& summary = report.get_summary_by_job(client.sid);
-            summary = client.conn_stats;
-            summary.finish_time = mono_clock::now();
-          }
-        }).then([&report] {
-          report.report_summary();
-        });
-      }
-
-     public:
-      seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
-        logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
-        return container().invoke_on_all([] (auto& client) {
-          if (client.is_active()) {
-            client.do_dispatch_messages(client.active_conn.get());
-          }
-        }).then([ramptime] {
-          logger().info("[all clients]: ramping up {} seconds...", ramptime);
-          return seastar::sleep(std::chrono::seconds(ramptime));
-        }).then([this] {
-          return container().invoke_on_all([] (auto& client) {
-            if (client.is_active()) {
-              client.conn_stats.start();
-              client.period_stats.reset(client.conn_stats.received_count);
-            }
-          });
-        }).then([this, msgtime] {
-          logger().info("[all clients]: reporting {} seconds...\n", msgtime);
-          return seastar::do_with(
-              TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
-            report.report_header();
-            return seastar::do_until(
-              [&report] { return report.should_stop(); },
-              [&report, this] {
-                return report.ticktock().then([&report, this] {
-                  // report period every 1s
-                  return report_period(report);
-                }).then([&report, this] {
-                  // report summary every 10s
-                  if (report.get_elapsed() % 10 == 0) {
-                    return report_summary(report);
-                  } else {
-                    return seastar::now();
-                  }
-                });
-              }
-            ).then([&report, this] {
-              // report the final summary
-              if (report.get_elapsed() % 10 != 0) {
-                return report_summary(report);
-              } else {
-                return seastar::now();
-              }
-            });
-          });
-        });
-      }
-
-     private:
-      seastar::future<> send_msg(crimson::net::Connection* conn) {
-        ceph_assert(seastar::this_shard_id() == sid);
-        return depth.wait(1).then([this, conn] {
-          const static pg_t pgid;
-          const static object_locator_t oloc;
-          const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
-                                      pgid.pool(), oloc.nspace);
-          static spg_t spgid(pgid);
-          auto m = crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0);
-          bufferlist data(msg_data);
-          m->write(0, msg_len, data);
-          // use tid as the identity of each round
-          m->set_tid(sent_count);
-
-          // sample message latency
-          if (sent_count % SAMPLE_RATE == 0) {
-            auto index = sent_count % time_msgs_sent.size();
-            ceph_assert(time_msgs_sent[index] == mono_clock::zero());
-            time_msgs_sent[index] = mono_clock::now();
-          }
-
-          return conn->send(std::move(m));
-        });
-      }
-
-      class DepthBroken: public std::exception {};
-
-      seastar::future<> stop_dispatch_messages() {
-        stop_send = true;
-        depth.broken(DepthBroken());
-        return stopped_send_promise.get_future();
-      }
-
-      void do_dispatch_messages(crimson::net::Connection* conn) {
-        ceph_assert(seastar::this_shard_id() == sid);
-        ceph_assert(sent_count == 0);
-        conn_stats.start_time = mono_clock::now();
-        // forwarded to stopped_send_promise
-        (void) seastar::do_until(
-          [this] { return stop_send; },
-          [this, conn] {
-            sent_count += 1;
-            return send_msg(conn);
-          }
-        ).handle_exception_type([] (const DepthBroken& e) {
-          // ok, stopped by stop_dispatch_messages()
-        }).then([this, conn] {
-          std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
-          std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
-          unsigned ops = conn_stats.received_count - conn_stats.start_count;
-          logger().info("{}: stopped sending OSDOPs.\n"
-                        "{}(depth={}):\n"
-                        "  connect time: {}s\n"
-                        "  messages received: {}\n"
-                        "  messaging time: {}s\n"
-                        "  latency: {}ms\n"
-                        "  IOPS: {}\n"
-                        "  throughput: {}MB/s\n",
-                        *conn,
-                        lname,
-                        nr_depth,
-                        dur_conn.count(),
-                        ops,
-                        dur_msg.count(),
-                        conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
-                        ops / dur_msg.count(),
-                        ops / dur_msg.count() * msg_len / 1048576);
-          stopped_send_promise.set_value();
-        });
-      }
-    };
-  };
-
-  return seastar::when_all(
-      test_state::Server::create(server_conf.core, server_conf.block_size),
-      create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
-      crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
-        return crimson::common::local_conf().start();
-      }).then([crc_enabled] {
-        return crimson::common::local_conf().set_val(
-            "ms_crc_data", crc_enabled ? "true" : "false");
-      })
-  ).then([=](auto&& ret) {
-    auto fp_server = std::move(std::get<0>(ret).get0());
-    auto client = std::move(std::get<1>(ret).get0());
-    test_state::Server* server = fp_server.get();
-    if (mode == perf_mode_t::both) {
-      logger().info("\nperf settings:\n  {}\n  {}\n",
-                    client_conf.str(), server_conf.str());
-      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
-      ceph_assert(client_conf.jobs > 0);
-      ceph_assert(seastar::smp::count >= 1+server_conf.core);
-      ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
-      return seastar::when_all_succeed(
-        server->init(server_conf.addr),
-        client->init()
-      ).then_unpack([client, addr = client_conf.server_addr] {
-        return client->connect_wait_verify(addr);
-      }).then([client, ramptime = client_conf.ramptime,
-               msgtime = client_conf.msgtime] {
-        return client->dispatch_with_timer(ramptime, msgtime);
-      }).then([client] {
-        return client->shutdown();
-      }).then([server, fp_server = std::move(fp_server)] () mutable {
-        return server->shutdown().then([cleanup = std::move(fp_server)] {});
-      });
-    } else if (mode == perf_mode_t::client) {
-      logger().info("\nperf settings:\n  {}\n", client_conf.str());
-      ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
-      ceph_assert(client_conf.jobs > 0);
-      return client->init(
-      ).then([client, addr = client_conf.server_addr] {
-        return client->connect_wait_verify(addr);
-      }).then([client, ramptime = client_conf.ramptime,
-               msgtime = client_conf.msgtime] {
-        return client->dispatch_with_timer(ramptime, msgtime);
-      }).then([client] {
-        return client->shutdown();
-      });
-    } else { // mode == perf_mode_t::server
-      ceph_assert(seastar::smp::count >= 1+server_conf.core);
-      logger().info("\nperf settings:\n  {}\n", server_conf.str());
-      return server->init(server_conf.addr
-      // dispatch ops
-      ).then([server] {
-        return server->wait();
-      // shutdown
-      }).then([server, fp_server = std::move(fp_server)] () mutable {
-        return server->shutdown().then([cleanup = std::move(fp_server)] {});
-      });
-    }
-  }).finally([] {
-    return crimson::common::sharded_conf().stop();
-  });
-}
-
-}
-
-int main(int argc, char** argv)
-{
-  seastar::app_template app;
-  app.add_options()
-    ("mode", bpo::value<unsigned>()->default_value(0),
-     "0: both, 1:client, 2:server")
-    ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
-     "server address(only support msgr v2 protocol)")
-    ("ramptime", bpo::value<unsigned>()->default_value(5),
-     "seconds of client ramp-up time")
-    ("msgtime", bpo::value<unsigned>()->default_value(15),
-     "seconds of client messaging time")
-    ("jobs", bpo::value<unsigned>()->default_value(1),
-     "number of client jobs (messengers)")
-    ("cbs", bpo::value<unsigned>()->default_value(4096),
-     "client block size")
-    ("depth", bpo::value<unsigned>()->default_value(512),
-     "client io depth")
-    ("core", bpo::value<unsigned>()->default_value(0),
-     "server running core")
-    ("sbs", bpo::value<unsigned>()->default_value(0),
-     "server block size")
-    ("crc-enabled", bpo::value<bool>()->default_value(false),
-     "enable CRC checks");
-  return app.run(argc, argv, [&app] {
-      auto&& config = app.configuration();
-      auto mode = config["mode"].as<unsigned>();
-      ceph_assert(mode <= 2);
-      auto _mode = static_cast<perf_mode_t>(mode);
-      bool crc_enabled = config["crc-enabled"].as<bool>();
-      auto server_conf = server_config::load(config);
-      auto client_conf = client_config::load(config);
-      return run(_mode, client_conf, server_conf, crc_enabled
-      ).then([] {
-          logger().info("\nsuccessful!\n");
-        }).handle_exception([] (auto eptr) {
-          logger().info("\nfailed!\n");
-          return seastar::make_exception_future<>(eptr);
-        });
-    });
-}
diff --git a/src/tools/crimson/perf_staged_fltree.cc b/src/tools/crimson/perf_staged_fltree.cc
deleted file mode 100644 (file)
index 81b6217..0000000
+++ /dev/null
@@ -1,178 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <boost/program_options.hpp>
-
-#include <seastar/core/app-template.hh>
-#include <seastar/core/thread.hh>
-
-#include "crimson/common/config_proxy.h"
-#include "crimson/common/log.h"
-#include "crimson/common/perf_counters_collection.h"
-#include "crimson/os/seastore/onode_manager/staged-fltree/tree_utils.h"
-#include "crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager.h"
-
-#include "test/crimson/seastore/onode_tree/test_value.h"
-#include "test/crimson/seastore/transaction_manager_test_state.h"
-
-using namespace crimson::os::seastore::onode;
-namespace bpo = boost::program_options;
-
-seastar::logger& logger() {
-  return crimson::get_logger(ceph_subsys_test);
-}
-
-template <bool TRACK>
-class PerfTree : public TMTestState {
- public:
-  PerfTree(bool is_dummy) : is_dummy{is_dummy} {}
-
-  seastar::future<> run(KVPool<test_item_t>& kvs, double erase_ratio) {
-    return tm_setup().then([this, &kvs, erase_ratio] {
-      return seastar::async([this, &kvs, erase_ratio] {
-        auto tree = std::make_unique<TreeBuilder<TRACK, ExtendedValue>>(kvs,
-            (is_dummy ? NodeExtentManager::create_dummy(true)
-                      : NodeExtentManager::create_seastore(*tm)));
-        {
-          auto t = create_mutate_transaction();
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->bootstrap(tr);
-          }).unsafe_get();
-          submit_transaction(std::move(t));
-        }
-        {
-          auto t = create_mutate_transaction();
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->insert(tr);
-          }).unsafe_get();
-          auto start_time = mono_clock::now();
-          submit_transaction(std::move(t));
-          std::chrono::duration<double> duration = mono_clock::now() - start_time;
-          logger().warn("submit_transaction() done! {}s", duration.count());
-        }
-        {
-          // Note: create_weak_transaction() can also work, but too slow.
-          auto t = create_read_transaction();
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->get_stats(tr);
-          }).unsafe_get();
-
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->validate(tr);
-          }).unsafe_get();
-        }
-        {
-          auto t = create_mutate_transaction();
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->erase(tr, kvs.size() * erase_ratio);
-          }).unsafe_get();
-          submit_transaction(std::move(t));
-        }
-        {
-          auto t = create_read_transaction();
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->get_stats(tr);
-          }).unsafe_get();
-
-          with_trans_intr(*t, [&](auto &tr){
-            return tree->validate(tr);
-          }).unsafe_get();
-        }
-        tree.reset();
-      });
-    }).then([this] {
-      return tm_teardown();
-    });
-  }
-
- private:
-  bool is_dummy;
-};
-
-template <bool TRACK>
-seastar::future<> run(const bpo::variables_map& config) {
-  return seastar::async([&config] {
-    auto backend = config["backend"].as<std::string>();
-    bool is_dummy;
-    if (backend == "dummy") {
-      is_dummy = true;
-    } else if (backend == "seastore") {
-      is_dummy = false;
-    } else {
-      ceph_abort(false && "invalid backend");
-    }
-    auto ns_sizes = config["ns-sizes"].as<std::vector<size_t>>();
-    auto oid_sizes = config["oid-sizes"].as<std::vector<size_t>>();
-    auto onode_sizes = config["onode-sizes"].as<std::vector<size_t>>();
-    auto range2 = config["range2"].as<std::vector<int>>();
-    ceph_assert(range2.size() == 2);
-    auto range1 = config["range1"].as<std::vector<unsigned>>();
-    ceph_assert(range1.size() == 2);
-    auto range0 = config["range0"].as<std::vector<unsigned>>();
-    ceph_assert(range0.size() == 2);
-    auto erase_ratio = config["erase-ratio"].as<double>();
-    ceph_assert(erase_ratio >= 0);
-    ceph_assert(erase_ratio <= 1);
-
-    using crimson::common::sharded_conf;
-    sharded_conf().start(EntityName{}, std::string_view{"ceph"}).get();
-    seastar::engine().at_exit([] {
-      return sharded_conf().stop();
-    });
-
-    using crimson::common::sharded_perf_coll;
-    sharded_perf_coll().start().get();
-    seastar::engine().at_exit([] {
-      return sharded_perf_coll().stop();
-    });
-
-    auto kvs = KVPool<test_item_t>::create_raw_range(
-        ns_sizes, oid_sizes, onode_sizes,
-        {range2[0], range2[1]},
-        {range1[0], range1[1]},
-        {range0[0], range0[1]});
-    PerfTree<TRACK> perf{is_dummy};
-    perf.run(kvs, erase_ratio).get0();
-  });
-}
-
-
-int main(int argc, char** argv)
-{
-  seastar::app_template app;
-  app.add_options()
-    ("backend", bpo::value<std::string>()->default_value("dummy"),
-     "tree backend: dummy, seastore")
-    ("tracked", bpo::value<bool>()->default_value(false),
-     "track inserted cursors")
-    ("ns-sizes", bpo::value<std::vector<size_t>>()->default_value(
-        {8, 11, 64, 128, 255, 256}),
-     "sizes of ns strings")
-    ("oid-sizes", bpo::value<std::vector<size_t>>()->default_value(
-        {8, 13, 64, 512, 2035, 2048}),
-     "sizes of oid strings")
-    ("onode-sizes", bpo::value<std::vector<size_t>>()->default_value(
-        {8, 16, 128, 576, 992, 1200}),
-     "sizes of onode")
-    ("range2", bpo::value<std::vector<int>>()->default_value(
-        {0, 128}),
-     "range of shard-pool-crush [a, b)")
-    ("range1", bpo::value<std::vector<unsigned>>()->default_value(
-        {0, 10}),
-     "range of ns-oid strings [a, b)")
-    ("range0", bpo::value<std::vector<unsigned>>()->default_value(
-        {0, 4}),
-     "range of snap-gen [a, b)")
-    ("erase-ratio", bpo::value<double>()->default_value(
-        0.8),
-     "erase-ratio of all the inserted onodes");
-  return app.run(argc, argv, [&app] {
-    auto&& config = app.configuration();
-    auto tracked = config["tracked"].as<bool>();
-    if (tracked) {
-      return run<true>(config);
-    } else {
-      return run<false>(config);
-    }
-  });
-}