From: Sage Weil Date: Fri, 11 Jun 2010 20:34:14 +0000 (-0700) Subject: mon: simplify request forwarding X-Git-Tag: v0.22~368 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ddf739f6162a83a8b6d565a82970d433b9efdaf7;p=ceph.git mon: simplify request forwarding Use the MonSession to track proxy state, NOT the PaxosServiceMessage header hackery. --- diff --git a/src/messages/MForward.h b/src/messages/MForward.h index 416cc485c80..f84e68267fe 100644 --- a/src/messages/MForward.h +++ b/src/messages/MForward.h @@ -24,19 +24,20 @@ #include "include/encoding.h" struct MForward : public Message { + uint64_t tid; PaxosServiceMessage *msg; entity_inst_t client; MonCaps client_caps; - MForward() : Message(MSG_FORWARD), msg(NULL) {} + MForward() : Message(MSG_FORWARD), tid(0), msg(NULL) {} //the message needs to have caps filled in! - MForward(PaxosServiceMessage *m) : - Message(MSG_FORWARD), msg(m) { + MForward(uint64_t t, PaxosServiceMessage *m) : + Message(MSG_FORWARD), tid(t), msg(m) { client = m->get_source_inst(); client_caps = m->get_session()->caps; } - MForward(PaxosServiceMessage *m, MonCaps caps) : - Message(MSG_FORWARD), msg(m), client_caps(caps) { + MForward(uint64_t t, PaxosServiceMessage *m, MonCaps caps) : + Message(MSG_FORWARD), tid(t), msg(m), client_caps(caps) { client = m->get_source_inst(); } private: @@ -46,6 +47,7 @@ private: public: void encode_payload() { + ::encode(tid, payload); ::encode(client, payload); ::encode(client_caps, payload); encode_message(msg, payload); @@ -53,6 +55,7 @@ public: void decode_payload() { bufferlist::iterator p = payload.begin(); + ::decode(tid, p); ::decode(client, p); ::decode(client_caps, p); msg = (PaxosServiceMessage *)decode_message(p); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index a83db9618c4..8d1dc1a7b27 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -366,6 +366,7 @@ void MonClient::_send_mon_message(Message *m, bool force) { assert(!cur_mon.empty()); if (force || state == MC_STATE_HAVE_SESSION) { + dout(10) << "_send_mon_message to mon." << cur_mon << " at " << monmap.get_inst(cur_mon) << dendl; messenger->send_message(m, monmap.get_inst(cur_mon)); } else { waiting_for_session.push_back(m); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 99136f2b643..15d9cff796c 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -374,24 +374,14 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req) } else if (session && !session->closed) { RoutedRequest *rr = new RoutedRequest; rr->tid = ++routed_request_tid; - - dout(10) << "forward_request " << rr->tid << " request " << *req << dendl; - - dout(10) << " noting that i mon" << rank << " own this requests's session" << dendl; - //set forwarding variables; clear payload so it re-encodes properly - req->session_mon = rank; - req->session_mon_tid = rr->tid; - req->clear_payload(); - //of course, the need to forward does give it an effectively lower priority - - //encode forward message and insert into routed_requests encode_message(req, rr->request_bl); rr->session = (MonSession *)session->get(); routed_requests[rr->tid] = rr; - session->routed_request_tids.insert(rr->tid); - MForward *forward = new MForward(req); + dout(10) << "forward_request " << rr->tid << " request " << *req << dendl; + + MForward *forward = new MForward(rr->tid, req, rr->session->caps); forward->set_priority(req->get_priority()); messenger->send_message(forward, monmap->get_inst(mon)); } else { @@ -414,15 +404,21 @@ void Monitor::handle_forward(MForward *m) dout(0) << "forward from entity with insufficient caps! " << session->caps << dendl; } else { - Connection *c = new Connection; MonSession *s = new MonSession(m->msg->get_source_inst(), c); - s->caps = m->client_caps; c->set_priv(s); + s->caps = m->client_caps; + s->proxy_con = m->get_connection()->get(); + s->proxy_tid = m->tid; + PaxosServiceMessage *req = m->msg; m->msg = NULL; // so ~MForward doesn't delete it req->set_connection(c); + + dout(10) << " got tid " << s->proxy_tid << " " << m << " " << *m + << " from " << m->get_orig_source_inst() << dendl; + _ms_dispatch(req); } session->put(); @@ -447,20 +443,22 @@ void Monitor::try_send_message(Message *m, entity_inst_t to) void Monitor::send_reply(PaxosServiceMessage *req, Message *reply, entity_inst_t to) { - if (req->session_mon >= 0) { - if (req->session_mon < (int)monmap->size()) { - dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon - << " for request " << *req << dendl; - messenger->send_message(new MRoute(req->session_mon_tid, reply, to), - monmap->get_inst(req->session_mon)); - } else { - dout(2) << "send_reply mon" << req->session_mon << " dne, dropping reply " << *reply - << " to " << *req << " for " << to << dendl; - reply->put(); - } + MonSession *session = (MonSession*)req->get_connection()->get_priv(); + if (!session) { + dout(2) << "send_reply no session, dropping reply " << *reply + << " to " << req << " " << *req << " for " << to << dendl; + reply->put(); + return; + } + if (session->proxy_con) { + dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon + << " for request " << *req << dendl; + messenger->send_message(new MRoute(session->proxy_tid, reply, to), + session->proxy_con); } else { - messenger->send_message(reply, to); + messenger->send_message(reply, session->con); } + session->put(); } void Monitor::handle_route(MRoute *m) @@ -507,7 +505,7 @@ void Monitor::resend_routed_requests() PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(q); dout(10) << " resend to mon" << mon << " tid " << rr->tid << " " << *req << dendl; - MForward *forward = new MForward(req, rr->session->caps); + MForward *forward = new MForward(rr->tid, req, rr->session->caps); forward->set_priority(req->get_priority()); messenger->send_message(forward, monmap->get_inst(mon)); } diff --git a/src/mon/Session.h b/src/mon/Session.h index fda8d3e8576..b1908f128e1 100644 --- a/src/mon/Session.h +++ b/src/mon/Session.h @@ -49,12 +49,18 @@ struct MonSession : public RefCountedObject { AuthServiceHandler *auth_handler; + Connection *proxy_con; + uint64_t proxy_tid; + MonSession(entity_inst_t i, Connection *c) : con(c->get()), inst(i), closed(false), item(this), - global_id(0), notified_global_id(0), auth_handler(NULL) {} + global_id(0), notified_global_id(0), auth_handler(NULL), + proxy_con(NULL), proxy_tid(0) {} ~MonSession() { if (con) con->put(); + if (proxy_con) + proxy_con->put(); generic_dout(0) << "~MonSession " << this << dendl; // we should have been removed before we get destructed; see MonSessionMap::remove_session() assert(!item.is_on_list());