write_event();
}
+void Protocol::requeue_sent()
+{
+ assert(write_state != write_state_t::open);
+ if (conn.sent.empty()) {
+ return;
+ }
+
+ conn.out_seq -= conn.sent.size();
+ logger().debug("{} requeue {} items, revert out_seq to {}",
+ conn, conn.sent.size(), conn.out_seq);
+ for (MessageRef& msg : conn.sent) {
+ msg->clear_payload();
+ msg->set_seq(0);
+ }
+ conn.out_q.insert(conn.out_q.begin(),
+ std::make_move_iterator(conn.sent.begin()),
+ std::make_move_iterator(conn.sent.end()));
+ conn.sent.clear();
+}
+
+void Protocol::requeue_up_to(seq_num_t seq)
+{
+ assert(write_state != write_state_t::open);
+ if (conn.sent.empty() && conn.out_q.empty()) {
+ logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
+ conn, conn.out_seq, seq);
+ conn.out_seq = seq;
+ return;
+ }
+ logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
+ conn, seq, conn.sent.size(), conn.out_seq);
+ while (!conn.sent.empty()) {
+ auto cur_seq = conn.sent.front()->get_seq();
+ if (cur_seq == 0 || cur_seq > seq) {
+ break;
+ } else {
+ conn.sent.pop_front();
+ }
+ }
+ requeue_sent();
+}
+
void Protocol::reset_write()
{
assert(write_state != write_state_t::open);
keepalive_ack = std::nullopt;
}
+void Protocol::ack_writes(seq_num_t seq)
+{
+ if (conn.policy.lossy) { // lossy connections don't keep sent messages
+ return;
+ }
+ while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
+ logger().trace("{} got ack seq {} >= {}, pop {}",
+ conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
+ conn.sent.pop_front();
+ }
+}
+
seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
{
switch (write_state) {
conn.pending_q.clear();
conn.pending_q.swap(conn.out_q);
+ if (!conn.policy.lossy) {
+ conn.sent.insert(conn.sent.end(),
+ conn.pending_q.begin(),
+ conn.pending_q.end());
+ }
// sweep all pending writes with the concrete Protocol
return socket->write(do_sweep_messages(
conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
void notify_keepalive_ack(utime_t keepalive_ack);
+ void requeue_up_to(seq_num_t seq);
+
+ void requeue_sent();
+
void reset_write();
bool is_queued() const {
keepalive_ack.has_value());
}
+ void ack_writes(seq_num_t seq);
+
private:
write_state_t write_state = write_state_t::none;
// wait until current state changed
reply_tag = CEPH_MSGR_TAG_READY;
}
if (!existing->is_lossy()) {
- // reset the in_seq if this is a hard reset from peer,
- // otherwise we respect our original connection's value
- conn.in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
- // steal outgoing queue and out_seq
- existing->requeue_sent();
- std::tie(conn.out_seq, conn.out_q) = existing->get_out_queue();
+ // XXX: we decided not to support lossless connection in v1. as the
+ // client's default policy is
+ // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
+ // lossy. And by the time
+ // will all be performed using v2 protocol.
+ ceph_abort("lossless policy not supported for v1");
}
seastar::do_with(
std::move(existing),
case Tag::SERVER_IDENT:
return read_frame_payload().then([this] {
// handle_server_ident() logic
+ requeue_sent();
auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
+ requeue_up_to(reconnect_ok.msg_seq());
// TODO
- // discard_requeued_up_to()
// backoff = utime_t();
return dispatcher.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(
logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
// this is required for the case when this connection is being replaced
- // TODO
- // out_seq = discard_requeued_up_to(out_seq, 0);
+ requeue_up_to(0);
conn.in_seq = 0;
if (!conn.policy.lossy) {
return bl;
}
-void ProtocolV2::handle_message_ack(seq_num_t seq) {
- if (conn.policy.lossy) { // lossy connections don't keep sent messages
- return;
- }
-
- // TODO: lossless policy
-}
-
seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
{
return read_frame_payload()
if (!conn.policy.lossy) {
// ++ack_left;
}
- handle_message_ack(current_header.ack_seq);
+ ack_writes(current_header.ack_seq);
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
// handle_message_ack() logic
auto ack = AckFrame::Decode(rx_segments_data.back());
logger().debug("{} GOT AckFrame: seq={}", ack.seq());
- handle_message_ack(ack.seq());
+ ack_writes(ack.seq());
});
case Tag::KEEPALIVE2:
return read_frame_payload().then([this] {
// READY
seastar::future<> read_message(utime_t throttle_stamp);
- void handle_message_ack(seq_num_t seq);
void execute_ready();
// STANDBY
});
}
-void SocketConnection::requeue_sent()
-{
- out_seq -= sent.size();
- while (!sent.empty()) {
- auto m = sent.front();
- sent.pop_front();
- out_q.push_back(std::move(m));
- }
-}
-
bool SocketConnection::update_rx_seq(seq_num_t seq)
{
if (seq <= in_seq) {
void start_accept(SocketFRef&& socket,
const entity_addr_t& peer_addr);
- seq_num_t rx_seq_num() const {
- return in_seq;
- }
-
bool is_server_side() const {
return policy.server;
}
return policy.lossy;
}
- /// move all messages in the sent list back into the queue
- void requeue_sent();
-
- std::tuple<seq_num_t, std::deque<MessageRef>> get_out_queue() {
- return {out_seq, std::move(out_q)};
- }
-
friend class Protocol;
friend class ProtocolV1;
friend class ProtocolV2;