From: Yingxin Cheng Date: Thu, 25 Apr 2019 21:50:19 +0000 (+0800) Subject: crimson/net: use optional keepalive_ack to notify writes X-Git-Tag: v15.1.0~2798^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4091f76a6865ebd706479fa8e361424b97cffd99;p=ceph-ci.git crimson/net: use optional keepalive_ack to notify writes Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 3e8538fa06e..3131f568324 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -87,12 +87,11 @@ seastar::future<> Protocol::keepalive() return seastar::now(); } -void Protocol::notify_keepalive_ack() +void Protocol::notify_keepalive_ack(utime_t _keepalive_ack) { - if (!need_keepalive_ack) { - need_keepalive_ack = true; - write_event(); - } + logger().debug("{} got keepalive ack {}", conn, _keepalive_ack); + keepalive_ack = _keepalive_ack; + write_event(); } seastar::future Protocol::do_write_dispatch_sweep() @@ -101,26 +100,28 @@ seastar::future Protocol::do_write_dispatch_sweep() case write_state_t::open: { size_t num_msgs = conn.out_q.size(); // we must have something to write... - ceph_assert(num_msgs || need_keepalive || need_keepalive_ack); + ceph_assert(num_msgs || need_keepalive || keepalive_ack.has_value()); Message* msg_ptr = nullptr; if (likely(num_msgs)) { msg_ptr = conn.out_q.front().get(); } // sweep all pending writes with the concrete Protocol return socket->write(do_sweep_messages( - conn.out_q, num_msgs, need_keepalive, need_keepalive_ack)) - .then([this, msg_ptr, num_msgs] { + conn.out_q, num_msgs, need_keepalive, keepalive_ack)) + .then([this, msg_ptr, num_msgs, prv_keepalive_ack=keepalive_ack] { need_keepalive = false; - need_keepalive_ack = false; + if (keepalive_ack == prv_keepalive_ack) { + keepalive_ack = std::nullopt; + } if (likely(num_msgs && msg_ptr == conn.out_q.front().get())) { // we have sent some messages successfully // and the out_q was not reset during socket write conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs); } - if (conn.out_q.empty()) { + if (conn.out_q.empty() && !keepalive_ack.has_value()) { // good, we have nothing pending to send now. return socket->flush().then([this] { - if (conn.out_q.empty() && !need_keepalive && !need_keepalive_ack) { + if (conn.out_q.empty() && !need_keepalive && !keepalive_ack.has_value()) { // still nothing pending to send after flush, // the dispatching can ONLY stop now ceph_assert(write_dispatching); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 78ffcb6f0df..5da96375350 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -48,7 +48,7 @@ class Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - bool require_keepalive_ack) = 0; + std::optional keepalive_ack) = 0; public: const proto_t proto_type; @@ -75,7 +75,7 @@ class Protocol { state_changed = seastar::shared_promise<>(); } - void notify_keepalive_ack(); + void notify_keepalive_ack(utime_t keepalive_ack); private: write_state_t write_state = write_state_t::none; @@ -87,7 +87,7 @@ class Protocol { seastar::shared_future<> close_ready; bool need_keepalive = false; - bool need_keepalive_ack = false; + std::optional keepalive_ack = std::nullopt; bool write_dispatching = false; seastar::future do_write_dispatch_sweep(); void write_event(); diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 5225de1f0fa..7b775d74b2c 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -655,7 +655,7 @@ ceph::bufferlist ProtocolV1::do_sweep_messages( const std::deque& msgs, size_t num_msgs, bool require_keepalive, - bool require_keepalive_ack) + std::optional _keepalive_ack) { static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + sizeof(ceph_msg_header) + @@ -680,8 +680,9 @@ ceph::bufferlist ProtocolV1::do_sweep_messages( bl.append(create_static(k.req)); } - if (unlikely(require_keepalive_ack)) { - logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec); + if (unlikely(_keepalive_ack.has_value())) { + logger().debug("{} write keepalive2 ack {}", conn, *_keepalive_ack); + k.ack.stamp = ceph_timespec(*_keepalive_ack); bl.append(create_static(k.ack)); } @@ -736,9 +737,8 @@ seastar::future<> ProtocolV1::handle_keepalive2() { return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { - k.ack.stamp = *reinterpret_cast(buf.get()); - logger().debug("{} got keepalive2 {}", conn, k.ack.stamp.tv_sec); - notify_keepalive_ack(); + utime_t ack{*reinterpret_cast(buf.get())}; + notify_keepalive_ack(ack); }); } diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h index 5f6a75da781..53539ca0f7f 100644 --- a/src/crimson/net/ProtocolV1.h +++ b/src/crimson/net/ProtocolV1.h @@ -30,7 +30,7 @@ class ProtocolV1 final : public Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - bool require_keepalive_ack) override; + std::optional keepalive_ack) override; private: SocketMessenger &messenger; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 877a07b6471..315936140bd 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1346,7 +1346,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( const std::deque& msgs, size_t num_msgs, bool require_keepalive, - bool require_keepalive_ack) + std::optional _keepalive_ack) { ceph::bufferlist bl; @@ -1355,8 +1355,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( bl.append(keepalive_frame.get_buffer(session_stream_handlers)); } - if (unlikely(require_keepalive_ack)) { - auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send); + if (unlikely(_keepalive_ack.has_value())) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack); bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); } @@ -1540,11 +1540,8 @@ void ProtocolV2::execute_ready() return read_frame_payload().then([this] { // handle_keepalive2() logic auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back()); - last_keepalive_ack_to_send = keepalive_frame.timestamp(); - logger().debug("{} got KEEPALIVE2 {}", - conn, last_keepalive_ack_to_send); + notify_keepalive_ack(keepalive_frame.timestamp()); conn.set_last_keepalive(seastar::lowres_system_clock::now()); - notify_keepalive_ack(); }); case Tag::KEEPALIVE2_ACK: return read_frame_payload().then([this] { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index eb1bc65005d..b8681c2199d 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -29,7 +29,7 @@ class ProtocolV2 final : public Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - bool require_keepalive_ack) override; + std::optional keepalive_ack) override; private: SocketMessenger &messenger; @@ -72,8 +72,6 @@ class ProtocolV2 final : public Protocol { uint64_t peer_global_seq = 0; uint64_t connect_seq = 0; - utime_t last_keepalive_ack_to_send; - // TODO: Frame related implementations, probably to a separate class. private: bool record_io = false;