public:
const proto_t proto_type;
+ SocketRef socket;
protected:
template <typename Func>
Dispatcher &dispatcher;
SocketConnection &conn;
- SocketRef socket;
AuthConnectionMetaRef auth_meta;
private:
state = state_t::connecting;
set_write_state(write_state_t::delay);
- // we don't know my ephemeral port yet
- conn.set_ephemeral_port(0, SocketConnection::side_t::none);
ceph_assert(!socket);
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
- conn.set_ephemeral_port(caddr.get_port(),
- SocketConnection::side_t::connector);
+ if (state != state_t::connecting) {
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ }
+ socket->learn_ephemeral_port_as_connector(caddr.get_port());
if (unlikely(caddr.is_msgr2())) {
logger().warn("{} peer sent a v2 address for me: {}",
conn, caddr);
ceph_assert(!socket);
// until we know better
conn.target_addr = _peer_addr;
- conn.set_ephemeral_port(_peer_addr.get_port(),
- SocketConnection::side_t::acceptor);
socket = std::move(sock);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
ceph_assert(!socket);
// until we know better
conn.target_addr = _peer_addr;
- conn.set_ephemeral_port(_peer_addr.get_port(),
- SocketConnection::side_t::acceptor);
socket = std::move(sock);
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
socket->shutdown();
}
gated_execute("execute_connecting", [this] {
- // we don't know my socket_port yet
- conn.set_ephemeral_port(0, SocketConnection::side_t::none);
return messenger.get_global_seq().then([this] (auto gs) {
global_seq = gs;
assert(client_cookie != 0);
ceph_entity_type_name(_peer_type));
abort_in_close(*this, true);
}
- conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
- SocketConnection::side_t::connector);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
class Socket
{
- const seastar::shard_id sid;
- seastar::connected_socket socket;
- seastar::input_stream<char> in;
- seastar::output_stream<char> out;
-
-#ifndef NDEBUG
- bool closed = false;
-#endif
-
- /// buffer state for read()
- struct {
- bufferlist buffer;
- size_t remaining;
- } r;
-
struct construct_tag {};
public:
- Socket(seastar::connected_socket&& _socket, construct_tag)
+ // if acceptor side, peer is using a different port (ephemeral_port)
+ // if connector side, I'm using a different port (ephemeral_port)
+ enum class side_t {
+ acceptor,
+ connector
+ };
+
+ Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
: sid{seastar::engine().cpu_id()},
socket(std::move(_socket)),
in(socket.input()),
// the default buffer size 8192 is too small that may impact our write
// performance. see seastar::net::connected_socket::output()
- out(socket.output(65536)) {}
+ out(socket.output(65536)),
+ side(_side),
+ ephemeral_port(e_port) {}
~Socket() {
#ifndef NDEBUG
connect(const entity_addr_t& peer_addr) {
return seastar::connect(peer_addr.in4_addr()
).then([] (seastar::connected_socket socket) {
- return std::make_unique<Socket>(std::move(socket), construct_tag{});
+ return std::make_unique<Socket>(
+ std::move(socket), side_t::connector, 0, construct_tag{});
});
}
socket.shutdown_output();
}
+ side_t get_side() const {
+ return side;
+ }
+
+ uint16_t get_ephemeral_port() const {
+ return ephemeral_port;
+ }
+
+ // learn my ephemeral_port as connector.
+ // unfortunately, there's no way to identify which port I'm using as
+ // connector with current seastar interface.
+ void learn_ephemeral_port_as_connector(uint16_t port) {
+ assert(side == side_t::connector &&
+ (ephemeral_port == 0 || ephemeral_port == port));
+ ephemeral_port = port;
+ }
+
+ private:
+ const seastar::shard_id sid;
+ seastar::connected_socket socket;
+ seastar::input_stream<char> in;
+ seastar::output_stream<char> out;
+ side_t side;
+ uint16_t ephemeral_port;
+
+#ifndef NDEBUG
+ bool closed = false;
+#endif
+
+ /// buffer state for read()
+ struct {
+ bufferlist buffer;
+ size_t remaining;
+ } r;
+
#ifdef UNIT_TESTS_BUILT
+ public:
+ void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
+
private:
bp_action_t next_trap_read = bp_action_t::CONTINUE;
bp_action_t next_trap_write = bp_action_t::CONTINUE;
seastar::future<> try_trap_pre(bp_action_t& trap);
seastar::future<> try_trap_post(bp_action_t& trap);
- public:
- void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
-
#endif
friend class FixedCPUServerSocket;
};
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
peer_addr.set_type(entity_addr_t::TYPE_ANY);
SocketRef _socket = std::make_unique<Socket>(
- std::move(socket), Socket::construct_tag{});
+ std::move(socket), Socket::side_t::acceptor,
+ peer_addr.get_port(), Socket::construct_tag{});
std::ignore = seastar::with_gate(ss.shutdown_gate,
[socket = std::move(_socket), peer_addr,
&ss, fn_accept = std::move(fn_accept)] () mutable {
void SocketConnection::print(ostream& out) const {
messenger.print(out);
- if (side == side_t::none) {
+ if (!protocol->socket) {
out << " >> " << get_peer_name() << " " << peer_addr;
- } else if (side == side_t::acceptor) {
+ } else if (protocol->socket->get_side() == Socket::side_t::acceptor) {
out << " >> " << get_peer_name() << " " << peer_addr
- << "@" << ephemeral_port;
- } else { // side == side_t::connector
- out << "@" << ephemeral_port
+ << "@" << protocol->socket->get_ephemeral_port();
+ } else { // protocol->socket->get_side() == Socket::side_t::connector
+ out << "@" << protocol->socket->get_ephemeral_port()
<< " >> " << get_peer_name() << " " << peer_addr;
}
}
SocketMessenger& messenger;
std::unique_ptr<Protocol> protocol;
- // if acceptor side, ephemeral_port is different from peer_addr.get_port();
- // if connector side, ephemeral_port is different from my_addr.get_port().
- enum class side_t {
- none,
- acceptor,
- connector
- };
- side_t side = side_t::none;
- uint16_t ephemeral_port = 0;
- void set_ephemeral_port(uint16_t port, side_t _side) {
- ephemeral_port = port;
- side = _side;
- }
-
ceph::net::Policy<crimson::thread::Throttle> policy;
/// the seq num of the last transmitted message