});
}
+static seastar::future<> test_concurrent_dispatch()
+{
+ struct test_state {
+ entity_addr_t addr;
+
+ struct {
+ ceph::net::SocketMessenger messenger{entity_name_t::OSD(1)};
+ class ServerDispatcher : public ceph::net::Dispatcher {
+ int count = 0;
+ seastar::promise<> on_second; // satisfied on second dispatch
+ seastar::promise<> on_done; // satisfied when first dispatch unblocks
+ public:
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ switch (++count) {
+ case 1:
+ // block on the first request until we reenter with the second
+ return on_second.get_future().then([=] { on_done.set_value(); });
+ case 2:
+ on_second.set_value();
+ return seastar::now();
+ default:
+ throw std::runtime_error("unexpected count");
+ }
+ }
+ seastar::future<> wait() { return on_done.get_future(); }
+ } dispatcher;
+ } server;
+
+ struct {
+ ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)};
+ ceph::net::Dispatcher dispatcher;
+ } client;
+ };
+ return seastar::do_with(test_state{},
+ [] (test_state& t) {
+ // bind the server
+ t.addr.set_family(AF_INET);
+ t.addr.set_port(9010);
+ t.server.messenger.bind(t.addr);
+
+ return t.server.messenger.start(&t.server.dispatcher)
+ .then([&] {
+ return t.client.messenger.start(&t.client.dispatcher)
+ .then([&] {
+ return t.client.messenger.connect(t.addr,
+ entity_name_t::TYPE_OSD);
+ }).then([] (ceph::net::ConnectionRef conn) {
+ // send two messages
+ conn->send(MessageRef{new MPing, false});
+ conn->send(MessageRef{new MPing, false});
+ }).then([&] {
+ // wait for the server to get both
+ return t.server.dispatcher.wait();
+ }).finally([&] {
+ return t.client.messenger.shutdown();
+ });
+ }).finally([&] {
+ return t.server.messenger.shutdown();
+ });
+ });
+}
+
int main(int argc, char** argv)
{
seastar::app_template app;
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
- return test_echo(rounds, keepalive_ratio).then([] {
+ return test_echo(rounds, keepalive_ratio)
+ .then([] {
+ return test_concurrent_dispatch();
+ }).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {
std::cout << "Test failure" << std::endl;