]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: add shutdown interface to Socket
authorYingxin Cheng <yingxincheng@gmail.com>
Thu, 20 Jun 2019 11:26:08 +0000 (19:26 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 10 Jul 2019 08:40:53 +0000 (16:40 +0800)
* Support preemptive shutdown.
* Implement socket unittests to verify all shutdown possibilities.
* Added missing error::broken_pipe.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Errors.cc
src/crimson/net/Errors.h
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/test/crimson/test_socket.cc

index a5748019c841c8a794eddb388a954dacec3a7a85..abd34a809f92475265b45cadb8c4e93b573560c8 100644 (file)
@@ -47,6 +47,8 @@ const std::error_category& net_category()
           return "invalid argument";
         case error::address_in_use:
           return "address in use";
+        case error::broken_pipe:
+          return "broken pipe";
         default:
           return "unknown";
       }
@@ -67,6 +69,8 @@ const std::error_category& net_category()
           return std::errc::invalid_argument;
         case error::address_in_use:
           return std::errc::address_in_use;
+        case error::broken_pipe:
+          return std::errc::broken_pipe;
         default:
           return std::error_condition(ev, *this);
       }
@@ -92,6 +96,9 @@ const std::error_category& net_category()
         case error::address_in_use:
           return cond == std::errc::address_in_use
               || cond == std::error_condition(EADDRINUSE, std::system_category());
+        case error::broken_pipe:
+          return cond == std::errc::broken_pipe
+              || cond == std::error_condition(EPIPE, std::system_category());
         default:
           return false;
       }
@@ -117,6 +124,9 @@ const std::error_category& net_category()
         case error::address_in_use:
           return code == std::errc::address_in_use
               || code == std::error_code(EADDRINUSE, std::system_category());
+        case error::broken_pipe:
+          return code == std::errc::broken_pipe
+              || code == std::error_code(EPIPE, std::system_category());
         default:
           return false;
       }
index aa81adcfe6ea705ca2a8600e00f6453e49fe4014..d8c712bf2d06e45b5c2ac962848c1d7444fb5d22 100644 (file)
@@ -31,6 +31,7 @@ enum class error {
   corrupted_message,
   invalid_argument,
   address_in_use,
+  broken_pipe,
 };
 
 /// net error category
index 3ea65d41a54b5f6a1247b277e6f69339fe973acd..ca4e80db5ddecabb1e4002c309ca75cdd8488824 100644 (file)
@@ -3,12 +3,17 @@
 
 #include "Socket.h"
 
+#include "crimson/common/log.h"
 #include "Errors.h"
 
 namespace ceph::net {
 
 namespace {
 
+seastar::logger& logger() {
+  return ceph::get_logger(ceph_subsys_ms);
+}
+
 // an input_stream consumer that reads buffer segments into a bufferlist up to
 // the given number of remaining bytes
 struct bufferlist_consumer {
@@ -81,4 +86,38 @@ Socket::read_exactly(size_t bytes) {
     });
 }
 
+void Socket::shutdown() {
+#ifndef NDEBUG
+  ceph_assert(!down);
+  down = true;
+#endif
+  socket.shutdown_input();
+  socket.shutdown_output();
+}
+
+static inline seastar::future<> close_and_handle_errors(auto& out) {
+  return out.close().handle_exception_type([] (const std::system_error& e) {
+    if (e.code() != error::broken_pipe &&
+        e.code() != error::connection_reset) {
+      logger().error("Socket::close(): unexpected error {}", e);
+      ceph_abort();
+    }
+    // can happen when out is already shutdown, ignore
+  });
+}
+
+seastar::future<> Socket::close() {
+#ifndef NDEBUG
+  ceph_assert(!closed);
+  closed = true;
+#endif
+  return seastar::when_all_succeed(
+    in.close(),
+    close_and_handle_errors(out)
+  ).handle_exception([] (auto eptr) {
+    logger().error("Socket::close(): unexpected exception {}", eptr);
+    ceph_abort();
+  });
+}
+
 } // namespace ceph::net
