From fd8cec434f65044461cba1a73297b5de92d89a39 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Fri, 28 Dec 2018 10:59:45 +0800 Subject: [PATCH] crimson/net: simplify logics and centralize fault handling in execute_open() Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 100 +++++++++++----------------- src/crimson/net/SocketConnection.h | 13 +--- 2 files changed, 43 insertions(+), 70 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 23765345369..0d3d1181fa0 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -92,50 +92,40 @@ seastar::future<> SocketConnection::keepalive() }); } -void SocketConnection::read_tags_until_next_message() +seastar::future<> SocketConnection::handle_tags() { - seastar::repeat([this] { + return seastar::keep_doing([this] { // read the next tag return socket->read_exactly(1) .then([this] (auto buf) { switch (buf[0]) { case CEPH_MSGR_TAG_MSG: - // stop looping and notify read_header() - return seastar::make_ready_future(stop_t::yes); + return read_message(); case CEPH_MSGR_TAG_ACK: return handle_ack(); case CEPH_MSGR_TAG_KEEPALIVE: - break; + return seastar::now(); case CEPH_MSGR_TAG_KEEPALIVE2: - return handle_keepalive2() - .then([this] { return stop_t::no; }); + return handle_keepalive2(); case CEPH_MSGR_TAG_KEEPALIVE2_ACK: - return handle_keepalive2_ack() - .then([this] { return stop_t::no; }); + return handle_keepalive2_ack(); case CEPH_MSGR_TAG_CLOSE: logger().info("{} got tag close", *this); - break; + throw std::system_error(make_error_code(error::connection_aborted)); + default: + logger().error("{} got unknown msgr tag {}", *this, static_cast(buf[0])); + throw std::system_error(make_error_code(error::read_eof)); } - return seastar::make_ready_future(stop_t::no); }); - }).handle_exception_type([this] (const std::system_error& e) { - if (e.code() == error::read_eof) { - close(); - } - throw e; - }).then_wrapped([this] (auto fut) { - // satisfy the message promise - fut.forward_to(std::move(on_message)); }); } -seastar::future SocketConnection::handle_ack() +seastar::future<> SocketConnection::handle_ack() { return socket->read_exactly(sizeof(ceph_le64)) .then([this] (auto buf) { auto seq = reinterpret_cast(buf.get()); discard_up_to(&sent, *seq); - return stop_t::no; }); } @@ -169,14 +159,10 @@ seastar::future<> SocketConnection::maybe_throttle() return policy.throttler_bytes->get(to_read); } -seastar::future SocketConnection::do_read_message() +seastar::future<> SocketConnection::read_message() { - return on_message.get_future() - .then([this] { - on_message = seastar::promise<>{}; - // read header - return socket->read(sizeof(m.header)); - }).then([this] (bufferlist bl) { + return socket->read(sizeof(m.header)) + .then([this] (bufferlist bl) { // throttle the traffic, maybe auto p = bl.cbegin(); ::decode(m.header, p); @@ -197,30 +183,27 @@ seastar::future SocketConnection::do_read_message() // read footer return socket->read(sizeof(m.footer)); }).then([this] (bufferlist bl) { - // resume background processing of tags - read_tags_until_next_message(); - auto p = bl.cbegin(); ::decode(m.footer, p); auto msg = ::decode_message(nullptr, 0, m.header, m.footer, m.front, m.middle, m.data, nullptr); // TODO: set time stamps msg->set_byte_throttler(policy.throttler_bytes); - constexpr bool add_ref = false; // Message starts with 1 ref - return MessageRef{msg, add_ref}; - }); -} -seastar::future SocketConnection::read_message() -{ - return seastar::repeat_until_value([this] { - return do_read_message() - .then([this] (MessageRef msg) -> std::optional { - if (!update_rx_seq(msg->get_seq())) { - // skip this request and read the next - return {}; - } - return msg; + if (!update_rx_seq(msg->get_seq())) { + // skip this message + return; + } + + constexpr bool add_ref = false; // Message starts with 1 ref + auto msg_ref = MessageRef{msg, add_ref}; + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + return dispatcher.ms_dispatch(this, std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_dispatch caught exception: {}", *this, eptr); + ceph_assert(false); + }); }); }); } @@ -914,30 +897,27 @@ SocketConnection::execute_open() h.promise.set_value(); seastar::with_gate(pending_dispatch, [this] { // start background processing of tags - read_tags_until_next_message(); - return seastar::keep_doing([this] { - return read_message() - .then([this] (MessageRef msg) { - // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] { - return dispatcher.ms_dispatch(this, std::move(msg)) - .handle_exception([] (std::exception_ptr eptr) {}); - }); - // return immediately to start on the next message - return seastar::now(); - }); - }).handle_exception_type([this] (const std::system_error& e) { + return handle_tags() + .handle_exception_type([this] (const std::system_error& e) { + logger().warn("{} open fault: {}", *this, e); if (e.code() == error::connection_aborted || e.code() == error::connection_reset) { - return dispatcher.ms_handle_reset(this); + return dispatcher.ms_handle_reset(this) + .then([this] { + close(); + }); } else if (e.code() == error::read_eof) { - return dispatcher.ms_handle_remote_reset(this); + return dispatcher.ms_handle_remote_reset(this) + .then([this] { + close(); + }); } else { throw e; } }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the open state logger().warn("{} open fault: {}", *this, eptr); + close(); }); }); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index ecb2a9df595..8cbe575f69a 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -108,13 +108,9 @@ class SocketConnection : public Connection { bufferlist data; } m; - /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message - /// header will follow - seastar::promise<> on_message; - seastar::future<> maybe_throttle(); - void read_tags_until_next_message(); - seastar::future handle_ack(); + seastar::future<> handle_tags(); + seastar::future<> handle_ack(); /// becomes available when handshake completes, and when all previous messages /// have been sent to the output stream. send() chains new messages as @@ -139,7 +135,7 @@ class SocketConnection : public Connection { /// false otherwise. bool update_rx_seq(seq_num_t seq); - seastar::future do_read_message(); + seastar::future<> read_message(); std::unique_ptr session_security; @@ -199,9 +195,6 @@ class SocketConnection : public Connection { void start_accept(seastar::connected_socket&& socket, const entity_addr_t& peer_addr); - /// read a message from a connection that has completed its handshake - seastar::future read_message(); - /// the number of connections initiated in this session, increment when a /// new connection is established uint32_t connect_seq() const { -- 2.39.5