From: Yingxin Cheng Date: Thu, 3 Nov 2022 02:48:56 +0000 (+0800) Subject: crimson/net: rename in/out related members and methods X-Git-Tag: v18.1.0~375^2~31 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1ce6acbf498d7c2f12f906525fa66b1128044805;p=ceph.git crimson/net: rename in/out related members and methods Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 760a97579291d..7be09915a9461 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -465,7 +465,7 @@ void Client::tick() gate.dispatch_in_background(__func__, *this, [this] { if (active_con) { return seastar::when_all_succeed(wait_for_send_log(), - active_con->get_conn()->keepalive(), + active_con->get_conn()->send_keepalive(), active_con->renew_tickets(), active_con->renew_rotating_keyring()).discard_result(); } else { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 7cb78438dc3f8..4c90f6e6852c2 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -76,14 +76,14 @@ class Connection : public seastar::enable_shared_from_this { virtual seastar::future<> send(MessageURef msg) = 0; /** - * keepalive + * send_keepalive * * Send a keepalive message over a connection that has completed its * handshake. * * May be invoked from any core. */ - virtual seastar::future<> keepalive() = 0; + virtual seastar::future<> send_keepalive() = 0; virtual clock_t::time_point get_last_keepalive() const = 0; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 1adfe895455ff..27fd98f54aeb1 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -28,7 +28,7 @@ Protocol::Protocol(ChainedDispatchers& dispatchers, Protocol::~Protocol() { ceph_assert(gate.is_closed()); - assert(!exit_open); + assert(!out_exit_dispatching); } void Protocol::close(bool dispatch_reset, @@ -53,7 +53,7 @@ void Protocol::close(bool dispatch_reset, if (conn.socket) { conn.socket->shutdown(); } - set_write_state(write_state_t::drop); + set_out_state(out_state_t::drop); assert(!gate.is_closed()); auto gate_closed = gate.close(); @@ -86,139 +86,143 @@ void Protocol::close(bool dispatch_reset, }); } -ceph::bufferlist Protocol::sweep_messages_and_move_to_sent( +ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent( size_t num_msgs, bool require_keepalive, - std::optional keepalive_ack, + std::optional maybe_keepalive_ack, bool require_ack) { - ceph::bufferlist bl = do_sweep_messages(out_q, + ceph::bufferlist bl = do_sweep_messages(out_pending_msgs, num_msgs, require_keepalive, - keepalive_ack, + maybe_keepalive_ack, require_ack); if (!conn.policy.lossy) { - sent.insert(sent.end(), - std::make_move_iterator(out_q.begin()), - std::make_move_iterator(out_q.end())); + out_sent_msgs.insert( + out_sent_msgs.end(), + std::make_move_iterator(out_pending_msgs.begin()), + std::make_move_iterator(out_pending_msgs.end())); } - out_q.clear(); + out_pending_msgs.clear(); return bl; } seastar::future<> Protocol::send(MessageURef msg) { - if (write_state != write_state_t::drop) { - out_q.push_back(std::move(msg)); - write_event(); + if (out_state != out_state_t::drop) { + out_pending_msgs.push_back(std::move(msg)); + notify_out_dispatch(); } return seastar::now(); } -seastar::future<> Protocol::keepalive() +seastar::future<> Protocol::send_keepalive() { if (!need_keepalive) { need_keepalive = true; - write_event(); + notify_out_dispatch(); } return seastar::now(); } -void Protocol::notify_keepalive_ack(utime_t _keepalive_ack) +void Protocol::notify_keepalive_ack(utime_t keepalive_ack) { - logger().trace("{} got keepalive ack {}", conn, _keepalive_ack); - keepalive_ack = _keepalive_ack; - write_event(); + logger().trace("{} got keepalive ack {}", conn, keepalive_ack); + next_keepalive_ack = keepalive_ack; + notify_out_dispatch(); } void Protocol::notify_ack() { if (!conn.policy.lossy) { ++ack_left; - write_event(); + notify_out_dispatch(); } } -void Protocol::requeue_sent() +void Protocol::requeue_out_sent() { - assert(write_state != write_state_t::open); - if (sent.empty()) { + assert(out_state != out_state_t::open); + if (out_sent_msgs.empty()) { return; } - out_seq -= sent.size(); + out_seq -= out_sent_msgs.size(); logger().debug("{} requeue {} items, revert out_seq to {}", - conn, sent.size(), out_seq); - for (MessageURef& msg : sent) { + conn, out_sent_msgs.size(), out_seq); + for (MessageURef& msg : out_sent_msgs) { msg->clear_payload(); msg->set_seq(0); } - out_q.insert(out_q.begin(), - std::make_move_iterator(sent.begin()), - std::make_move_iterator(sent.end())); - sent.clear(); - write_event(); + out_pending_msgs.insert( + out_pending_msgs.begin(), + std::make_move_iterator(out_sent_msgs.begin()), + std::make_move_iterator(out_sent_msgs.end())); + out_sent_msgs.clear(); + notify_out_dispatch(); } -void Protocol::requeue_up_to(seq_num_t seq) +void Protocol::requeue_out_sent_up_to(seq_num_t seq) { - assert(write_state != write_state_t::open); - if (sent.empty() && out_q.empty()) { + assert(out_state != out_state_t::open); + if (out_sent_msgs.empty() && out_pending_msgs.empty()) { logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", conn, out_seq, seq); out_seq = seq; return; } - logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})", - conn, seq, sent.size(), out_seq); - while (!sent.empty()) { - auto cur_seq = sent.front()->get_seq(); + logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})", + conn, seq, out_sent_msgs.size(), out_seq); + while (!out_sent_msgs.empty()) { + auto cur_seq = out_sent_msgs.front()->get_seq(); if (cur_seq == 0 || cur_seq > seq) { break; } else { - sent.pop_front(); + out_sent_msgs.pop_front(); } } - requeue_sent(); + requeue_out_sent(); } -void Protocol::reset_write() +void Protocol::reset_out() { - assert(write_state != write_state_t::open); + assert(out_state != out_state_t::open); out_seq = 0; - out_q.clear(); - sent.clear(); + out_pending_msgs.clear(); + out_sent_msgs.clear(); need_keepalive = false; - keepalive_ack = std::nullopt; + next_keepalive_ack = std::nullopt; ack_left = 0; } -void Protocol::ack_writes(seq_num_t seq) +void Protocol::ack_out_sent(seq_num_t seq) { if (conn.policy.lossy) { // lossy connections don't keep sent messages return; } - while (!sent.empty() && sent.front()->get_seq() <= seq) { + while (!out_sent_msgs.empty() && + out_sent_msgs.front()->get_seq() <= seq) { logger().trace("{} got ack seq {} >= {}, pop {}", - conn, seq, sent.front()->get_seq(), *sent.front()); - sent.pop_front(); + conn, seq, out_sent_msgs.front()->get_seq(), + *out_sent_msgs.front()); + out_sent_msgs.pop_front(); } } -seastar::future Protocol::try_exit_sweep() { - assert(!is_queued()); +seastar::future Protocol::try_exit_out_dispatch() { + assert(!is_out_queued()); return conn.socket->flush().then([this] { - if (!is_queued()) { + if (!is_out_queued()) { // still nothing pending to send after flush, // the dispatching can ONLY stop now - ceph_assert(write_dispatching); - write_dispatching = false; - if (unlikely(exit_open.has_value())) { - exit_open->set_value(); - exit_open = std::nullopt; - logger().info("{} write_event: nothing queued at {}," - " set exit_open", - conn, write_state); + ceph_assert(out_dispatching); + out_dispatching = false; + if (unlikely(out_exit_dispatching.has_value())) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: nothing queued at {}," + " set out_exit_dispatching", + conn, out_state); } return seastar::make_ready_future(stop_t::yes); } else { @@ -228,56 +232,57 @@ seastar::future Protocol::try_exit_sweep() { }); } -seastar::future<> Protocol::do_write_dispatch_sweep() +seastar::future<> Protocol::do_out_dispatch() { return seastar::repeat([this] { - switch (write_state) { - case write_state_t::open: { - size_t num_msgs = out_q.size(); - bool still_queued = is_queued(); + switch (out_state) { + case out_state_t::open: { + size_t num_msgs = out_pending_msgs.size(); + bool still_queued = is_out_queued(); if (unlikely(!still_queued)) { - return try_exit_sweep(); + return try_exit_out_dispatch(); } - auto acked = ack_left; - assert(acked == 0 || in_seq > 0); - // sweep all pending writes with the concrete Protocol - return conn.socket->write(sweep_messages_and_move_to_sent( - num_msgs, need_keepalive, keepalive_ack, acked > 0) - ).then([this, prv_keepalive_ack=keepalive_ack, acked] { + auto to_ack = ack_left; + assert(to_ack == 0 || in_seq > 0); + // sweep all pending out with the concrete Protocol + return conn.socket->write( + sweep_out_pending_msgs_to_sent( + num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0) + ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { need_keepalive = false; - if (keepalive_ack == prv_keepalive_ack) { - keepalive_ack = std::nullopt; + if (next_keepalive_ack == prv_keepalive_ack) { + next_keepalive_ack = std::nullopt; } - assert(ack_left >= acked); - ack_left -= acked; - if (!is_queued()) { - return try_exit_sweep(); + assert(ack_left >= to_ack); + ack_left -= to_ack; + if (!is_out_queued()) { + return try_exit_out_dispatch(); } else { // messages were enqueued during socket write return seastar::make_ready_future(stop_t::no); } }); } - case write_state_t::delay: - // delay dispatching writes until open - if (exit_open) { - exit_open->set_value(); - exit_open = std::nullopt; - logger().info("{} write_event: delay and set exit_open ...", conn); + case out_state_t::delay: + // delay out dispatching until open + if (out_exit_dispatching) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn); } else { - logger().info("{} write_event: delay ...", conn); + logger().info("{} do_out_dispatch: delay ...", conn); } - return state_changed.get_shared_future() - .then([] { return stop_t::no; }); - case write_state_t::drop: - ceph_assert(write_dispatching); - write_dispatching = false; - if (exit_open) { - exit_open->set_value(); - exit_open = std::nullopt; - logger().info("{} write_event: dropped and set exit_open", conn); + return out_state_changed.get_shared_future( + ).then([] { return stop_t::no; }); + case out_state_t::drop: + ceph_assert(out_dispatching); + out_dispatching = false; + if (out_exit_dispatching) { + out_exit_dispatching->set_value(); + out_exit_dispatching = std::nullopt; + logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn); } else { - logger().info("{} write_event: dropped", conn); + logger().info("{} do_out_dispatch: dropped", conn); } return seastar::make_ready_future(stop_t::yes); default: @@ -287,42 +292,42 @@ seastar::future<> Protocol::do_write_dispatch_sweep() if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset && e.code() != error::negotiation_failure) { - logger().error("{} write_event(): unexpected error at {} -- {}", - conn, write_state, e); + logger().error("{} do_out_dispatch(): unexpected error at {} -- {}", + conn, out_state, e); ceph_abort(); } conn.socket->shutdown(); - if (write_state == write_state_t::open) { - logger().info("{} write_event(): fault at {}, going to delay -- {}", - conn, write_state, e); - write_state = write_state_t::delay; + if (out_state == out_state_t::open) { + logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}", + conn, out_state, e); + out_state = out_state_t::delay; } else { - logger().info("{} write_event(): fault at {} -- {}", - conn, write_state, e); + logger().info("{} do_out_dispatch(): fault at {} -- {}", + conn, out_state, e); } - return do_write_dispatch_sweep(); + return do_out_dispatch(); }); } -void Protocol::write_event() +void Protocol::notify_out_dispatch() { - notify_write(); - if (write_dispatching) { + notify_out(); + if (out_dispatching) { // already dispatching return; } - write_dispatching = true; - switch (write_state) { - case write_state_t::open: + out_dispatching = true; + switch (out_state) { + case out_state_t::open: [[fallthrough]]; - case write_state_t::delay: + case out_state_t::delay: assert(!gate.is_closed()); - gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] { - return do_write_dispatch_sweep(); + gate.dispatch_in_background("do_out_dispatch", *this, [this] { + return do_out_dispatch(); }); return; - case write_state_t::drop: - write_dispatching = false; + case out_state_t::drop: + out_dispatching = false; return; default: ceph_assert(false); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 268ffc9968204..c71b37f07c4d1 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -62,23 +62,12 @@ class Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional keepalive_ack, + std::optional maybe_keepalive_ack, bool require_ack) = 0; - virtual void notify_write() {}; + virtual void notify_out() = 0; - virtual void on_closed() {} - - private: - ceph::bufferlist sweep_messages_and_move_to_sent( - size_t num_msgs, - bool require_keepalive, - std::optional keepalive_ack, - bool require_ack); - - protected: - ChainedDispatchers& dispatchers; - SocketConnection &conn; + virtual void on_closed() = 0; private: bool closed = false; @@ -91,7 +80,7 @@ class Protocol { seastar::future<> send(MessageURef msg); - seastar::future<> keepalive(); + seastar::future<> send_keepalive(); clock_t::time_point get_last_keepalive() const { return last_keepalive; @@ -112,40 +101,44 @@ class Protocol { out << "io_stat(" << "in_seq=" << in_seq << ", out_seq=" << out_seq - << ", out_q_size=" << out_q.size() - << ", sent_size=" << sent.size() + << ", out_pending_msgs_size=" << out_pending_msgs.size() + << ", out_sent_msgs_size=" << out_sent_msgs.size() << ", need_ack=" << (ack_left > 0) << ", need_keepalive=" << need_keepalive - << ", need_keepalive_ack=" << bool(keepalive_ack) + << ", need_keepalive_ack=" << bool(next_keepalive_ack) << ")"; } // TODO: encapsulate a SessionedSender class protected: - // write_state is changed with state atomically, indicating the write - // behavior of the according state. - enum class write_state_t : uint8_t { + /** + * out_state_t + * + * The out_state is changed with protocol state atomically, indicating the + * out behavior of the according protocol state. + */ + enum class out_state_t : uint8_t { none, delay, open, drop }; - friend class fmt::formatter; - void set_write_state(const write_state_t& state) { - if (write_state == write_state_t::open && - state != write_state_t::open && - write_dispatching) { - exit_open = seastar::shared_promise<>(); + friend class fmt::formatter; + void set_out_state(const out_state_t& state) { + if (out_state == out_state_t::open && + state != out_state_t::open && + out_dispatching) { + out_exit_dispatching = seastar::shared_promise<>(); } - write_state = state; - state_changed.set_value(); - state_changed = seastar::shared_promise<>(); + out_state = state; + out_state_changed.set_value(); + out_state_changed = seastar::shared_promise<>(); } - seastar::future<> wait_write_exit() { - if (exit_open) { - return exit_open->get_shared_future(); + seastar::future<> wait_out_exit_dispatching() { + if (out_exit_dispatching) { + return out_exit_dispatching->get_shared_future(); } return seastar::now(); } @@ -154,28 +147,28 @@ class Protocol { void notify_ack(); - void requeue_up_to(seq_num_t seq); + void requeue_out_sent_up_to(seq_num_t seq); - void requeue_sent(); + void requeue_out_sent(); - void reset_write(); + void reset_out(); - void reset_read() { + void reset_in() { in_seq = 0; } - bool is_queued() const { - return (!out_q.empty() || + bool is_out_queued() const { + return (!out_pending_msgs.empty() || ack_left > 0 || need_keepalive || - keepalive_ack.has_value()); + next_keepalive_ack.has_value()); } - bool is_queued_or_sent() const { - return is_queued() || !sent.empty(); + bool is_out_queued_or_sent() const { + return is_out_queued() || !out_sent_msgs.empty(); } - void ack_writes(seq_num_t seq); + void ack_out_sent(seq_num_t seq); void set_last_keepalive(clock_t::time_point when) { last_keepalive = when; @@ -189,35 +182,63 @@ class Protocol { in_seq = _in_seq; } - seq_num_t increment_out() { + seq_num_t increment_out_seq() { return ++out_seq; } crimson::common::Gated gate; + ChainedDispatchers& dispatchers; + + SocketConnection &conn; + private: - write_state_t write_state = write_state_t::none; + seastar::future try_exit_out_dispatch(); + + seastar::future<> do_out_dispatch(); + + ceph::bufferlist sweep_out_pending_msgs_to_sent( + size_t num_msgs, + bool require_keepalive, + std::optional maybe_keepalive_ack, + bool require_ack); + + void notify_out_dispatch(); + + /* + * out states for writing + */ - // wait until current state changed - seastar::shared_promise<> state_changed; + out_state_t out_state = out_state_t::none; + + // wait until current out_state changed + seastar::shared_promise<> out_state_changed; + + bool out_dispatching = false; + + // If another continuation is trying to close or replace socket when + // out_dispatching is true and out_state is open, it needs to wait for + // out_exit_dispatching until writing is stopped or failed. + std::optional> out_exit_dispatching; /// the seq num of the last transmitted message seq_num_t out_seq = 0; // messages to be resent after connection gets reset - std::deque out_q; + std::deque out_pending_msgs; // messages sent, but not yet acked by peer - std::deque sent; + std::deque out_sent_msgs; bool need_keepalive = false; - std::optional keepalive_ack = std::nullopt; + + std::optional next_keepalive_ack = std::nullopt; + uint64_t ack_left = 0; - bool write_dispatching = false; - // If another continuation is trying to close or replace socket when - // write_dispatching is true and write_state is open, - // it needs to wait for exit_open until writing is stopped or failed. - std::optional> exit_open; + + /* + * in states for reading + */ /// the seq num of the last received message seq_num_t in_seq = 0; @@ -225,10 +246,6 @@ class Protocol { clock_t::time_point last_keepalive; clock_t::time_point last_keepalive_ack; - - seastar::future try_exit_sweep(); - seastar::future<> do_write_dispatch_sweep(); - void write_event(); }; inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) { @@ -245,11 +262,11 @@ inline std::ostream& operator<<( } // namespace crimson::net template <> -struct fmt::formatter +struct fmt::formatter : fmt::formatter { template - auto format(crimson::net::Protocol::write_state_t state, FormatContext& ctx) { - using enum crimson::net::Protocol::write_state_t; + auto format(crimson::net::Protocol::out_state_t state, FormatContext& ctx) { + using enum crimson::net::Protocol::out_state_t; std::string_view name; switch (state) { case none: diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 0e74ce827a594..0c911afdc2729 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -359,7 +359,7 @@ seastar::future<> ProtocolV2::write_frame(F &frame, bool flush) } } -void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant) +void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant) { if (!reentrant && _state == state) { logger().error("{} is not allowed to re-trigger state {}", @@ -369,7 +369,7 @@ void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool logger().debug("{} TRIGGER {}, was {}", conn, get_state_name(_state), get_state_name(state)); state = _state; - set_write_state(_write_state); + set_out_state(_out_state); } void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr) @@ -379,7 +379,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e conn, func_name, get_state_name(state), eptr); close(true); } else if (conn.policy.server || - (conn.policy.standby && !is_queued_or_sent())) { + (conn.policy.standby && !is_out_queued_or_sent())) { logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}", conn, func_name, get_state_name(state), eptr); execute_standby(); @@ -398,11 +398,11 @@ void ProtocolV2::reset_session(bool full) { server_cookie = 0; connect_seq = 0; - reset_read(); + reset_in(); if (full) { client_cookie = generate_client_cookie(); peer_global_seq = 0; - reset_write(); + reset_out(); dispatchers.ms_handle_remote_reset( seastar::static_pointer_cast(conn.shared_from_this())); } @@ -678,7 +678,7 @@ ProtocolV2::client_connect() case Tag::SERVER_IDENT: return read_frame_payload().then([this] { // handle_server_ident() logic - requeue_sent(); + requeue_out_sent(); auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back()); logger().debug("{} GOT ServerIdentFrame:" " addrs={}, gid={}, gs={}," @@ -800,7 +800,7 @@ ProtocolV2::client_reconnect() auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back()); logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", conn, reconnect_ok.msg_seq()); - requeue_up_to(reconnect_ok.msg_seq()); + requeue_out_sent_up_to(reconnect_ok.msg_seq()); return seastar::make_ready_future(next_step_t::ready); }); default: { @@ -813,7 +813,7 @@ ProtocolV2::client_reconnect() void ProtocolV2::execute_connecting() { - trigger_state(state_t::CONNECTING, write_state_t::delay, false); + trigger_state(state_t::CONNECTING, out_state_t::delay, false); if (conn.socket) { conn.socket->shutdown(); } @@ -829,7 +829,7 @@ void ProtocolV2::execute_connecting() assert(server_cookie == 0); logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); } - return wait_write_exit().then([this] { + return wait_out_exit_dispatching().then([this] { if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} before Socket::connect()", conn, get_state_name(state)); @@ -925,7 +925,7 @@ void ProtocolV2::execute_connecting() } if (conn.policy.server || - (conn.policy.standby && !is_queued_or_sent())) { + (conn.policy.standby && !is_out_queued_or_sent())) { logger().info("{} execute_connecting(): fault at {} with nothing to send," " going to STANDBY -- {}", conn, get_state_name(state), eptr); @@ -1178,7 +1178,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)" " and lose to existing {}, ask client to wait", conn, client_cookie, existing_proto->client_cookie, *existing_conn); - return existing_conn->keepalive().then([this] { + return existing_conn->send_keepalive().then([this] { return send_wait(); }); } @@ -1457,7 +1457,7 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { - trigger_state(state_t::ACCEPTING, write_state_t::none, false); + trigger_state(state_t::ACCEPTING, out_state_t::none, false); gate.dispatch_in_background("execute_accepting", *this, [this] { return seastar::futurize_invoke([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); @@ -1578,7 +1578,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) { conn.shared_from_this())); }; - trigger_state(state_t::ESTABLISHING, write_state_t::delay, false); + trigger_state(state_t::ESTABLISHING, out_state_t::delay, false); if (existing_conn) { existing_conn->protocol->close( true /* dispatch_reset */, std::move(accept_me)); @@ -1635,8 +1635,8 @@ ProtocolV2::send_server_ident() logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); // this is required for the case when this connection is being replaced - requeue_up_to(0); - reset_read(); + requeue_out_sent_up_to(0); + reset_in(); if (!conn.policy.lossy) { server_cookie = ceph::util::generate_random_number(1, -1ll); @@ -1682,7 +1682,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, uint64_t new_connect_seq, uint64_t new_msg_seq) { - trigger_state(state_t::REPLACING, write_state_t::delay, false); + trigger_state(state_t::REPLACING, out_state_t::delay, false); if (conn.socket) { conn.socket->shutdown(); } @@ -1699,7 +1699,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, new_conn_features, new_peer_supported_features, new_peer_global_seq, new_connect_seq, new_msg_seq] () mutable { - return wait_write_exit().then([this, do_reset] { + return wait_out_exit_dispatching().then([this, do_reset] { if (do_reset) { reset_session(true); } @@ -1735,7 +1735,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (reconnect) { connect_seq = new_connect_seq; // send_reconnect_ok() logic - requeue_up_to(new_msg_seq); + requeue_out_sent_up_to(new_msg_seq); auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq()); logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq()); return write_frame(reconnect_ok); @@ -1783,7 +1783,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional _keepalive_ack, + std::optional maybe_keepalive_ack, bool require_ack) { ceph::bufferlist bl; @@ -1794,8 +1794,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE); } - if (unlikely(_keepalive_ack.has_value())) { - auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); + if (unlikely(maybe_keepalive_ack.has_value())) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack); bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm)); INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE); } @@ -1814,7 +1814,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( msg->encode(conn.features, 0); ceph_assert(!msg->get_seq() && "message already has seq"); - msg->set_seq(increment_out()); + msg->set_seq(increment_out_seq()); ceph_msg_header &header = msg->get_header(); ceph_msg_footer &footer = msg->get_footer(); @@ -1916,7 +1916,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) logger().debug("{} <== #{} === {} ({})", conn, message->get_seq(), *message, message->get_type()); notify_ack(); - ack_writes(current_header.ack_seq); + ack_out_sent(current_header.ack_seq); // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; @@ -1928,7 +1928,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) void ProtocolV2::execute_ready(bool dispatch_connect) { assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); - trigger_state(state_t::READY, write_state_t::open, false); + trigger_state(state_t::READY, out_state_t::open, false); if (dispatch_connect) { dispatchers.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())); @@ -1978,7 +1978,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect) // handle_message_ack() logic auto ack = AckFrame::Decode(rx_segments_data.back()); logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq()); - ack_writes(ack.seq()); + ack_out_sent(ack.seq()); }); case Tag::KEEPALIVE2: return read_frame_payload().then([this] { @@ -2022,16 +2022,16 @@ void ProtocolV2::execute_ready(bool dispatch_connect) void ProtocolV2::execute_standby() { - trigger_state(state_t::STANDBY, write_state_t::delay, false); + trigger_state(state_t::STANDBY, out_state_t::delay, false); if (conn.socket) { conn.socket->shutdown(); } } -void ProtocolV2::notify_write() +void ProtocolV2::notify_out() { if (unlikely(state == state_t::STANDBY && !conn.policy.server)) { - logger().info("{} notify_write(): at {}, going to CONNECTING", + logger().info("{} notify_out(): at {}, going to CONNECTING", conn, get_state_name(state)); execute_connecting(); } @@ -2041,7 +2041,7 @@ void ProtocolV2::notify_write() void ProtocolV2::execute_wait(bool max_backoff) { - trigger_state(state_t::WAIT, write_state_t::delay, false); + trigger_state(state_t::WAIT, out_state_t::delay, false); if (conn.socket) { conn.socket->shutdown(); } @@ -2075,7 +2075,7 @@ void ProtocolV2::execute_wait(bool max_backoff) void ProtocolV2::execute_server_wait() { - trigger_state(state_t::SERVER_WAIT, write_state_t::none, false); + trigger_state(state_t::SERVER_WAIT, out_state_t::none, false); gated_execute("execute_server_wait", [this] { return read_exactly(1).then([this] (auto bl) { logger().warn("{} SERVER_WAIT got read, abort", conn); @@ -2110,7 +2110,7 @@ void ProtocolV2::trigger_close() } protocol_timer.cancel(); - trigger_state(state_t::CLOSING, write_state_t::drop, false); + trigger_state(state_t::CLOSING, out_state_t::drop, false); } void ProtocolV2::on_closed() diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index b580110834f9d..cfd5781ff0435 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -41,7 +41,7 @@ class ProtocolV2 final : public Protocol { std::optional keepalive_ack, bool require_ack) override; - void notify_write() override; + void notify_out() override; private: SocketMessenger &messenger; @@ -76,7 +76,7 @@ class ProtocolV2 final : public Protocol { return statenames[static_cast(state)]; } - void trigger_state(state_t state, write_state_t write_state, bool reentrant); + void trigger_state(state_t state, out_state_t out_state, bool reentrant); uint64_t peer_supported_features = 0; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e5419125ae971..bd7259c6e7c94 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -75,12 +75,12 @@ seastar::future<> SocketConnection::send(MessageURef msg) }); } -seastar::future<> SocketConnection::keepalive() +seastar::future<> SocketConnection::send_keepalive() { return seastar::smp::submit_to( shard_id(), [this] { - return protocol->keepalive(); + return protocol->send_keepalive(); }); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 5e928de79db59..ea18407e4591e 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -82,7 +82,7 @@ class SocketConnection : public Connection { seastar::future<> send(MessageURef msg) override; - seastar::future<> keepalive() override; + seastar::future<> send_keepalive() override; clock_t::time_point get_last_keepalive() const override; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index a0b982645e8cf..e5215a0ac55bc 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -197,7 +197,7 @@ static seastar::future<> test_echo(unsigned rounds, [this, conn, &count_ping, &count_keepalive] { return seastar::repeat([this, conn, &count_ping, &count_keepalive] { if (keepalive_dist(rng)) { - return conn->keepalive() + return conn->send_keepalive() .then([&count_keepalive] { count_keepalive += 1; return seastar::make_ready_future( @@ -1155,7 +1155,7 @@ class FailoverSuite : public Dispatcher { seastar::future<> keepalive_peer() { logger().info("[Test] keepalive_peer()"); ceph_assert(tracked_conn); - return tracked_conn->keepalive(); + return tracked_conn->send_keepalive(); } seastar::future<> try_send_peer() { @@ -1524,7 +1524,7 @@ class FailoverSuitePeer : public Dispatcher { seastar::future<> keepalive_peer() { logger().info("[TestPeer] keepalive_peer()"); ceph_assert(tracked_conn); - return tracked_conn->keepalive(); + return tracked_conn->send_keepalive(); } seastar::future<> markdown() {