In order to introduce the cross-core IOHandler class.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
std::optional<utime_t> keepalive_ack,
bool require_ack)
{
- ceph::bufferlist bl = do_sweep_messages(conn.out_q,
- num_msgs,
- require_keepalive,
- keepalive_ack,
+ ceph::bufferlist bl = do_sweep_messages(out_q,
+ num_msgs,
+ require_keepalive,
+ keepalive_ack,
require_ack);
if (!conn.policy.lossy) {
- conn.sent.insert(conn.sent.end(),
- std::make_move_iterator(conn.out_q.begin()),
- std::make_move_iterator(conn.out_q.end()));
+ sent.insert(sent.end(),
+ std::make_move_iterator(out_q.begin()),
+ std::make_move_iterator(out_q.end()));
}
- conn.out_q.clear();
+ out_q.clear();
return bl;
}
seastar::future<> Protocol::send(MessageURef msg)
{
if (write_state != write_state_t::drop) {
- conn.out_q.push_back(std::move(msg));
+ out_q.push_back(std::move(msg));
write_event();
}
return seastar::now();
void Protocol::requeue_sent()
{
assert(write_state != write_state_t::open);
- if (conn.sent.empty()) {
+ if (sent.empty()) {
return;
}
- conn.out_seq -= conn.sent.size();
+ out_seq -= sent.size();
logger().debug("{} requeue {} items, revert out_seq to {}",
- conn, conn.sent.size(), conn.out_seq);
- for (MessageURef& msg : conn.sent) {
+ conn, sent.size(), out_seq);
+ for (MessageURef& msg : sent) {
msg->clear_payload();
msg->set_seq(0);
}
- conn.out_q.insert(conn.out_q.begin(),
- std::make_move_iterator(conn.sent.begin()),
- std::make_move_iterator(conn.sent.end()));
- conn.sent.clear();
+ out_q.insert(out_q.begin(),
+ std::make_move_iterator(sent.begin()),
+ std::make_move_iterator(sent.end()));
+ sent.clear();
write_event();
}
void Protocol::requeue_up_to(seq_num_t seq)
{
assert(write_state != write_state_t::open);
- if (conn.sent.empty() && conn.out_q.empty()) {
+ if (sent.empty() && out_q.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
- conn, conn.out_seq, seq);
- conn.out_seq = seq;
+ conn, out_seq, seq);
+ out_seq = seq;
return;
}
logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
- conn, seq, conn.sent.size(), conn.out_seq);
- while (!conn.sent.empty()) {
- auto cur_seq = conn.sent.front()->get_seq();
+ conn, seq, sent.size(), out_seq);
+ while (!sent.empty()) {
+ auto cur_seq = sent.front()->get_seq();
if (cur_seq == 0 || cur_seq > seq) {
break;
} else {
- conn.sent.pop_front();
+ sent.pop_front();
}
}
requeue_sent();
void Protocol::reset_write()
{
assert(write_state != write_state_t::open);
- conn.out_seq = 0;
- conn.out_q.clear();
- conn.sent.clear();
+ out_seq = 0;
+ out_q.clear();
+ sent.clear();
need_keepalive = false;
keepalive_ack = std::nullopt;
ack_left = 0;
if (conn.policy.lossy) { // lossy connections don't keep sent messages
return;
}
- while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
+ while (!sent.empty() && sent.front()->get_seq() <= seq) {
logger().trace("{} got ack seq {} >= {}, pop {}",
- conn, seq, conn.sent.front()->get_seq(), *conn.sent.front());
- conn.sent.pop_front();
+ conn, seq, sent.front()->get_seq(), *sent.front());
+ sent.pop_front();
}
}
return seastar::repeat([this] {
switch (write_state) {
case write_state_t::open: {
- size_t num_msgs = conn.out_q.size();
+ size_t num_msgs = out_q.size();
bool still_queued = is_queued();
if (unlikely(!still_queued)) {
return try_exit_sweep();
}
auto acked = ack_left;
- assert(acked == 0 || conn.in_seq > 0);
+ assert(acked == 0 || in_seq > 0);
// sweep all pending writes with the concrete Protocol
return conn.socket->write(sweep_messages_and_move_to_sent(
num_msgs, need_keepalive, keepalive_ack, acked > 0)
virtual void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) = 0;
- virtual void print(std::ostream&) const = 0;
+ virtual void print_conn(std::ostream&) const = 0;
+
protected:
Protocol(ChainedDispatchers& dispatchers,
SocketConnection& conn);
// the write state-machine
public:
+ using clock_t = seastar::lowres_system_clock;
+
seastar::future<> send(MessageURef msg);
+
seastar::future<> keepalive();
+ clock_t::time_point get_last_keepalive() const {
+ return last_keepalive;
+ }
+
+ clock_t::time_point get_last_keepalive_ack() const {
+ return last_keepalive_ack;
+ }
+
+ void set_last_keepalive_ack(clock_t::time_point when) {
+ last_keepalive_ack = when;
+ }
+
+ struct io_stat_printer {
+ const Protocol &protocol;
+ };
+ void print_io_stat(std::ostream &out) const {
+ out << "io_stat("
+ << "in_seq=" << in_seq
+ << ", out_seq=" << out_seq
+ << ", out_q_size=" << out_q.size()
+ << ", sent_size=" << sent.size()
+ << ", need_ack=" << (ack_left > 0)
+ << ", need_keepalive=" << need_keepalive
+ << ", need_keepalive_ack=" << bool(keepalive_ack)
+ << ")";
+ }
+
// TODO: encapsulate a SessionedSender class
protected:
// write_state is changed with state atomically, indicating the write
void reset_write();
+ void reset_read() {
+ in_seq = 0;
+ }
+
bool is_queued() const {
- return (!conn.out_q.empty() ||
+ return (!out_q.empty() ||
ack_left > 0 ||
need_keepalive ||
keepalive_ack.has_value());
}
+ bool is_queued_or_sent() const {
+ return is_queued() || !sent.empty();
+ }
+
void ack_writes(seq_num_t seq);
+
+ void set_last_keepalive(clock_t::time_point when) {
+ last_keepalive = when;
+ }
+
+ seq_num_t get_in_seq() const {
+ return in_seq;
+ }
+
+ void set_in_seq(seq_num_t _in_seq) {
+ in_seq = _in_seq;
+ }
+
+ seq_num_t increment_out() {
+ return ++out_seq;
+ }
+
crimson::common::Gated gate;
private:
write_state_t write_state = write_state_t::none;
+
// wait until current state changed
seastar::shared_promise<> state_changed;
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+
+ // messages to be resent after connection gets reset
+ std::deque<MessageURef> out_q;
+
+ // messages sent, but not yet acked by peer
+ std::deque<MessageURef> sent;
+
bool need_keepalive = false;
std::optional<utime_t> keepalive_ack = std::nullopt;
uint64_t ack_left = 0;
// it needs to wait for exit_open until writing is stopped or failed.
std::optional<seastar::shared_promise<>> exit_open;
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+
+ clock_t::time_point last_keepalive;
+
+ clock_t::time_point last_keepalive_ack;
+
seastar::future<stop_t> try_exit_sweep();
seastar::future<> do_write_dispatch_sweep();
void write_event();
};
inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
- proto.print(out);
+ proto.print_conn(out);
return out;
}
+inline std::ostream& operator<<(
+ std::ostream& out, Protocol::io_stat_printer stat) {
+ stat.protocol.print_io_stat(out);
+ return out;
+}
} // namespace crimson::net
conn, func_name, get_state_name(state), eptr);
close(true);
} else if (conn.policy.server ||
- (conn.policy.standby &&
- (!is_queued() && conn.sent.empty()))) {
+ (conn.policy.standby && !is_queued_or_sent())) {
logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
conn, func_name, get_state_name(state), eptr);
execute_standby();
{
server_cookie = 0;
connect_seq = 0;
- conn.in_seq = 0;
+ reset_read();
if (full) {
client_cookie = generate_client_cookie();
peer_global_seq = 0;
server_cookie,
global_seq,
connect_seq,
- conn.in_seq);
+ get_in_seq());
logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
- " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ " server_cookie={}, gs={}, cs={}, in_seq={}",
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
- global_seq, connect_seq, conn.in_seq);
+ global_seq, connect_seq, get_in_seq());
return write_frame(reconnect).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
}
switch (next) {
case next_step_t::ready: {
- logger().info("{} connected:"
- " gs={}, pgs={}, cs={}, client_cookie={},"
- " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, conn.in_seq,
- conn.out_seq, conn.out_q.size());
+ client_cookie, server_cookie,
+ io_stat_printer{*this});
execute_ready(true);
break;
}
}
if (conn.policy.server ||
- (conn.policy.standby &&
- (!is_queued() && conn.sent.empty()))) {
+ (conn.policy.standby && !is_queued_or_sent())) {
logger().info("{} execute_connecting(): fault at {} with nothing to send,"
" going to STANDBY -- {}",
conn, get_state_name(state), eptr);
conn, get_state_name(state));
abort_protocol();
}
- logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
- " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ logger().info("{} established: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, conn.in_seq,
- conn.out_seq, conn.out_q.size());
+ client_cookie, server_cookie,
+ io_stat_printer{*this});
execute_ready(false);
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::ESTABLISHING) {
// this is required for the case when this connection is being replaced
requeue_up_to(0);
- conn.in_seq = 0;
+ reset_read();
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
requeue_up_to(new_msg_seq);
- auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
- logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
+ auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
return write_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
conn, get_state_name(state));
abort_protocol();
}
- logger().info("{} replaced ({}):"
- " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
- " in_seq={}, out_seq={}, out_q={}",
+ logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
conn, reconnect ? "reconnected" : "connected",
- global_seq, peer_global_seq, connect_seq, client_cookie,
- server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
+ global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ io_stat_printer{*this});
execute_ready(false);
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::REPLACING) {
}
if (require_ack && num_msgs == 0u) {
- auto ack_frame = AckFrame::Encode(conn.in_seq);
+ auto ack_frame = AckFrame::Encode(get_in_seq());
bl.append(ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
msg->encode(conn.features, 0);
ceph_assert(!msg->get_seq() && "message already has seq");
- msg->set_seq(++conn.out_seq);
+ msg->set_seq(increment_out());
ceph_msg_header &header = msg->get_header();
ceph_msg_footer &footer = msg->get_footer();
header.type, header.priority,
header.version,
ceph_le32(0), header.data_off,
- ceph_le64(conn.in_seq),
+ ceph_le64(get_in_seq()),
footer.flags, header.compat_version,
header.reserved};
// client side queueing because messages can't be renumbered, but the (kernel)
// client will occasionally pull a message out of the sent queue to send
// elsewhere. in that case it doesn't matter if we "got" it or not.
- uint64_t cur_seq = conn.in_seq;
+ uint64_t cur_seq = get_in_seq();
if (message->get_seq() <= cur_seq) {
logger().error("{} got old message {} <= {} {}, discarding",
conn, message->get_seq(), cur_seq, *message);
}
// note last received message.
- conn.in_seq = message->get_seq();
+ set_in_seq(message->get_seq());
logger().debug("{} <== #{} === {} ({})",
conn, message->get_seq(), *message, message->get_type());
notify_ack();
logger().debug("{} GOT KeepAliveFrame: timestamp={}",
conn, keepalive_frame.timestamp());
notify_keepalive_ack(keepalive_frame.timestamp());
- conn.set_last_keepalive(seastar::lowres_system_clock::now());
+ set_last_keepalive(seastar::lowres_system_clock::now());
});
case Tag::KEEPALIVE2_ACK:
return read_frame_payload().then([this] {
// handle_keepalive2_ack() logic
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
- conn.set_last_keepalive_ack(
- seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
+ auto _last_keepalive_ack =
+ seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()};
+ set_last_keepalive_ack(_last_keepalive_ack);
logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
- conn, conn.last_keepalive_ack);
+ conn, _last_keepalive_ack);
});
default: {
unexpected_tag(tag, conn, "execute_ready");
conn.shared_from_this()));
}
-void ProtocolV2::print(std::ostream& out) const
+void ProtocolV2::print_conn(std::ostream& out) const
{
out << conn;
}
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV2() override;
- void print(std::ostream&) const final;
+ void print_conn(std::ostream&) const final;
+
private:
void on_closed() override;
bool is_connected() const override;
private:
void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
- void reset_session(bool full);
+ void reset_session(bool is_full);
seastar::future<std::tuple<entity_type_t, entity_addr_t>>
banner_exchange(bool is_connect);
});
}
-void SocketConnection::mark_down()
+SocketConnection::clock_t::time_point
+SocketConnection::get_last_keepalive() const
{
- assert(seastar::this_shard_id() == shard_id());
- protocol->close(false);
+ return protocol->get_last_keepalive();
}
-bool SocketConnection::update_rx_seq(seq_num_t seq)
+SocketConnection::clock_t::time_point
+SocketConnection::get_last_keepalive_ack() const
{
- if (seq <= in_seq) {
- if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
- local_conf()->ms_die_on_old_message) {
- ceph_abort_msg("old msgs despite reconnect_seq feature");
- }
- return false;
- } else if (seq > in_seq + 1) {
- if (local_conf()->ms_die_on_skipped_message) {
- ceph_abort_msg("skipped incoming seq");
- }
- return false;
- } else {
- in_seq = seq;
- return true;
- }
+ return protocol->get_last_keepalive_ack();
+}
+
+void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
+{
+ protocol->set_last_keepalive_ack(when);
+}
+
+void SocketConnection::mark_down()
+{
+ assert(seastar::this_shard_id() == shard_id());
+ protocol->close(false);
}
void
// or should reconnect to (as peer)
entity_addr_t target_addr;
- clock_t::time_point last_keepalive;
-
- clock_t::time_point last_keepalive_ack;
-
uint64_t features = 0;
ceph::net::Policy<crimson::common::Throttle> policy;
- /// the seq num of the last transmitted message
- seq_num_t out_seq = 0;
- /// the seq num of the last received message
- seq_num_t in_seq = 0;
-
- // messages to be resent after connection gets reset
- std::deque<MessageURef> out_q;
- // messages sent, but not yet acked by peer
- std::deque<MessageURef> sent;
-
uint64_t peer_global_id = 0;
std::unique_ptr<user_private_t> user_private;
seastar::future<> keepalive() override;
- clock_t::time_point get_last_keepalive() const override {
- return last_keepalive;
- }
+ clock_t::time_point get_last_keepalive() const override;
- clock_t::time_point get_last_keepalive_ack() const override {
- return last_keepalive_ack;
- }
+ clock_t::time_point get_last_keepalive_ack() const override;
- void set_last_keepalive_ack(clock_t::time_point when) override {
- last_keepalive_ack = when;
- }
+ void set_last_keepalive_ack(clock_t::time_point when) override;
void mark_down() override;
private:
seastar::shard_id shard_id() const;
- /// update the seq num of last received message
- /// @returns true if the @c seq is valid, and @c in_seq is updated,
- /// false otherwise.
- bool update_rx_seq(seq_num_t seq);
-
void set_peer_type(entity_type_t peer_type) {
// it is not allowed to assign an unknown value when the current
// value is known
set_peer_id(name.num());
}
- void set_last_keepalive(clock_t::time_point when) {
- last_keepalive = when;
- }
-
void set_features(uint64_t f) {
features = f;
}