return seastar::now();
}
-void Protocol::notify_keepalive_ack()
+void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
{
- if (!need_keepalive_ack) {
- need_keepalive_ack = true;
- write_event();
- }
+ logger().debug("{} got keepalive ack {}", conn, _keepalive_ack);
+ keepalive_ack = _keepalive_ack;
+ write_event();
}
seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
case write_state_t::open: {
size_t num_msgs = conn.out_q.size();
// we must have something to write...
- ceph_assert(num_msgs || need_keepalive || need_keepalive_ack);
+ ceph_assert(num_msgs || need_keepalive || keepalive_ack.has_value());
Message* msg_ptr = nullptr;
if (likely(num_msgs)) {
msg_ptr = conn.out_q.front().get();
}
// sweep all pending writes with the concrete Protocol
return socket->write(do_sweep_messages(
- conn.out_q, num_msgs, need_keepalive, need_keepalive_ack))
- .then([this, msg_ptr, num_msgs] {
+ conn.out_q, num_msgs, need_keepalive, keepalive_ack))
+ .then([this, msg_ptr, num_msgs, prv_keepalive_ack=keepalive_ack] {
need_keepalive = false;
- need_keepalive_ack = false;
+ if (keepalive_ack == prv_keepalive_ack) {
+ keepalive_ack = std::nullopt;
+ }
if (likely(num_msgs && msg_ptr == conn.out_q.front().get())) {
// we have sent some messages successfully
// and the out_q was not reset during socket write
conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
}
- if (conn.out_q.empty()) {
+ if (conn.out_q.empty() && !keepalive_ack.has_value()) {
// good, we have nothing pending to send now.
return socket->flush().then([this] {
- if (conn.out_q.empty() && !need_keepalive && !need_keepalive_ack) {
+ if (conn.out_q.empty() && !need_keepalive && !keepalive_ack.has_value()) {
// still nothing pending to send after flush,
// the dispatching can ONLY stop now
ceph_assert(write_dispatching);
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- bool require_keepalive_ack) = 0;
+ std::optional<utime_t> keepalive_ack) = 0;
public:
const proto_t proto_type;
state_changed = seastar::shared_promise<>();
}
- void notify_keepalive_ack();
+ void notify_keepalive_ack(utime_t keepalive_ack);
private:
write_state_t write_state = write_state_t::none;
seastar::shared_future<> close_ready;
bool need_keepalive = false;
- bool need_keepalive_ack = false;
+ std::optional<utime_t> keepalive_ack = std::nullopt;
bool write_dispatching = false;
seastar::future<stop_t> do_write_dispatch_sweep();
void write_event();
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- bool require_keepalive_ack)
+ std::optional<utime_t> _keepalive_ack)
{
static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
sizeof(ceph_msg_header) +
bl.append(create_static(k.req));
}
- if (unlikely(require_keepalive_ack)) {
- logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
+ if (unlikely(_keepalive_ack.has_value())) {
+ logger().debug("{} write keepalive2 ack {}", conn, *_keepalive_ack);
+ k.ack.stamp = ceph_timespec(*_keepalive_ack);
bl.append(create_static(k.ack));
}
{
return socket->read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
- k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
- logger().debug("{} got keepalive2 {}", conn, k.ack.stamp.tv_sec);
- notify_keepalive_ack();
+ utime_t ack{*reinterpret_cast<const ceph_timespec*>(buf.get())};
+ notify_keepalive_ack(ack);
});
}
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- bool require_keepalive_ack) override;
+ std::optional<utime_t> keepalive_ack) override;
private:
SocketMessenger &messenger;
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- bool require_keepalive_ack)
+ std::optional<utime_t> _keepalive_ack)
{
ceph::bufferlist bl;
bl.append(keepalive_frame.get_buffer(session_stream_handlers));
}
- if (unlikely(require_keepalive_ack)) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
+ if (unlikely(_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
}
return read_frame_payload().then([this] {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
- last_keepalive_ack_to_send = keepalive_frame.timestamp();
- logger().debug("{} got KEEPALIVE2 {}",
- conn, last_keepalive_ack_to_send);
+ notify_keepalive_ack(keepalive_frame.timestamp());
conn.set_last_keepalive(seastar::lowres_system_clock::now());
- notify_keepalive_ack();
});
case Tag::KEEPALIVE2_ACK:
return read_frame_payload().then([this] {
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
- bool require_keepalive_ack) override;
+ std::optional<utime_t> keepalive_ack) override;
private:
SocketMessenger &messenger;
uint64_t peer_global_seq = 0;
uint64_t connect_seq = 0;
- utime_t last_keepalive_ack_to_send;
-
// TODO: Frame related implementations, probably to a separate class.
private:
bool record_io = false;