}
void SimpleMessenger::submit_message(Message *m, Pipe **ppipe,
- const entity_addr_t& dest_addr
+ const entity_addr_t& dest_addr,
int dest_type, bool lazy)
{
assert(m->nref.test() == 1); //this is just to make sure that a changeset
m->put();
}
}
- else {
- // remote.
- Pipe *pipe = 0;
- if (rank_pipe.count( dest_addr )) {
+ else { // remote pipe.
+ Pipe *pipe = NULL;
+ if (ppipe) pipe = *ppipe;
+ if (!pipe && !rank_pipe.count(dest_addr)) goto no_pipe;
+ else {
// connected?
- pipe = rank_pipe[ dest_addr ];
+ if (!pipe) pipe = rank_pipe[ dest_addr ];
pipe->pipe_lock.Lock();
if (pipe->state == Pipe::STATE_CLOSED) {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
pipe->pipe_lock.Unlock();
}
}
+ no_pipe:
if (!pipe) {
if (lazy) {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl;