bool FrameAssemblerV2::has_socket() const
{
assert((socket && conn.socket) || (!socket && !conn.socket));
- return socket != nullptr;
+ return bool(socket);
}
bool FrameAssemblerV2::is_socket_valid() const
return has_socket() && !socket->is_shutdown();
}
-SocketRef FrameAssemblerV2::move_socket()
+SocketFRef FrameAssemblerV2::move_socket()
{
assert(has_socket());
conn.set_socket(nullptr);
return std::move(socket);
}
-void FrameAssemblerV2::set_socket(SocketRef &&new_socket)
+void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
{
assert(!has_socket());
+ assert(new_socket);
socket = std::move(new_socket);
conn.set_socket(socket.get());
assert(is_socket_valid());
socket->shutdown();
}
-seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket)
+seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
{
assert(has_socket());
assert(socket->is_shutdown());
*/
struct mover_t {
- SocketRef socket;
+ SocketFRef socket;
ceph::crypto::onwire::rxtx_t session_stream_handlers;
ceph::compression::onwire::rxtx_t session_comp_handlers;
};
// the socket exists and not shutdown
bool is_socket_valid() const;
- void set_socket(SocketRef &&);
+ void set_socket(SocketFRef &&);
void learn_socket_ephemeral_port_as_connector(uint16_t port);
void shutdown_socket();
- seastar::future<> replace_shutdown_socket(SocketRef &&);
+ seastar::future<> replace_shutdown_socket(SocketFRef &&);
seastar::future<> close_shutdown_socket();
private:
bool has_socket() const;
- SocketRef move_socket();
+ SocketFRef move_socket();
void log_main_preamble(const ceph::bufferlist &bl);
SocketConnection &conn;
- SocketRef socket;
+ SocketFRef socket;
/*
* auth signature
execute_connecting();
}
-void ProtocolV2::start_accept(SocketRef&& new_socket,
+void ProtocolV2::start_accept(SocketFRef&& new_socket,
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::NONE);
abort_protocol();
}
return Socket::connect(conn.peer_addr);
- }).then([this](SocketRef new_socket) {
+ }).then([this](SocketRef _new_socket) {
logger().debug("{} socket connected", conn);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during Socket::connect()",
conn, get_state_name(state));
- return new_socket->close().then([sock=std::move(new_socket)] {
+ return _new_socket->close().then([sock=std::move(_new_socket)] {
abort_protocol();
});
}
+ SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
if (!has_socket) {
frame_assembler->set_socket(std::move(new_socket));
has_socket = true;
void start_connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name);
- void start_accept(SocketRef&& socket,
+ void start_accept(SocketFRef&& socket,
const entity_addr_t& peer_addr);
seastar::future<> close_clean_yielded();
class Socket;
using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
class Socket {
struct construct_tag {};
}
void
-SocketConnection::start_accept(SocketRef&& sock,
+SocketConnection::start_accept(SocketFRef&& sock,
const entity_addr_t& _peer_addr)
{
assert(seastar::this_shard_id() == msgr_sid);
/// start a handshake from the server's perspective,
/// only call when SocketConnection first construct
- void start_accept(SocketRef&& socket,
+ void start_accept(SocketFRef&& socket,
const entity_addr_t& peer_addr);
seastar::future<> close_clean_yielded();
});
}
+seastar::future<> SocketMessenger::accept(
+ SocketFRef &&socket, const entity_addr_t &peer_addr)
+{
+ assert(seastar::this_shard_id() == sid);
+ SocketConnectionRef conn =
+ seastar::make_shared<SocketConnection>(*this, dispatchers);
+ conn->start_accept(std::move(socket), peer_addr);
+ return seastar::now();
+}
+
seastar::future<> SocketMessenger::start(
const dispatchers_t& _dispatchers) {
assert(seastar::this_shard_id() == sid);
ceph_assert(get_myaddr().is_msgr2());
ceph_assert(get_myaddr().get_port() > 0);
- return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
- assert(listener->is_fixed());
- assert(seastar::this_shard_id() == sid);
+ return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) {
assert(get_myaddr().is_msgr2());
- SocketConnectionRef conn =
- seastar::make_shared<SocketConnection>(*this, dispatchers);
- conn->start_accept(std::move(socket), peer_addr);
- return seastar::now();
+ SocketFRef socket = seastar::make_foreign(std::move(_socket));
+ if (listener->is_fixed()) {
+ return accept(std::move(socket), peer_addr);
+ } else {
+ return seastar::smp::submit_to(sid,
+ [this, peer_addr, socket = std::move(socket)]() mutable {
+ return accept(std::move(socket), peer_addr);
+ });
+ }
});
}
return seastar::now();
#endif
private:
+ seastar::future<> accept(SocketFRef &&, const entity_addr_t &);
+
listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
/// try to bind to the first unused port of given address
ShardedServerSocket *pss = nullptr;
seastar::shard_id server_socket_CPU;
- SocketRef server_socket;
+ SocketFRef server_socket;
public:
template <typename FuncC, typename FuncS>
});
}),
seastar::smp::submit_to(SERVER_CPU, [psf] {
- return psf->pss->accept([psf](auto socket, auto paddr) {
+ return psf->pss->accept([psf](auto _socket, auto paddr) {
logger().info("dispatch_sockets(): accepted at shard {}",
seastar::this_shard_id());
psf->server_socket_CPU = seastar::this_shard_id();
if (psf->pss->is_fixed()) {
ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
}
+ SocketFRef socket = seastar::make_foreign(std::move(_socket));
psf->server_socket = std::move(socket);
return seastar::smp::submit_to(CLIENT_CPU, [psf] {
psf->server_connected.set_value();