virtual void start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) = 0;
- virtual void start_accept(SocketFRef&& socket,
+ virtual void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) = 0;
protected:
Dispatcher &dispatcher;
SocketConnection &conn;
- SocketFRef socket;
+ SocketRef socket;
seastar::gate pending_dispatch;
AuthConnectionMetaRef auth_meta;
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
(void) seastar::with_gate(pending_dispatch, [this] {
return Socket::connect(conn.peer_addr)
- .then([this](SocketFRef sock) {
+ .then([this](SocketRef sock) {
socket = std::move(sock);
if (state == state_t::closing) {
return socket->close().then([] {
});
}
-void ProtocolV1::start_accept(SocketFRef&& sock,
+void ProtocolV1::start_accept(SocketRef&& sock,
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::none);
void start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) override;
- void start_accept(SocketFRef&& socket,
+ void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
void trigger_close() override;
#ifdef UNIT_TESTS_BUILT
void intercept(Breakpoint bp, bp_type_t type,
- SocketConnection& conn, SocketFRef& socket) {
+ SocketConnection& conn, SocketRef& socket) {
if (conn.interceptor) {
auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
socket->set_trap(type, action, &conn.interceptor->blocker);
execute_connecting();
}
-void ProtocolV2::start_accept(SocketFRef&& sock,
+void ProtocolV2::start_accept(SocketRef&& sock,
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::NONE);
}
INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
return Socket::connect(conn.peer_addr);
- }).then([this](SocketFRef sock) {
+ }).then([this](SocketRef sock) {
logger().debug("{} socket connected", conn);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during Socket::connect()",
void ProtocolV2::trigger_replacing(bool reconnect,
bool do_reset,
- SocketFRef&& new_socket,
+ SocketRef&& new_socket,
AuthConnectionMetaRef&& new_auth_meta,
ceph::crypto::onwire::rxtx_t new_rxtx,
uint64_t new_peer_global_seq,
void start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) override;
- void start_accept(SocketFRef&& socket,
+ void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
void trigger_close() override;
// REPLACING (server)
void trigger_replacing(bool reconnect,
bool do_reset,
- SocketFRef&& new_socket,
+ SocketRef&& new_socket,
AuthConnectionMetaRef&& new_auth_meta,
ceph::crypto::onwire::rxtx_t new_rxtx,
uint64_t new_peer_global_seq,
class Socket;
using SocketRef = std::unique_ptr<Socket>;
-using SocketFRef = seastar::foreign_ptr<SocketRef>;
class Socket
{
Socket(Socket&& o) = delete;
- static seastar::future<SocketFRef>
+ static seastar::future<SocketRef>
connect(const entity_addr_t& peer_addr) {
- return seastar::connect(peer_addr.in4_addr())
- .then([] (seastar::connected_socket socket) {
- return seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
- construct_tag{}));
- });
- }
-
- static seastar::future<SocketFRef, entity_addr_t>
- accept(seastar::server_socket& listener) {
- return listener.accept().then([] (seastar::accept_result accept_result) {
- auto [socket, paddr] = std::move(accept_result);
- entity_addr_t peer_addr;
- peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- peer_addr.set_type(entity_addr_t::TYPE_ANY);
- return seastar::make_ready_future<SocketFRef, entity_addr_t>(
- seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
- construct_tag{})),
- peer_addr);
- });
+ return seastar::connect(peer_addr.in4_addr()
+ ).then([] (seastar::connected_socket socket) {
+ return std::make_unique<Socket>(std::move(socket), construct_tag{});
+ });
}
/// read the requested number of bytes into a bufferlist
public:
void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
+
#endif
+ friend class FixedCPUServerSocket;
};
class FixedCPUServerSocket
std::ignore = seastar::with_gate(ss.shutdown_gate,
[&ss, fn_accept = std::move(fn_accept)] () mutable {
return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
- return Socket::accept(*ss.listener
- ).then([&ss, fn_accept = std::move(fn_accept)]
- (auto socket, entity_addr_t peer_addr) mutable {
+ return ss.listener->accept().then(
+ [&ss, fn_accept = std::move(fn_accept)]
+ (seastar::accept_result accept_result) mutable {
// assert seastar::listen_options::set_fixed_cpu() works
assert(seastar::engine().cpu_id() == ss.cpu);
- SocketRef _socket = socket.release();
+ auto [socket, paddr] = std::move(accept_result);
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ peer_addr.set_type(entity_addr_t::TYPE_ANY);
+ SocketRef _socket = std::make_unique<Socket>(
+ std::move(socket), Socket::construct_tag{});
std::ignore = seastar::with_gate(ss.shutdown_gate,
[socket = std::move(_socket), peer_addr,
&ss, fn_accept = std::move(fn_accept)] () mutable {
}
void
-SocketConnection::start_accept(SocketFRef&& sock,
+SocketConnection::start_accept(SocketRef&& sock,
const entity_addr_t& _peer_addr)
{
protocol->start_accept(std::move(sock), _peer_addr);
const entity_type_t& peer_type);
/// start a handshake from the server's perspective,
/// only call when SocketConnection first construct
- void start_accept(SocketFRef&& socket,
+ void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr);
bool is_server_side() const {
assert(seastar::engine().cpu_id() == master_sid);
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
*this, *dispatcher, get_myaddr().is_msgr2());
- // TODO: use SocketRef
- conn->start_accept(seastar::make_foreign(std::move(socket)), peer_addr);
+ conn->start_accept(std::move(socket), peer_addr);
return seastar::now();
});
}
using crimson::net::error;
using crimson::net::FixedCPUServerSocket;
using crimson::net::Socket;
-using crimson::net::SocketFRef;
using crimson::net::SocketRef;
using crimson::net::stop_t;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
+
static seastar::logger logger{"crimsontest"};
static entity_addr_t server_addr = [] {
entity_addr_t saddr;
logger.debug("socket_connect()...");
return Socket::connect(server_addr).then([] (auto socket) {
logger.debug("socket_connect() connected");
- return socket.release();
+ return socket;
});
}