From: Yingxin Cheng Date: Mon, 25 Feb 2019 06:32:13 +0000 (+0800) Subject: test/crimson: split async-msgr out of alien_echo X-Git-Tag: v14.1.1~156^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b9b0c61bf73a1d3ecd3b4153021a4a8082639351;p=ceph.git test/crimson: split async-msgr out of alien_echo async-msgr is not compatible with custom-built libs for seastar, so split it. Signed-off-by: Yingxin Cheng --- diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 8626b7e6d1d7..69545d5fe826 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -17,7 +17,11 @@ target_link_libraries(perf_crimson_msgr ceph-common crimson) add_executable(unittest_seastar_echo test_alien_echo.cc) -target_link_libraries(unittest_seastar_echo ceph-common global crimson) +target_link_libraries(unittest_seastar_echo crimson) + +add_executable(unittest_async_echo + test_async_echo.cc) +target_link_libraries(unittest_async_echo ceph-common global) add_executable(unittest_seastar_thread_pool test_thread_pool.cc) diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index d9a80966cb01..1dbe81335641 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -1,10 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- #include "auth/Auth.h" -#include "global/global_init.h" #include "messages/MPing.h" -#include "msg/Dispatcher.h" -#include "msg/Messenger.h" #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" @@ -98,125 +95,6 @@ struct Client { }; } // namespace seastar_pingpong -namespace native_pingpong { - -constexpr int CEPH_OSD_PROTOCOL = 10; - -struct Server { - Server(CephContext* cct, const entity_inst_t& entity) - : dispatcher(cct) - { - msgr.reset(Messenger::create(cct, "async", - entity.name, "pong", entity.addr.get_nonce(), 0)); - msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); - msgr->set_default_policy(Messenger::Policy::stateless_server(0)); - } - unique_ptr msgr; - struct ServerDispatcher : Dispatcher { - std::mutex mutex; - std::condition_variable on_reply; - bool replied = false; - ServerDispatcher(CephContext* cct) - : Dispatcher(cct) - {} - bool ms_can_fast_dispatch_any() const override { - return true; - } - bool ms_can_fast_dispatch(const Message* m) const override { - return m->get_type() == CEPH_MSG_PING; - } - void ms_fast_dispatch(Message* m) override { - m->get_connection()->send_message(new MPing); - m->put(); - { - std::lock_guard lock{mutex}; - replied = true; - } - on_reply.notify_one(); - } - bool ms_dispatch(Message*) override { - ceph_abort(); - } - bool ms_handle_reset(Connection*) override { - return true; - } - void ms_handle_remote_reset(Connection*) override { - } - bool ms_handle_refused(Connection*) override { - return true; - } - void echo() { - replied = false; - std::unique_lock lock{mutex}; - return on_reply.wait(lock, [this] { return replied; }); - } - } dispatcher; - void echo() { - dispatcher.echo(); - } -}; - -struct Client { - unique_ptr msgr; - Client(CephContext *cct) - : dispatcher(cct) - { - msgr.reset(Messenger::create(cct, "async", - entity_name_t::CLIENT(-1), "ping", - getpid(), 0)); - msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); - msgr->set_default_policy(Messenger::Policy::lossy_client(0)); - } - struct ClientDispatcher : Dispatcher { - std::mutex mutex; - std::condition_variable on_reply; - bool replied = false; - - ClientDispatcher(CephContext* cct) - : Dispatcher(cct) - {} - bool ms_can_fast_dispatch_any() const override { - return true; - } - bool ms_can_fast_dispatch(const Message* m) const override { - return m->get_type() == CEPH_MSG_PING; - } - void ms_fast_dispatch(Message* m) override { - m->put(); - { - std::lock_guard lock{mutex}; - replied = true; - } - on_reply.notify_one(); - } - bool ms_dispatch(Message*) override { - ceph_abort(); - } - bool ms_handle_reset(Connection *) override { - return true; - } - void ms_handle_remote_reset(Connection*) override { - } - bool ms_handle_refused(Connection*) override { - return true; - } - bool ping(Messenger* msgr, const entity_inst_t& peer) { - auto conn = msgr->connect_to(peer.name.type(), - entity_addrvec_t{peer.addr}); - replied = false; - conn->send_message(new MPing); - std::unique_lock lock{mutex}; - return on_reply.wait_for(lock, 500ms, [&] { - return replied; - }); - } - } dispatcher; - void ping(const entity_inst_t& peer) { - dispatcher.ping(msgr.get(), peer); - } -}; -} // namespace native_pingpong - class SeastarContext { seastar::file_desc begin_fd; ceph::thread::Condition on_end; @@ -324,38 +202,6 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) } } -static void ceph_echo(CephContext* cct, - entity_addr_t addr, echo_role role, unsigned count) -{ - std::cout << "ceph/"; - entity_inst_t entity{entity_name_t::OSD(0), addr}; - if (role == echo_role::as_server) { - std::cout << "server listening at " << addr << std::endl; - native_pingpong::Server server{cct, entity}; - server.msgr->bind(addr); - server.msgr->add_dispatcher_head(&server.dispatcher); - server.msgr->start(); - for (unsigned i = 0; i < count; i++) { - server.echo(); - } - server.msgr->shutdown(); - server.msgr->wait(); - } else { - std::cout << "client sending to " << addr << std::endl; - native_pingpong::Client client{cct}; - client.msgr->add_dispatcher_head(&client.dispatcher); - client.msgr->start(); - auto conn = client.msgr->connect_to(entity.name.type(), - entity_addrvec_t{entity.addr}); - for (unsigned i = 0; i < count; i++) { - std::cout << "seq=" << i << std::endl; - client.ping(entity); - } - client.msgr->shutdown(); - client.msgr->wait(); - } -} - int main(int argc, char** argv) { namespace po = boost::program_options; @@ -364,8 +210,6 @@ int main(int argc, char** argv) ("help,h", "show help message") ("role", po::value()->default_value("pong"), "role to play (ping | pong)") - ("test", po::value()->default_value("seastar"), - "messenger to use (seastar | ceph)") ("port", po::value()->default_value(9010), "port #") ("nonce", po::value()->default_value(42), @@ -409,33 +253,23 @@ int main(int argc, char** argv) } auto count = vm["count"].as(); - if (vm["test"].as() == "seastar") { - seastar::app_template app; - SeastarContext sc; - auto job = sc.with_seastar([&] { - auto fut = seastar::alien::submit_to(0, [addr, role, count] { - return seastar_echo(addr, role, count); - }); - fut.wait(); + seastar::app_template app; + SeastarContext sc; + auto job = sc.with_seastar([&] { + auto fut = seastar::alien::submit_to(0, [addr, role, count] { + return seastar_echo(addr, role, count); }); - std::vector av{argv[0]}; - std::transform(begin(unrecognized_options), - end(unrecognized_options), - std::back_inserter(av), - [](auto& s) { - return const_cast(s.c_str()); - }); - sc.run(app, av.size(), av.data()); - job.join(); - } else { - std::vector args(argv, argv + argc); - auto cct = global_init(nullptr, args, - CEPH_ENTITY_TYPE_CLIENT, - CODE_ENVIRONMENT_UTILITY, - CINIT_FLAG_NO_MON_CONFIG); - common_init_finish(cct.get()); - ceph_echo(cct.get(), addr, role, count); - } + fut.wait(); + }); + std::vector av{argv[0]}; + std::transform(begin(unrecognized_options), + end(unrecognized_options), + std::back_inserter(av), + [](auto& s) { + return const_cast(s.c_str()); + }); + sc.run(app, av.size(), av.data()); + job.join(); } /* diff --git a/src/test/crimson/test_async_echo.cc b/src/test/crimson/test_async_echo.cc new file mode 100644 index 000000000000..fef4b4946da6 --- /dev/null +++ b/src/test/crimson/test_async_echo.cc @@ -0,0 +1,226 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include +#include + +#include "auth/Auth.h" +#include "global/global_init.h" +#include "messages/MPing.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" + +enum class echo_role { + as_server, + as_client, +}; + +namespace native_pingpong { + +constexpr int CEPH_OSD_PROTOCOL = 10; + +struct Server { + Server(CephContext* cct, const entity_inst_t& entity) + : dispatcher(cct) + { + msgr.reset(Messenger::create(cct, "async", + entity.name, "pong", entity.addr.get_nonce(), 0)); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + } + unique_ptr msgr; + struct ServerDispatcher : Dispatcher { + std::mutex mutex; + std::condition_variable on_reply; + bool replied = false; + ServerDispatcher(CephContext* cct) + : Dispatcher(cct) + {} + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_PING; + } + void ms_fast_dispatch(Message* m) override { + m->get_connection()->send_message(new MPing); + m->put(); + { + std::lock_guard lock{mutex}; + replied = true; + } + on_reply.notify_one(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection*) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + void echo() { + replied = false; + std::unique_lock lock{mutex}; + return on_reply.wait(lock, [this] { return replied; }); + } + } dispatcher; + void echo() { + dispatcher.echo(); + } +}; + +struct Client { + unique_ptr msgr; + Client(CephContext *cct) + : dispatcher(cct) + { + msgr.reset(Messenger::create(cct, "async", + entity_name_t::CLIENT(-1), "ping", + getpid(), 0)); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + } + struct ClientDispatcher : Dispatcher { + std::mutex mutex; + std::condition_variable on_reply; + bool replied = false; + + ClientDispatcher(CephContext* cct) + : Dispatcher(cct) + {} + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_PING; + } + void ms_fast_dispatch(Message* m) override { + m->put(); + { + std::lock_guard lock{mutex}; + replied = true; + } + on_reply.notify_one(); + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection *) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + bool ping(Messenger* msgr, const entity_inst_t& peer) { + auto conn = msgr->connect_to(peer.name.type(), + entity_addrvec_t{peer.addr}); + replied = false; + conn->send_message(new MPing); + std::unique_lock lock{mutex}; + return on_reply.wait_for(lock, 500ms, [&] { + return replied; + }); + } + } dispatcher; + void ping(const entity_inst_t& peer) { + dispatcher.ping(msgr.get(), peer); + } +}; +} // namespace native_pingpong + +static void ceph_echo(CephContext* cct, + entity_addr_t addr, echo_role role, unsigned count) +{ + std::cout << "ceph/"; + entity_inst_t entity{entity_name_t::OSD(0), addr}; + if (role == echo_role::as_server) { + std::cout << "server listening at " << addr << std::endl; + native_pingpong::Server server{cct, entity}; + server.msgr->bind(addr); + server.msgr->add_dispatcher_head(&server.dispatcher); + server.msgr->start(); + for (unsigned i = 0; i < count; i++) { + server.echo(); + } + server.msgr->shutdown(); + server.msgr->wait(); + } else { + std::cout << "client sending to " << addr << std::endl; + native_pingpong::Client client{cct}; + client.msgr->add_dispatcher_head(&client.dispatcher); + client.msgr->start(); + auto conn = client.msgr->connect_to(entity.name.type(), + entity_addrvec_t{entity.addr}); + for (unsigned i = 0; i < count; i++) { + std::cout << "seq=" << i << std::endl; + client.ping(entity); + } + client.msgr->shutdown(); + client.msgr->wait(); + } +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("role", po::value()->default_value("pong"), + "role to play (ping | pong)") + ("port", po::value()->default_value(9010), + "port #") + ("nonce", po::value()->default_value(42), + "a unique number to identify the pong server") + ("count", po::value()->default_value(10), + "stop after sending/echoing MPing messages") + ("v2", po::value()->default_value(false), + "using msgr v2 protocol"); + po::variables_map vm; + std::vector unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + entity_addr_t addr; + if (vm["v2"].as()) { + addr.set_type(entity_addr_t::TYPE_MSGR2); + } else { + addr.set_type(entity_addr_t::TYPE_LEGACY); + } + addr.set_family(AF_INET); + addr.set_port(vm["port"].as()); + addr.set_nonce(vm["nonce"].as()); + + echo_role role = echo_role::as_server; + if (vm["role"].as() == "ping") { + role = echo_role::as_client; + } + + auto count = vm["count"].as(); + std::vector args(argv, argv + argc); + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + common_init_finish(cct.get()); + ceph_echo(cct.get(), addr, role, count); +}