ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
return *_dout << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd << " :" << port
+ << " s=" << state
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
<< " l=" << policy.lossy
// existing?
- if (msgr->rank_pipe.count(peer_addr)) {
- existing = msgr->rank_pipe[peer_addr];
+ existing = msgr->_lookup_pipe(peer_addr);
+ if (existing) {
existing->pipe_lock.Lock();
if (connect.global_seq < existing->peer_global_seq) {
rc = tcp_write((char*)&reply, sizeof(reply));
if (rc < 0) {
- goto fail_unlocked;
+ goto fail_registered;
}
if (reply.authorizer_len) {
rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
if (rc < 0) {
- goto fail_unlocked;
+ goto fail_registered;
}
}
uint64_t newly_acked_seq = 0;
if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
- goto fail_unlocked;
+ goto fail_registered;
}
if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
- goto fail_unlocked;
+ goto fail_registered;
}
requeue_sent(newly_acked_seq);
}
return 0; // success.
+ fail_registered:
+ ldout(msgr->cct, 10) << "accept fault after register" << dendl;
fail_unlocked:
pipe_lock.Lock();
if (state != STATE_CLOSED) {
{
ldout(msgr->cct,10) << "register_pipe" << dendl;
assert(msgr->lock.is_locked());
- assert(msgr->rank_pipe.count(peer_addr) == 0);
+ Pipe *existing = msgr->_lookup_pipe(peer_addr);
+ assert(existing == NULL);
msgr->rank_pipe[peer_addr] = this;
}
void Pipe::unregister_pipe()
{
assert(msgr->lock.is_locked());
- if (msgr->rank_pipe.count(peer_addr) &&
- msgr->rank_pipe[peer_addr] == this) {
+ hash_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
+ if (p != msgr->rank_pipe.end() && p->second == this) {
ldout(msgr->cct,10) << "unregister_pipe" << dendl;
- msgr->rank_pipe.erase(peer_addr);
+ msgr->rank_pipe.erase(p);
} else {
ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
}
}
lock.Lock();
- Pipe *pipe = rank_pipe.count(dest.addr) ? rank_pipe[ dest.addr ] : NULL;
+ Pipe *pipe = _lookup_pipe(dest.addr);
submit_message(m, (pipe ? pipe->connection_state : NULL),
dest.addr, dest.name.type(), lazy);
lock.Unlock();
// remote
while (true) {
- Pipe *pipe = NULL;
- hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(dest.addr);
- if (p != rank_pipe.end()) {
- pipe = p->second;
+ Pipe *pipe = _lookup_pipe(dest.addr);
+ if (pipe) {
ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
} else {
pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
// local?
if (my_inst.addr != dest_addr) {
// remote.
- Pipe *pipe = 0;
- if (rank_pipe.count( dest_proc_addr )) {
+ Pipe *pipe = _lookup_pipe(dest_proc_addr);
+ if (pipe) {
// connected?
- pipe = rank_pipe[ dest_proc_addr ];
pipe->pipe_lock.Lock();
ldout(cct,20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl;
pipe->_send_keepalive();
void SimpleMessenger::mark_down(const entity_addr_t& addr)
{
lock.Lock();
- if (rank_pipe.count(addr)) {
- Pipe *p = rank_pipe[addr];
+ Pipe *p = _lookup_pipe(addr);
+ if (p) {
ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
p->unregister_pipe();
p->pipe_lock.Lock();
/// lock to protect the global_seq
pthread_spinlock_t global_seq_lock;
- /// hash map of addresses to Pipes
+ /**
+ * hash map of addresses to Pipes
+ *
+ * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
+ * invalid and can be replaced by anyone holding the msgr lock
+ */
hash_map<entity_addr_t, Pipe*> rank_pipe;
/// a set of all the Pipes we have which are somehow active
set<Pipe*> pipes;
friend class Pipe;
+ Pipe *_lookup_pipe(const entity_addr_t& k) {
+ hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
+ if (p == rank_pipe.end())
+ return NULL;
+ // see lock cribbing in Pipe::fault()
+ if (p->second->state == Pipe::STATE_CLOSED)
+ return NULL;
+ return p->second;
+ }
+
public:
int timeout;