]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: split async-msgr out of alien_echo
authorYingxin Cheng <yingxincheng@gmail.com>
Mon, 25 Feb 2019 06:32:13 +0000 (14:32 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Mon, 25 Feb 2019 06:32:13 +0000 (14:32 +0800)
async-msgr is not compatible with custom-built libs for seastar, so
split it.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/test/crimson/CMakeLists.txt
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_async_echo.cc [new file with mode: 0644]

index 8626b7e6d1d7ac479614e3dacccb869153d4a3e4..69545d5fe8263493f59792fdb9e242d9f1a5a1f3 100644 (file)
@@ -17,7 +17,11 @@ target_link_libraries(perf_crimson_msgr ceph-common crimson)
 
 add_executable(unittest_seastar_echo
   test_alien_echo.cc)
-target_link_libraries(unittest_seastar_echo ceph-common global crimson)
+target_link_libraries(unittest_seastar_echo crimson)
+
+add_executable(unittest_async_echo
+  test_async_echo.cc)
+target_link_libraries(unittest_async_echo ceph-common global)
 
 add_executable(unittest_seastar_thread_pool
   test_thread_pool.cc)
index d9a80966cb01a2329cd99758b9cb6accf354c71e..1dbe81335641b77a864ccfcea0c9dca665008794 100644 (file)
@@ -1,10 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
 
 #include "auth/Auth.h"
-#include "global/global_init.h"
 #include "messages/MPing.h"
-#include "msg/Dispatcher.h"
-#include "msg/Messenger.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
@@ -98,125 +95,6 @@ struct Client {
 };
 } // namespace seastar_pingpong
 
-namespace native_pingpong {
-
-constexpr int CEPH_OSD_PROTOCOL = 10;
-
-struct Server {
-  Server(CephContext* cct, const entity_inst_t& entity)
-    : dispatcher(cct)
-  {
-    msgr.reset(Messenger::create(cct, "async",
-                                 entity.name, "pong", entity.addr.get_nonce(), 0));
-    msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
-    msgr->set_default_policy(Messenger::Policy::stateless_server(0));
-  }
-  unique_ptr<Messenger> msgr;
-  struct ServerDispatcher : Dispatcher {
-    std::mutex mutex;
-    std::condition_variable on_reply;
-    bool replied = false;
-    ServerDispatcher(CephContext* cct)
-      : Dispatcher(cct)
-    {}
-    bool ms_can_fast_dispatch_any() const override {
-      return true;
-    }
-    bool ms_can_fast_dispatch(const Message* m) const override {
-      return m->get_type() == CEPH_MSG_PING;
-    }
-    void ms_fast_dispatch(Message* m) override {
-      m->get_connection()->send_message(new MPing);
-      m->put();
-      {
-        std::lock_guard lock{mutex};
-        replied = true;
-      }
-      on_reply.notify_one();
-    }
-    bool ms_dispatch(Message*) override {
-      ceph_abort();
-    }
-    bool ms_handle_reset(Connection*) override {
-      return true;
-    }
-    void ms_handle_remote_reset(Connection*) override {
-    }
-    bool ms_handle_refused(Connection*) override {
-      return true;
-    }
-    void echo() {
-      replied = false;
-      std::unique_lock lock{mutex};
-      return on_reply.wait(lock, [this] { return replied; });
-    }
-  } dispatcher;
-  void echo() {
-    dispatcher.echo();
-  }
-};
-
-struct Client {
-  unique_ptr<Messenger> msgr;
-  Client(CephContext *cct)
-    : dispatcher(cct)
-  {
-    msgr.reset(Messenger::create(cct, "async",
-                                 entity_name_t::CLIENT(-1), "ping",
-                                 getpid(), 0));
-    msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
-    msgr->set_default_policy(Messenger::Policy::lossy_client(0));
-  }
-  struct ClientDispatcher : Dispatcher {
-    std::mutex mutex;
-    std::condition_variable on_reply;
-    bool replied = false;
-
-    ClientDispatcher(CephContext* cct)
-      : Dispatcher(cct)
-    {}
-    bool ms_can_fast_dispatch_any() const override {
-      return true;
-    }
-    bool ms_can_fast_dispatch(const Message* m) const override {
-      return m->get_type() == CEPH_MSG_PING;
-    }
-    void ms_fast_dispatch(Message* m) override {
-      m->put();
-      {
-        std::lock_guard lock{mutex};
-        replied = true;
-      }
-      on_reply.notify_one();
-    }
-    bool ms_dispatch(Message*) override {
-      ceph_abort();
-    }
-    bool ms_handle_reset(Connection *) override {
-      return true;
-    }
-    void ms_handle_remote_reset(Connection*) override {
-    }
-    bool ms_handle_refused(Connection*) override {
-      return true;
-    }
-    bool ping(Messenger* msgr, const entity_inst_t& peer) {
-      auto conn = msgr->connect_to(peer.name.type(),
-                                   entity_addrvec_t{peer.addr});
-      replied = false;
-      conn->send_message(new MPing);
-      std::unique_lock lock{mutex};
-      return on_reply.wait_for(lock, 500ms, [&] {
-        return replied;
-      });
-    }
-  } dispatcher;
-  void ping(const entity_inst_t& peer) {
-    dispatcher.ping(msgr.get(), peer);
-  }
-};
-} // namespace native_pingpong
-
 class SeastarContext {
   seastar::file_desc begin_fd;
   ceph::thread::Condition on_end;
@@ -324,38 +202,6 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
   }
 }
 
