{
dout(10) << "resend_routed_requests" << dendl;
int mon = get_leader();
+ list<Context*> retry;
for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
p != routed_requests.end();
++p) {
bufferlist::iterator q = rr->request_bl.begin();
PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q);
- dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
- MForward *forward = new MForward(rr->tid, req, rr->session->caps);
- forward->client = rr->client_inst;
- forward->set_priority(req->get_priority());
- messenger->send_message(forward, monmap->get_inst(mon));
- }
+ if (mon == rank) {
+ dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
+ req->set_connection(rr->con);
+ rr->con->get();
+ retry.push_back(new C_RetryMessage(this, req));
+ delete rr;
+ } else {
+ dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
+ MForward *forward = new MForward(rr->tid, req, rr->session->caps);
+ forward->client = rr->client_inst;
+ forward->set_priority(req->get_priority());
+ messenger->send_message(forward, monmap->get_inst(mon));
+ }
+ }
+ if (mon == rank) {
+ routed_requests.clear();
+ finish_contexts(g_ceph_context, retry);
+ }
}
void Monitor::remove_session(MonSession *s)