entity_inst_t dest;
MRoute() : Message(MSG_ROUTE), msg(NULL) {}
- MRoute(uint64_t t, Message *m, entity_inst_t i) :
- Message(MSG_ROUTE), session_mon_tid(t), msg(m), dest(i) {}
- MRoute(uint64_t t, bufferlist bl, entity_inst_t i) :
- Message(MSG_ROUTE), session_mon_tid(t), dest(i) {
+ MRoute(uint64_t t, Message *m) :
+ Message(MSG_ROUTE), session_mon_tid(t), msg(m) {}
+ MRoute(bufferlist bl, entity_inst_t i) :
+ Message(MSG_ROUTE), session_mon_tid(0), dest(i) {
bufferlist::iterator p = bl.begin();
msg = decode_message(p);
}
const char *get_type_name() { return "route"; }
void print(ostream& o) {
if (msg)
- o << "route(" << *msg << " to " << dest << ")";
+ o << "route(" << *msg;
else
- o << "route(??? to " << dest << ")";
+ o << "route(???";
+ if (session_mon_tid)
+ o << " tid " << session_mon_tid << ")";
+ else
+ o << " to " << dest << ")";
}
};
for (int i=0; i<(int)monmap->size(); i++) {
if (i != rank)
- messenger->send_message(new MRoute(0, bl, to),
- monmap->get_inst(i));
+ messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
}
}
-void Monitor::send_reply(PaxosServiceMessage *req, Message *reply, entity_inst_t to)
+void Monitor::send_reply(PaxosServiceMessage *req, Message *reply)
{
MonSession *session = (MonSession*)req->get_connection()->get_priv();
if (!session) {
dout(2) << "send_reply no session, dropping reply " << *reply
- << " to " << req << " " << *req << " for " << to << dendl;
+ << " to " << req << " " << *req << dendl;
reply->put();
return;
}
if (session->proxy_con) {
- dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon
+ dout(15) << "send_reply routing reply to " << req->get_connection()->get_peer_addr()
+ << " via mon" << req->session_mon
<< " for request " << *req << dendl;
- messenger->send_message(new MRoute(session->proxy_tid, reply, to),
+ messenger->send_message(new MRoute(session->proxy_tid, reply),
session->proxy_con);
} else {
messenger->send_message(reply, session->con);
dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
// look it up
- if (routed_requests.count(m->session_mon_tid)) {
- RoutedRequest *rr = routed_requests[m->session_mon_tid];
- messenger->send_message(m->msg, rr->session->inst);
- m->msg = NULL;
- routed_requests.erase(m->session_mon_tid);
- rr->session->routed_request_tids.insert(rr->tid);
- delete rr;
+ if (m->session_mon_tid) {
+ if (routed_requests.count(m->session_mon_tid)) {
+ RoutedRequest *rr = routed_requests[m->session_mon_tid];
+ messenger->send_message(m->msg, rr->session->inst);
+ m->msg = NULL;
+ routed_requests.erase(m->session_mon_tid);
+ rr->session->routed_request_tids.insert(rr->tid);
+ delete rr;
+ } else {
+ dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
+ }
} else {
- dout(10) << " don't have routed request tid " << m->session_mon_tid
- << ", trying to send anyway" << dendl;
+ dout(10) << " not a routed request, trying to send anyway" << dendl;
messenger->lazy_send_message(m->msg, m->dest);
m->msg = NULL;
}
m->put();
- if (session) session->put();
+ if (session)
+ session->put();
}
void Monitor::resend_routed_requests()
void forward_request_leader(PaxosServiceMessage *req);
void handle_forward(MForward *m);
void try_send_message(Message *m, entity_inst_t to);
- void send_reply(PaxosServiceMessage *req, Message *reply, entity_inst_t to);
- void send_reply(PaxosServiceMessage *req, Message *reply) {
- send_reply(req, reply, req->get_orig_source_inst());
- }
+ void send_reply(PaxosServiceMessage *req, Message *reply);
void resend_routed_requests();
void remove_session(MonSession *s);