]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: route replies
authorSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 19:58:25 +0000 (12:58 -0700)
committerSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 19:58:25 +0000 (12:58 -0700)
src/include/types.h
src/messages/MRoute.h [new file with mode: 0644]
src/mon/ClientMonitor.cc
src/mon/Monitor.cc
src/mon/Monitor.h
src/msg/Message.cc
src/msg/Message.h

index 65041ca0cbe490c5603986c9666bd0eba0c92042..7792313a76988fc9141ebf82ff19b39814fa8090 100644 (file)
@@ -217,6 +217,8 @@ WRITE_RAW_ENCODER(ceph_frag_tree_split)
 WRITE_RAW_ENCODER(ceph_osd_request_head)
 WRITE_RAW_ENCODER(ceph_osd_reply_head)
 WRITE_RAW_ENCODER(ceph_osd_op)
+WRITE_RAW_ENCODER(ceph_msg_header)
+WRITE_RAW_ENCODER(ceph_msg_footer)
 
 WRITE_RAW_ENCODER(ceph_mon_statfs)
 WRITE_RAW_ENCODER(ceph_mon_statfs_reply)
diff --git a/src/messages/MRoute.h b/src/messages/MRoute.h
new file mode 100644 (file)
index 0000000..e4d411f
--- /dev/null
@@ -0,0 +1,66 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+
+#ifndef __MROUTE_H
+#define __MROUTE_H
+
+#include "msg/Message.h"
+#include "include/encoding.h"
+
+struct MRoute : public Message {
+  Message *msg;
+  entity_inst_t dest;
+  
+  MRoute() : Message(MSG_ROUTE), msg(NULL) {}
+  MRoute(Message *m, entity_inst_t i) : Message(MSG_ROUTE), msg(m), dest(i) {}
+  ~MRoute() {
+    delete msg;
+  }
+
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(dest, p);
+    ceph_msg_header h;
+    ceph_msg_footer f;
+    bufferlist fr, mi, da;
+    ::decode(h, p);
+    ::decode(f, p);
+    ::decode(fr, p);
+    ::decode(mi, p);
+    ::decode(da, p);
+    msg = decode_message(h, f, fr, mi, da);
+  }
+  void encode_payload() {
+    ::encode(dest, payload);
+    bufferlist front, middle, data;
+    msg->encode();
+    ::encode(msg->get_header(), payload);
+    ::encode(msg->get_footer(), payload);
+    ::encode(msg->get_payload(), payload);
+    ::encode(msg->get_middle(), payload);
+    ::encode(msg->get_data(), payload);
+  }
+
+  const char *get_type_name() { return "route"; }
+  void print(ostream& o) {
+    if (msg)
+      o << "route(" << *msg << " to " << dest << ")";
+    else
+      o << "route(??? to " << dest << ")";
+  }
+};
+
+#endif
index 8c139877aacb5a28818deb103e74408d6ed86502..c31468ddb8287b7501baf3e6ee8a629bdc11d12a 100644 (file)
@@ -23,6 +23,7 @@
 #include "messages/MClientMount.h"
 #include "messages/MClientMountAck.h"
 #include "messages/MMonCommand.h"
+#include "messages/MRoute.h"
 
 #include "common/Timer.h"
 
@@ -239,7 +240,12 @@ void ClientMonitor::_mounted(__s64 client, MClientMount *m)
   ack->addr = to.addr;
   mon->monmap->encode(ack->monmap_bl);
 
-  mon->messenger->send_message(ack, to);
+  if (m->get_source().is_mon()) {
+    // route reply
+    mon->messenger->send_message(new MRoute(ack, to), m->get_source_inst());
+  } else {
+    mon->messenger->send_message(ack, to);
+  }
 
   // also send latest mds and osd maps
   //mon->mdsmon()->send_latest(to);
index 625a23eeb223a226bd261bda52183f99b142d419..5013408735d29af4dc01bbe9166e336efc1b22d1 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "messages/MMonPaxos.h"
 #include "messages/MClass.h"
+#include "messages/MRoute.h"
 
 #include "messages/MClientMountAck.h"
 
