lock.Lock();
Pipe *pipe = _lookup_pipe(dest.addr);
submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
- dest.addr, dest.name.type(), lazy);
+ dest.addr, dest.name.type(), lazy, true);
lock.Unlock();
return 0;
}
<< " " << m << " con " << con
<< dendl;
- lock.Lock();
- submit_message(m, con, con->get_peer_addr(), con->get_peer_type(), lazy);
- lock.Unlock();
+ submit_message(m, con, con->get_peer_addr(), con->get_peer_type(), lazy, false);
return 0;
}
}
void SimpleMessenger::submit_message(Message *m, Connection *con,
- const entity_addr_t& dest_addr, int dest_type, bool lazy)
+ const entity_addr_t& dest_addr, int dest_type,
+ bool lazy, bool already_locked)
{
-
if (cct->_conf->ms_dump_on_send) {
m->encode(-1, true);
ldout(cct, 0) << "submit_message " << *m << "\n";
m->put();
return;
}
- if (pipe) {
- pipe->pipe_lock.Lock();
+ while (pipe && ok) {
+ // we loop in case of a racing reconnect, either from us or them
+ pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
if (pipe->state != Pipe::STATE_CLOSED) {
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
pipe->_send(m);
pipe->put();
return;
}
+ Pipe *current_pipe;
+ ok = con->try_get_pipe((RefCountedObject**)¤t_pipe);
pipe->pipe_lock.Unlock();
- pipe->put();
- ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
- << ", had pipe " << pipe << ", but it closed." << dendl;
- m->put();
- return;
+ if (current_pipe == pipe) {
+ ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
+ << ", had pipe " << pipe << ", but it closed." << dendl;
+ pipe->put();
+ current_pipe->put();
+ m->put();
+ return;
+ } else {
+ pipe->put();
+ pipe = current_pipe;
+ }
}
}
m->put();
} else {
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
- connect_rank(dest_addr, dest_type, con, m);
+ if (!already_locked) {
+ /** We couldn't handle the Message without reference to global data, so
+ * grab the lock and do it again. If we got here, we know it's a non-lossy
+ * Connection, so we can use our existing pointer without doing another lookup. */
+ Mutex::Locker l(lock);
+ submit_message(m, con, dest_addr, dest_type, lazy, true);
+ } else {
+ connect_rank(dest_addr, dest_type, con, m);
+ }
}
}
* @param dest_type The peer type of the address we're sending to
* @param lazy If true, do not establish or fix a Connection to send the Message;
* just drop silently under failure.
+ * @param already_locked If false, submit_message() will acquire the
+ * SimpleMessenger lock before accessing shared data structures; otherwise
+ * it will assume the lock is held. NOTE: if you are making a request
+ * without locking, you MUST have filled in the con with a valid pointer.
*/
void submit_message(Message *m, Connection *con,
- const entity_addr_t& addr, int dest_type, bool lazy);
+ const entity_addr_t& addr, int dest_type,
+ bool lazy, bool already_locked);
/**
* Look through the pipes in the pipe_reap_queue and tear them down.
*/