virtual void mark_down(const entity_addr_t& a) = 0;
+ virtual Connection *get_connection(const entity_inst_t& dest) = 0;
+
protected:
//destruction should be handled via destroy()
virtual ~Messenger() {
lock.Unlock();
}
+Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
+{
+ Mutex::Locker l(lock);
+ Pipe *pipe = NULL;
+ if (ms_addr == dest.addr) {
+ // local
+ pipe = dispatch_queue.local_pipe;
+ } else {
+ // remote
+ hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(dest.addr);
+ if (p != rank_pipe.end()) {
+ pipe = p->second;
+ pipe->pipe_lock.Lock();
+ if (pipe->state == Pipe::STATE_CLOSED) {
+ pipe->unregister_pipe();
+ pipe->pipe_lock.Unlock();
+ pipe = 0;
+ } else {
+ pipe->pipe_lock.Unlock();
+ }
+ }
+ if (!pipe) {
+ Policy& policy = get_policy(dest.name.type());
+ if (policy.lossy && policy.server)
+ pipe = NULL;
+ else
+ pipe = connect_rank(dest.addr, dest.name.type());
+ }
+ }
+ if (pipe)
+ return (Connection *)pipe->connection_state->get();
+ return NULL;
+}
+
+
void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, int dest_type, bool lazy)
{
// this is just to make sure that a changeset is working properly;
void prepare_dest(const entity_inst_t& inst);
int send_message(Message *m, const entity_inst_t& dest);
int send_message(Message *m, Connection *con);
+ Connection *get_connection(const entity_inst_t& dest);
int lazy_send_message(Message *m, const entity_inst_t& dest);
int lazy_send_message(Message *m, Connection *con) {
return send_message(m, con);