bool dispatch_in = false;
if (new_state == out_state_t::open) {
// to open
+ ceph_assert_always(protocol_is_connected == true);
assert(fa != nullptr);
ceph_assert_always(frame_assembler == nullptr);
frame_assembler = std::move(fa);
#endif
} else if (out_state == out_state_t::open) {
// from open
+ ceph_assert_always(protocol_is_connected == true);
+ protocol_is_connected = false;
assert(fa == nullptr);
ceph_assert_always(frame_assembler->is_socket_valid());
frame_assembler->shutdown_socket();
void Protocol::dispatch_accept()
{
+ // protocol_is_connected can be from true to true here if the replacing is
+ // happening to a connected connection.
+ protocol_is_connected = true;
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
void Protocol::dispatch_connect()
{
+ ceph_assert_always(protocol_is_connected == false);
+ protocol_is_connected = true;
dispatchers.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
Protocol(Protocol&&) = delete;
virtual ~Protocol();
- virtual bool is_connected() const = 0;
-
virtual void close() = 0;
virtual seastar::future<> close_clean_yielded() = 0;
public:
using clock_t = seastar::lowres_system_clock;
+ bool is_connected() const {
+ return protocol_is_connected;
+ }
+
seastar::future<> send(MessageURef msg);
seastar::future<> send_keepalive();
FrameAssemblerV2Ref frame_assembler;
+ bool protocol_is_connected = false;
+
/*
* out states for writing
*/
ProtocolV2::~ProtocolV2() {}
-bool ProtocolV2::is_connected() const {
- return state == state_t::READY ||
- state == state_t::ESTABLISHING ||
- state == state_t::REPLACING;
-}
-
void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
// public to SocketConnection, but private to the others
private:
- bool is_connected() const override;
-
void close() override;
seastar::future<> close_clean_yielded() override;
racing_detected = true;
logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
assert(conn != replaced_conn);
- assert(conn->is_connected());
}
void Heartbeat::Connection::reset()