<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+ service.send_message_osd_cluster(it->first, m, curmap->get_epoch());
} else {
dout(7) << "do_notify osd." << it->first
<< " sending seperate messages" << dendl;
list[0] = *i;
MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
list);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
+ service.send_message_osd_cluster(it->first, m, curmap->get_epoch());
}
}
}
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+ service.send_message_osd_cluster(who, m, curmap->get_epoch());
} else {
dout(7) << "do_queries querying osd." << who
<< " sending seperate messages "
map<pg_t, pg_query_t> to_send;
to_send.insert(*i);
MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
- cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
+ service.send_message_osd_cluster(who, m, curmap->get_epoch());
}
}
}
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
- cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+ service.send_message_osd_cluster(p->first, m, curmap->get_epoch());
} else {
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
p->second.begin();
to_send[0] = *i;
MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
m->pg_list = to_send;
- cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
+ service.send_message_osd_cluster(p->first, m, curmap->get_epoch());
}
}
}
MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
it->second.epoch_sent);
_share_map_outgoing(osdmap->get_cluster_inst(from));
- cluster_messenger->send_message(mlog,
- osdmap->get_cluster_inst(from));
+ service.send_message_osd_cluster(from, mlog, osdmap->get_epoch());
} else {
notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
osdmap->get_epoch(),
if (m) {
dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
//m->log.print(cout);
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
// peer now has
info);
i.info.history.last_epoch_started = e;
m->pg_list.push_back(make_pair(i, pg_interval_map_t()));
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(acting[0]));
+ osd->send_message_osd_cluster(acting[0], m, get_osdmap()->get_epoch());
}
if (dirty_info) {
MOSDPGRemove *m = new MOSDPGRemove(
get_osdmap()->get_epoch(),
to_remove);
- osd->cluster_messenger->send_message(
- m, get_osdmap()->get_cluster_inst(*p));
+ osd->send_message_osd_cluster(*p, m, get_osdmap()->get_epoch());
stray_purged.insert(*p);
} else {
dout(10) << "not sending PGRemove to down osd." << *p << dendl;
dout(10) << "trim_peers " << pg_trim_to << dendl;
if (pg_trim_to != eversion_t()) {
for (unsigned i=1; i<acting.size(); i++)
- osd->cluster_messenger->send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
- pg_trim_to),
- get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i],
+ new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+ pg_trim_to),
+ get_osdmap()->get_epoch());
}
}
MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
last_update_applied,
get_osdmap()->get_epoch());
- osd->cluster_messenger->send_message(repscrubop,
- get_osdmap()->get_cluster_inst(replica));
+ osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
}
// send scrub v3 messages (chunky scrub)
MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
get_osdmap()->get_epoch(),
start, end, deep);
- osd->cluster_messenger->send_message(repscrubop,
- get_osdmap()->get_cluster_inst(replica));
+ osd->send_message_osd_cluster(replica, repscrubop, get_osdmap()->get_epoch());
}
void PG::sub_op_scrub_reserve(OpRequestRef op)
MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
get_osdmap()->get_epoch(), osd->get_tid(), v);
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
}
}
MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
get_osdmap()->get_epoch(), osd->get_tid(), v);
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(acting[i]));
+ osd->send_message_osd_cluster(acting[i], subop, get_osdmap()->get_epoch());
}
}
get_osdmap()->get_epoch(),
info),
pg_interval_map_t()));
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
}
}
pinfo.last_update = m->log.head;
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, m, get_osdmap()->get_epoch());
}
}
osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from),
get_osdmap());
- osd->cluster_messenger->send_message(mlog,
- get_osdmap()->get_cluster_inst(from));
+ osd->send_message_osd_cluster(from, mlog, get_osdmap()->get_epoch());
}
pg->osd->cluster_messenger->get_connection(
pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->backfill_target,
new MBackfillReserve(
MBackfillReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
+ pg->get_osdmap()->get_epoch());
} else {
post_event(RemoteBackfillReserved());
}
PG::RecoveryState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MRecoveryReserve(
MRecoveryReserve::GRANT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepRecovering>();
}
PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MBackfillReserve(
MBackfillReserve::GRANT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepRecovering>();
}
PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteReservationRejected &evt)
{
PG *pg = context< RecoveryMachine >().pg;
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ pg->acting[0],
new MBackfillReserve(
MBackfillReserve::REJECT,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+ pg->get_osdmap()->get_epoch());
return transit<RepNotRecovering>();
}
pg->osd->cluster_messenger->get_connection(
pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ *acting_osd_it,
new MRecoveryReserve(
MRecoveryReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(*acting_osd_it));
+ pg->get_osdmap()->get_epoch());
} else {
post_event(RemoteRecoveryReserved());
}
pg->osd->cluster_messenger->get_connection(
pg->get_osdmap()->get_cluster_inst(*i));
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
+ *i,
new MRecoveryReserve(
MRecoveryReserve::RELEASE,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- pg->get_osdmap()->get_cluster_inst(*i));
+ pg->get_osdmap()->get_epoch());
}
}
}
}
wr->pg_trim_to = pg_trim_to;
- osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
// keep peer_info up to date
if (pinfo.last_complete == pinfo.last_update)
// send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
}
rm->applied = true;
MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
+ osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
}
rm->committed = true;
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
}
/*
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
- osd->cluster_messenger->send_message(subop,
- get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
osd->logger->inc(l_osd_pull);
return 0;
subop->recovery_info = recovery_info;
subop->recovery_progress = new_progress;
subop->current_progress = progress;
- osd->cluster_messenger->
- send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
if (out_progress)
*out_progress = new_progress;
return 0;
subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
subop->first = false;
subop->complete = false;
- osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+ osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
}
void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
if (last_complete_ondisk == info.last_update) {
if (is_replica()) {
// we are fully up to date. tell the primary!
- osd->cluster_messenger->
- send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
- last_complete_ondisk),
- get_osdmap()->get_cluster_inst(get_primary()));
+ osd->send_message_osd_cluster(get_primary(),
+ new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
+ last_complete_ondisk),
+ get_osdmap()->get_epoch());
// adjust local snaps!
adjust_local_snaps();
epoch_t e = get_osdmap()->get_epoch();
MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
pbi.end, hobject_t());
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+ osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
waiting_on_backfill = true;
start_recovery_op(pbi.end);
ops++;
}
m->last_backfill = bound;
m->stats = pinfo.stats.stats;
- osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
+ osd->send_message_osd_cluster(backfill_target, m, get_osdmap()->get_epoch());
}
dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects