}
center->delete_file_event(cs.fd(), EVENT_WRITABLE);
-
- update_socket_addr(target_addr.get_type());
- lock.unlock();
- async_msgr->learned_addr(socket_addr);
- lock.lock();
ldout(async_msgr->cct, 10)
<< __func__ << " connect successfully, ready to send banner" << dendl;
state = STATE_CONNECTION_ESTABLISHED;
return protocol->is_connected();
}
-bool AsyncConnection::update_socket_addr(uint32_t type) {
- sockaddr_storage ss;
- socklen_t slen = sizeof(ss);
- int r = getsockname(cs.fd(), (sockaddr *)&ss, &slen);
- if (r == 0) {
- entity_addr_t bind_addr;
- bind_addr.set_type(type);
- bind_addr.set_sockaddr((sockaddr*)&ss);
- for (auto a : async_msgr->get_myaddrs().v) {
- if (a.is_same_host(bind_addr) && a.get_port() == bind_addr.get_port()) {
- socket_addr = a;
- break;
- }
- }
- if (socket_addr.is_blank_ip()) {
- socket_addr = bind_addr;
- }
- return true;
- }
- return false;
-}
-
void AsyncConnection::connect(const entity_addrvec_t &addrs, int type,
entity_addr_t &target) {
ceph_assert(socket.fd() >= 0);
std::lock_guard<std::mutex> l(lock);
-
cs = std::move(socket);
- if (!update_socket_addr(listen_addr.get_type())) {
- socket_addr = listen_addr;
- }
+ socket_addr = listen_addr;
target_addr = peer_addr; // until we know better
-
state = STATE_ACCEPTING;
protocol->accept();
// rescheduler connection in order to avoid lock dep
};
struct ServerIdentFrame
- : public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t, int64_t,
- uint64_t, uint64_t, uint64_t, uint64_t,
- uint64_t> {
+ : public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t,
+ entity_addr_t, int64_t, uint64_t, uint64_t,
+ uint64_t, uint64_t, uint64_t> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
using SignedEncryptedFrame::SignedEncryptedFrame;
inline entity_addrvec_t &addrs() { return get_val<0>(); }
- inline int64_t &gid() { return get_val<1>(); }
- inline uint64_t &global_seq() { return get_val<2>(); }
- inline uint64_t &supported_features() { return get_val<3>(); }
- inline uint64_t &required_features() { return get_val<4>(); }
- inline uint64_t &flags() { return get_val<5>(); }
- inline uint64_t &cookie() { return get_val<6>(); }
+ inline entity_addr_t &peer_addr() { return get_val<1>(); }
+ inline int64_t &gid() { return get_val<2>(); }
+ inline uint64_t &global_seq() { return get_val<3>(); }
+ inline uint64_t &supported_features() { return get_val<4>(); }
+ inline uint64_t &required_features() { return get_val<5>(); }
+ inline uint64_t &flags() { return get_val<6>(); }
+ inline uint64_t &cookie() { return get_val<7>(); }
};
struct ReconnectFrame
flags |= CEPH_MSG_CONNECT_LOSSY;
}
- entity_addrvec_t maddrs = messenger->get_myaddrs();
- if (!messenger->get_myaddrs().front().is_msgr2()) {
- entity_addr_t a = messenger->get_myaddrs().front();
- a.set_type(entity_addr_t::TYPE_MSGR2);
- ldout(cct, 20) << "encoding addr " << a << " instead of non-v2 myaddrs "
- << messenger->get_myaddrs() << dendl;
- maddrs.v.push_back(a);
- }
-
- ClientIdentFrame client_ident(this, maddrs, messenger->get_myname().num(),
- global_seq,
+ ClientIdentFrame client_ident(this, messenger->get_myaddrs(),
+ messenger->get_myname().num(), global_seq,
connection->policy.features_supported,
connection->policy.features_required, flags);
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
ServerIdentFrame server_ident(this, payload, length);
- ldout(cct, 5) << __func__ << " received server identification: "
- << "addrs=" << server_ident.addrs()
+ ldout(cct, 5) << __func__ << " received server identification:"
+ << " addrs=" << server_ident.addrs()
+ << " my_addr=" << server_ident.peer_addr()
<< " gid=" << server_ident.gid()
<< " global_seq=" << server_ident.global_seq()
<< " features_supported=" << std::hex
<< " flags=" << server_ident.flags() << " cookie=" << std::dec
<< server_ident.cookie() << dendl;
+ connection->lock.unlock();
+ messenger->learned_addr(server_ident.peer_addr());
+ if (cct->_conf->ms_inject_internal_delays &&
+ cct->_conf->ms_inject_socket_failures) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 10) << __func__ << " sleep for "
+ << cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+ }
+ connection->lock.lock();
+ if (state != CONNECTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while learned_addr, mark_down or "
+ << " replacing must be happened just now" << dendl;
+ return nullptr;
+ }
+
cookie = server_ident.cookie();
connection->set_peer_addrs(server_ident.addrs());
<< " features_required=" << client_ident.required_features()
<< " flags=" << client_ident.flags() << std::dec << dendl;
+ connection->target_addr.set_type(entity_addr_t::TYPE_MSGR2);
+
if (client_ident.addrs().empty()) {
connection->set_peer_addr(connection->target_addr);
} else {
- // Should we check if one of the ident.addrs match connection->target_addr
- // as we do in ProtocolV1?
- connection->set_peer_addrs(client_ident.addrs());
- connection->target_addr = client_ident.addrs().msgr2_addr();
+ entity_addr_t peer_addr = client_ident.addrs().msgr2_addr();
+
+ ldout(cct, 10) << __func__ << " peer addr is " << peer_addr << dendl;
+ if (peer_addr.type == entity_addr_t::TYPE_NONE) {
+ // no address is known
+ peer_addr = client_ident.addrs().legacy_addr();
+ peer_addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else if (peer_addr.is_blank_ip()) {
+ // peer apparently doesn't know what ip they have; figure it out for them.
+ int port = peer_addr.get_port();
+ peer_addr.u = connection->target_addr.u;
+ peer_addr.set_port(port);
+ peer_addr.set_type(entity_addr_t::TYPE_MSGR2);
+ }
+ ldout(cct, 0) << __func__ << " peer addr is really " << peer_addr
+ << " (socket is " << connection->target_addr << ")"
+ << dendl;
+ connection->target_addr = peer_addr;
+ entity_addrvec_t addrs;
+ addrs.v.push_back(peer_addr);
+ for (const auto &addr : client_ident.addrs().v) {
+ if (addr.type != entity_addr_t::TYPE_MSGR2) {
+ addrs.v.push_back(addr);
+ }
+ }
+ connection->set_peer_addrs(addrs);
}
peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
uint64_t gs = messenger->get_global_seq();
ServerIdentFrame server_ident(
- this, messenger->get_myaddrs(), messenger->get_myname().num(), gs,
- connection->policy.features_supported,
+ this, messenger->get_myaddrs(), connection->target_addr,
+ messenger->get_myname().num(), gs, connection->policy.features_supported,
connection->policy.features_required, flags, cookie);
- ldout(cct, 5) << __func__ << " sending identification: "
- << "addrs=" << messenger->get_myaddrs()
+ ldout(cct, 5) << __func__ << " sending identification:"
+ << " addrs=" << messenger->get_myaddrs()
+ << " peer_addr=" << connection->target_addr
<< " gid=" << messenger->get_myname().num()
<< " global_seq=" << gs << " features_supported=" << std::hex
<< connection->policy.features_supported