index a43a7f5a0bb2b874b4f80c838adb1b5fe4cd9eab..17ffb4c7b262536b548e4326e6988aa3d6a0c518 100644 (file)
@@ -22,6 +22,11 @@ class Socket
   seastar::input_stream<char> in;
   seastar::output_stream<char> out;
 
+#ifndef NDEBUG
+  bool down = false;
+  bool closed = false;
+#endif
+
   /// buffer state for read()
   struct {
     bufferlist buffer;
@@ -39,6 +44,12 @@ class Socket
       // performance. see seastar::net::connected_socket::output()
       out(socket.output(65536)) {}
 
+  ~Socket() {
+#ifndef NDEBUG
+    assert(closed);
+#endif
+  }
+
   Socket(Socket&& o) = delete;
 
   static seastar::future<SocketFRef>
@@ -79,12 +90,20 @@ class Socket
     return out.write(std::move(buf)).then([this] { return out.flush(); });
   }
 
+  // preemptively disable further reads or writes, can only be shutdown once.
+  void shutdown();
+
   /// Socket can only be closed once.
-  seastar::future<> close() {
-    return seastar::smp::submit_to(sid, [this] {
-        return seastar::when_all(
-          in.close(), out.close()).discard_result();
-      });
+  seastar::future<> close();
+
+  // shutdown input_stream only, for tests
+  void force_shutdown_in() {
+    socket.shutdown_input();
+  }
+
+  // shutdown output_stream only, for tests
+  void force_shutdown_out() {
+    socket.shutdown_output();
   }
 };
 
index b73974561b18cb5a4799b4408df744176c2c6219..cbdb4ec20d1aaf63629b4d3aec15b2185c849fcf 100644 (file)
@@ -17,6 +17,7 @@ using seastar::future;
 using ceph::net::error;
 using ceph::net::Socket;
 using ceph::net::SocketFRef;
+using ceph::net::stop_t;
 
 static seastar::logger logger{"test"};
 
@@ -180,6 +181,276 @@ future<> test_accept() {
   });
 }
 
