]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: inject Socket layer delays/failures msgr utest 44261/head
authorMatan Breizman <mbreizma@redhat.com>
Mon, 29 Nov 2021 14:21:40 +0000 (14:21 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 6 Feb 2022 08:45:49 +0000 (08:45 +0000)
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/test/crimson/CMakeLists.txt
src/test/crimson/test_messenger_thrash.cc [new file with mode: 0644]
src/test/crimson/test_socket.cc

index 642bda4921f08439d2f11b9a51a7e1afcaebe4bf..d3139c928a803838aac0f678947ea8c86167aedd 100644 (file)
@@ -8,6 +8,8 @@
 #include "crimson/common/log.h"
 #include "Errors.h"
 
+using crimson::common::local_conf;
+
 namespace crimson::net {
 
 namespace {
@@ -72,7 +74,10 @@ seastar::future<bufferlist> Socket::read(size_t bytes)
       if (r.remaining) { // throw on short reads
         throw std::system_error(make_error_code(error::read_eof));
       }
-      return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+      inject_failure();
+      return inject_delay().then([this] {
+        return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+      });
     });
 #ifdef UNIT_TESTS_BUILT
   }).then([this] (auto buf) {
@@ -96,7 +101,11 @@ Socket::read_exactly(size_t bytes) {
       if (buf.size() < bytes) {
         throw std::system_error(make_error_code(error::read_eof));
       }
-      return seastar::make_ready_future<tmp_buf>(std::move(buf));
+      inject_failure();
+      return inject_delay(
+      ).then([buf = std::move(buf)] () mutable {
+        return seastar::make_ready_future<tmp_buf>(std::move(buf));
+      });
     });
 #ifdef UNIT_TESTS_BUILT
   }).then([this] (auto buf) {
@@ -132,6 +141,7 @@ seastar::future<> Socket::close() {
   closed = true;
 #endif
   return seastar::when_all_succeed(
+    inject_delay(),
     in.close(),
     close_and_handle_errors(out)
   ).then_unpack([] {
@@ -142,6 +152,33 @@ seastar::future<> Socket::close() {
   });
 }
 
+seastar::future<> Socket::inject_delay () {
+  if (float delay_period = local_conf()->ms_inject_internal_delays;
+      delay_period) {
+    logger().debug("{}: sleep for {}",
+                  __func__,
+                  delay_period);
+    return seastar::sleep(
+      std::chrono::milliseconds((int)(delay_period * 1000.0)));
+  }
+  return seastar::now();
+}
+
+void Socket::inject_failure()
+{
+  if (local_conf()->ms_inject_socket_failures) {
+    uint64_t rand =
+      ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+      if (rand % local_conf()->ms_inject_socket_failures == 0) {
+      if (true) {
+        logger().warn("{} injecting socket failure", __func__);
+       throw std::system_error(make_error_code(
+         crimson::net::error::negotiation_failure));
+      }
+    }
+  }
+}
+
 #ifdef UNIT_TESTS_BUILT
 seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
   auto action = trap;
index 98e84a26c637fd49a749323badabb4831ad710d4..a533d3180b2daaa68e876981a0633d83a5ae5677 100644 (file)
@@ -55,8 +55,11 @@ class Socket
 
   static seastar::future<SocketRef>
   connect(const entity_addr_t& peer_addr) {
-    return seastar::connect(peer_addr.in4_addr()
-    ).then([] (seastar::connected_socket socket) {
+    inject_failure();
+    return inject_delay(
+    ).then([peer_addr] {
+      return seastar::connect(peer_addr.in4_addr());
+    }).then([] (seastar::connected_socket socket) {
       return std::make_unique<Socket>(
         std::move(socket), side_t::connector, 0, construct_tag{});
     });
@@ -70,9 +73,14 @@ class Socket
 
   seastar::future<> write(packet&& buf) {
 #ifdef UNIT_TESTS_BUILT
-    return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
+    return try_trap_pre(next_trap_write
+    ).then([buf = std::move(buf), this] () mutable {
 #endif
-      return out.write(std::move(buf));
+      inject_failure();
+      return inject_delay(
+      ).then([buf = std::move(buf), this] () mutable {
+        return out.write(std::move(buf));
+      });
 #ifdef UNIT_TESTS_BUILT
     }).then([this] {
       return try_trap_post(next_trap_write);
@@ -80,13 +88,20 @@ class Socket
 #endif
   }
   seastar::future<> flush() {
-    return out.flush();
+    inject_failure();
+    return inject_delay().then([this] {
+      return out.flush();
+    });
   }
   seastar::future<> write_flush(packet&& buf) {
 #ifdef UNIT_TESTS_BUILT
     return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
 #endif
-      return out.write(std::move(buf)).then([this] { return out.flush(); });
+      inject_failure();
+      return inject_delay(
+      ).then([buf = std::move(buf), this] () mutable {
+        return out.write(std::move(buf)).then([this] { return out.flush(); });
+      });
 #ifdef UNIT_TESTS_BUILT
     }).then([this] {
       return try_trap_post(next_trap_write);
@@ -100,6 +115,10 @@ class Socket
   /// Socket can only be closed once.
   seastar::future<> close();
 
+  static seastar::future<> inject_delay();
+
+  static void inject_failure();
+
   // shutdown input_stream only, for tests
   void force_shutdown_in() {
     socket.shutdown_input();
index 6c415ffa52b26e8b24457f719e8a436283832303..6e39d338a4d34a342cde1c8f81adbd5731a10a52 100644 (file)
@@ -83,6 +83,11 @@ target_link_libraries(
   unittest-interruptible-future
   crimson-common)
 
+add_executable(unittest-seastar-messenger-thrash test_messenger_thrash.cc)
+add_ceph_unittest(unittest-seastar-messenger-thrash
+  --memory 256M --smp 1)
+target_link_libraries(unittest-seastar-messenger-thrash crimson)
+
 add_subdirectory(seastore)
 
 add_library(crimson-gtest STATIC
diff --git a/src/test/crimson/test_messenger_thrash.cc b/src/test/crimson/test_messenger_thrash.cc
new file mode 100644 (file)
index 0000000..ff75070
--- /dev/null
@@ -0,0 +1,660 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "messages/MPing.h"
+#include "messages/MCommand.h"
+#include "crimson/auth/DummyAuth.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+
+#include <map>
+#include <random>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/with_timeout.hh>
+
+using namespace std::chrono_literals;
+namespace bpo = boost::program_options;
+using crimson::common::local_conf;
+using payload_seq_t = uint64_t;
+
+struct Payload {
+  enum Who : uint8_t {
+    PING = 0,
+    PONG = 1,
+  };
+  uint8_t who = 0;
+  payload_seq_t seq = 0;
+  bufferlist data;
+
+  Payload(Who who, uint64_t seq, const bufferlist& data)
+    : who(who), seq(seq), data(data)
+  {}
+  Payload() = default;
+  DENC(Payload, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.who, p);
+    denc(v.seq, p);
+    denc(v.data, p);
+    DENC_FINISH(p);
+  }
+};
+WRITE_CLASS_DENC(Payload)
+
+std::ostream& operator<<(std::ostream& out, const Payload &pl)
+{
+  return out << "reply=" << pl.who << " i = " << pl.seq;
+}
+
+
+namespace {
+
+seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_test);
+}
+
+std::random_device rd;
+std::default_random_engine rng{rd()};
+std::uniform_int_distribution<> prob(0,99);
+bool verbose = false;
+
+entity_addr_t get_server_addr() {
+  static int port = 16800;
+  ++port;
+  entity_addr_t saddr;
+  saddr.parse("127.0.0.1", nullptr);
+  saddr.set_port(port);
+  return saddr;
+}
+
+uint64_t get_nonce() {
+  static uint64_t nonce = 1;
+  ++nonce;
+  return nonce;
+}
+
+struct thrash_params_t {
+  std::size_t servers;
+  std::size_t clients;
+  std::size_t connections;
+  std::size_t random_op;
+};
+
+class SyntheticWorkload;
+
+class SyntheticDispatcher final
+    : public crimson::net::Dispatcher {
+  public:
+  std::map<crimson::net::ConnectionRef, std::deque<payload_seq_t> > conn_sent;
+  std::map<payload_seq_t, bufferlist> sent;
+  unsigned index;
+  SyntheticWorkload *workload;
+
+  SyntheticDispatcher(bool s, SyntheticWorkload *wl):
+    index(0), workload(wl) {
+  }
+
+  std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef con,
+                                               MessageRef m) {
+    if (verbose) {
+      logger().warn("{}: con = {}", __func__, con);
+    }
+    // MSG_COMMAND is used to disorganize regular message flow
+    if (m->get_type() == MSG_COMMAND) {
+      return seastar::now();
+    }
+
+    Payload pl;
+    auto p = m->get_data().cbegin();
+    decode(pl, p);
+    if (pl.who == Payload::PING) {
+      logger().info(" {} conn= {} {}", __func__,
+        m->get_connection(), pl);
+      return reply_message(m, pl);
+    } else {
+      ceph_assert(pl.who == Payload::PONG);
+      if (sent.count(pl.seq)) {
+        logger().info(" {} conn= {} {}", __func__,
+          m->get_connection(), pl);
+        ceph_assert(conn_sent[m->get_connection()].front() == pl.seq);
+        ceph_assert(pl.data.contents_equal(sent[pl.seq]));
+        conn_sent[m->get_connection()].pop_front();
+        sent.erase(pl.seq);
+      }
+
+      return seastar::now();
+    }
+  }
+
+  void ms_handle_accept(crimson::net::ConnectionRef conn) {
+    logger().info("{} - Connection:{}", __func__, conn);
+  }
+
+  void ms_handle_connect(crimson::net::ConnectionRef conn) {
+    logger().info("{} - Connection:{}", __func__, conn);
+  }
+
+  void ms_handle_reset(crimson::net::ConnectionRef con, bool is_replace);
+
+  void ms_handle_remote_reset(crimson::net::ConnectionRef con) {
+    clear_pending(con);
+  }
+
+  std::optional<seastar::future<>> reply_message(const MessageRef m, Payload& pl) {
+    pl.who = Payload::PONG;
+    bufferlist bl;
+    encode(pl, bl);
+    auto rm = crimson::make_message<MPing>();
+    rm->set_data(bl);
+    if (verbose) {
+      logger().info("{} conn= {} reply i= {}",
+        __func__, m->get_connection(), pl.seq);
+    }
+    return m->get_connection()->send(std::move(rm));
+  }
+
+  seastar::future<> send_message_wrap(crimson::net::ConnectionRef con,
+                                      const bufferlist& data) {
+    auto m = crimson::make_message<MPing>();
+    Payload pl{Payload::PING, index++, data};
+    bufferlist bl;
+    encode(pl, bl);
+    m->set_data(bl);
+    sent[pl.seq] = pl.data;
+    conn_sent[con].push_back(pl.seq);
+    logger().info("{} conn= {} send i= {}",
+      __func__, con, pl.seq);
+
+    return con->send(std::move(m));
+  }
+
+  uint64_t get_num_pending_msgs() {
+    return sent.size();
+  }
+
+  void clear_pending(crimson::net::ConnectionRef con) {
+    for (std::deque<uint64_t>::iterator it = conn_sent[con].begin();
+         it != conn_sent[con].end(); ++it)
+      sent.erase(*it);
+    conn_sent.erase(con);
+  }
+
+  void print() {
+    for (auto && [conn, list] : conn_sent) {
+      if (!list.empty()) {
+        logger().info("{} {} wait {}", __func__,
+                      conn, list.size());
+      }
+    }
+  }
+};
+
+class SyntheticWorkload {
+  std::set<crimson::net::MessengerRef> available_servers;
+  std::set<crimson::net::MessengerRef> available_clients;
+  crimson::net::SocketPolicy server_policy;
+  crimson::net::SocketPolicy client_policy;
+  std::map<crimson::net::ConnectionRef,
+    std::pair<crimson::net::MessengerRef,
+    crimson::net::MessengerRef>> available_connections;
+  SyntheticDispatcher dispatcher;
+  std::vector<bufferlist> rand_data;
+  crimson::auth::DummyAuthClientServer dummy_auth;
+
+  seastar::future<crimson::net::ConnectionRef> get_random_connection() {
+    return seastar::do_until(
+      [this] { return dispatcher.get_num_pending_msgs() <= max_in_flight; },
+      [] { return seastar::sleep(100ms); }
+    ).then([this] {
+      boost::uniform_int<> choose(0, available_connections.size() - 1);
+      int index = choose(rng);
+      std::map<crimson::net::ConnectionRef,
+        std::pair<crimson::net::MessengerRef, crimson::net::MessengerRef>>::iterator i
+        = available_connections.begin();
+      for (; index > 0; --index, ++i) ;
+      return seastar::make_ready_future<crimson::net::ConnectionRef>(i->first);
+   });
+  }
+
+ public:
+   const unsigned min_connections = 10;
+   const unsigned max_in_flight = 64;
+   const unsigned max_connections = 128;
+   const unsigned max_message_len = 1024 * 1024 * 4;
+   const uint64_t  servers, clients;
+
+   SyntheticWorkload(int servers, int clients, int random_num,
+                     crimson::net::SocketPolicy srv_policy,
+                     crimson::net::SocketPolicy cli_policy)
+     : server_policy(srv_policy),
+       client_policy(cli_policy),
+       dispatcher(false, this),
+       servers(servers),
+       clients(clients) {
+
+     for (int i = 0; i < random_num; i++) {
+       bufferlist bl;
+       boost::uniform_int<> u(32, max_message_len);
+       uint64_t value_len = u(rng);
+       bufferptr bp(value_len);
+       bp.zero();
+       for (uint64_t j = 0; j < value_len-sizeof(i); ) {
+         memcpy(bp.c_str()+j, &i, sizeof(i));
+         j += 4096;
+       }
+
+       bl.append(bp);
+       rand_data.push_back(bl);
+     }
+   }
+
+
+   bool can_create_connection() {
+     return available_connections.size() < max_connections;
+   }
+
+   seastar::future<> maybe_generate_connection() {
+     if (!can_create_connection()) {
+       return seastar::now();
+     }
+     crimson::net::MessengerRef server, client;
+     {
+       boost::uniform_int<> choose(0, available_servers.size() - 1);
+       int index = choose(rng);
+       std::set<crimson::net::MessengerRef>::iterator i
+         = available_servers.begin();
+       for (; index > 0; --index, ++i) ;
+       server = *i;
+     }
+     {
+       boost::uniform_int<> choose(0, available_clients.size() - 1);
+       int index = choose(rng);
+       std::set<crimson::net::MessengerRef>::iterator i
+         = available_clients.begin();
+       for (; index > 0; --index, ++i) ;
+       client = *i;
+     }
+
+
+     std::pair<crimson::net::MessengerRef, crimson::net::MessengerRef>
+       connected_pair;
+     {
+       crimson::net::ConnectionRef conn = client->connect(
+                                            server->get_myaddr(),
+                                            entity_name_t::TYPE_OSD);
+       connected_pair = std::make_pair(client, server);
+       available_connections[conn] = connected_pair;
+     }
+     return seastar::now();
+   }
+
+   seastar::future<> random_op (const uint64_t& iter) {
+     return seastar::do_with(iter, [this] (uint64_t& iter) {
+       return seastar::do_until(
+         [&] { return iter == 0; },
+         [&, this]
+       {
+         if (!(iter % 10)) {
+           logger().info("{} Op {} : ", __func__ ,iter);
+           print_internal_state();
+         }
+         --iter;
+         int val = prob(rng);
+         if(val > 90) {
+           return maybe_generate_connection();
+         } else if (val > 80) {
+           return drop_connection();
+         } else if (val > 10) {
+           return send_message();
+         } else {
+           return seastar::sleep(
+             std::chrono::milliseconds(rand() % 1000 + 500));
+         }
+       });
+     });
+   }
+
+   seastar::future<> generate_connections (const uint64_t& iter) {
+     return seastar::do_with(iter, [this] (uint64_t& iter) {
+       return seastar::do_until(
+         [&] { return iter == 0; },
+         [&, this]
+       {
+         --iter;
+         if (!(connections_count() % 10)) {
+          logger().info("seeding connection {}",
+                        connections_count());
+         }
+         return maybe_generate_connection();
+       });
+     });
+   }
+
+   seastar::future<> init_server(const entity_name_t& name,
+                          const std::string& lname,
+                          const uint64_t nonce,
+                          const entity_addr_t& addr) {
+     crimson::net::MessengerRef msgr =
+       crimson::net::Messenger::create(name, lname, nonce);
+     msgr->set_default_policy(server_policy);
+     msgr->set_require_authorizer(false);
+     msgr->set_auth_client(&dummy_auth);
+     msgr->set_auth_server(&dummy_auth);
+     available_servers.insert(msgr);
+     return msgr->bind(entity_addrvec_t{addr}).safe_then(
+         [this, msgr] {
+       return msgr->start({&dispatcher});
+     }, crimson::net::Messenger::bind_ertr::all_same_way(
+         [addr] (const std::error_code& e) {
+       logger().error("{} test_messenger_thrash(): "
+                      "there is another instance running at {}",
+                       __func__, addr);
+       ceph_abort();
+     }));
+   }
+
+   seastar::future<> init_client(const entity_name_t& name,
+                          const std::string& lname,
+                          const uint64_t nonce) {
+     crimson::net::MessengerRef msgr =
+       crimson::net::Messenger::create(name, lname, nonce);
+     msgr->set_default_policy(client_policy);
+     msgr->set_auth_client(&dummy_auth);
+     msgr->set_auth_server(&dummy_auth);
+     available_clients.insert(msgr);
+     return msgr->start({&dispatcher});
+   }
+
+   seastar::future<> send_message() {
+     return get_random_connection()
+     .then([this] (crimson::net::ConnectionRef conn) {
+       boost::uniform_int<> true_false(0, 99);
+       int val = true_false(rng);
+       if (val >= 95) {
+         uuid_d uuid;
+         uuid.generate_random();
+         auto m = crimson::make_message<MCommand>(uuid);
+         std::vector<std::string> cmds;
+         cmds.push_back("command");
+         m->cmd = cmds;
+         m->set_priority(200);
+         return conn->send(std::move(m));
+       } else {
+         boost::uniform_int<> u(0, rand_data.size()-1);
+         return dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
+       }
+     });
+   }
+
+   seastar::future<> drop_connection() {
+     if (available_connections.size() < min_connections) {
+       return seastar::now();
+     }
+
+     return get_random_connection()
+     .then([this] (crimson::net::ConnectionRef conn) {
+       dispatcher.clear_pending(conn);
+       conn->mark_down();
+       if (!client_policy.server &&
+           client_policy.standby) {
+         // it's a lossless policy, so we need to mark down each side
+         std::pair<crimson::net::MessengerRef, crimson::net::MessengerRef> &p =
+           available_connections[conn];
+         if (!p.first->get_default_policy().server &&
+             !p.second->get_default_policy().server) {
+             //verify that equal-to operator applies here
+           ceph_assert(conn->get_messenger() == p.first.get());
+           crimson::net::ConnectionRef peer = p.second->connect(
+             p.first->get_myaddr(), p.first->get_mytype());
+           peer->mark_down();
+           dispatcher.clear_pending(peer);
+           available_connections.erase(peer);
+         }
+       }
+       ceph_assert(available_connections.erase(conn) == 1U);
+       return seastar::now();
+     });
+   }
+
+   void print_internal_state(bool detail=false) {
+     logger().info("available_connections: {} inflight messages: {}",
+       available_connections.size(),
+       dispatcher.get_num_pending_msgs());
+     if (detail && !available_connections.empty()) {
+       dispatcher.print();
+     }
+   }
+
+   seastar::future<> wait_for_done() {
+     int i = 0;
+     return seastar::do_until(
+       [this] { return !dispatcher.get_num_pending_msgs(); },
+       [this, &i]
+     {
+       if (i++ % 50 == 0){
+         print_internal_state(true);
+       }
+       return seastar::sleep(100ms);
+     }).then([this] {
+       return seastar::do_for_each(available_servers, [] (auto server) {
+        if (verbose) {
+           logger().info("server {} shutdown" , server->get_myaddrs());
+        }
+         server->stop();
+         return server->shutdown();
+       });
+     }).then([this] {
+       available_servers.clear();
+     }).then([this] {
+       return seastar::do_for_each(available_clients, [] (auto client) {
+        if (verbose) {
+           logger().info("client {} shutdown" , client->get_myaddrs());
+        }
+         client->stop();
+         return client->shutdown();
+       });
+     }).then([this] {
+       available_clients.clear();
+     });
+   }
+
+   void handle_reset(crimson::net::ConnectionRef con) {
+     available_connections.erase(con);
+   }
+
+   uint64_t servers_count() {
+     return available_servers.size();
+   }
+
+   uint64_t clients_count() {
+     return available_clients.size();
+   }
+
+   uint64_t connections_count() {
+     return available_connections.size();
+   }
+};
+
+void SyntheticDispatcher::ms_handle_reset(crimson::net::ConnectionRef con,
+                                          bool is_replace) {
+  workload->handle_reset(con);
+  clear_pending(con);
+}
+
+seastar::future<> reset_conf() {
+  return seastar::when_all_succeed(
+    local_conf().set_val("ms_inject_socket_failures", "0"),
+    local_conf().set_val("ms_inject_internal_delays", "0"),
+    local_conf().set_val("ms_inject_delay_probability", "0"),
+    local_conf().set_val("ms_inject_delay_max", "0")
+  ).then_unpack([] {
+    return seastar::now();
+  });
+}
+
+// Testing Crimson messenger (with msgr-v2 protocol) robustness against
+// network delays and failures. The test includes stress tests and
+// socket level delays/failures injection tests, letting time
+// and randomness achieve the best test coverage.
+
+// Test Parameters:
+// Clients: 8             (stateful)
+// Servers: 32            (lossless)
+// Connections: 100       (Generated between random clients/server)
+// Random Operations: 120 (Generate/Drop Connection, Send Message, Sleep)
+seastar::future<> test_stress(thrash_params_t tp)
+{
+
+  logger().info("test_stress():");
+
+  SyntheticWorkload test_msg(tp.servers, tp.clients, 100,
+                             crimson::net::SocketPolicy::stateful_server(0),
+                             crimson::net::SocketPolicy::lossless_client(0));
+
+  return seastar::do_with(test_msg, [tp]
+    (SyntheticWorkload& test_msg) {
+      return seastar::do_until([&test_msg] {
+        return test_msg.servers_count() == test_msg.servers; },
+      [&test_msg] {
+        entity_addr_t bind_addr = get_server_addr();
+        bind_addr.set_type(entity_addr_t::TYPE_MSGR2);
+       uint64_t server_num = get_nonce();
+        return test_msg.init_server(entity_name_t::OSD(server_num),
+                             "server", server_num , bind_addr);
+      }).then([&test_msg] {
+      return seastar::do_until([&test_msg] {
+        return test_msg.clients_count() == test_msg.clients; },
+      [&test_msg] {
+        return test_msg.init_client(entity_name_t::CLIENT(-1),
+                             "client", get_nonce());
+       });
+      }).then([&test_msg, tp] {
+        return test_msg.generate_connections(tp.connections);
+      }).then([&test_msg, tp] {
+        return test_msg.random_op(tp.random_op);
+      }).then([&test_msg] {
+       return test_msg.wait_for_done();
+      }).then([] {
+        logger().info("test_stress() DONE");
+      }).handle_exception([] (auto eptr) {
+        logger().error(
+          "test_stress() failed: got exception {}",
+          eptr);
+        throw;
+      });
+    });
+}
+
+// Test Parameters:
+// Clients: 8              (statefull)
+// Servers: 32             (loseless)
+// Connections: 100        (Generated between random clients/server)
+// Random Operations: 120 (Generate/Drop Connection, Send Message, Sleep)
+seastar::future<> test_injection(thrash_params_t tp)
+{
+
+  logger().info("test_injection():");
+
+  SyntheticWorkload test_msg(tp.servers, tp.clients, 100,
+                             crimson::net::SocketPolicy::stateful_server(0),
+                             crimson::net::SocketPolicy::lossless_client(0));
+
+  return seastar::do_with(test_msg, [tp]
+    (SyntheticWorkload& test_msg) {
+      return seastar::do_until([&test_msg] {
+        return test_msg.servers_count() == test_msg.servers; },
+      [&test_msg] {
+        entity_addr_t bind_addr = get_server_addr();
+        bind_addr.set_type(entity_addr_t::TYPE_MSGR2);
+       uint64_t server_num = get_nonce();
+        return test_msg.init_server(entity_name_t::OSD(server_num),
+                             "server", server_num , bind_addr);
+      }).then([&test_msg] {
+      return seastar::do_until([&test_msg] {
+        return test_msg.clients_count() == test_msg.clients; },
+      [&test_msg] {
+        return test_msg.init_client(entity_name_t::CLIENT(-1),
+                             "client", get_nonce());
+       });
+      }).then([] {
+        return seastar::when_all_succeed(
+          local_conf().set_val("ms_inject_socket_failures", "30"),
+          local_conf().set_val("ms_inject_internal_delays", "0.1"),
+          local_conf().set_val("ms_inject_delay_probability", "1"),
+         local_conf().set_val("ms_inject_delay_max", "5"));
+      }).then_unpack([] {
+       return seastar::now();
+      }).then([&test_msg, tp] {
+        return test_msg.generate_connections(tp.connections);
+      }).then([&test_msg, tp] {
+        return test_msg.random_op(tp.random_op);
+      }).then([&test_msg] {
+         return test_msg.wait_for_done();
+      }).then([] {
+        logger().info("test_inejction() DONE");
+       return seastar::now();
+      }).then([] {
+        return reset_conf();
+      }).handle_exception([] (auto eptr) {
+        logger().error(
+          "test_injection() failed: got exception {}",
+          eptr);
+        throw;
+      });
+    });
+}
+
+}
+
+seastar::future<int> do_test(seastar::app_template& app)
+{
+  std::vector<const char*> args;
+  std::string cluster;
+  std::string conf_file_list;
+  auto init_params = ceph_argparse_early_args(args,
+                                              CEPH_ENTITY_TYPE_CLIENT,
+                                              &cluster,
+                                              &conf_file_list);
+  return crimson::common::sharded_conf().start(init_params.name, cluster)
+  .then([conf_file_list] {
+    return local_conf().parse_config_files(conf_file_list);
+  }).then([&app] {
+    auto&& config = app.configuration();
+    verbose = config["verbose"].as<bool>();
+      return test_stress(thrash_params_t{8, 32, 50, 120})
+    .then([] {
+      return test_injection(thrash_params_t{16, 32, 50, 120});
+    }).then([] {
+      logger().info("All tests succeeded");
+      // Seastar has bugs to have events undispatched during shutdown,
+      // which will result in memory leak and thus fail LeakSanitizer.
+      return seastar::sleep(100ms);
+    });
+  }).then([] {
+    return crimson::common::sharded_conf().stop();
+  }).then([] {
+    return 0;
+  }).handle_exception([] (auto eptr) {
+    logger().error("Test failed: got exception {}", eptr);
+    return 1;
+  });
+}
+
+int main(int argc, char** argv)
+{
+  seastar::app_template app;
+  app.add_options()
+    ("verbose,v", bpo::value<bool>()->default_value(false),
+     "chatty if true");
+  return app.run(argc, argv, [&app] {
+    return do_test(app);
+  });
+}
index 4ba7c547fa4322efbcbb497617f101360a05c5ff..850dcbd1bdb5687139a1242b9f8573453d4416a6 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "common/ceph_argparse.h"
 #include <seastar/core/app-template.hh>
 #include <seastar/core/gate.hh>
 #include <seastar/core/sharded.hh>
@@ -13,6 +14,8 @@
 #include "crimson/net/Fwd.h"
 #include "crimson/net/Socket.h"
 
+using crimson::common::local_conf;
+
 namespace {
 
 using namespace std::chrono_literals;
@@ -462,11 +465,21 @@ future<> test_preemptive_down() {
 
 }
 
-int main(int argc, char** argv)
+seastar::future<int> do_test(seastar::app_template& app)
 {
-  seastar::app_template app;
-  return app.run(argc, argv, [] {
-    return seastar::futurize_invoke([] {
+  std::vector<const char*> args;
+  std::string cluster;
+  std::string conf_file_list;
+  auto init_params = ceph_argparse_early_args(args,
+                                              CEPH_ENTITY_TYPE_CLIENT,
+                                              &cluster,
+                                              &conf_file_list);
+  return crimson::common::sharded_conf().start(init_params.name, cluster)
+  .then([conf_file_list] {
+    return local_conf().parse_config_files(conf_file_list);
+  }).then([] {
+      return local_conf().set_val("ms_inject_internal_delays", "0")
+    .then([] {
       return test_refused();
     }).then([] {
       return test_bind_same();
@@ -485,9 +498,21 @@ int main(int argc, char** argv)
       // Seastar has bugs to have events undispatched during shutdown,
       // which will result in memory leak and thus fail LeakSanitizer.
       return seastar::sleep(100ms);
-    }).handle_exception([] (auto eptr) {
-      std::cout << "Test failure" << std::endl;
-      return seastar::make_exception_future<>(eptr);
     });
+  }).then([] {
+    return crimson::common::sharded_conf().stop();
+  }).then([] {
+    return 0;
+  }).handle_exception([] (auto eptr) {
+    logger.error("Test failed: got exception {}", eptr);
+    return 1;
+  });
+}
+
+int main(int argc, char** argv)
+{
+  seastar::app_template app;
+  return app.run(argc, argv, [&app] {
+    return do_test(app);
   });
 }