WRITE_RAW_ENCODER(ceph_osd_request_head)
WRITE_RAW_ENCODER(ceph_osd_reply_head)
WRITE_RAW_ENCODER(ceph_osd_op)
+WRITE_RAW_ENCODER(ceph_msg_header)
+WRITE_RAW_ENCODER(ceph_msg_footer)
WRITE_RAW_ENCODER(ceph_mon_statfs)
WRITE_RAW_ENCODER(ceph_mon_statfs_reply)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+
+#ifndef __MROUTE_H
+#define __MROUTE_H
+
+#include "msg/Message.h"
+#include "include/encoding.h"
+
+struct MRoute : public Message {
+ Message *msg;
+ entity_inst_t dest;
+
+ MRoute() : Message(MSG_ROUTE), msg(NULL) {}
+ MRoute(Message *m, entity_inst_t i) : Message(MSG_ROUTE), msg(m), dest(i) {}
+ ~MRoute() {
+ delete msg;
+ }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(dest, p);
+ ceph_msg_header h;
+ ceph_msg_footer f;
+ bufferlist fr, mi, da;
+ ::decode(h, p);
+ ::decode(f, p);
+ ::decode(fr, p);
+ ::decode(mi, p);
+ ::decode(da, p);
+ msg = decode_message(h, f, fr, mi, da);
+ }
+ void encode_payload() {
+ ::encode(dest, payload);
+ bufferlist front, middle, data;
+ msg->encode();
+ ::encode(msg->get_header(), payload);
+ ::encode(msg->get_footer(), payload);
+ ::encode(msg->get_payload(), payload);
+ ::encode(msg->get_middle(), payload);
+ ::encode(msg->get_data(), payload);
+ }
+
+ const char *get_type_name() { return "route"; }
+ void print(ostream& o) {
+ if (msg)
+ o << "route(" << *msg << " to " << dest << ")";
+ else
+ o << "route(??? to " << dest << ")";
+ }
+};
+
+#endif
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
#include "messages/MMonCommand.h"
+#include "messages/MRoute.h"
#include "common/Timer.h"
ack->addr = to.addr;
mon->monmap->encode(ack->monmap_bl);
- mon->messenger->send_message(ack, to);
+ if (m->get_source().is_mon()) {
+ // route reply
+ mon->messenger->send_message(new MRoute(ack, to), m->get_source_inst());
+ } else {
+ mon->messenger->send_message(ack, to);
+ }
// also send latest mds and osd maps
//mon->mdsmon()->send_latest(to);
#include "messages/MMonPaxos.h"
#include "messages/MClass.h"
+#include "messages/MRoute.h"
#include "messages/MClientMountAck.h"
{
MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
reply->set_data(rdata);
- messenger->send_message(reply, m->get_orig_source_inst());
+ if (m->get_source().is_mon())
+ messenger->send_message(new MRoute(m, m->get_orig_source_inst()), m->get_source_inst());
+ else
+ messenger->send_message(reply, m->get_orig_source_inst());
delete m;
}
{
switch (m->get_type()) {
+ case MSG_ROUTE:
+ handle_route((MRoute*)m);
+ break;
+
// misc
case CEPH_MSG_MON_GET_MAP:
handle_mon_get_map((MMonGetMap*)m);
dout(10) << " ignoring sub for '" << p->first << "'" << dendl;
}
- messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval),
+ messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval * 1000),
m->get_source_inst());
delete m;
}
}
+
+void Monitor::handle_route(MRoute *m)
+{
+ dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
+
+ messenger->send_message(m->msg, m->dest);
+ m->msg = NULL;
+ delete m;
+}
class MMonObserve;
class MMonSubscribe;
class MClass;
+class MRoute;
class Monitor : public Dispatcher {
public:
void handle_command(class MMonCommand *m);
void handle_observe(MMonObserve *m);
void handle_class(MClass *m);
+ void handle_route(MRoute *m);
void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
#include "messages/MPing.h"
+#include "messages/MRoute.h"
+
#include "messages/MOSDBoot.h"
#include "messages/MOSDAlive.h"
#include "messages/MOSDPGTemp.h"
#define DEBUGLVL 10 // debug level of output
-
+void Message::encode()
+{
+ // encode and copy out of *m
+ if (empty_payload())
+ encode_payload();
+ calc_front_crc();
+
+ if (!g_conf.ms_nocrc)
+ calc_data_crc();
+ else
+ footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC;
+}
Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
bufferlist& front, bufferlist& middle, bufferlist& data)
case CEPH_MSG_PING:
m = new MPing();
break;
+ case MSG_ROUTE:
+ m = new MRoute;
+ break;
case CEPH_MSG_MON_MAP:
m = new MMonMap;
/* public message types */
#include "include/types.h"
+#include "config.h"
// monitor internal
#define MSG_MON_ELECTION 60
#define MSG_POOLOP 49
#define MSG_POOLOPREPLY 48
+#define MSG_ROUTE 47
+
#define MSG_PAXOS 40
+
// osd internal
#define MSG_OSD_PING 70
#define MSG_OSD_BOOT 71
virtual void print(ostream& out) {
out << get_type_name();
}
-
+
+ void encode();
+
};
extern Message *decode_message(ceph_msg_header &header, ceph_msg_footer& footer,