if (my_inst.addr == dest.addr) {
// local
return (Connection *)local_connection->get();
- } else {
- // remote
+ }
+
+ // remote
+ while (true) {
Pipe *pipe = NULL;
hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(dest.addr);
if (p != rank_pipe.end()) {
pipe = p->second;
- }
- if (!pipe) {
+ ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
+ } else {
pipe = connect_rank(dest.addr, dest.name.type(), NULL);
+ ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
}
- return (Connection *)pipe->connection_state->get();
+ Mutex::Locker l(pipe->pipe_lock);
+ if (pipe->connection_state)
+ return (Connection *)pipe->connection_state->get();
+ // we failed too quickly! retry. FIXME.
}
}
-void SimpleMessenger::submit_message(Message *m, Connection *con, const entity_addr_t& dest_addr, int dest_type, bool lazy)
+void SimpleMessenger::submit_message(Message *m, Connection *con,
+ const entity_addr_t& dest_addr, int dest_type, bool lazy)
{
// existing connection?
if (con) {