} else if (prefix == "config-key put") {
if (!mon->is_leader()) {
- mon->forward_request_leader(cmd);
+ mon->forward_request_leader(op);
// we forward the message; so return now.
return true;
}
} else if (prefix == "config-key del") {
if (!mon->is_leader()) {
- mon->forward_request_leader(cmd);
+ mon->forward_request_leader(op);
return true;
}
void mark_zap() {
mark_event("monitor_zap");
}
+ void mark_forwarded() {
+ mark_event("forwarded");
+ forwarded_to_leader = true;
+ }
void mark_svc_event(const string &service, const string &event) {
string s = service;
utime_t dequeued_time;
MonSession *session;
ConnectionRef con;
+ bool forwarded_to_leader;
MonOpRequest(Message *req, OpTracker *tracker) :
TrackedOp(tracker, req->get_recv_stamp()),
request(req->get()),
session(NULL),
- con(NULL)
+ con(NULL),
+ forwarded_to_leader(false)
{
tracker->mark_event(this, "header_read", request->get_recv_stamp());
tracker->mark_event(this, "throttled", request->get_throttle_stamp());
Message *get_req() const { return get_req<Message>(); }
+ int get_req_type() const {
+ if (!request)
+ return 0;
+ return request->get_type();
+ }
+
ConnectionRef get_connection() { return con; }
void set_session(MonSession *s) {
f->dump_int("seq", seq);
f->dump_bool("src_is_mon", is_src_mon());
f->dump_stream("source") << request->get_source_inst();
+ f->dump_bool("forwarded_to_leader", forwarded_to_leader);
f->close_section();
}
}
}
dout(10) << "Command not locally supported, forwarding request "
<< m << dendl;
- forward_request_leader(m);
+ forward_request_leader(op);
return;
} else if (!mon_cmd->is_compat(leader_cmd)) {
if (mon_cmd->is_noforward()) {
}
dout(10) << "Command not compatible with leader, forwarding request "
<< m << dendl;
- forward_request_leader(m);
+ forward_request_leader(op);
return;
}
}
int r = scrub_start();
reply_command(op, r, "", rdata, 0);
} else if (is_peon()) {
- forward_request_leader(m);
+ forward_request_leader(op);
} else {
reply_command(op, -EAGAIN, "no quorum", rdata, 0);
}
// back via the correct monitor and back to them. (the monitor will not
// initiate any connections.)
-void Monitor::forward_request_leader(PaxosServiceMessage *req)
+void Monitor::forward_request_leader(MonOpRequestRef op)
{
+ op->mark_event(__func__);
+
int mon = get_leader();
- MonSession *session = 0;
- if (req->get_connection())
- session = static_cast<MonSession *>(req->get_connection()->get_priv());
+ MonSession *session = op->get_session();
+ PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
+
if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
- req->put();
} else if (session && session->proxy_con) {
dout(10) << "forward_request won't double fwd request " << *req << dendl;
- req->put();
} else if (session && !session->closed) {
RoutedRequest *rr = new RoutedRequest;
rr->tid = ++routed_request_tid;
forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
}
messenger->send_message(forward, monmap->get_inst(mon));
+ op->mark_forwarded();
} else {
dout(10) << "forward_request no session for request " << *req << dendl;
- req->put();
}
if (session)
session->put();
uint64_t routed_request_tid;
map<uint64_t, RoutedRequest*> routed_requests;
- void forward_request_leader(PaxosServiceMessage *req);
+ void forward_request_leader(MonOpRequestRef op);
void handle_forward(MonOpRequestRef op);
void try_send_message(Message *m, const entity_inst_t& to);
void send_reply(PaxosServiceMessage *req, Message *reply);
// leader?
if (!mon->is_leader()) {
- mon->forward_request_leader(m);
+ mon->forward_request_leader(op);
return true;
}