From dbc00a61cb0b6d25c521f56c0ce3a0503d21db7c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 31 Aug 2009 12:58:25 -0700 Subject: [PATCH] mon: route replies --- src/include/types.h | 2 ++ src/messages/MRoute.h | 66 ++++++++++++++++++++++++++++++++++++++++ src/mon/ClientMonitor.cc | 8 ++++- src/mon/Monitor.cc | 21 +++++++++++-- src/mon/Monitor.h | 2 ++ src/msg/Message.cc | 18 ++++++++++- src/msg/Message.h | 8 ++++- 7 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 src/messages/MRoute.h diff --git a/src/include/types.h b/src/include/types.h index 65041ca0cbe49..7792313a76988 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -217,6 +217,8 @@ WRITE_RAW_ENCODER(ceph_frag_tree_split) WRITE_RAW_ENCODER(ceph_osd_request_head) WRITE_RAW_ENCODER(ceph_osd_reply_head) WRITE_RAW_ENCODER(ceph_osd_op) +WRITE_RAW_ENCODER(ceph_msg_header) +WRITE_RAW_ENCODER(ceph_msg_footer) WRITE_RAW_ENCODER(ceph_mon_statfs) WRITE_RAW_ENCODER(ceph_mon_statfs_reply) diff --git a/src/messages/MRoute.h b/src/messages/MRoute.h new file mode 100644 index 0000000000000..e4d411fed1eba --- /dev/null +++ b/src/messages/MRoute.h @@ -0,0 +1,66 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + + +#ifndef __MROUTE_H +#define __MROUTE_H + +#include "msg/Message.h" +#include "include/encoding.h" + +struct MRoute : public Message { + Message *msg; + entity_inst_t dest; + + MRoute() : Message(MSG_ROUTE), msg(NULL) {} + MRoute(Message *m, entity_inst_t i) : Message(MSG_ROUTE), msg(m), dest(i) {} + ~MRoute() { + delete msg; + } + + void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(dest, p); + ceph_msg_header h; + ceph_msg_footer f; + bufferlist fr, mi, da; + ::decode(h, p); + ::decode(f, p); + ::decode(fr, p); + ::decode(mi, p); + ::decode(da, p); + msg = decode_message(h, f, fr, mi, da); + } + void encode_payload() { + ::encode(dest, payload); + bufferlist front, middle, data; + msg->encode(); + ::encode(msg->get_header(), payload); + ::encode(msg->get_footer(), payload); + ::encode(msg->get_payload(), payload); + ::encode(msg->get_middle(), payload); + ::encode(msg->get_data(), payload); + } + + const char *get_type_name() { return "route"; } + void print(ostream& o) { + if (msg) + o << "route(" << *msg << " to " << dest << ")"; + else + o << "route(??? to " << dest << ")"; + } +}; + +#endif diff --git a/src/mon/ClientMonitor.cc b/src/mon/ClientMonitor.cc index 8c139877aacb5..c31468ddb8287 100644 --- a/src/mon/ClientMonitor.cc +++ b/src/mon/ClientMonitor.cc @@ -23,6 +23,7 @@ #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" #include "messages/MMonCommand.h" +#include "messages/MRoute.h" #include "common/Timer.h" @@ -239,7 +240,12 @@ void ClientMonitor::_mounted(__s64 client, MClientMount *m) ack->addr = to.addr; mon->monmap->encode(ack->monmap_bl); - mon->messenger->send_message(ack, to); + if (m->get_source().is_mon()) { + // route reply + mon->messenger->send_message(new MRoute(ack, to), m->get_source_inst()); + } else { + mon->messenger->send_message(ack, to); + } // also send latest mds and osd maps //mon->mdsmon()->send_latest(to); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 625a23eeb223a..5013408735d29 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -33,6 +33,7 @@ #include "messages/MMonPaxos.h" #include "messages/MClass.h" +#include "messages/MRoute.h" #include "messages/MClientMountAck.h" @@ -309,7 +310,10 @@ void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist { MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version); reply->set_data(rdata); - messenger->send_message(reply, m->get_orig_source_inst()); + if (m->get_source().is_mon()) + messenger->send_message(new MRoute(m, m->get_orig_source_inst()), m->get_source_inst()); + else + messenger->send_message(reply, m->get_orig_source_inst()); delete m; } @@ -375,6 +379,10 @@ bool Monitor::dispatch_impl(Message *m) { switch (m->get_type()) { + case MSG_ROUTE: + handle_route((MRoute*)m); + break; + // misc case CEPH_MSG_MON_GET_MAP: handle_mon_get_map((MMonGetMap*)m); @@ -496,7 +504,7 @@ void Monitor::handle_subscribe(MMonSubscribe *m) dout(10) << " ignoring sub for '" << p->first << "'" << dendl; } - messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval), + messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval * 1000), m->get_source_inst()); delete m; @@ -653,3 +661,12 @@ void Monitor::handle_class(MClass *m) } } + +void Monitor::handle_route(MRoute *m) +{ + dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl; + + messenger->send_message(m->msg, m->dest); + m->msg = NULL; + delete m; +} diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 09dd34efbcd82..e3b134f18d3c9 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -45,6 +45,7 @@ class MMonGetMap; class MMonObserve; class MMonSubscribe; class MClass; +class MRoute; class Monitor : public Dispatcher { public: @@ -133,6 +134,7 @@ public: void handle_command(class MMonCommand *m); void handle_observe(MMonObserve *m); void handle_class(MClass *m); + void handle_route(MRoute *m); void reply_command(MMonCommand *m, int rc, const string &rs, version_t version); void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version); diff --git a/src/msg/Message.cc b/src/msg/Message.cc index e1692675b0a21..918a6c1070488 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -35,6 +35,8 @@ using namespace std; #include "messages/MPing.h" +#include "messages/MRoute.h" + #include "messages/MOSDBoot.h" #include "messages/MOSDAlive.h" #include "messages/MOSDPGTemp.h" @@ -128,7 +130,18 @@ using namespace std; #define DEBUGLVL 10 // debug level of output - +void Message::encode() +{ + // encode and copy out of *m + if (empty_payload()) + encode_payload(); + calc_front_crc(); + + if (!g_conf.ms_nocrc) + calc_data_crc(); + else + footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC; +} Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& front, bufferlist& middle, bufferlist& data) @@ -225,6 +238,9 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer, case CEPH_MSG_PING: m = new MPing(); break; + case MSG_ROUTE: + m = new MRoute; + break; case CEPH_MSG_MON_MAP: m = new MMonMap; diff --git a/src/msg/Message.h b/src/msg/Message.h index 1d89994e10211..8eef46bf3cf6d 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -17,6 +17,7 @@ /* public message types */ #include "include/types.h" +#include "config.h" // monitor internal #define MSG_MON_ELECTION 60 @@ -38,8 +39,11 @@ #define MSG_POOLOP 49 #define MSG_POOLOPREPLY 48 +#define MSG_ROUTE 47 + #define MSG_PAXOS 40 + // osd internal #define MSG_OSD_PING 70 #define MSG_OSD_BOOT 71 @@ -239,7 +243,9 @@ public: virtual void print(ostream& out) { out << get_type_name(); } - + + void encode(); + }; extern Message *decode_message(ceph_msg_header &header, ceph_msg_footer& footer, -- 2.39.5