if (!session->auth_caps.allow_all()) {
dout(1) << __func__
<< ": received command from client without `tell` capability: "
- << m->get_connection()->peer_addrs << dendl;
+ << *m->get_connection()->peer_addrs << dendl;
ss << "permission denied";
r = -EPERM;
#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
+#include "common/item_history.h"
#include "msg/MessageRef.h"
Messenger *msgr;
RefCountedPtr priv;
int peer_type;
- entity_addrvec_t peer_addrs;
+ safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
private:
uint64_t features;
virtual entity_addr_t get_peer_socket_addr() const = 0;
entity_addr_t get_peer_addr() const {
- return peer_addrs.front();
+ return peer_addrs->front();
}
const entity_addrvec_t& get_peer_addrs() const {
- return peer_addrs;
+ return *peer_addrs;
}
void set_peer_addr(const entity_addr_t& a) {
peer_addrs = entity_addrvec_t(a);
#define dout_prefix _conn_prefix(_dout)
ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
- << peer_addrs << " conn(" << this
+ << *peer_addrs << " conn(" << this
<< (msgr2 ? " msgr2" : " legacy")
<< " :" << port
<< " s=" << get_state_name(state)
conn->connect(addrs, type, target);
ceph_assert(!conns.count(addrs));
ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
- << conn->peer_addrs << dendl;
+ << *conn->peer_addrs << dendl;
conns[addrs] = conn;
w->get_perf_counter()->inc(l_msgr_active_connections);
int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
{
Mutex::Locker l(lock);
- auto it = conns.find(conn->peer_addrs);
+ auto it = conns.find(*conn->peer_addrs);
if (it != conns.end()) {
AsyncConnectionRef existing = it->second;
return -1;
}
}
- ldout(cct, 10) << __func__ << " " << conn << " " << conn->peer_addrs << dendl;
- conns[conn->peer_addrs] = conn;
+ ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
+ conns[*conn->peer_addrs] = conn;
conn->get_perf_counter()->inc(l_msgr_active_connections);
accepting_conns.erase(conn);
return 0;
auto it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
- auto conns_it = conns.find(p->peer_addrs);
+ auto conns_it = conns.find(*p->peer_addrs);
if (conns_it != conns.end() && conns_it->second == p)
conns.erase(conns_it);
accepting_conns.erase(p);
#define dout_prefix _conn_prefix(_dout)
ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
return *_dout << "--1- " << messenger->get_myaddrs().legacy_addr() << " >> "
- << connection->peer_addrs.legacy_addr() << " conn("
+ << connection->peer_addrs->legacy_addr() << " conn("
<< connection << (connection->msgr2 ? " msgr2" : " legacy")
<< " :" << connection->port << " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq << " cs=" << connect_seq
ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
<< " on socket " << connection->cs.fd() << dendl;
- entity_addr_t peer_addr = connection->peer_addrs.legacy_addr();
+ entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
if (peer_addr != paddr) {
if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
peer_addr.get_nonce() == paddr.get_nonce()) {
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
- AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
+ AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
connection->inject_delay();
}
// connection race?
- if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
+ if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
existing->policy.server) {
// incoming wins
ldout(cct, 10) << __func__ << " accept connection race, existing "
<< __func__ << " accept connection race, existing " << existing
<< ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
- ceph_assert(connection->peer_addrs.legacy_addr() >
+ ceph_assert(connection->peer_addrs->legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
- << connection->peer_addrs.legacy_addr()
+ << connection->peer_addrs->legacy_addr()
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();
#define dout_prefix _conn_prefix(_dout)
ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
- << connection->peer_addrs << " conn("
+ << *connection->peer_addrs << " conn("
<< connection << (connection->msgr2 ? " msgr2" : " legacy")
<< " :" << connection->port << " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq << " cs=" << connect_seq
// may be trying to connect to a v2 addr, and the remote may
// identify themselves by several other addrs as well. This happens
// with mon discovery.
- if (!connection->peer_addrs.contains(peer_addr)) {
+ if (!connection->peer_addrs->contains(peer_addr)) {
ldout(cct, 10) << __func__ << " server claims to be " << peer_addr
<< " (of " << paddrs << "), but we are trying to reach "
- << connection->peer_addrs << dendl;
+ << *connection->peer_addrs << dendl;
return _fault();
}
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
- AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
+ AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
ldout(cct, 10) << __func__ << " existing " << existing
- << " on " << connection->peer_addrs << dendl;
+ << " on " << *connection->peer_addrs << dendl;
connection->inject_delay();
connection->lock.lock();
}
// connection race?
- if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
+ if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
existing->policy.server) {
// incoming wins
ldout(cct, 10) << __func__ << " accept connection race, existing "
<< __func__ << " accept connection race, existing " << existing
<< ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
- ceph_assert(connection->peer_addrs.legacy_addr() >
+ ceph_assert(connection->peer_addrs->legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
- << connection->peer_addrs.legacy_addr()
+ << connection->peer_addrs->legacy_addr()
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();
void mark_disposable() override;
entity_addr_t get_peer_socket_addr() const override {
- return peer_addrs.front();
+ return peer_addrs->front();
}
}; /* PipeConnection */