From 55a76b48643cad93e842a7a0e14259ad91c17ca0 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 25 Jan 2019 16:36:27 +0800 Subject: [PATCH] crimson/net: centralized write_event() * introduce write_state_t to decouple write behaviors from states. * replace `h.promise` with `state_changed`, with a more general way to change write behaviors according to state switches. * centralize write_event() to dispatch writes in the open state. * friendly interface for v1/v2 protocol abstraction. Signed-off-by: Yingxin Cheng --- src/crimson/net/SocketConnection.cc | 168 +++++++++++++++++----------- src/crimson/net/SocketConnection.h | 21 +++- 2 files changed, 122 insertions(+), 67 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index a0268ef949f..a3045b1dedc 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -46,8 +46,7 @@ namespace { SocketConnection::SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher) : messenger(messenger), - dispatcher(dispatcher), - send_ready(h.promise.get_future()) + dispatcher(dispatcher) { ceph_assert(&messenger.container().local() == &messenger); } @@ -65,39 +64,85 @@ SocketConnection::get_messenger() const { seastar::future SocketConnection::is_connected() { return seastar::smp::submit_to(shard_id(), [this] { - return !send_ready.failed(); + return write_state == write_state_t::open; }); } +//TODO(performance): batch messages in out_q instead of chaining individual write events +//TODO: should discard all the pending messages when reset +seastar::future<> SocketConnection::write_event(MessageRef msg) +{ + switch (write_state) { + case write_state_t::open: + case write_state_t::delay: + return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { + seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] { + return seastar::repeat([this, msg=std::move(msg)] { + switch (write_state) { + case write_state_t::open: + return seastar::futurize_apply([this] { + if (m_keepalive) { + return do_keepalive() + .then([this] { m_keepalive = false; }); + } + return seastar::now(); + }).then([this] { + if (m_keepalive_ack) { + return do_keepalive_ack() + .then([this] { m_keepalive_ack = false; }); + } + return seastar::now(); + }).then([this, msg] { + if (msg) { + return write_message(msg); + } + return seastar::now(); + }).then([this] { + return socket->flush(); + }).then([] { + return stop_t::yes; + }).handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} write_event fault: {}", *this, eptr); + close(); + return stop_t::no; + }); + case write_state_t::delay: + // delay all the writes until open + return state_changed.get_shared_future() + .then([] { return stop_t::no; }); + case write_state_t::drop: + return seastar::make_ready_future(stop_t::yes); + default: + ceph_assert(false); + } + }); + }); + send_ready = f.get_future(); + return f.get_future(); + }); + case write_state_t::drop: + return seastar::now(); + default: + ceph_assert(false); + } +} + seastar::future<> SocketConnection::send(MessageRef msg) { logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg); return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { - if (state == state_t::closing) - return seastar::now(); - return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { - return do_send(std::move(msg)) - .handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} send fault: {}", *this, eptr); - close(); - }); - }); - }); + return write_event(msg); + }); } seastar::future<> SocketConnection::keepalive() { return seastar::smp::submit_to(shard_id(), [this] { - if (state == state_t::closing) - return seastar::now(); - return seastar::with_gate(pending_dispatch, [this] { - return do_keepalive() - .handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} keepalive fault: {}", *this, eptr); - close(); - }); - }); - }); + if (!m_keepalive) { + m_keepalive = true; + write_event(); + } + }); } seastar::future<> SocketConnection::close() @@ -279,7 +324,7 @@ seastar::future<> SocketConnection::write_message(MessageRef msg) bl.append((const char*)&old_footer, sizeof(old_footer)); } // write as a seastar::net::packet - return socket->write_flush(std::move(bl)); + return socket->write(std::move(bl)); // TODO: lossless policy // .then([this, msg = std::move(msg)] { // if (!policy.lossy) { @@ -288,35 +333,18 @@ seastar::future<> SocketConnection::write_message(MessageRef msg) // }); } -seastar::future<> SocketConnection::do_send(MessageRef msg) +seastar::future<> SocketConnection::do_keepalive() { - // chain the message after the last message is sent - // TODO: retry send for lossless connection - seastar::shared_future<> f = send_ready.then( - [this, msg = std::move(msg)] { - if (state == state_t::closing) - return seastar::now(); - return write_message(std::move(msg)); - }); - - // chain any later messages after this one completes - send_ready = f.get_future(); - // allow the caller to wait on the same future - return f.get_future(); + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + logger().debug("{} write keepalive2 {}", *this, k.req.stamp.tv_sec); + return socket->write(make_static_packet(k.req)); } -seastar::future<> SocketConnection::do_keepalive() +seastar::future<> SocketConnection::do_keepalive_ack() { - // TODO: retry keepalive for lossless connection - seastar::shared_future<> f = send_ready.then([this] { - if (state == state_t::closing) - return seastar::now(); - k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( - ceph::coarse_real_clock::now()); - return socket->write_flush(make_static_packet(k.req)); - }); - send_ready = f.get_future(); - return f.get_future(); + logger().debug("{} write keepalive2 ack {}", *this, k.ack.stamp.tv_sec); + return socket->write(make_static_packet(k.ack)); } seastar::future<> SocketConnection::do_close() @@ -353,8 +381,13 @@ seastar::future<> SocketConnection::do_close() ceph_assert(state == state_t::connecting); close_ready = pending_dispatch.close().finally(std::move(cleanup)); } + logger().debug("{} trigger closing, was {}", *this, static_cast(state)); state = state_t::closing; + write_state = write_state_t::drop; + state_changed.set_value(); + state_changed = seastar::shared_promise<>(); + return close_ready.get_future(); } @@ -575,12 +608,11 @@ SocketConnection::handle_keepalive2() return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { k.ack.stamp = *reinterpret_cast(buf.get()); - seastar::shared_future<> f = send_ready.then([this] { - logger().debug("{} keepalive2 {}", *this, k.ack.stamp.tv_sec); - return socket->write_flush(make_static_packet(k.ack)); - }); - send_ready = f.get_future(); - return f.get_future(); + logger().debug("{} got keepalive2 {}", *this, k.ack.stamp.tv_sec); + if (!m_keepalive_ack) { + m_keepalive_ack = true; + write_event(); + } }); } @@ -591,7 +623,7 @@ SocketConnection::handle_keepalive2_ack() .then([this] (auto buf) { auto t = reinterpret_cast(buf.get()); k.ack_stamp = *t; - logger().debug("{} keepalive2 ack {}", *this, t->tv_sec); + logger().debug("{} got keepalive2 ack {}", *this, t->tv_sec); }); } @@ -799,12 +831,16 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, const entity_type_t& _peer_type) { ceph_assert(state == state_t::none); + logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); + state = state_t::connecting; + write_state = write_state_t::delay; + state_changed.set_value(); + state_changed = seastar::shared_promise<>(); + ceph_assert(!socket); peer_addr = _peer_addr; peer_type = _peer_type; messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); - logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); - state = state_t::connecting; seastar::with_gate(pending_dispatch, [this] { return seastar::connect(peer_addr.in4_addr()) .then([this](seastar::connected_socket fd) { @@ -847,7 +883,6 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state logger().warn("{} connecting fault: {}", *this, eptr); - h.promise.set_value(); close(); }); }); @@ -858,6 +893,12 @@ SocketConnection::start_accept(seastar::foreign_ptr>&& s const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::none); + logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); + state = state_t::accepting; + write_state = write_state_t::delay; + state_changed.set_value(); + state_changed = seastar::shared_promise<>(); + ceph_assert(!socket); peer_addr.u = _peer_addr.u; peer_addr.set_port(0); @@ -865,8 +906,6 @@ SocketConnection::start_accept(seastar::foreign_ptr>&& s socket_port = _peer_addr.get_port(); socket = std::move(sock); messenger.accept_conn(seastar::static_pointer_cast(shared_from_this())); - logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); - state = state_t::accepting; seastar::with_gate(pending_dispatch, [this, _peer_addr] { // encode/send server's handshake header bufferlist bl; @@ -899,7 +938,6 @@ SocketConnection::start_accept(seastar::foreign_ptr>&& s }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state logger().warn("{} accepting fault: {}", *this, eptr); - h.promise.set_value(); close(); }); }); @@ -910,8 +948,10 @@ SocketConnection::execute_open() { logger().debug("{} trigger open, was {}", *this, static_cast(state)); state = state_t::open; - // satisfy the handshake's promise - h.promise.set_value(); + write_state = write_state_t::open; + state_changed.set_value(); + state_changed = seastar::shared_promise<>(); + seastar::with_gate(pending_dispatch, [this] { // start background processing of tags return handle_tags() diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 9fd3bdf2943..cc8ab2db49c 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -61,6 +61,18 @@ class SocketConnection : public Connection { closing }; state_t state = state_t::none; + // wait until current state changed + seastar::shared_promise<> state_changed; + + // write_state is changed with state atomically, indicating the write + // behavior of the according state. + enum class write_state_t { + none, + delay, + open, + drop + }; + write_state_t write_state = write_state_t::none; /// become valid only when state is state_t::closing seastar::shared_future<> close_ready; @@ -74,7 +86,6 @@ class SocketConnection : public Connection { uint32_t connect_seq = 0; uint32_t peer_global_seq = 0; uint32_t global_seq; - seastar::promise<> promise; } h; /// server side of handshake negotiation @@ -112,10 +123,12 @@ class SocketConnection : public Connection { seastar::future<> handle_tags(); seastar::future<> handle_ack(); + seastar::future<> write_event(MessageRef msg=nullptr); + /// becomes available when handshake completes, and when all previous messages /// have been sent to the output stream. send() chains new messages as /// continuations to this future to act as a queue - seastar::future<> send_ready; + seastar::future<> send_ready = seastar::now(); /// encode/write a message seastar::future<> write_message(MessageRef msg); @@ -156,13 +169,15 @@ class SocketConnection : public Connection { } __attribute__((packed)) ack; ceph_timespec ack_stamp; } k; + bool m_keepalive = false; + bool m_keepalive_ack = false; seastar::future<> fault(); void execute_open(); - seastar::future<> do_send(MessageRef msg); seastar::future<> do_keepalive(); + seastar::future<> do_keepalive_ack(); seastar::future<> do_close(); public: -- 2.39.5