return seastar::now();
}
+void Protocol::mark_down()
+{
+ ceph_assert_always(out_state != out_state_t::none);
+ need_dispatch_reset = false;
+ if (out_state == out_state_t::drop) {
+ return;
+ }
+
+ logger().info("{} mark_down() with {}",
+ conn, io_stat_printer{*this});
+ set_out_state(out_state_t::drop);
+ notify_mark_down();
+}
+
+void Protocol::print_io_stat(std::ostream &out) const
+{
+ out << "io_stat("
+ << "out_state=" << fmt::format("{}", out_state)
+ << ", in_seq=" << in_seq
+ << ", out_seq=" << out_seq
+ << ", out_pending_msgs_size=" << out_pending_msgs.size()
+ << ", out_sent_msgs_size=" << out_sent_msgs.size()
+ << ", need_ack=" << (ack_left > 0)
+ << ", need_keepalive=" << need_keepalive
+ << ", need_keepalive_ack=" << bool(next_keepalive_ack)
+ << ")";
+}
+
void Protocol::set_out_state(
const Protocol::out_state_t &new_state,
FrameAssemblerV2Ref fa)
void Protocol::dispatch_accept()
{
+ if (out_state == out_state_t::drop) {
+ return;
+ }
// protocol_is_connected can be from true to true here if the replacing is
// happening to a connected connection.
protocol_is_connected = true;
void Protocol::dispatch_connect()
{
+ if (out_state == out_state_t::drop) {
+ return;
+ }
ceph_assert_always(protocol_is_connected == false);
protocol_is_connected = true;
dispatchers.ms_handle_connect(
void Protocol::dispatch_reset(bool is_replace)
{
+ ceph_assert_always(out_state == out_state_t::drop);
+ if (!need_dispatch_reset) {
+ return;
+ }
+ need_dispatch_reset = false;
dispatchers.ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
is_replace);
void Protocol::dispatch_remote_reset()
{
+ if (out_state == out_state_t::drop) {
+ return;
+ }
dispatchers.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
Protocol(Protocol&&) = delete;
virtual ~Protocol();
- virtual void close() = 0;
-
virtual seastar::future<> close_clean_yielded() = 0;
#ifdef UNIT_TESTS_BUILT
virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+ virtual void notify_mark_down() = 0;
+
// the write state-machine
public:
using clock_t = seastar::lowres_system_clock;
last_keepalive_ack = when;
}
+ void mark_down();
+
struct io_stat_printer {
const Protocol &protocol;
};
- void print_io_stat(std::ostream &out) const {
- out << "io_stat("
- << "in_seq=" << in_seq
- << ", out_seq=" << out_seq
- << ", out_pending_msgs_size=" << out_pending_msgs.size()
- << ", out_sent_msgs_size=" << out_sent_msgs.size()
- << ", need_ack=" << (ack_left > 0)
- << ", need_keepalive=" << need_keepalive
- << ", need_keepalive_ack=" << bool(next_keepalive_ack)
- << ")";
- }
+ void print_io_stat(std::ostream &out) const;
// TODO: encapsulate a SessionedSender class
protected:
seastar::future<> close_out() {
+ ceph_assert_always(out_state == out_state_t::drop);
assert(!gate.is_closed());
return gate.close();
}
bool protocol_is_connected = false;
+ bool need_dispatch_reset = true;
+
/*
* out states for writing
*/