]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: simplify request forwarding
authorSage Weil <sage@newdream.net>
Fri, 11 Jun 2010 20:34:14 +0000 (13:34 -0700)
committerSage Weil <sage@newdream.net>
Fri, 11 Jun 2010 20:34:14 +0000 (13:34 -0700)
Use the MonSession to track proxy state, NOT the PaxosServiceMessage header
hackery.

src/messages/MForward.h
src/mon/MonClient.cc
src/mon/Monitor.cc
src/mon/Session.h

index 416cc485c80c1e64e5a1a2e98e43b860199a86d8..f84e68267feb14dfff4ce347614be74db04a34d4 100644 (file)
 #include "include/encoding.h"
 
 struct MForward : public Message {
+  uint64_t tid;
   PaxosServiceMessage *msg;
   entity_inst_t client;
   MonCaps client_caps;
   
-  MForward() : Message(MSG_FORWARD), msg(NULL) {}
+  MForward() : Message(MSG_FORWARD), tid(0), msg(NULL) {}
   //the message needs to have caps filled in!
-  MForward(PaxosServiceMessage *m) :
-    Message(MSG_FORWARD), msg(m) {
+  MForward(uint64_t t, PaxosServiceMessage *m) :
+    Message(MSG_FORWARD), tid(t), msg(m) {
     client = m->get_source_inst();
     client_caps = m->get_session()->caps;
   }
-  MForward(PaxosServiceMessage *m, MonCaps caps) :
-    Message(MSG_FORWARD), msg(m), client_caps(caps) {
+  MForward(uint64_t t, PaxosServiceMessage *m, MonCaps caps) :
+    Message(MSG_FORWARD), tid(t), msg(m), client_caps(caps) {
     client = m->get_source_inst();
   }
 private:
@@ -46,6 +47,7 @@ private:
 
 public:
   void encode_payload() {
+    ::encode(tid, payload);
     ::encode(client, payload);
     ::encode(client_caps, payload);
     encode_message(msg, payload);
@@ -53,6 +55,7 @@ public:
 
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
+    ::decode(tid, p);
     ::decode(client, p);
     ::decode(client_caps, p);
     msg = (PaxosServiceMessage *)decode_message(p);
index a83db9618c47ad0a5333bf392649dfe065e77540..8d1dc1a7b27b6db46aa33363b149d48544df5db2 100644 (file)
@@ -366,6 +366,7 @@ void MonClient::_send_mon_message(Message *m, bool force)
 {
   assert(!cur_mon.empty());
   if (force || state == MC_STATE_HAVE_SESSION) {
+    dout(10) << "_send_mon_message to mon." << cur_mon << " at " << monmap.get_inst(cur_mon) << dendl;
     messenger->send_message(m, monmap.get_inst(cur_mon));
   } else {
     waiting_for_session.push_back(m);
index 99136f2b6436dd8a64a30af870ecd0da737e5454..15d9cff796c03b9d738d1bd4f455cc63cc681eef 100644 (file)
@@ -374,24 +374,14 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req)
   } else if (session && !session->closed) {
     RoutedRequest *rr = new RoutedRequest;
     rr->tid = ++routed_request_tid;
-
-    dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
-
-    dout(10) << " noting that i mon" << rank << " own this requests's session" << dendl;
-    //set forwarding variables; clear payload so it re-encodes properly
-    req->session_mon = rank;
-    req->session_mon_tid = rr->tid;
-    req->clear_payload();
-    //of course, the need to forward does give it an effectively lower priority
-    
-    //encode forward message and insert into routed_requests
     encode_message(req, rr->request_bl);
     rr->session = (MonSession *)session->get();
     routed_requests[rr->tid] = rr;
-
     session->routed_request_tids.insert(rr->tid);
     
-    MForward *forward = new MForward(req);
+    dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
+
+    MForward *forward = new MForward(rr->tid, req, rr->session->caps);
     forward->set_priority(req->get_priority());
     messenger->send_message(forward, monmap->get_inst(mon));
   } else {
@@ -414,15 +404,21 @@ void Monitor::handle_forward(MForward *m)
     dout(0) << "forward from entity with insufficient caps! " 
            << session->caps << dendl;
   } else {
-
     Connection *c = new Connection;
     MonSession *s = new MonSession(m->msg->get_source_inst(), c);
-    s->caps = m->client_caps;
     c->set_priv(s);
 
+    s->caps = m->client_caps;
+    s->proxy_con = m->get_connection()->get();
+    s->proxy_tid = m->tid;
+
     PaxosServiceMessage *req = m->msg;
     m->msg = NULL;  // so ~MForward doesn't delete it
     req->set_connection(c);
+
+    dout(10) << " got tid " << s->proxy_tid << " " << m << " " << *m
+            << " from " << m->get_orig_source_inst() << dendl;
+
     _ms_dispatch(req);
   }
   session->put();
@@ -447,20 +443,22 @@ void Monitor::try_send_message(Message *m, entity_inst_t to)
 
 void Monitor::send_reply(PaxosServiceMessage *req, Message *reply, entity_inst_t to)
 {
-  if (req->session_mon >= 0) {
-    if (req->session_mon < (int)monmap->size()) {
-      dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon
-              << " for request " << *req << dendl;
-      messenger->send_message(new MRoute(req->session_mon_tid, reply, to),
-                             monmap->get_inst(req->session_mon));
-    } else {
-      dout(2) << "send_reply mon" << req->session_mon << " dne, dropping reply " << *reply
-             << " to " << *req << " for " << to << dendl;
-      reply->put();
-    }
+  MonSession *session = (MonSession*)req->get_connection()->get_priv();
+  if (!session) {
+    dout(2) << "send_reply no session, dropping reply " << *reply
+           << " to " << req << " " << *req << " for " << to << dendl;
+    reply->put();
+    return;
+  }
+  if (session->proxy_con) {
+    dout(15) << "send_reply routing reply to " << to << " via mon" << req->session_mon
+            << " for request " << *req << dendl;
+    messenger->send_message(new MRoute(session->proxy_tid, reply, to),
+                           session->proxy_con);    
   } else {
-    messenger->send_message(reply, to);
+    messenger->send_message(reply, session->con);
   }
+  session->put();
 }
 
 void Monitor::handle_route(MRoute *m)
@@ -507,7 +505,7 @@ void Monitor::resend_routed_requests()
     PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(q);
 
     dout(10) << " resend to mon" << mon << " tid " << rr->tid << " " << *req << dendl;
-    MForward *forward = new MForward(req, rr->session->caps);
+    MForward *forward = new MForward(rr->tid, req, rr->session->caps);
     forward->set_priority(req->get_priority());
     messenger->send_message(forward, monmap->get_inst(mon));
   }  
index fda8d3e8576c7741361339d4fe8c21a33fc213f9..b1908f128e14cf87628726b529b9cc8eeeddea7e 100644 (file)
@@ -49,12 +49,18 @@ struct MonSession : public RefCountedObject {
 
   AuthServiceHandler *auth_handler;
 
+  Connection *proxy_con;
+  uint64_t proxy_tid;
+
   MonSession(entity_inst_t i, Connection *c) :
     con(c->get()), inst(i), closed(false), item(this),
-    global_id(0), notified_global_id(0), auth_handler(NULL) {}
+    global_id(0), notified_global_id(0), auth_handler(NULL),
+    proxy_con(NULL), proxy_tid(0) {}
   ~MonSession() {
     if (con)
       con->put();
+    if (proxy_con)
+      proxy_con->put();
     generic_dout(0) << "~MonSession " << this << dendl;
     // we should have been removed before we get destructed; see MonSessionMap::remove_session()
     assert(!item.is_on_list());