From 5da94738f6058251987132d69ceda8e1cee1fcf5 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 6 Mar 2018 20:38:03 +0800 Subject: [PATCH] test/crimson: add test_alien_echo Signed-off-by: Kefu Chai --- src/test/crimson/CMakeLists.txt | 9 + src/test/crimson/test_alien_echo.cc | 441 ++++++++++++++++++++++++++++ 2 files changed, 450 insertions(+) create mode 100644 src/test/crimson/test_alien_echo.cc diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 691db771fec..61bd791b869 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -20,3 +20,12 @@ set(test_messenger_srcs add_executable(unittest_seastar_messenger ${test_messenger_srcs}) add_ceph_unittest(unittest_seastar_messenger) target_link_libraries(unittest_seastar_messenger ceph-common Seastar::seastar) + +set(test_alien_echo_srcs + test_alien_echo.cc + $ + $ + $) +add_executable(unittest_seastar_echo ${test_alien_echo_srcs}) +add_ceph_unittest(unittest_seastar_echo) +target_link_libraries(unittest_seastar_echo ceph-common global Seastar::seastar) diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc new file mode 100644 index 00000000000..b06bc686a17 --- /dev/null +++ b/src/test/crimson/test_alien_echo.cc @@ -0,0 +1,441 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include "auth/Auth.h" +#include "global/global_init.h" +#include "messages/MPing.h" +#include "msg/Dispatcher.h" +#include "msg/Messenger.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Config.h" +#include "crimson/thread/Condition.h" +#include "crimson/thread/Throttle.h" + +#include +#include +#include +#include + + +enum class echo_role { + as_server, + as_client, +}; + +namespace seastar_pingpong { +struct DummyAuthAuthorizer : public AuthAuthorizer { + DummyAuthAuthorizer() + : AuthAuthorizer(CEPH_AUTH_CEPHX) + {} + bool verify_reply(bufferlist::const_iterator&) override { + return true; + } +}; + +struct Server { + ceph::thread::Throttle byte_throttler; + static constexpr int64_t server_num = 0; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num)}; + struct ServerDispatcher : ceph::net::Dispatcher { + unsigned count = 0; + seastar::condition_variable on_reply; + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + std::cout << "server got ping " << *m << std::endl; + // reply with a pong + return c->send(MessageRef{new MPing(), false}).then([this] { + ++count; + on_reply.signal(); + }); + } + seastar::future + ms_verify_authorizer(peer_type_t peer_type, + auth_proto_t protocol, + bufferlist& auth) override { + return seastar::make_ready_future( + 0, bufferlist{}); + } + seastar::future> + ms_get_authorizer(peer_type_t, bool) override { + return seastar::make_ready_future>( + new DummyAuthAuthorizer{}); + } + } dispatcher; + Server() + : byte_throttler(ceph::net::conf.osd_client_message_size_cap) + { + msgr.set_crc_header(); + msgr.set_crc_data(); + } +}; + +struct Client { + ceph::thread::Throttle byte_throttler; + static constexpr int64_t client_num = 1; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num)}; + struct ClientDispatcher : ceph::net::Dispatcher { + unsigned count = 0; + seastar::condition_variable on_reply; + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + std::cout << "client got pong " << *m << std::endl; + ++count; + on_reply.signal(); + return seastar::now(); + } + } dispatcher; + Client() + : byte_throttler(ceph::net::conf.osd_client_message_size_cap) + { + msgr.set_crc_header(); + msgr.set_crc_data(); + } +}; +} // namespace seastar_pingpong + +namespace native_pingpong { + +constexpr int CEPH_OSD_PROTOCOL = 10; + +struct Server { + Server(CephContext* cct, const entity_inst_t& entity) + : dispatcher(cct) + { + msgr.reset(Messenger::create(cct, cct->_conf->get_val("ms_type"), + entity.name, "pong", entity.addr.get_nonce(), 0)); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + } + unique_ptr msgr; + struct ServerDispatcher : Dispatcher { + std::mutex mutex; + std::condition_variable on_reply; + bool replied = false; + ServerDispatcher(CephContext* cct) + : Dispatcher(cct) + {} + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_PING; + } + void ms_fast_dispatch(Message* m) override { + m->get_connection()->send_message(new MPing); + m->put(); + { + std::lock_guard lock{mutex}; + replied = true; + } + on_reply.notify_one(); + } + bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, + bufferlist& authorizer, + bufferlist& authorizer_reply, + bool& isvalid, CryptoKey& session_key) override { + isvalid = true; + return true; + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection*) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + void echo() { + replied = false; + std::unique_lock lock{mutex}; + return on_reply.wait(lock, [this] { return replied; }); + } + } dispatcher; + void echo() { + dispatcher.echo(); + } +}; + +struct Client { + unique_ptr msgr; + Client(CephContext *cct) + : dispatcher(cct) + { + msgr.reset(Messenger::create(cct, cct->_conf->get_val("ms_type"), + entity_name_t::CLIENT(-1), "ping", + getpid(), 0)); + msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL); + msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + } + struct ClientDispatcher : Dispatcher { + std::mutex mutex; + std::condition_variable on_reply; + bool replied = false; + + ClientDispatcher(CephContext* cct) + : Dispatcher(cct) + {} + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message* m) const override { + return m->get_type() == CEPH_MSG_PING; + } + void ms_fast_dispatch(Message* m) override { + m->put(); + { + std::lock_guard lock{mutex}; + replied = true; + } + on_reply.notify_one(); + } + bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, + bufferlist& authorizer, + bufferlist& authorizer_reply, + bool& isvalid, CryptoKey& session_key) override { + isvalid = true; + return true; + } + bool ms_dispatch(Message*) override { + ceph_abort(); + } + bool ms_handle_reset(Connection *) override { + return true; + } + void ms_handle_remote_reset(Connection*) override { + } + bool ms_handle_refused(Connection*) override { + return true; + } + bool ping(Messenger* msgr, const entity_inst_t& peer) { + auto conn = msgr->get_connection(peer); + replied = false; + conn->send_message(new MPing); + std::unique_lock lock{mutex}; + return on_reply.wait_for(lock, 500ms, [&] { + return replied; + }); + } + } dispatcher; + void ping(const entity_inst_t& peer) { + dispatcher.ping(msgr.get(), peer); + } +}; +} // namespace native_pingpong + +class SeastarContext { + seastar::file_desc begin_fd; + ceph::thread::Condition on_end; + +public: + SeastarContext() + : begin_fd{seastar::file_desc::eventfd(0, 0)} + {} + + template + std::thread with_seastar(Func&& func) { + return std::thread{[this, func = std::forward(func)] { + // alien: are you ready? + wait_for_seastar(); + // alien: could you help me apply(func)? + func(); + // alien: i've sent my request. have you replied it? + // wait_for_seastar(); + // alien: you are free to go! + on_end.notify(); + }}; + } + + void run(seastar::app_template& app, int argc, char** argv) { + app.run(argc, argv, [this] { + return seastar::now().then([this] { + return set_seastar_ready(); + }).then([this] { + // seastar: let me know once i am free to leave. + return on_end.wait(); + }).handle_exception([](auto ep) { + std::cerr << "Error: " << ep << std::endl; + }).finally([] { + seastar::engine().exit(0); + }); + }); + } + + seastar::future<> set_seastar_ready() { + // seastar: i am ready to serve! + ::eventfd_write(begin_fd.get(), 1); + return seastar::now(); + } + +private: + void wait_for_seastar() { + eventfd_t result = 0; + if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) { + std::cerr << "unable to eventfd_read():" << errno << std::endl; + } + } +}; + +static seastar::future<> +seastar_echo(SeastarContext& sc, + const entity_addr_t& addr, echo_role role, unsigned count) +{ + std::cout << "seastar/"; + if (role == echo_role::as_server) { + return seastar::do_with(seastar_pingpong::Server{}, + [&addr, count](auto& server) mutable { + std::cout << "server listening at " << addr << std::endl; + // bind the server + server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &server.byte_throttler); + server.msgr.bind(addr); + return server.msgr.start(&server.dispatcher) + .then([&dispatcher=server.dispatcher, count] { + return dispatcher.on_reply.wait([&dispatcher, count] { + return dispatcher.count >= count; + }); + }).finally([&server] { + std::cout << "server shutting down" << std::endl; + return server.msgr.shutdown(); + }); + }); + } else { + return seastar::do_with(seastar_pingpong::Client{}, + [&addr, count](auto& client) { + std::cout << "client sending to " << addr << std::endl; + client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &client.byte_throttler); + return client.msgr.start(&client.dispatcher) + .then([&] { + return client.msgr.connect(addr, entity_name_t::TYPE_OSD); + }).then([&disp=client.dispatcher, count](ceph::net::ConnectionRef conn) { + return seastar::do_until( + [&disp,count] { return disp.count >= count; }, + [&disp,conn] { return conn->send(MessageRef{new MPing(), false}) + .then([&] { return disp.on_reply.wait(); }); + }); + }).finally([&client] { + std::cout << "client shutting down" << std::endl; + return client.msgr.shutdown(); + }); + }); + } +} + +static void ceph_echo(CephContext* cct, + entity_addr_t addr, echo_role role, unsigned count) +{ + std::cout << "ceph/"; + entity_inst_t entity{entity_name_t::OSD(0), addr}; + if (role == echo_role::as_server) { + std::cout << "server listening at " << addr << std::endl; + native_pingpong::Server server{cct, entity}; + server.msgr->bind(addr); + server.msgr->add_dispatcher_head(&server.dispatcher); + server.msgr->start(); + for (unsigned i = 0; i < count; i++) { + server.echo(); + } + server.msgr->shutdown(); + server.msgr->wait(); + } else { + std::cout << "client sending to " << addr << std::endl; + native_pingpong::Client client{cct}; + client.msgr->add_dispatcher_head(&client.dispatcher); + client.msgr->start(); + auto conn = client.msgr->get_connection(entity); + for (unsigned i = 0; i < count; i++) { + std::cout << "seq=" << i << std::endl; + client.ping(entity); + } + client.msgr->shutdown(); + client.msgr->wait(); + } +} + +int main(int argc, char** argv) +{ + namespace po = boost::program_options; + po::options_description desc{"Allowed options"}; + desc.add_options() + ("help,h", "show help message") + ("role", po::value()->default_value("pong"), + "role to play (ping | pong)") + ("test", po::value()->default_value("seastar"), + "messenger to use (seastar | ceph)") + ("port", po::value()->default_value(9010), + "port #") + ("nonce", po::value()->default_value(42), + "a unique number to identify the pong server") + ("count", po::value()->default_value(10), + "stop after sending/echoing MPing messages"); + po::variables_map vm; + std::vector unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + entity_addr_t addr; + addr.set_type(addr.TYPE_DEFAULT); + addr.set_family(AF_INET); + addr.set_port(vm["port"].as()); + addr.set_nonce(vm["nonce"].as()); + + echo_role role = echo_role::as_server; + if (vm["role"].as() == "ping") { + role = echo_role::as_client; + } + + auto count = vm["count"].as(); + if (vm["test"].as() == "seastar") { + seastar::app_template app; + SeastarContext sc; + auto job = sc.with_seastar([&] { + auto fut = seastar::alien::submit_to(0, [&sc, &addr, role, count] { + return seastar_echo(sc, addr, role, count); + }); + fut.wait(); + }); + std::vector av{argv[0]}; + std::transform(begin(unrecognized_options), + end(unrecognized_options), + std::back_inserter(av), + [](auto s) { + return const_cast(s.c_str()); + }); + sc.run(app, av.size(), av.data()); + job.join(); + } else { + std::vector args(argv, argv + argc); + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + common_init_finish(cct.get()); + ceph_echo(cct.get(), addr, role, count); + } +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * unittest_seastar_echo" + * End: + */ -- 2.47.3