messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::with_gate(pending_dispatch, [this] {
- return seastar::connect(conn.peer_addr.in4_addr())
- .then([this](seastar::connected_socket fd) {
+ return Socket::connect(conn.peer_addr)
+ .then([this](SocketFRef sock) {
+ socket = std::move(sock);
if (state == state_t::closing) {
- fd.shutdown_input();
- fd.shutdown_output();
- throw std::system_error(make_error_code(error::connection_aborted));
+ return socket->close().then([] {
+ throw std::system_error(make_error_code(error::connection_aborted));
+ });
}
- socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
+ return seastar::now();
+ }).then([this] {
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {
#include <seastar/net/packet.hh>
#include "include/buffer.h"
+#include "msg/msg_types.h"
namespace ceph::net {
size_t remaining;
} r;
+ struct construct_tag {};
+
public:
- explicit Socket(seastar::connected_socket&& _socket)
+ Socket(seastar::connected_socket&& _socket, construct_tag)
: sid{seastar::engine().cpu_id()},
socket(std::move(_socket)),
in(socket.input()),
out(socket.output()) {}
+
Socket(Socket&& o) = delete;
+ static seastar::future<SocketFRef>
+ connect(const entity_addr_t& peer_addr) {
+ return seastar::connect(peer_addr.in4_addr())
+ .then([] (seastar::connected_socket socket) {
+ return seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
+ construct_tag{}));
+ });
+ }
+
+ static seastar::future<SocketFRef, entity_addr_t>
+ accept(seastar::server_socket& listener) {
+ return listener.accept().then([] (seastar::connected_socket socket,
+ seastar::socket_address paddr) {
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ return seastar::make_ready_future<SocketFRef, entity_addr_t>(
+ seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
+ construct_tag{})),
+ peer_addr);
+ });
+ }
+
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
using tmp_buf = seastar::temporary_buffer<char>;
// start listening if bind() was called
if (listener) {
seastar::keep_doing([this] {
- return listener->accept()
- .then([this] (seastar::connected_socket socket,
- seastar::socket_address paddr) {
- // allocate the connection
- entity_addr_t peer_addr;
- peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ return Socket::accept(*listener)
+ .then([this] (SocketFRef socket,
+ entity_addr_t peer_addr) {
auto shard = locate_shard(peer_addr);
+ // don't wait before accepting another
#warning fixme
// we currently do dangerous i/o from a Connection core, different from the Socket core.
- auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
- // don't wait before accepting another
- container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
+ container().invoke_on(shard, [sock = std::move(socket), peer_addr, this](auto& msgr) mutable {
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
conn->start_accept(std::move(sock), peer_addr);
});