<< " " << m
<< dendl;
- submit_message(m, dest);
-
+ submit_message(m, dest.addr, dest.name.type(), false);
return 0;
}
if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
dout(1) << "--> " << con->get_peer_addr() << " -- " << *m
<< " -- ?+" << m->get_data().length()
<< " " << m
<< dendl;
- submit_message(m, (Pipe **)&con->pipe, con->get_peer_addr(),
- con->get_peer_type());
+ submit_message(m, (SimpleMessenger::Pipe *)con->pipe);
return 0;
}
<< " " << m
<< dendl;
- submit_message(m, dest, true);
-
- return 0;
-}
-
-int SimpleMessenger::lazy_send_message(Message *m, Connection *con)
-{
- //set envelope
- m->get_header().src = get_myname();
-
- if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
-
- dout(1) << "lazy "
- << "--> " << con->get_peer_addr() << " -- " << *m
- << " -- ?+" << m->get_data().length()
- << " " << m
- << dendl;
-
- submit_message(m, (Pipe **)&con->pipe, con->get_peer_addr(),
- con->get_peer_type(), true);
+ submit_message(m, dest.addr, dest.name.type(), true);
return 0;
}
return true;
}
-void SimpleMessenger::submit_message(Message *m, Pipe **ppipe,
- const entity_addr_t& dest_addr,
- int dest_type, bool lazy)
+void SimpleMessenger::submit_message(Message *m, Pipe *pipe)
+{
+ lock.Lock();
+ {
+ pipe->pipe_lock.Lock();
+ if (pipe->state == Pipe::STATE_CLOSED) {
+ dout(20) << "submit_message " << *m << " ignoring closed pipe " << pipe->peer_addr << dendl;
+ pipe->unregister_pipe();
+ pipe->pipe_lock.Unlock();
+ m->put();
+ } else {
+ dout(20) << "submit_message " << *m << " remote " << pipe->peer_addr << dendl;
+ pipe->_send(m);
+ pipe->pipe_lock.Unlock();
+ }
+ }
+ lock.Unlock();
+}
+
+void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, int dest_type, bool lazy)
{
assert(m->nref.test() == 1); //this is just to make sure that a changeset
//is working properly; if you start using the refcounting more and have multiple
//people hanging on to a message, ditch the assert!
-
lock.Lock();
{
// local?
assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
m->put();
}
- }
- else { // remote pipe.
- Pipe *pipe = NULL;
- if (ppipe) pipe = *ppipe;
- if (!pipe && !rank_pipe.count(dest_addr)) goto no_pipe;
- else {
- // connected?
- if (!pipe) pipe = rank_pipe[ dest_addr ];
+ } else {
+ // remote pipe.
+ Pipe *pipe = 0;
+ if (rank_pipe.count(dest_addr)) {
+ pipe = rank_pipe[ dest_addr ];
pipe->pipe_lock.Lock();
if (pipe->state == Pipe::STATE_CLOSED) {
- dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+ dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring closed pipe." << dendl;
pipe->unregister_pipe();
pipe->pipe_lock.Unlock();
pipe = 0;
} else {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
-
+
pipe->_send(m);
pipe->pipe_lock.Unlock();
}
}
- no_pipe:
if (!pipe) {
if (lazy) {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl;
int send_message(Message *m, const entity_inst_t& dest);
int send_message(Message *m, Connection *con);
int lazy_send_message(Message *m, const entity_inst_t& dest);
- int lazy_send_message(Message *m, Connection *con);
+ int lazy_send_message(Message *m, Connection *con) {
+ return send_message(m, con);
+ }
+
/***********************/
private:
bool register_entity(entity_name_t addr);
- void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false) {
- submit_message(m, NULL, addr.addr, addr.name.type(), lazy);
- }
- void submit_message(Message *m, Pipe **ppipe, const entity_addr_t& dest_addr,
- int dest_type, bool lazy=false);
+ void submit_message(Message *m, const entity_addr_t& addr, int dest_type, bool lazy);
+ void submit_message(Message *m, Pipe *pipe);
int send_keepalive(const entity_inst_t& addr);