#include "include/encoding.h"
struct MForward : public Message {
+ uint64_t tid;
PaxosServiceMessage *msg;
entity_inst_t client;
MonCaps client_caps;
- MForward() : Message(MSG_FORWARD), msg(NULL) {}
+ MForward() : Message(MSG_FORWARD), tid(0), msg(NULL) {}
//the message needs to have caps filled in!
- MForward(PaxosServiceMessage *m) :
- Message(MSG_FORWARD), msg(m) {
+ MForward(uint64_t t, PaxosServiceMessage *m) :
+ Message(MSG_FORWARD), tid(t), msg(m) {
client = m->get_source_inst();
client_caps = m->get_session()->caps;
}
- MForward(PaxosServiceMessage *m, MonCaps caps) :
- Message(MSG_FORWARD), msg(m), client_caps(caps) {
+ MForward(uint64_t t, PaxosServiceMessage *m, MonCaps caps) :
+ Message(MSG_FORWARD), tid(t), msg(m), client_caps(caps) {
client = m->get_source_inst();
}
private:
public:
void encode_payload() {
+ ::encode(tid, payload);
::encode(client, payload);
::encode(client_caps, payload);
encode_message(msg, payload);
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ ::decode(tid, p);
::decode(client, p);
::decode(client_caps, p);
msg = (PaxosServiceMessage *)decode_message(p);
} else if (session && !session->closed) {
RoutedRequest *rr = new RoutedRequest;
rr->tid = ++routed_request_tid;
-
- dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
-
- dout(10) << " noting that i mon" << rank << " own this requests's session" << dendl;
- //set forwarding variables; clear payload so it re-encodes properly
- req->session_mon = rank;
- req->session_mon_tid = rr->tid;
- req->clear_payload();
- //of course, the need to forward does give it an effectively lower priority
-
- //encode forward message and insert into routed_requests
encode_message(req, rr->request_bl);
rr->session = (MonSession *)session->get();
routed_requests[rr->tid] = rr;
-
session->routed_request_tids.insert(rr->tid);
- MForward *forward = new MForward(req);
+ dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
+
+ MForward *forward = new MForward(rr->tid, req, rr->session->caps);
forward->set_priority(req->get_priority());
messenger->send_message(forward, monmap->get_inst(mon));
} else {
dout(0) << "forward from entity with insufficient caps! "
<< session->caps << dendl;
} else {
-
Connection *c = new Connection;
MonSession *s = new MonSession(m->msg->get_source_inst(), c);
- s->caps = m->client_caps;
c->set_priv(s);
+ s->caps = m->client_caps;
+ s->proxy_con = m->get_connection()->get();
+ s->proxy_tid = m->tid;
+
PaxosServiceMessage *req = m->msg;
m->msg = NULL; // so ~MForward doesn't delete it
req->set_connection(c);
+
+ dout(10) << " got tid " << s->proxy_tid << " " << m << " " << *m
+ << " from " << m->get_orig_source_inst() << dendl;
+
_ms_dispatch(req);
}
session->put();
void Monitor::send_reply(PaxosServiceMessage *req, Message *reply, entity_inst_t to)
{
- if (req->session_mon >= 0) {
- if (req->session_mon < (int)monmap->size()) {
- dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon
- << " for request " << *req << dendl;
- messenger->send_message(new MRoute(req->session_mon_tid, reply, to),
- monmap->get_inst(req->session_mon));
- } else {
- dout(2) << "send_reply mon" << req->session_mon << " dne, dropping reply " << *reply
- << " to " << *req << " for " << to << dendl;
- reply->put();
- }
+ MonSession *session = (MonSession*)req->get_connection()->get_priv();
+ if (!session) {
+ dout(2) << "send_reply no session, dropping reply " << *reply
+ << " to " << req << " " << *req << " for " << to << dendl;
+ reply->put();
+ return;
+ }
+ if (session->proxy_con) {
+ dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon
+ << " for request " << *req << dendl;
+ messenger->send_message(new MRoute(session->proxy_tid, reply, to),
+ session->proxy_con);
} else {
- messenger->send_message(reply, to);
+ messenger->send_message(reply, session->con);
}
+ session->put();
}
void Monitor::handle_route(MRoute *m)
PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(q);
dout(10) << " resend to mon" << mon << " tid " << rr->tid << " " << *req << dendl;
- MForward *forward = new MForward(req, rr->session->caps);
+ MForward *forward = new MForward(rr->tid, req, rr->session->caps);
forward->set_priority(req->get_priority());
messenger->send_message(forward, monmap->get_inst(mon));
}
AuthServiceHandler *auth_handler;
+ Connection *proxy_con;
+ uint64_t proxy_tid;
+
MonSession(entity_inst_t i, Connection *c) :
con(c->get()), inst(i), closed(false), item(this),
- global_id(0), notified_global_id(0), auth_handler(NULL) {}
+ global_id(0), notified_global_id(0), auth_handler(NULL),
+ proxy_con(NULL), proxy_tid(0) {}
~MonSession() {
if (con)
con->put();
+ if (proxy_con)
+ proxy_con->put();
generic_dout(0) << "~MonSession " << this << dendl;
// we should have been removed before we get destructed; see MonSessionMap::remove_session()
assert(!item.is_on_list());