*/
Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
int type,
- Connection *con)
+ Connection *con,
+ Message *first)
{
assert(lock.is_locked());
assert(addr != my_inst.addr);
pipe->set_peer_addr(addr);
pipe->policy = get_policy(type);
pipe->start_writer();
+ if (first)
+ pipe->_send(first);
pipe->pipe_lock.Unlock();
pipe->register_pipe();
pipes.insert(pipe);
pipe = p->second;
ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
} else {
- pipe = connect_rank(dest.addr, dest.name.type(), NULL);
+ pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
}
Mutex::Locker l(pipe->pipe_lock);
m->put();
} else {
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
- // not connected.
- Pipe *pipe = connect_rank(dest_addr, dest_type, con);
- pipe->send(m);
+ connect_rank(dest_addr, dest_type, con, m);
}
}
* @param type The peer type of the entity at the address.
* @param con An existing Connection to associate with the new Pipe. If
* NULL, it creates a new Connection.
+ * @param msg an initial message to queue on the new pipe
*
* @return a pointer to the newly-created Pipe. Caller does not own a
* reference; take one if you need it.
*/
- Pipe *connect_rank(const entity_addr_t& addr, int type, Connection *con);
+ Pipe *connect_rank(const entity_addr_t& addr, int type, Connection *con, Message *first);
/**
* Send a message, lazily or not.
* This just glues [lazy_]send_message together and passes