-static void ceph_echo(CephContext* cct,
-                      entity_addr_t addr, echo_role role, unsigned count)
-{
-  std::cout << "ceph/";
-  entity_inst_t entity{entity_name_t::OSD(0), addr};
-  if (role == echo_role::as_server) {
-    std::cout << "server listening at " << addr << std::endl;
-    native_pingpong::Server server{cct, entity};
-    server.msgr->bind(addr);
-    server.msgr->add_dispatcher_head(&server.dispatcher);
-    server.msgr->start();
-    for (unsigned i = 0; i < count; i++) {
-      server.echo();
-    }
-    server.msgr->shutdown();
-    server.msgr->wait();
-  } else {
-    std::cout << "client sending to " << addr << std::endl;
-    native_pingpong::Client client{cct};
-    client.msgr->add_dispatcher_head(&client.dispatcher);
-    client.msgr->start();
-    auto conn = client.msgr->connect_to(entity.name.type(),
-                                        entity_addrvec_t{entity.addr});
-    for (unsigned i = 0; i < count; i++) {
-      std::cout << "seq=" << i << std::endl;
-      client.ping(entity);
-    }
-    client.msgr->shutdown();
-    client.msgr->wait();
-  }
-}
-
 int main(int argc, char** argv)
 {
   namespace po = boost::program_options;
@@ -364,8 +210,6 @@ int main(int argc, char** argv)
     ("help,h", "show help message")
     ("role", po::value<std::string>()->default_value("pong"),
      "role to play (ping | pong)")
-    ("test", po::value<std::string>()->default_value("seastar"),
-     "messenger to use (seastar | ceph)")
     ("port", po::value<uint16_t>()->default_value(9010),
      "port #")
     ("nonce", po::value<uint32_t>()->default_value(42),
@@ -409,33 +253,23 @@ int main(int argc, char** argv)
   }
 
   auto count = vm["count"].as<unsigned>();
-  if (vm["test"].as<std::string>() == "seastar") {
-    seastar::app_template app;
-    SeastarContext sc;
-    auto job = sc.with_seastar([&] {
-      auto fut = seastar::alien::submit_to(0, [addr, role, count] {
-        return seastar_echo(addr, role, count);
-      });
-      fut.wait();
+  seastar::app_template app;
+  SeastarContext sc;
+  auto job = sc.with_seastar([&] {
+    auto fut = seastar::alien::submit_to(0, [addr, role, count] {
+      return seastar_echo(addr, role, count);
     });
-    std::vector<char*> av{argv[0]};
-    std::transform(begin(unrecognized_options),
-                   end(unrecognized_options),
-                   std::back_inserter(av),
-                   [](auto& s) {
-                     return const_cast<char*>(s.c_str());
-                   });
-    sc.run(app, av.size(), av.data());
-    job.join();
-  } else {
-    std::vector<const char*> args(argv, argv + argc);
-    auto cct = global_init(nullptr, args,
-                           CEPH_ENTITY_TYPE_CLIENT,
-                           CODE_ENVIRONMENT_UTILITY,
-                           CINIT_FLAG_NO_MON_CONFIG);
-    common_init_finish(cct.get());
-    ceph_echo(cct.get(), addr, role, count);
-  }
+    fut.wait();
+  });
+  std::vector<char*> av{argv[0]};
+  std::transform(begin(unrecognized_options),
+                 end(unrecognized_options),
+                 std::back_inserter(av),
+                 [](auto& s) {
+                   return const_cast<char*>(s.c_str());
+                 });
+  sc.run(app, av.size(), av.data());
+  job.join();
 }
 
 /*
diff --git a/src/test/crimson/test_async_echo.cc b/src/test/crimson/test_async_echo.cc
new file mode 100644 (file)
index 0000000..fef4b49
--- /dev/null
@@ -0,0 +1,226 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+#include "auth/Auth.h"
+#include "global/global_init.h"
+#include "messages/MPing.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+
+enum class echo_role {
+  as_server,
+  as_client,
+};
+
+namespace native_pingpong {
+
+constexpr int CEPH_OSD_PROTOCOL = 10;
+
+struct Server {
+  Server(CephContext* cct, const entity_inst_t& entity)
+    : dispatcher(cct)
+  {
+    msgr.reset(Messenger::create(cct, "async",
+                                 entity.name, "pong", entity.addr.get_nonce(), 0));
+    msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+    msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+  }
+  unique_ptr<Messenger> msgr;
+  struct ServerDispatcher : Dispatcher {
+    std::mutex mutex;
+    std::condition_variable on_reply;
+    bool replied = false;
+    ServerDispatcher(CephContext* cct)
+      : Dispatcher(cct)
+    {}
+    bool ms_can_fast_dispatch_any() const override {
+      return true;
+    }
+    bool ms_can_fast_dispatch(const Message* m) const override {
+      return m->get_type() == CEPH_MSG_PING;
+    }
+    void ms_fast_dispatch(Message* m) override {
+      m->get_connection()->send_message(new MPing);
+      m->put();
+      {
+        std::lock_guard lock{mutex};
+        replied = true;
+      }
+      on_reply.notify_one();
+    }
+    bool ms_dispatch(Message*) override {
+      ceph_abort();
+    }
+    bool ms_handle_reset(Connection*) override {
+      return true;
+    }
+    void ms_handle_remote_reset(Connection*) override {
+    }
+    bool ms_handle_refused(Connection*) override {
+      return true;
+    }
+    void echo() {
+      replied = false;
+      std::unique_lock lock{mutex};
+      return on_reply.wait(lock, [this] { return replied; });
+    }
+  } dispatcher;
+  void echo() {
+    dispatcher.echo();
+  }
+};
+
+struct Client {
+  unique_ptr<Messenger> msgr;
+  Client(CephContext *cct)
+    : dispatcher(cct)
+  {
+    msgr.reset(Messenger::create(cct, "async",
+                                 entity_name_t::CLIENT(-1), "ping",
+                                 getpid(), 0));
+    msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+    msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+  }
+  struct ClientDispatcher : Dispatcher {
+    std::mutex mutex;
+    std::condition_variable on_reply;
+    bool replied = false;
+
+    ClientDispatcher(CephContext* cct)
+      : Dispatcher(cct)
+    {}
+    bool ms_can_fast_dispatch_any() const override {
+      return true;
+    }
+    bool ms_can_fast_dispatch(const Message* m) const override {
+      return m->get_type() == CEPH_MSG_PING;
+    }
+    void ms_fast_dispatch(Message* m) override {
+      m->put();
+      {
+        std::lock_guard lock{mutex};
+        replied = true;
+      }
+      on_reply.notify_one();
+    }
+    bool ms_dispatch(Message*) override {
+      ceph_abort();
+    }
+    bool ms_handle_reset(Connection *) override {
+      return true;
+    }
+    void ms_handle_remote_reset(Connection*) override {
+    }
+    bool ms_handle_refused(Connection*) override {
+      return true;
+    }
+    bool ping(Messenger* msgr, const entity_inst_t& peer) {
+      auto conn = msgr->connect_to(peer.name.type(),
+                                   entity_addrvec_t{peer.addr});
+      replied = false;
+      conn->send_message(new MPing);
+      std::unique_lock lock{mutex};
+      return on_reply.wait_for(lock, 500ms, [&] {
+        return replied;
+      });
+    }
+  } dispatcher;
+  void ping(const entity_inst_t& peer) {
+    dispatcher.ping(msgr.get(), peer);
+  }
+};
+} // namespace native_pingpong
+
+static void ceph_echo(CephContext* cct,
+                      entity_addr_t addr, echo_role role, unsigned count)
+{
+  std::cout << "ceph/";
+  entity_inst_t entity{entity_name_t::OSD(0), addr};
+  if (role == echo_role::as_server) {
+    std::cout << "server listening at " << addr << std::endl;
+    native_pingpong::Server server{cct, entity};
+    server.msgr->bind(addr);
+    server.msgr->add_dispatcher_head(&server.dispatcher);
+    server.msgr->start();
+    for (unsigned i = 0; i < count; i++) {
+      server.echo();
+    }
+    server.msgr->shutdown();
+    server.msgr->wait();
+  } else {
+    std::cout << "client sending to " << addr << std::endl;
+    native_pingpong::Client client{cct};
+    client.msgr->add_dispatcher_head(&client.dispatcher);
+    client.msgr->start();
+    auto conn = client.msgr->connect_to(entity.name.type(),
+                                        entity_addrvec_t{entity.addr});
+    for (unsigned i = 0; i < count; i++) {
+      std::cout << "seq=" << i << std::endl;
+      client.ping(entity);
+    }
+    client.msgr->shutdown();
+    client.msgr->wait();
+  }
+}
+
+int main(int argc, char** argv)
+{
+  namespace po = boost::program_options;
+  po::options_description desc{"Allowed options"};
+  desc.add_options()
+    ("help,h", "show help message")
+    ("role", po::value<std::string>()->default_value("pong"),
+     "role to play (ping | pong)")
+    ("port", po::value<uint16_t>()->default_value(9010),
+     "port #")
+    ("nonce", po::value<uint32_t>()->default_value(42),
+     "a unique number to identify the pong server")
+    ("count", po::value<unsigned>()->default_value(10),
+     "stop after sending/echoing <count> MPing messages")
+    ("v2", po::value<bool>()->default_value(false),
+     "using msgr v2 protocol");
+  po::variables_map vm;
+  std::vector<std::string> unrecognized_options;
+  try {
+    auto parsed = po::command_line_parser(argc, argv)
+      .options(desc)
+      .allow_unregistered()
+      .run();
+    po::store(parsed, vm);
+    if (vm.count("help")) {
+      std::cout << desc << std::endl;
+      return 0;
+    }
+    po::notify(vm);
+    unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
+  } catch(const po::error& e) {
+    std::cerr << "error: " << e.what() << std::endl;
+    return 1;
+  }
+
+  entity_addr_t addr;
+  if (vm["v2"].as<bool>()) {
+    addr.set_type(entity_addr_t::TYPE_MSGR2);
+  } else {
+    addr.set_type(entity_addr_t::TYPE_LEGACY);
+  }
+  addr.set_family(AF_INET);
+  addr.set_port(vm["port"].as<std::uint16_t>());
+  addr.set_nonce(vm["nonce"].as<std::uint32_t>());
+
+  echo_role role = echo_role::as_server;
+  if (vm["role"].as<std::string>() == "ping") {
+    role = echo_role::as_client;
+  }
+
+  auto count = vm["count"].as<unsigned>();
+  std::vector<const char*> args(argv, argv + argc);
+  auto cct = global_init(nullptr, args,
+                         CEPH_ENTITY_TYPE_CLIENT,
+                         CODE_ENVIRONMENT_UTILITY,
+                         CINIT_FLAG_NO_MON_CONFIG);
+  common_init_finish(cct.get());
+  ceph_echo(cct.get(), addr, role, count);
+}