]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: Encapsulate forwarded PaxosServiceMessages into MForward with their caps
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 16 Mar 2010 21:48:31 +0000 (14:48 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 17 Mar 2010 21:57:12 +0000 (14:57 -0700)
src/mon/Monitor.cc
src/mon/Monitor.h

index 71407360b98a633de7265fa32fdd72f9a71cc59f..23ca670cf3c79df073d5bbe61e8cc1a2b3c788a3 100644 (file)
@@ -34,6 +34,7 @@
 #include "messages/MMonPaxos.h"
 #include "messages/MClass.h"
 #include "messages/MRoute.h"
+#include "messages/MForward.h"
 
 #include "messages/MMonSubscribe.h"
 #include "messages/MMonSubscribeAck.h"
@@ -346,19 +347,24 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req)
     rr->tid = ++routed_request_tid;
 
     dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
+    MForward* forward = new MForward(req);
 
-    encode_message(req, rr->request_bl);
-    rr->session = (Session *)session->get();
-    routed_requests[rr->tid] = rr;
-
-    session->routed_request_tids.insert(rr->tid);
-    
     dout(10) << " noting that i mon" << whoami << " own this requests's session" << dendl;
+    //set forwarding variables; clear payload so it re-encodes properly
     req->session_mon = whoami;
     req->session_mon_tid = rr->tid;
     req->clear_payload();
+    forward->set_priority(req->get_priority());
+    //of course, the need to forward does give it an effectively lower priority
     
-    messenger->forward_message(req, monmap->get_inst(mon));
+    //encode forward message and insert into routed_requests
+    encode_message(forward, rr->request_bl);
+    rr->session = (Session *)session->get();
+    routed_requests[rr->tid] = rr;
+
+    session->routed_request_tids.insert(rr->tid);
+    
+    messenger->forward_message(forward, monmap->get_inst(mon));
   } else {
     dout(10) << "forward_request no session for request " << *req << dendl;
     delete req;
@@ -367,6 +373,17 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req)
     session->put();
 }
 
+//extract the original message and put it into the regular dispatch function
+void Monitor::handle_forward(MForward *m)
+{
+  dout(10) << "received forwarded message from " << m->msg->get_source_inst()
+          << " via " << m->get_source_inst() << dendl;
+  PaxosServiceMessage *req = m->msg;
+  ms_dispatch(req);
+  m->msg = NULL;
+  delete m;
+}
+
 void Monitor::try_send_message(Message *m, entity_inst_t to)
 {
   dout(10) << "try_send_message " << *m << " to " << to << dendl;
@@ -693,6 +710,11 @@ do { \
       handle_class((MClass *)m);
       break;
 
+    case MSG_FORWARD:
+      ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
+      handle_forward((MForward *)m);
+      break;
+
     default:
       ret = false;
     }
index d1201f769ff535b889d61c2b1ef69970ff467f7b..54f74aa2ef903e0e30191dcc7236bb2884e6fb5c 100644 (file)
@@ -50,6 +50,7 @@ class MMonSubscribe;
 class MClass;
 class MAuthRotating;
 class MRoute;
+class MForward;
 
 #define COMPAT_SET_LOC "feature_set"
 
@@ -173,6 +174,7 @@ public:
   map<__u64, RoutedRequest*> routed_requests;
   
   void forward_request_leader(PaxosServiceMessage *req);
+  void handle_forward(MForward *m);
   void try_send_message(Message *m, entity_inst_t to);
   void send_reply(PaxosServiceMessage *req, Message *reply, entity_inst_t to);
   void send_reply(PaxosServiceMessage *req, Message *reply) {