/* set source */
msg->hdr.src = msgr->inst;
+ msg->hdr.orig_src = msgr->inst;
/* do we have the connection? */
spin_lock(&msgr->con_lock);
int FakeMessenger::send_message(Message *m, entity_inst_t inst)
{
- entity_name_t dest = inst.name;
-
- m->set_source(get_myname());
- m->set_source_addr(get_myaddr());
+ m->set_source_inst(_myinst);
+ m->set_orig_source_inst(_myinst);
+ m->set_dest_inst(inst);
+ return submit_message(m, inst);
+}
+int FakeMessenger::forward_message(Message *m, entity_inst_t inst)
+{
+ m->set_source_inst(_myinst);
m->set_dest_inst(inst);
+ return submit_message(m, inst);
+}
+
+int FakeMessenger::submit_message(Message *m, entity_inst_t inst)
+{
+ entity_name_t dest = inst.name;
lock.Lock();
// msg interface
int send_message(Message *m, entity_inst_t dest);
+ int forward_message(Message *m, entity_inst_t dest);
+ int submit_message(Message *m, entity_inst_t dest);
int get_dispatch_queue_len() { return qlen; }
// source/dest
entity_inst_t get_dest_inst() { return entity_inst_t(env.dst); }
+ entity_name_t get_dest() { return entity_name_t(env.dst.name); }
void set_dest_inst(entity_inst_t& inst) { env.dst = inst; }
entity_inst_t get_source_inst() { return entity_inst_t(env.src); }
- void set_source_inst(entity_inst_t& inst) { env.src = inst; }
-
- entity_name_t get_dest() { return entity_name_t(env.dst.name); }
- void set_dest(entity_name_t a) { env.dst.name = a; }
-
entity_name_t get_source() { return entity_name_t(env.src.name); }
- void set_source(entity_name_t a) { env.src.name = a; }
-
entity_addr_t get_source_addr() { return entity_addr_t(env.src.addr); }
- void set_source_addr(const entity_addr_t &i) { env.src.addr = i; }
+ void set_source_inst(entity_inst_t& inst) { env.src = inst; }
+
+ entity_inst_t get_orig_source_inst() { return entity_inst_t(env.orig_src); }
+ entity_name_t get_orig_source() { return entity_name_t(env.orig_src.name); }
+ entity_addr_t get_orig_source_addr() { return entity_addr_t(env.orig_src.addr); }
+ void set_orig_source_inst(entity_inst_t &i) { env.orig_src = i; }
// virtual bits
virtual void decode_payload() = 0;
// send message
virtual void prepare_dest(const entity_inst_t& inst) {}
virtual int send_message(Message *m, entity_inst_t dest) = 0;
+ virtual int forward_message(Message *m, entity_inst_t dest) = 0;
virtual int lazy_send_message(Message *m, entity_inst_t dest) {
return send_message(m, dest);
}
{
// set envelope
m->set_source_inst(_myinst);
+ m->set_orig_source_inst(_myinst);
m->set_dest_inst(dest);
dout(1) << m->get_source()
return 0;
}
+int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest)
+{
+ // set envelope
+ m->set_source_inst(_myinst);
+ m->set_dest_inst(dest);
+
+ dout(1) << m->get_source()
+ << " **> " << dest.name << " " << dest.addr
+ << " -- " << *m
+ << " -- " << m
+ << dendl;
+
+ rank.submit_message(m, dest.addr);
+
+ return 0;
+}
+
+
+
int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
+ m->set_orig_source_inst(_myinst);
m->set_dest_inst(dest);
dout(1) << "lazy " << m->get_source()
if (env.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
dout(10) << "reader munging src addr " << env.src << " to be " << peer_addr << dendl;
- env.src.addr.ipaddr = peer_addr.ipaddr;
+ env.orig_src.addr.ipaddr = env.src.addr.ipaddr = peer_addr.ipaddr;
}
// read front
void suicide();
void prepare_dest(const entity_inst_t& inst);
int send_message(Message *m, entity_inst_t dest);
+ int forward_message(Message *m, entity_inst_t dest);
int lazy_send_message(Message *m, entity_inst_t dest);
void mark_down(entity_addr_t a);