/// handshake
virtual seastar::future<> keepalive() = 0;
- /// close the connection and cancel any any pending futures from read/send
+ // close the connection and cancel any any pending futures from read/send
+ // Note it's OK to discard the returned future because Messenger::shutdown()
+ // will wait for all connections closed
virtual seastar::future<> close() = 0;
/// which shard id the connection lives
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", conn, eptr);
- close();
+ (void) close();
});
});
}
// will all be performed using v2 protocol.
ceph_abort("lossless policy not supported for v1");
}
- seastar::do_with(
- std::move(existing),
- [](auto existing) {
- return existing->close();
- });
+ (void) existing->close();
return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
}
logger().warn("{} existing {} proto version is {} not 1, close existing",
conn, *existing,
static_cast<int>(existing->protocol->proto_type));
- existing->close();
+ (void) existing->close();
} else {
return handle_connect_with_existing(existing, std::move(authorizer_reply));
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} accepting fault: {}", conn, eptr);
- close();
+ (void) close();
});
});
}
return dispatcher.ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.then([this] {
- close();
+ (void) close();
});
} else if (e.code() == error::read_eof) {
return dispatcher.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.then([this] {
- close();
+ (void) close();
});
} else {
throw e;
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
logger().warn("{} open fault: {}", conn, eptr);
- close();
+ (void) close();
});
});
}
}
void abort_in_close(ceph::net::ProtocolV2& proto) {
- proto.close();
+ (void) proto.close();
abort_protocol();
}
logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
conn, func_name, get_state_name(state), eptr);
dispatch_reset();
- close();
+ (void) close();
} else if (conn.policy.server ||
(conn.policy.standby &&
(!is_queued() && conn.sent.empty()))) {
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
existing_proto->dispatch_reset();
- existing_proto->close();
+ (void) existing_proto->close();
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} in execute_accepting()",
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
- existing_conn->close();
+ (void) existing_conn->close();
} else {
return handle_existing_connection(existing_conn);
}
"close existing and reset client.",
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
- existing_conn->close();
+ (void) existing_conn->close();
return send_reset(true);
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- close();
+ (void) close();
});
});
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- close();
+ (void) close();
});
});
}
seastar::future<> SocketConnection::close()
{
- return seastar::smp::submit_to(shard_id(), [this] {
- return protocol->close();
- });
+ ceph_assert(seastar::engine().cpu_id() == shard_id());
+ return protocol->close();
}
bool SocketConnection::update_rx_seq(seq_num_t seq)