@@ -309,7 +310,10 @@ void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist
 {
   MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
   reply->set_data(rdata);
-  messenger->send_message(reply, m->get_orig_source_inst());
+  if (m->get_source().is_mon())
+    messenger->send_message(new MRoute(m, m->get_orig_source_inst()), m->get_source_inst());
+  else 
+    messenger->send_message(reply, m->get_orig_source_inst());
   delete m;
 }
 
@@ -375,6 +379,10 @@ bool Monitor::dispatch_impl(Message *m)
   {
     switch (m->get_type()) {
       
+    case MSG_ROUTE:
+      handle_route((MRoute*)m);
+      break;
+
       // misc
     case CEPH_MSG_MON_GET_MAP:
       handle_mon_get_map((MMonGetMap*)m);
@@ -496,7 +504,7 @@ void Monitor::handle_subscribe(MMonSubscribe *m)
       dout(10) << " ignoring sub for '" << p->first << "'" << dendl;
   }
 
-  messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval),
+  messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval * 1000),
                          m->get_source_inst());
 
   delete m;
@@ -653,3 +661,12 @@ void Monitor::handle_class(MClass *m)
   }
 }
 
+
+void Monitor::handle_route(MRoute *m)
+{
+  dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
+  
+  messenger->send_message(m->msg, m->dest);
+  m->msg = NULL;
+  delete m;
+}
index 09dd34efbcd8255ef3f0cf1173430ca14e76c1ba..e3b134f18d3c97ef6d7de83f135290f18f6e3a4a 100644 (file)
@@ -45,6 +45,7 @@ class MMonGetMap;
 class MMonObserve;
 class MMonSubscribe;
 class MClass;
+class MRoute;
 
 class Monitor : public Dispatcher {
 public:
@@ -133,6 +134,7 @@ public:
   void handle_command(class MMonCommand *m);
   void handle_observe(MMonObserve *m);
   void handle_class(MClass *m);
+  void handle_route(MRoute *m);
 
   void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
   void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
index e1692675b0a2162a1ec1540b06528798e20db874..918a6c1070488b5f4659d5ac5b33815d05601e5a 100644 (file)
@@ -35,6 +35,8 @@ using namespace std;
 
 #include "messages/MPing.h"
 
+#include "messages/MRoute.h"
+
 #include "messages/MOSDBoot.h"
 #include "messages/MOSDAlive.h"
 #include "messages/MOSDPGTemp.h"
@@ -128,7 +130,18 @@ using namespace std;
 #define DEBUGLVL  10    // debug level of output
 
 
-
+void Message::encode()
+{
+  // encode and copy out of *m
+  if (empty_payload())
+    encode_payload();
+  calc_front_crc();
+  
+  if (!g_conf.ms_nocrc)
+    calc_data_crc();
+  else
+    footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC;
+}
 
 Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
                        bufferlist& front, bufferlist& middle, bufferlist& data)
@@ -225,6 +238,9 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
   case CEPH_MSG_PING:
     m = new MPing();
     break;
+  case MSG_ROUTE:
+    m = new MRoute;
+    break;
     
   case CEPH_MSG_MON_MAP:
     m = new MMonMap;
index 1d89994e102115bf6e94b54db3ded90e14bf547c..8eef46bf3cf6d51be5c8f9461499c4a3e5132bda 100644 (file)
@@ -17,6 +17,7 @@
  
 /* public message types */
 #include "include/types.h"
+#include "config.h"
 
 // monitor internal
 #define MSG_MON_ELECTION           60
 #define MSG_POOLOP                 49
 #define MSG_POOLOPREPLY            48
 
+#define MSG_ROUTE                  47
+
 #define MSG_PAXOS                  40
 
+
 // osd internal
 #define MSG_OSD_PING         70
 #define MSG_OSD_BOOT         71
@@ -239,7 +243,9 @@ public:
   virtual void print(ostream& out) {
     out << get_type_name();
   }
-  
+
+  void encode();
+
 };
 
 extern Message *decode_message(ceph_msg_header &header, ceph_msg_footer& footer,