gate.dispatch_in_background(__func__, *this, [this] {
if (active_con) {
return seastar::when_all_succeed(wait_for_send_log(),
- active_con->get_conn()->keepalive(),
+ active_con->get_conn()->send_keepalive(),
active_con->renew_tickets(),
active_con->renew_rotating_keyring()).discard_result();
} else {
virtual seastar::future<> send(MessageURef msg) = 0;
/**
- * keepalive
+ * send_keepalive
*
* Send a keepalive message over a connection that has completed its
* handshake.
*
* May be invoked from any core.
*/
- virtual seastar::future<> keepalive() = 0;
+ virtual seastar::future<> send_keepalive() = 0;
virtual clock_t::time_point get_last_keepalive() const = 0;
Protocol::~Protocol()
{
ceph_assert(gate.is_closed());
- assert(!exit_open);
+ assert(!out_exit_dispatching);
}
void Protocol::close(bool dispatch_reset,
if (conn.socket) {
conn.socket->shutdown();
}
- set_write_state(write_state_t::drop);
+ set_out_state(out_state_t::drop);
assert(!gate.is_closed());
auto gate_closed = gate.close();
});
}
-ceph::bufferlist Protocol::sweep_messages_and_move_to_sent(
+ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> keepalive_ack,
+ std::optional<utime_t> maybe_keepalive_ack,
bool require_ack)
{
- ceph::bufferlist bl = do_sweep_messages(out_q,
+ ceph::bufferlist bl = do_sweep_messages(out_pending_msgs,
num_msgs,
require_keepalive,
- keepalive_ack,
+ maybe_keepalive_ack,
require_ack);
if (!conn.policy.lossy) {
- sent.insert(sent.end(),
- std::make_move_iterator(out_q.begin()),
- std::make_move_iterator(out_q.end()));
+ out_sent_msgs.insert(
+ out_sent_msgs.end(),
+ std::make_move_iterator(out_pending_msgs.begin()),
+ std::make_move_iterator(out_pending_msgs.end()));
}
- out_q.clear();
+ out_pending_msgs.clear();
return bl;
}
seastar::future<> Protocol::send(MessageURef msg)
{
- if (write_state != write_state_t::drop) {
- out_q.push_back(std::move(msg));
- write_event();
+ if (out_state != out_state_t::drop) {
+ out_pending_msgs.push_back(std::move(msg));
+ notify_out_dispatch();
}
return seastar::now();
}
-seastar::future<> Protocol::keepalive()
+seastar::future<> Protocol::send_keepalive()
{
if (!need_keepalive) {
need_keepalive = true;
- write_event();
+ notify_out_dispatch();
}
return seastar::now();
}
-void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
+void Protocol::notify_keepalive_ack(utime_t keepalive_ack)
{
- logger().trace("{} got keepalive ack {}", conn, _keepalive_ack);
- keepalive_ack = _keepalive_ack;
- write_event();
+ logger().trace("{} got keepalive ack {}", conn, keepalive_ack);
+ next_keepalive_ack = keepalive_ack;
+ notify_out_dispatch();
}
void Protocol::notify_ack()
{
if (!conn.policy.lossy) {
++ack_left;
- write_event();
+ notify_out_dispatch();
}
}
-void Protocol::requeue_sent()
+void Protocol::requeue_out_sent()
{
- assert(write_state != write_state_t::open);
- if (sent.empty()) {
+ assert(out_state != out_state_t::open);
+ if (out_sent_msgs.empty()) {
return;
}
- out_seq -= sent.size();
+ out_seq -= out_sent_msgs.size();
logger().debug("{} requeue {} items, revert out_seq to {}",
- conn, sent.size(), out_seq);
- for (MessageURef& msg : sent) {
+ conn, out_sent_msgs.size(), out_seq);
+ for (MessageURef& msg : out_sent_msgs) {
msg->clear_payload();
msg->set_seq(0);
}
- out_q.insert(out_q.begin(),
- std::make_move_iterator(sent.begin()),
- std::make_move_iterator(sent.end()));
- sent.clear();
- write_event();
+ out_pending_msgs.insert(
+ out_pending_msgs.begin(),
+ std::make_move_iterator(out_sent_msgs.begin()),
+ std::make_move_iterator(out_sent_msgs.end()));
+ out_sent_msgs.clear();
+ notify_out_dispatch();
}
-void Protocol::requeue_up_to(seq_num_t seq)
+void Protocol::requeue_out_sent_up_to(seq_num_t seq)
{
- assert(write_state != write_state_t::open);
- if (sent.empty() && out_q.empty()) {
+ assert(out_state != out_state_t::open);
+ if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
conn, out_seq, seq);
out_seq = seq;
return;
}
- logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
- conn, seq, sent.size(), out_seq);
- while (!sent.empty()) {
- auto cur_seq = sent.front()->get_seq();
+ logger().debug("{} discarding sent msgs by seq {} (sent_len={}, out_seq={})",
+ conn, seq, out_sent_msgs.size(), out_seq);
+ while (!out_sent_msgs.empty()) {
+ auto cur_seq = out_sent_msgs.front()->get_seq();
if (cur_seq == 0 || cur_seq > seq) {
break;
} else {
- sent.pop_front();
+ out_sent_msgs.pop_front();
}
}
- requeue_sent();
+ requeue_out_sent();
}
-void Protocol::reset_write()
+void Protocol::reset_out()
{
- assert(write_state != write_state_t::open);
+ assert(out_state != out_state_t::open);
out_seq = 0;
- out_q.clear();
- sent.clear();
+ out_pending_msgs.clear();
+ out_sent_msgs.clear();
need_keepalive = false;
- keepalive_ack = std::nullopt;
+ next_keepalive_ack = std::nullopt;
ack_left = 0;
}
-void Protocol::ack_writes(seq_num_t seq)
+void Protocol::ack_out_sent(seq_num_t seq)
{
if (conn.policy.lossy) { // lossy connections don't keep sent messages
return;
}
- while (!sent.empty() && sent.front()->get_seq() <= seq) {
+ while (!out_sent_msgs.empty() &&
+ out_sent_msgs.front()->get_seq() <= seq) {
logger().trace("{} got ack seq {} >= {}, pop {}",
- conn, seq, sent.front()->get_seq(), *sent.front());
- sent.pop_front();
+ conn, seq, out_sent_msgs.front()->get_seq(),
+ *out_sent_msgs.front());
+ out_sent_msgs.pop_front();
}
}
-seastar::future<stop_t> Protocol::try_exit_sweep() {
- assert(!is_queued());
+seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
+ assert(!is_out_queued());
return conn.socket->flush().then([this] {
- if (!is_queued()) {
+ if (!is_out_queued()) {
// still nothing pending to send after flush,
// the dispatching can ONLY stop now
- ceph_assert(write_dispatching);
- write_dispatching = false;
- if (unlikely(exit_open.has_value())) {
- exit_open->set_value();
- exit_open = std::nullopt;
- logger().info("{} write_event: nothing queued at {},"
- " set exit_open",
- conn, write_state);
+ ceph_assert(out_dispatching);
+ out_dispatching = false;
+ if (unlikely(out_exit_dispatching.has_value())) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: nothing queued at {},"
+ " set out_exit_dispatching",
+ conn, out_state);
}
return seastar::make_ready_future<stop_t>(stop_t::yes);
} else {
});
}
-seastar::future<> Protocol::do_write_dispatch_sweep()
+seastar::future<> Protocol::do_out_dispatch()
{
return seastar::repeat([this] {
- switch (write_state) {
- case write_state_t::open: {
- size_t num_msgs = out_q.size();
- bool still_queued = is_queued();
+ switch (out_state) {
+ case out_state_t::open: {
+ size_t num_msgs = out_pending_msgs.size();
+ bool still_queued = is_out_queued();
if (unlikely(!still_queued)) {
- return try_exit_sweep();
+ return try_exit_out_dispatch();
}
- auto acked = ack_left;
- assert(acked == 0 || in_seq > 0);
- // sweep all pending writes with the concrete Protocol
- return conn.socket->write(sweep_messages_and_move_to_sent(
- num_msgs, need_keepalive, keepalive_ack, acked > 0)
- ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
+ auto to_ack = ack_left;
+ assert(to_ack == 0 || in_seq > 0);
+ // sweep all pending out with the concrete Protocol
+ return conn.socket->write(
+ sweep_out_pending_msgs_to_sent(
+ num_msgs, need_keepalive, next_keepalive_ack, to_ack > 0)
+ ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
need_keepalive = false;
- if (keepalive_ack == prv_keepalive_ack) {
- keepalive_ack = std::nullopt;
+ if (next_keepalive_ack == prv_keepalive_ack) {
+ next_keepalive_ack = std::nullopt;
}
- assert(ack_left >= acked);
- ack_left -= acked;
- if (!is_queued()) {
- return try_exit_sweep();
+ assert(ack_left >= to_ack);
+ ack_left -= to_ack;
+ if (!is_out_queued()) {
+ return try_exit_out_dispatch();
} else {
// messages were enqueued during socket write
return seastar::make_ready_future<stop_t>(stop_t::no);
}
});
}
- case write_state_t::delay:
- // delay dispatching writes until open
- if (exit_open) {
- exit_open->set_value();
- exit_open = std::nullopt;
- logger().info("{} write_event: delay and set exit_open ...", conn);
+ case out_state_t::delay:
+ // delay out dispatching until open
+ if (out_exit_dispatching) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
} else {
- logger().info("{} write_event: delay ...", conn);
+ logger().info("{} do_out_dispatch: delay ...", conn);
}
- return state_changed.get_shared_future()
- .then([] { return stop_t::no; });
- case write_state_t::drop:
- ceph_assert(write_dispatching);
- write_dispatching = false;
- if (exit_open) {
- exit_open->set_value();
- exit_open = std::nullopt;
- logger().info("{} write_event: dropped and set exit_open", conn);
+ return out_state_changed.get_shared_future(
+ ).then([] { return stop_t::no; });
+ case out_state_t::drop:
+ ceph_assert(out_dispatching);
+ out_dispatching = false;
+ if (out_exit_dispatching) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
} else {
- logger().info("{} write_event: dropped", conn);
+ logger().info("{} do_out_dispatch: dropped", conn);
}
return seastar::make_ready_future<stop_t>(stop_t::yes);
default:
if (e.code() != std::errc::broken_pipe &&
e.code() != std::errc::connection_reset &&
e.code() != error::negotiation_failure) {
- logger().error("{} write_event(): unexpected error at {} -- {}",
- conn, write_state, e);
+ logger().error("{} do_out_dispatch(): unexpected error at {} -- {}",
+ conn, out_state, e);
ceph_abort();
}
conn.socket->shutdown();
- if (write_state == write_state_t::open) {
- logger().info("{} write_event(): fault at {}, going to delay -- {}",
- conn, write_state, e);
- write_state = write_state_t::delay;
+ if (out_state == out_state_t::open) {
+ logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
+ conn, out_state, e);
+ out_state = out_state_t::delay;
} else {
- logger().info("{} write_event(): fault at {} -- {}",
- conn, write_state, e);
+ logger().info("{} do_out_dispatch(): fault at {} -- {}",
+ conn, out_state, e);
}
- return do_write_dispatch_sweep();
+ return do_out_dispatch();
});
}
-void Protocol::write_event()
+void Protocol::notify_out_dispatch()
{
- notify_write();
- if (write_dispatching) {
+ notify_out();
+ if (out_dispatching) {
// already dispatching
return;
}
- write_dispatching = true;
- switch (write_state) {
- case write_state_t::open:
+ out_dispatching = true;
+ switch (out_state) {
+ case out_state_t::open:
[[fallthrough]];
- case write_state_t::delay:
+ case out_state_t::delay:
assert(!gate.is_closed());
- gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
- return do_write_dispatch_sweep();
+ gate.dispatch_in_background("do_out_dispatch", *this, [this] {
+ return do_out_dispatch();
});
return;
- case write_state_t::drop:
- write_dispatching = false;
+ case out_state_t::drop:
+ out_dispatching = false;
return;
default:
ceph_assert(false);
const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> keepalive_ack,
+ std::optional<utime_t> maybe_keepalive_ack,
bool require_ack) = 0;
- virtual void notify_write() {};
+ virtual void notify_out() = 0;
- virtual void on_closed() {}
-
- private:
- ceph::bufferlist sweep_messages_and_move_to_sent(
- size_t num_msgs,
- bool require_keepalive,
- std::optional<utime_t> keepalive_ack,
- bool require_ack);
-
- protected:
- ChainedDispatchers& dispatchers;
- SocketConnection &conn;
+ virtual void on_closed() = 0;
private:
bool closed = false;
seastar::future<> send(MessageURef msg);
- seastar::future<> keepalive();
+ seastar::future<> send_keepalive();
clock_t::time_point get_last_keepalive() const {
return last_keepalive;
out << "io_stat("
<< "in_seq=" << in_seq
<< ", out_seq=" << out_seq
- << ", out_q_size=" << out_q.size()
- << ", sent_size=" << sent.size()
+ << ", out_pending_msgs_size=" << out_pending_msgs.size()
+ << ", out_sent_msgs_size=" << out_sent_msgs.size()
<< ", need_ack=" << (ack_left > 0)
<< ", need_keepalive=" << need_keepalive
- << ", need_keepalive_ack=" << bool(keepalive_ack)
+ << ", need_keepalive_ack=" << bool(next_keepalive_ack)
<< ")";
}
// TODO: encapsulate a SessionedSender class
protected:
- // write_state is changed with state atomically, indicating the write
- // behavior of the according state.
- enum class write_state_t : uint8_t {
+ /**
+ * out_state_t
+ *
+ * The out_state is changed with protocol state atomically, indicating the
+ * out behavior of the according protocol state.
+ */
+ enum class out_state_t : uint8_t {
none,
delay,
open,
drop
};
- friend class fmt::formatter<write_state_t>;
- void set_write_state(const write_state_t& state) {
- if (write_state == write_state_t::open &&
- state != write_state_t::open &&
- write_dispatching) {
- exit_open = seastar::shared_promise<>();
+ friend class fmt::formatter<out_state_t>;
+ void set_out_state(const out_state_t& state) {
+ if (out_state == out_state_t::open &&
+ state != out_state_t::open &&
+ out_dispatching) {
+ out_exit_dispatching = seastar::shared_promise<>();
}
- write_state = state;
- state_changed.set_value();
- state_changed = seastar::shared_promise<>();
+ out_state = state;
+ out_state_changed.set_value();
+ out_state_changed = seastar::shared_promise<>();
}
- seastar::future<> wait_write_exit() {
- if (exit_open) {
- return exit_open->get_shared_future();
+ seastar::future<> wait_out_exit_dispatching() {
+ if (out_exit_dispatching) {
+ return out_exit_dispatching->get_shared_future();
}
return seastar::now();
}
void notify_ack();
- void requeue_up_to(seq_num_t seq);
+ void requeue_out_sent_up_to(seq_num_t seq);
- void requeue_sent();
+ void requeue_out_sent();
- void reset_write();
+ void reset_out();
- void reset_read() {
+ void reset_in() {
in_seq = 0;
}
- bool is_queued() const {
- return (!out_q.empty() ||
+ bool is_out_queued() const {
+ return (!out_pending_msgs.empty() ||
ack_left > 0 ||
need_keepalive ||
- keepalive_ack.has_value());
+ next_keepalive_ack.has_value());
}
- bool is_queued_or_sent() const {
- return is_queued() || !sent.empty();
+ bool is_out_queued_or_sent() const {
+ return is_out_queued() || !out_sent_msgs.empty();
}
- void ack_writes(seq_num_t seq);
+ void ack_out_sent(seq_num_t seq);
void set_last_keepalive(clock_t::time_point when) {
last_keepalive = when;
in_seq = _in_seq;
}
- seq_num_t increment_out() {
+ seq_num_t increment_out_seq() {
return ++out_seq;
}
crimson::common::Gated gate;
+ ChainedDispatchers& dispatchers;
+
+ SocketConnection &conn;
+
private:
- write_state_t write_state = write_state_t::none;
+ seastar::future<stop_t> try_exit_out_dispatch();
+
+ seastar::future<> do_out_dispatch();
+
+ ceph::bufferlist sweep_out_pending_msgs_to_sent(
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> maybe_keepalive_ack,
+ bool require_ack);
+
+ void notify_out_dispatch();
+
+ /*
+ * out states for writing
+ */
- // wait until current state changed
- seastar::shared_promise<> state_changed;
+ out_state_t out_state = out_state_t::none;
+
+ // wait until current out_state changed
+ seastar::shared_promise<> out_state_changed;
+
+ bool out_dispatching = false;
+
+ // If another continuation is trying to close or replace socket when
+ // out_dispatching is true and out_state is open, it needs to wait for
+ // out_exit_dispatching until writing is stopped or failed.
+ std::optional<seastar::shared_promise<>> out_exit_dispatching;
/// the seq num of the last transmitted message
seq_num_t out_seq = 0;
// messages to be resent after connection gets reset
- std::deque<MessageURef> out_q;
+ std::deque<MessageURef> out_pending_msgs;
// messages sent, but not yet acked by peer
- std::deque<MessageURef> sent;
+ std::deque<MessageURef> out_sent_msgs;
bool need_keepalive = false;
- std::optional<utime_t> keepalive_ack = std::nullopt;
+
+ std::optional<utime_t> next_keepalive_ack = std::nullopt;
+
uint64_t ack_left = 0;
- bool write_dispatching = false;
- // If another continuation is trying to close or replace socket when
- // write_dispatching is true and write_state is open,
- // it needs to wait for exit_open until writing is stopped or failed.
- std::optional<seastar::shared_promise<>> exit_open;
+
+ /*
+ * in states for reading
+ */
/// the seq num of the last received message
seq_num_t in_seq = 0;
clock_t::time_point last_keepalive;
clock_t::time_point last_keepalive_ack;
-
- seastar::future<stop_t> try_exit_sweep();
- seastar::future<> do_write_dispatch_sweep();
- void write_event();
};
inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
} // namespace crimson::net
template <>
-struct fmt::formatter<crimson::net::Protocol::write_state_t>
+struct fmt::formatter<crimson::net::Protocol::out_state_t>
: fmt::formatter<std::string_view> {
template <typename FormatContext>
- auto format(crimson::net::Protocol::write_state_t state, FormatContext& ctx) {
- using enum crimson::net::Protocol::write_state_t;
+ auto format(crimson::net::Protocol::out_state_t state, FormatContext& ctx) {
+ using enum crimson::net::Protocol::out_state_t;
std::string_view name;
switch (state) {
case none:
}
}
-void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
{
if (!reentrant && _state == state) {
logger().error("{} is not allowed to re-trigger state {}",
logger().debug("{} TRIGGER {}, was {}",
conn, get_state_name(_state), get_state_name(state));
state = _state;
- set_write_state(_write_state);
+ set_out_state(_out_state);
}
void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
conn, func_name, get_state_name(state), eptr);
close(true);
} else if (conn.policy.server ||
- (conn.policy.standby && !is_queued_or_sent())) {
+ (conn.policy.standby && !is_out_queued_or_sent())) {
logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
conn, func_name, get_state_name(state), eptr);
execute_standby();
{
server_cookie = 0;
connect_seq = 0;
- reset_read();
+ reset_in();
if (full) {
client_cookie = generate_client_cookie();
peer_global_seq = 0;
- reset_write();
+ reset_out();
dispatchers.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
case Tag::SERVER_IDENT:
return read_frame_payload().then([this] {
// handle_server_ident() logic
- requeue_sent();
+ requeue_out_sent();
auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
- requeue_up_to(reconnect_ok.msg_seq());
+ requeue_out_sent_up_to(reconnect_ok.msg_seq());
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
void ProtocolV2::execute_connecting()
{
- trigger_state(state_t::CONNECTING, write_state_t::delay, false);
+ trigger_state(state_t::CONNECTING, out_state_t::delay, false);
if (conn.socket) {
conn.socket->shutdown();
}
assert(server_cookie == 0);
logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
}
- return wait_write_exit().then([this] {
+ return wait_out_exit_dispatching().then([this] {
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} before Socket::connect()",
conn, get_state_name(state));
}
if (conn.policy.server ||
- (conn.policy.standby && !is_queued_or_sent())) {
+ (conn.policy.standby && !is_out_queued_or_sent())) {
logger().info("{} execute_connecting(): fault at {} with nothing to send,"
" going to STANDBY -- {}",
conn, get_state_name(state), eptr);
logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
" and lose to existing {}, ask client to wait",
conn, client_cookie, existing_proto->client_cookie, *existing_conn);
- return existing_conn->keepalive().then([this] {
+ return existing_conn->send_keepalive().then([this] {
return send_wait();
});
}
void ProtocolV2::execute_accepting()
{
- trigger_state(state_t::ACCEPTING, write_state_t::none, false);
+ trigger_state(state_t::ACCEPTING, out_state_t::none, false);
gate.dispatch_in_background("execute_accepting", *this, [this] {
return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
conn.shared_from_this()));
};
- trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+ trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
if (existing_conn) {
existing_conn->protocol->close(
true /* dispatch_reset */, std::move(accept_me));
logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
// this is required for the case when this connection is being replaced
- requeue_up_to(0);
- reset_read();
+ requeue_out_sent_up_to(0);
+ reset_in();
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
uint64_t new_connect_seq,
uint64_t new_msg_seq)
{
- trigger_state(state_t::REPLACING, write_state_t::delay, false);
+ trigger_state(state_t::REPLACING, out_state_t::delay, false);
if (conn.socket) {
conn.socket->shutdown();
}
new_conn_features, new_peer_supported_features,
new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
- return wait_write_exit().then([this, do_reset] {
+ return wait_out_exit_dispatching().then([this, do_reset] {
if (do_reset) {
reset_session(true);
}
if (reconnect) {
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
- requeue_up_to(new_msg_seq);
+ requeue_out_sent_up_to(new_msg_seq);
auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
return write_frame(reconnect_ok);
const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
- std::optional<utime_t> _keepalive_ack,
+ std::optional<utime_t> maybe_keepalive_ack,
bool require_ack)
{
ceph::bufferlist bl;
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
}
- if (unlikely(_keepalive_ack.has_value())) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
+ if (unlikely(maybe_keepalive_ack.has_value())) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
}
msg->encode(conn.features, 0);
ceph_assert(!msg->get_seq() && "message already has seq");
- msg->set_seq(increment_out());
+ msg->set_seq(increment_out_seq());
ceph_msg_header &header = msg->get_header();
ceph_msg_footer &footer = msg->get_footer();
logger().debug("{} <== #{} === {} ({})",
conn, message->get_seq(), *message, message->get_type());
notify_ack();
- ack_writes(current_header.ack_seq);
+ ack_out_sent(current_header.ack_seq);
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
void ProtocolV2::execute_ready(bool dispatch_connect)
{
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
- trigger_state(state_t::READY, write_state_t::open, false);
+ trigger_state(state_t::READY, out_state_t::open, false);
if (dispatch_connect) {
dispatchers.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
// handle_message_ack() logic
auto ack = AckFrame::Decode(rx_segments_data.back());
logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
- ack_writes(ack.seq());
+ ack_out_sent(ack.seq());
});
case Tag::KEEPALIVE2:
return read_frame_payload().then([this] {
void ProtocolV2::execute_standby()
{
- trigger_state(state_t::STANDBY, write_state_t::delay, false);
+ trigger_state(state_t::STANDBY, out_state_t::delay, false);
if (conn.socket) {
conn.socket->shutdown();
}
}
-void ProtocolV2::notify_write()
+void ProtocolV2::notify_out()
{
if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
- logger().info("{} notify_write(): at {}, going to CONNECTING",
+ logger().info("{} notify_out(): at {}, going to CONNECTING",
conn, get_state_name(state));
execute_connecting();
}
void ProtocolV2::execute_wait(bool max_backoff)
{
- trigger_state(state_t::WAIT, write_state_t::delay, false);
+ trigger_state(state_t::WAIT, out_state_t::delay, false);
if (conn.socket) {
conn.socket->shutdown();
}
void ProtocolV2::execute_server_wait()
{
- trigger_state(state_t::SERVER_WAIT, write_state_t::none, false);
+ trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
gated_execute("execute_server_wait", [this] {
return read_exactly(1).then([this] (auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
}
protocol_timer.cancel();
- trigger_state(state_t::CLOSING, write_state_t::drop, false);
+ trigger_state(state_t::CLOSING, out_state_t::drop, false);
}
void ProtocolV2::on_closed()
std::optional<utime_t> keepalive_ack,
bool require_ack) override;
- void notify_write() override;
+ void notify_out() override;
private:
SocketMessenger &messenger;
return statenames[static_cast<int>(state)];
}
- void trigger_state(state_t state, write_state_t write_state, bool reentrant);
+ void trigger_state(state_t state, out_state_t out_state, bool reentrant);
uint64_t peer_supported_features = 0;
});
}
-seastar::future<> SocketConnection::keepalive()
+seastar::future<> SocketConnection::send_keepalive()
{
return seastar::smp::submit_to(
shard_id(),
[this] {
- return protocol->keepalive();
+ return protocol->send_keepalive();
});
}
seastar::future<> send(MessageURef msg) override;
- seastar::future<> keepalive() override;
+ seastar::future<> send_keepalive() override;
clock_t::time_point get_last_keepalive() const override;
[this, conn, &count_ping, &count_keepalive] {
return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
if (keepalive_dist(rng)) {
- return conn->keepalive()
+ return conn->send_keepalive()
.then([&count_keepalive] {
count_keepalive += 1;
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::future<> keepalive_peer() {
logger().info("[Test] keepalive_peer()");
ceph_assert(tracked_conn);
- return tracked_conn->keepalive();
+ return tracked_conn->send_keepalive();
}
seastar::future<> try_send_peer() {
seastar::future<> keepalive_peer() {
logger().info("[TestPeer] keepalive_peer()");
ceph_assert(tracked_conn);
- return tracked_conn->keepalive();
+ return tracked_conn->send_keepalive();
}
seastar::future<> markdown() {