}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", conn, eptr);
- close(false);
+ close(true);
});
});
}
// will all be performed using v2 protocol.
ceph_abort("lossless policy not supported for v1");
}
- (void) existing->close();
+ existing->protocol->close(true);
return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
}
logger().warn("{} existing {} proto version is {} not 1, close existing",
conn, *existing,
static_cast<int>(existing->protocol->proto_type));
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
(void) existing->close();
} else {
return handle_connect_with_existing(existing, std::move(authorizer_reply));
.handle_exception_type([this] (const std::system_error& e) {
logger().warn("{} open fault: {}", conn, e);
if (e.code() == error::protocol_aborted ||
- e.code() == std::errc::connection_reset) {
+ e.code() == std::errc::connection_reset ||
+ e.code() == error::read_eof) {
close(true);
return seastar::now();
- } else if (e.code() == error::read_eof) {
- return dispatcher.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
- .then([this] {
- close(false);
- });
} else {
throw e;
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
logger().warn("{} open fault: {}", conn, eptr);
- close(false);
+ close(true);
});
});
}
}
}
-seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
+seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange(bool is_connect)
{
// 1. prepare and send banner
bufferlist banner_payload;
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
return read(payload_len);
- }).then([this] (bufferlist bl) {
+ }).then([this, is_connect] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
uint64_t peer_supported_features;
logger().error("{} peer does not support all required features"
" required={} peer_supported={}",
conn, required_features, peer_supported_features);
- abort_in_close(*this, false);
+ abort_in_close(*this, is_connect);
}
if ((supported_features & peer_required_features) != peer_required_features) {
logger().error("{} we do not support all peer required features"
" peer_required={} supported={}",
conn, peer_required_features, supported_features);
- abort_in_close(*this, false);
+ abort_in_close(*this, is_connect);
}
this->peer_required_features = peer_required_features;
if (this->peer_required_features == 0) {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
- return banner_exchange();
+ return banner_exchange(true);
}).then([this] (entity_type_t _peer_type,
entity_addr_t _my_addr_from_peer) {
if (conn.get_peer_type() != _peer_type) {
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
(void) existing_conn->close();
} else {
return handle_existing_connection(existing_conn);
"close existing and reset client.",
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
(void) existing_conn->close();
return send_reset(true);
}
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
- return banner_exchange();
+ return banner_exchange(false);
}).then([this] (entity_type_t _peer_type,
entity_addr_t _my_addr_from_peer) {
ceph_assert(conn.get_peer_type() == 0);