assert(!exit_open);
}
-bool Protocol::is_connected() const
-{
- return write_state == write_state_t::open;
-}
-
void Protocol::close(bool dispatch_reset,
std::optional<std::function<void()>> f_accept_new)
{
Protocol(Protocol&&) = delete;
virtual ~Protocol();
- bool is_connected() const;
+ virtual bool is_connected() const = 0;
#ifdef UNIT_TESTS_BUILT
bool is_closed_clean = false;
ProtocolV1::~ProtocolV1() {}
+bool ProtocolV1::is_connected() const
+{
+ return state == state_t::open;
+}
+
// connecting state
void ProtocolV1::reset_session()
return repeat_connect();
});
}).then([this] {
- // notify the dispatcher and allow them to reject the connection
- return dispatcher.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- }).then([this] {
- execute_open();
+ execute_open(open_t::connected);
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", conn, eptr);
return seastar::repeat([this] {
return repeat_handle_connect();
});
- }).then([this] {
- // notify the dispatcher and allow them to reject the connection
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).then([this] {
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- execute_open();
+ execute_open(open_t::accepted);
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} accepting fault: {}", conn, eptr);
});
}
-void ProtocolV1::execute_open()
+void ProtocolV1::execute_open(open_t type)
{
logger().trace("{} trigger open, was {}", conn, static_cast<int>(state));
state = state_t::open;
set_write_state(write_state_t::open);
+ if (type == open_t::connected) {
+ gated_dispatch("ms_handle_connect", [this] {
+ return dispatcher.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ });
+ } else { // type == open_t::accepted
+ gated_dispatch("ms_handle_accept", [this] {
+ return dispatcher.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ });
+ }
+
gated_dispatch("execute_open", [this] {
// start background processing of tags
return handle_tags()
~ProtocolV1() override;
private:
+ bool is_connected() const override;
+
void start_connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name) override;
seastar::future<> maybe_throttle();
seastar::future<> read_message();
seastar::future<> handle_tags();
- void execute_open();
+
+ enum class open_t {
+ connected,
+ accepted
+ };
+ void execute_open(open_t type);
// replacing
// the number of connections initiated in this session, increment when a
ProtocolV2::~ProtocolV2() {}
+bool ProtocolV2::is_connected() const {
+ return state == state_t::READY ||
+ state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING;
+}
+
void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
}
switch (next) {
case next_step_t::ready: {
- gated_dispatch("ms_handle_connect", [this] {
- return dispatcher.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
logger().info("{} connected:"
" gs={}, pgs={}, cs={}, client_cookie={},"
" server_cookie={}, in_seq={}, out_seq={}, out_q={}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq,
conn.out_seq, conn.out_q.size());
- execute_ready();
+ execute_ready(true);
break;
}
case next_step_t::wait: {
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq,
conn.out_seq, conn.out_q.size());
- execute_ready();
+ execute_ready(false);
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::ESTABLISHING) {
logger().info("{} execute_establishing() protocol aborted at {} -- {}",
conn, reconnect ? "reconnected" : "connected",
global_seq, peer_global_seq, connect_seq, client_cookie,
server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
- execute_ready();
+ execute_ready(false);
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::REPLACING) {
logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
});
}
-void ProtocolV2::execute_ready()
+void ProtocolV2::execute_ready(bool dispatch_connect)
{
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, write_state_t::open, false);
+ if (dispatch_connect) {
+ gated_dispatch("ms_handle_connect", [this] {
+ return dispatcher.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ });
+ }
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
conn.interceptor->register_conn_ready(conn);
~ProtocolV2() override;
private:
+ bool is_connected() const override;
+
void start_connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name) override;
// READY
seastar::future<> read_message(utime_t throttle_stamp);
- void execute_ready();
+ void execute_ready(bool dispatch_connect);
// STANDBY
void execute_standby();