+class SocketFactory final
+    : public SocketFactoryBase<SocketFactory> {
+  const seastar::shard_id target_shard;
+  seastar::promise<SocketFRef> socket_promise;
+
+  future<> bind_accept() override {
+    return SocketFactoryBase<SocketFactory>::bind_accept();
+  }
+
+  future<SocketFRef> get_accepted() {
+    return socket_promise.get_future();
+  }
+
+ public:
+  SocketFactory(seastar::shard_id shard) : target_shard{shard} {}
+
+  future<> handle_server_socket(SocketFRef&& socket) override {
+    return container().invoke_on(target_shard,
+        [socket = std::move(socket)] (auto& factory) mutable {
+      factory.socket_promise.set_value(std::move(socket));
+    });
+  }
+
+  static future<SocketFRef, SocketFRef> get_sockets() {
+    return ceph::net::create_sharded<SocketFactory>(seastar::engine().cpu_id()
+    ).then([] (SocketFactory* factory) {
+      return factory->bind_accept().then([factory] {
+        return connect();
+      }).then([factory] (auto fp_client_socket) {
+        return factory->get_accepted(
+        ).then([fp_client_socket = std::move(fp_client_socket)]
+              (auto fp_server_socket) mutable {
+          return seastar::make_ready_future<SocketFRef, SocketFRef>(
+              std::move(fp_client_socket), std::move(fp_server_socket));
+        });
+      }).finally([factory] {
+        return factory->shutdown();
+      });
+    });
+  }
+};
+
+class Connection {
+  static const uint64_t DATA_TAIL = 5327;
+  static const unsigned DATA_SIZE = 4096;
+  std::array<uint64_t, DATA_SIZE> data = {0};
+
+  void verify_data_read(const uint64_t read_data[]) {
+    ceph_assert(read_data[0] == read_count);
+    ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL);
+  }
+
+  SocketFRef socket;
+  uint64_t write_count = 0;
+  uint64_t read_count = 0;
+
+  Connection(SocketFRef&& socket) : socket{std::move(socket)} {
+    data[DATA_SIZE - 1] = DATA_TAIL;
+  }
+
+  future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
+    return seastar::repeat([this, round, force_shut] {
+      if (round != 0 && round <= write_count) {
+        return seastar::futurize_apply([this, force_shut] {
+          if (force_shut) {
+            socket->force_shutdown_out();
+          }
+        }).then([] {
+          return seastar::make_ready_future<stop_t>(stop_t::yes);
+        });
+      } else {
+        data[0] = write_count;
+        return socket->write(seastar::net::packet(
+            reinterpret_cast<const char*>(&data), sizeof(data))
+        ).then([this] {
+          return socket->flush();
+        }).then([this] {
+          write_count += 1;
+          return seastar::make_ready_future<stop_t>(stop_t::no);
+        });
+      }
+    });
+  }
+
+  future<> dispatch_write_unbounded() {
+    return dispatch_write(
+    ).then([] {
+      ceph_abort();
+    }).handle_exception_type([] (const std::system_error& e) {
+      if (e.code() != error::broken_pipe &&
+          e.code() != error::connection_reset) {
+        logger.error("dispatch_write_unbounded(): "
+                     "unexpected error {}", e);
+        throw;
+      }
+      // successful
+      logger.debug("dispatch_write_unbounded(): "
+                   "expected error {}", e);
+    });
+  }
+
+  future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
+    return seastar::repeat([this, round, force_shut] {
+      if (round != 0 && round <= read_count) {
+        return seastar::futurize_apply([this, force_shut] {
+          if (force_shut) {
+            socket->force_shutdown_in();
+          }
+        }).then([] {
+          return seastar::make_ready_future<stop_t>(stop_t::yes);
+        });
+      } else {
+        return seastar::futurize_apply([this] {
+          // we want to test both Socket::read() and Socket::read_exactly()
+          if (read_count % 2) {
+            return socket->read(DATA_SIZE * sizeof(uint64_t)
+            ).then([this] (ceph::bufferlist bl) {
+              uint64_t read_data[DATA_SIZE];
+              auto p = bl.cbegin();
+              ::ceph::decode_raw(read_data, p);
+              verify_data_read(read_data);
+            });
+          } else {
+            return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
+            ).then([this] (auto buf) {
+              auto read_data = reinterpret_cast<const uint64_t*>(buf.get());
+              verify_data_read(read_data);
+            });
+          }
+        }).then([this] {
+          ++read_count;
+          return seastar::make_ready_future<stop_t>(stop_t::no);
+        });
+      }
+    });
+  }
+
+  future<> dispatch_read_unbounded() {
+    return dispatch_read(
+    ).then([] {
+      ceph_abort();
+    }).handle_exception_type([] (const std::system_error& e) {
+      if (e.code() != error::read_eof
+       && e.code() != error::connection_reset) {
+        logger.error("dispatch_read_unbounded(): "
+                     "unexpected error {}", e);
+        throw;
+      }
+      // successful
+      logger.debug("dispatch_read_unbounded(): "
+                   "expected error {}", e);
+    });
+  }
+
+  void shutdown() {
+    socket->shutdown();
+  }
+
+  future<> close() {
+    return socket->close();
+  }
+
+ public:
+  static future<> dispatch_rw_bounded(SocketFRef&& socket, bool is_client,
+                                      unsigned round, bool force_shut = false) {
+    return seastar::smp::submit_to(is_client ? 0 : 1,
+        [socket = std::move(socket), round, force_shut] () mutable {
+      return seastar::do_with(Connection{std::move(socket)},
+                              [round, force_shut] (auto& conn) {
+        ceph_assert(round != 0);
+        return seastar::when_all_succeed(
+          conn.dispatch_write(round, force_shut),
+          conn.dispatch_read(round, force_shut)
+        ).finally([&conn] {
+          return conn.close();
+        });
+      });
+    }).handle_exception([is_client] (auto eptr) {
+      logger.error("dispatch_rw_bounded(): {} got unexpected exception {}",
+                   is_client ? "client" : "server", eptr);
+      ceph_abort();
+    });
+  }
+
+  static future<> dispatch_rw_unbounded(SocketFRef&& socket, bool is_client,
+                                        bool preemptive_shut = false) {
+    return seastar::smp::submit_to(is_client ? 0 : 1,
+        [socket = std::move(socket), preemptive_shut, is_client] () mutable {
+      return seastar::do_with(Connection{std::move(socket)},
+                              [preemptive_shut, is_client] (auto& conn) {
+        return seastar::when_all_succeed(
+          conn.dispatch_write_unbounded(),
+          conn.dispatch_read_unbounded(),
+          seastar::futurize_apply([&conn, preemptive_shut] {
+            if (preemptive_shut) {
+              return seastar::sleep(100ms).then([&conn] { conn.shutdown(); });
+            } else {
+              return seastar::now();
+            }
+          })
+        ).finally([&conn] {
+          return conn.close();
+        });
+      });
+    }).handle_exception([is_client] (auto eptr) {
+      logger.error("dispatch_rw_unbounded(): {} got unexpected exception {}",
+                   is_client ? "client" : "server", eptr);
+      ceph_abort();
+    });
+  }
+};
+
+future<> test_read_write() {
+  logger.info("test_read_write()...");
+  return SocketFactory::get_sockets(
+  ).then([] (auto client_socket, auto server_socket) {
+    return seastar::when_all_succeed(
+      Connection::dispatch_rw_bounded(std::move(client_socket), true, 128),
+      Connection::dispatch_rw_bounded(std::move(server_socket), false, 128)
+    );
+  }).handle_exception([] (auto eptr) {
+    logger.error("test_read_write() got unexpeted exception {}", eptr);
+    ceph_abort();
+  });
+}
+
+future<> test_unexpected_down() {
+  logger.info("test_unexpected_down()...");
+  return SocketFactory::get_sockets(
+  ).then([] (auto client_socket, auto server_socket) {
+    return seastar::when_all_succeed(
+      Connection::dispatch_rw_bounded(std::move(client_socket), true, 128, true),
+      Connection::dispatch_rw_unbounded(std::move(server_socket), false)
+    );
+  }).handle_exception([] (auto eptr) {
+    logger.error("test_unexpected_down() got unexpeted exception {}", eptr);
+    ceph_abort();
+  });
+}
+
+future<> test_shutdown_propagated() {
+  logger.info("test_shutdown_propagated()...");
+  return SocketFactory::get_sockets(
+  ).then([] (auto client_socket, auto server_socket) {
+    client_socket->shutdown();
+    return Connection::dispatch_rw_unbounded(std::move(server_socket), false
+    ).finally([client_socket = std::move(client_socket)] () mutable {
+      return client_socket->close(
+      ).finally([cleanup = std::move(client_socket)] {});
+    });
+  }).handle_exception([] (auto eptr) {
+    logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr);
+    ceph_abort();
+  });
+}
+
+future<> test_preemptive_down() {
+  logger.info("test_preemptive_down()...");
+  return SocketFactory::get_sockets(
+  ).then([] (auto client_socket, auto server_socket) {
+    return seastar::when_all_succeed(
+      Connection::dispatch_rw_unbounded(std::move(client_socket), true, true),
+      Connection::dispatch_rw_unbounded(std::move(server_socket), false)
+    );
+  }).handle_exception([] (auto eptr) {
+    logger.error("test_preemptive_down() got unexpeted exception {}", eptr);
+    ceph_abort();
+  });
+}
+
 }
 
 int main(int argc, char** argv)
@@ -190,6 +461,14 @@ int main(int argc, char** argv)
       return test_bind_same();
     }).then([] {
       return test_accept();
+    }).then([] {
+      return test_read_write();
+    }).then([] {
+      return test_unexpected_down();
+    }).then([] {
+      return test_shutdown_propagated();
+    }).then([] {
+      return test_preemptive_down();
     }).then([] {
       logger.info("All tests succeeded");
     }).handle_exception([] (auto eptr) {