}
}
-// make sure that we agree with the peer about its address
-void validate_peer_addr(const entity_addr_t& addr,
- const entity_addr_t& expected)
-{
- if (addr == expected) {
- return;
- }
- // ok if server bound anonymously, as long as port/nonce match
- if (addr.is_blank_ip() &&
- addr.get_port() == expected.get_port() &&
- addr.get_nonce() == expected.get_nonce()) {
- return;
- } else {
- throw std::system_error(
- make_error_code(ceph::net::error::bad_peer_address));
- }
-}
-
// return a static bufferptr to the given object
template <typename T>
bufferptr create_static(T& obj)
::decode(saddr, p);
::decode(caddr, p);
ceph_assert(p.end());
- validate_peer_addr(saddr, conn.peer_addr);
+ if (saddr != conn.peer_addr) {
+ logger().error("{} my peer_addr {} doesn't match what peer advertized {}",
+ conn, conn.peer_addr, saddr);
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
conn.set_ephemeral_port(caddr.get_port(),
SocketConnection::side_t::connector);
- return messenger.learned_addr(caddr);
+ if (unlikely(caddr.is_msgr2())) {
+ logger().warn("{} peer sent a v2 address for me: {}",
+ conn, caddr);
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
+ caddr.set_type(entity_addr_t::TYPE_LEGACY);
+ return messenger.learned_addr(caddr, conn);
}).then([this] {
// encode/send client's handshake header
bufferlist bl;
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::with_gate(pending_dispatch, [this] {
- // encode/send server's handshake header
- bufferlist bl;
- bl.append(buffer::create_static(banner_size, banner));
- ::encode(messenger.get_myaddr(), bl, 0);
- ::encode(conn.target_addr, bl, 0);
- return socket->write_flush(std::move(bl))
- .then([this] {
+ // stop learning my_addr before sending it out, so it won't change
+ return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
+ // encode/send server's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(messenger.get_myaddr(), bl, 0);
+ ::encode(conn.target_addr, bl, 0);
+ return socket->write_flush(std::move(bl));
+ }).then([this] {
// read client's handshake header and connect request
return socket->read(client_header_size);
}).then([this] (bufferlist bl) {
entity_addr_t addr;
::decode(addr, p);
ceph_assert(p.end());
+ if ((addr.is_legacy() || addr.is_any()) &&
+ addr.is_same_host(conn.target_addr)) {
+ // good
+ } else {
+ logger().error("{} peer advertized an invalid peer_addr: {},"
+ " which should be v1 and the same host with {}.",
+ conn, addr, conn.peer_addr);
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
conn.peer_addr = addr;
conn.target_addr = conn.peer_addr;
return seastar::repeat([this] {
client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
}
- // TODO: get socket address and learn(not supported by seastar)
- entity_addr_t a;
- a.u.sa.sa_family = AF_INET;
- a.set_type(entity_addr_t::TYPE_MSGR2);
- return messenger.learned_addr(a).then([this] {
- uint64_t flags = 0;
- if (conn.policy.lossy) {
- flags |= CEPH_MSG_CONNECT_LOSSY;
- }
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags |= CEPH_MSG_CONNECT_LOSSY;
+ }
- auto client_ident = ClientIdentFrame::Encode(
- messenger.get_myaddrs(),
- conn.target_addr,
- messenger.get_myname().num(),
- global_seq,
- conn.policy.features_supported,
- conn.policy.features_required | msgr2_required, flags,
- client_cookie);
-
- logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
- " gs={}, features_supported={}, features_required={},"
- " flags={}, cookie={}",
- conn, messenger.get_myaddrs(), conn.target_addr,
- messenger.get_myname().num(), global_seq,
- conn.policy.features_supported,
- conn.policy.features_required | msgr2_required,
- flags, client_cookie);
- return write_frame(client_ident);
- }).then([this] {
+ auto client_ident = ClientIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ conn.target_addr,
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required, flags,
+ client_cookie);
+
+ logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), conn.target_addr,
+ messenger.get_myname().num(), global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, client_cookie);
+ return write_frame(client_ident).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
switch (tag) {
if (!server_ident.addrs().contains(conn.target_addr)) {
logger().warn("{} peer identifies as {}, does not include {}",
conn, server_ident.addrs(), conn.target_addr);
- abort_in_fault();
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
}
server_cookie = server_ident.cookie();
// TODO: change peer_addr to entity_addrvec_t
- ceph_assert(conn.peer_addr == server_ident.addrs().front());
+ if (server_ident.addrs().front() != conn.peer_addr) {
+ logger().warn("{} peer advertises as {}, does not match {}",
+ conn, server_ident.addrs(), conn.peer_addr);
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
conn.set_peer_id(server_ident.gid());
conn.set_features(server_ident.supported_features() &
conn.policy.features_supported);
}
conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
SocketConnection::side_t::connector);
- if (messenger.get_myaddrs().empty() ||
- messenger.get_myaddrs().front().is_blank_ip()) {
- return messenger.learned_addr(_my_addr_from_peer);
- } else {
- return seastar::now();
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
}
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ return messenger.learned_addr(_my_addr_from_peer, conn);
}).then([this] {
return client_auth();
}).then([this] {
if (client_ident.addrs().empty() ||
client_ident.addrs().front() == entity_addr_t()) {
logger().warn("{} oops, client_ident.addrs() is empty", conn);
- abort_in_fault();
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
}
if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
logger().warn("{} peer is trying to reach {} which is not us ({})",
conn, client_ident.target_addr(), messenger.get_myaddrs());
- abort_in_fault();
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
}
// TODO: change peer_addr to entity_addrvec_t
entity_addr_t paddr = client_ident.addrs().front();
+ if ((paddr.is_msgr2() || paddr.is_any()) &&
+ paddr.is_same_host(conn.target_addr)) {
+ // good
+ } else {
+ logger().warn("{} peer's address {} is not v2 or not the same host with {}",
+ conn, paddr, conn.target_addr);
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
conn.peer_addr = paddr;
logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
conn.target_addr = conn.peer_addr;
conn, ceph_entity_type_name(_peer_type),
conn.policy.lossy, conn.policy.server,
conn.policy.standby, conn.policy.resetcheck);
+ if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
+ return messenger.learned_addr(_my_addr_from_peer, conn);
+ }).then([this] {
return server_auth();
}).then([this] {
return read_main_preamble();
seastar::future<ceph::net::ConnectionXRef>
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
+ // make sure we connect to a valid peer_addr
+ ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
+ ceph_assert(peer_addr.get_port() > 0);
+
auto shard = locate_shard(peer_addr);
return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
return msgr.do_connect(peer_addr, peer_type);
// start listening if bind() was called
if (listener) {
+ // make sure we have already bound to a valid address
+ ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
+ ceph_assert(get_myaddr().get_port() > 0);
+
seastar::keep_doing([this] {
return Socket::accept(*listener)
.then([this] (SocketFRef socket,
});
}
-seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
{
- if (!get_myaddr().is_blank_ip()) {
- // already learned or binded
- return seastar::now();
- }
-
- // Only learn IP address if blank.
- entity_addr_t addr = get_myaddr();
- addr.u = peer_addr_for_me.u;
- addr.set_type(peer_addr_for_me.get_type());
- addr.set_port(get_myaddr().get_port());
- return set_myaddrs(entity_addrvec_t{addr}).then([this, peer_addr_for_me] {
- logger().debug("{} learned myaddr={} from {}",
- *this, get_myaddr(), peer_addr_for_me);
+ // make sure we there's no racing to learn address from peer
+ return container().invoke_on(0, [peer_addr_for_me, &conn] (auto& msgr) {
+ if (!msgr.need_addr) {
+ if ((!msgr.get_myaddr().is_any() &&
+ msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
+ msgr.get_myaddr().get_family() != peer_addr_for_me.get_family() ||
+ !msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
+ logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
+ conn, peer_addr_for_me, msgr.get_myaddr());
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
+ return seastar::now();
+ }
+ msgr.need_addr = false;
+
+ if (msgr.get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
+ // Not bound
+ entity_addr_t addr = peer_addr_for_me;
+ addr.set_type(entity_addr_t::TYPE_ANY);
+ addr.set_port(0);
+ return msgr.set_myaddrs(entity_addrvec_t{addr}
+ ).then([&msgr, &conn, peer_addr_for_me] {
+ logger().info("{} learned myaddr={} (unbound) from {}",
+ conn, msgr.get_myaddr(), peer_addr_for_me);
+ });
+ } else {
+ // Already bound
+ if (!msgr.get_myaddr().is_any() &&
+ msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) {
+ logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
+ conn, peer_addr_for_me, msgr.get_myaddr());
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
+ if (msgr.get_myaddr().get_family() != peer_addr_for_me.get_family()) {
+ logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
+ conn, peer_addr_for_me, msgr.get_myaddr());
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ }
+ if (msgr.get_myaddr().is_blank_ip()) {
+ entity_addr_t addr = peer_addr_for_me;
+ addr.set_type(msgr.get_myaddr().get_type());
+ addr.set_port(msgr.get_myaddr().get_port());
+ return msgr.set_myaddrs(entity_addrvec_t{addr}
+ ).then([&msgr, &conn, peer_addr_for_me] {
+ logger().info("{} learned myaddr={} (blank IP) from {}",
+ conn, msgr.get_myaddr(), peer_addr_for_me);
+ });
+ } else if (!msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
+ logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
+ conn, peer_addr_for_me, msgr.get_myaddr());
+ throw std::system_error(
+ make_error_code(ceph::net::error::bad_peer_address));
+ } else {
+ return seastar::now();
+ }
+ }
});
}
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
const uint32_t nonce;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
- seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
+ seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn);
SocketConnectionRef lookup_conn(const entity_addr_t& addr);
void accept_conn(SocketConnectionRef);