#define CEPH_FEATURE_INDEP_PG_MAP (1<<17)
#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
#define CEPH_FEATURE_CHUNKY_SCRUB (1<<19)
+#define CEPH_FEATURE_MON_NULLROUTE (1<<20)
/*
* Features supported. Should be everything above.
CEPH_FEATURE_MONENC | \
CEPH_FEATURE_INDEP_PG_MAP | \
CEPH_FEATURE_CRUSH_TUNABLES | \
- CEPH_FEATURE_CHUNKY_SCRUB)
+ CEPH_FEATURE_CHUNKY_SCRUB | \
+ CEPH_FEATURE_MON_NULLROUTE)
#define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL
bufferlist::iterator p = payload.begin();
::decode(session_mon_tid, p);
::decode(dest, p);
- msg = decode_message(NULL, p);
+ if (!p.end())
+ msg = decode_message(NULL, p);
}
void encode_payload(uint64_t features) {
::encode(session_mon_tid, payload);
::encode(dest, payload);
- encode_message(msg, features, payload);
+ if (msg)
+ encode_message(msg, features, payload);
}
const char *get_type_name() const { return "route"; }
if (msg)
o << "route(" << *msg;
else
- o << "route(???";
+ o << "route(no-reply";
if (session_mon_tid)
o << " tid " << session_mon_tid << ")";
else
session->put();
}
+void Monitor::no_reply(PaxosServiceMessage *req)
+{
+ MonSession *session = (MonSession*)req->get_connection()->get_priv();
+ assert(session);
+ if (session->proxy_con) {
+ if (get_quorum_features() & CEPH_FEATURE_MON_NULLROUTE) {
+ dout(10) << "no_reply to " << req->get_source_inst() << " via mon." << req->session_mon
+ << " for request " << *req << dendl;
+ messenger->send_message(new MRoute(session->proxy_tid, NULL),
+ session->proxy_con);
+ } else {
+ dout(10) << "no_reply no quorum nullroute feature for " << req->get_source_inst() << " via mon." << req->session_mon
+ << " for request " << *req << dendl;
+ }
+ } else {
+ dout(10) << "no_reply to " << req->get_source_inst() << " " << *req << dendl;
+ }
+ session->put();
+}
+
void Monitor::handle_route(MRoute *m)
{
MonSession *session = (MonSession *)m->get_connection()->get_priv();
m->put();
return;
}
- dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
+ if (m->msg)
+ dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
+ else
+ dout(10) << "handle_route null to " << m->dest << dendl;
// look it up
if (m->session_mon_tid) {
RoutedRequest *rr = routed_requests[m->session_mon_tid];
// reset payload, in case encoding is dependent on target features
- m->msg->clear_payload();
-
- messenger->send_message(m->msg, rr->session->inst);
- m->msg = NULL;
+ if (m->msg) {
+ m->msg->clear_payload();
+ messenger->send_message(m->msg, rr->session->inst);
+ m->msg = NULL;
+ }
routed_requests.erase(m->session_mon_tid);
rr->session->routed_request_tids.insert(rr->tid);
delete rr;
}
} else {
dout(10) << " not a routed request, trying to send anyway" << dendl;
- messenger->lazy_send_message(m->msg, m->dest);
- m->msg = NULL;
+ if (m->msg) {
+ messenger->lazy_send_message(m->msg, m->dest);
+ m->msg = NULL;
+ }
}
m->put();
if (session)
void handle_forward(MForward *m);
void try_send_message(Message *m, entity_inst_t to);
void send_reply(PaxosServiceMessage *req, Message *reply);
+ void no_reply(PaxosServiceMessage *req);
void resend_routed_requests();
void remove_session(MonSession *s);