MMonElection *m =
new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
m->mon_features = ceph::features::mon::get_supported();
- mon->messenger->send_message(m, mon->monmap->get_inst(i));
+ mon->send_mon_message(m, i);
}
reset_timer();
m->mon_features = ceph::features::mon::get_supported();
mon->collect_metadata(&m->metadata);
- mon->messenger->send_message(m, mon->monmap->get_inst(who));
+ mon->send_mon_message(m, who);
// set a timer
reset_timer(1.0); // give the leader some extra time to declare victory
m->quorum_features = cluster_features;
m->mon_features = mon_features;
m->sharing_bl = mon->get_local_commands_bl(mon_features);
- mon->messenger->send_message(m, mon->monmap->get_inst(*p));
+ mon->send_mon_message(m, *p);
}
// tell monitor
dout(10) << "probing other monitors" << dendl;
for (unsigned i = 0; i < monmap->size(); i++) {
if ((int)i != rank)
- messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
- monmap->get_inst(i));
+ send_mon_message(
+ new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
+ i);
}
for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
p != extra_probe_peers.end();
entity_inst_t i;
i.name = entity_name_t::MON(-1);
i.addr = *p;
- messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+ messenger->send_message(
+ new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
+ i);
}
}
}
start_election();
} else {
dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
- messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
- monmap->get_inst(*m->quorum.begin()));
+ send_mon_message(
+ new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
+ *m->quorum.begin());
}
} else {
if (monmap->contains(m->name)) {
string cur_name = monmap->get_name(messenger->get_myaddr());
if (cur_name != name) {
dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
- messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
- monmap->get_inst(*quorum.begin()));
+ send_mon_message(
+ new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
+ *quorum.begin());
}
}
} else if (req->get_source().is_mon()) {
forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
}
- messenger->send_message(forward, monmap->get_inst(mon));
+ send_mon_message(forward, mon);
op->mark_forwarded();
assert(op->get_req()->get_type() != 0);
} else {
for (int i=0; i<(int)monmap->size(); i++) {
if (i != rank)
- messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
+ send_mon_message(new MRoute(bl, to), i);
}
}
req->put(); // forward takes its own ref; drop ours.
forward->client = rr->client_inst;
forward->set_priority(req->get_priority());
- messenger->send_message(forward, monmap->get_inst(mon));
+ send_mon_message(forward, mon);
}
}
if (mon == rank) {
try_send_message(c, inst);
}
+void Monitor::send_mon_message(Message *m, int rank)
+{
+ messenger->send_message(m, monmap->get_inst(rank));
+}
+
void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
{
/**
MPing *m = static_cast<MPing*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
MPing *reply = new MPing;
- entity_inst_t inst = m->get_source_inst();
bufferlist payload;
boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
f->open_object_section("pong");
encode(ss.str(), payload);
reply->set_payload(payload);
dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
- messenger->send_message(reply, inst);
+ m->get_connection()->send_message(reply);
}
void Monitor::timecheck_start()
}
do_output = false;
dout(10) << __func__ << " send report to mon." << *q << dendl;
- entity_inst_t inst = monmap->get_inst(*q);
- messenger->send_message(m, inst);
+ send_mon_message(m, *q);
}
}
if (monmap->get_name(*it) == name)
continue;
- entity_inst_t inst = monmap->get_inst(*it);
utime_t curr_time = ceph_clock_now();
timecheck_waiting[*it] = curr_time;
MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
m->epoch = get_epoch();
m->round = timecheck_round;
dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
- messenger->send_message(m, inst);
+ send_mon_message(m, *it);
}
}
MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
num_keys);
r->key = scrub_state->last_key;
- messenger->send_message(r, monmap->get_inst(*p));
+ send_mon_message(r, *p);
}
// scrub my keys
collect->last_committed = last_committed;
collect->first_committed = first_committed;
collect->pn = accepted_pn;
- mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
+ mon->send_mon_message(collect, *p);
}
// set timeout event
MMonPaxos::OP_COMMIT,
ceph_clock_now());
share_state(commit, peer_first_committed[p->first], p->second);
- mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
+ mon->send_mon_message(commit, p->first);
}
}
begin->last_committed = last_committed;
begin->pn = accepted_pn;
- mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
+ mon->send_mon_message(begin, *p);
}
// set timeout event
commit->pn = accepted_pn;
commit->last_committed = last_committed;
- mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
+ mon->send_mon_message(commit, *p);
}
assert(g_conf->paxos_kill_at != 9);
lease->last_committed = last_committed;
lease->lease_timestamp = lease_expire;
lease->first_committed = first_committed;
- mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
+ mon->send_mon_message(lease, *p);
}
// set timeout event.