);
}
+void BufferedRecoveryMessages::send_info(
+ int to,
+ spg_t to_spgid,
+ epoch_t min_epoch,
+ epoch_t cur_epoch,
+ const pg_info_t &info)
+{
+ MOSDPGInfo *m = new MOSDPGInfo(cur_epoch);
+ m->pg_list.push_back(
+ pg_notify_t(
+ to_spgid.shard,
+ info.pgid.shard,
+ min_epoch, cur_epoch,
+ info, PastIntervals()));
+ message_map[to].push_back(m);
+}
+
void PGPool::update(CephContext *cct, OSDMapRef map)
{
const pg_pool_t *pi = map->get_pg_pool(id);
oinfo.last_update != eversion_t()) {
pg_info_t tinfo(oinfo);
tinfo.pgid.shard = pg_whoami.shard;
- ctx.info_map[from.osd].emplace_back(
- pg_notify_t(
- from.shard, pg_whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- tinfo, past_intervals));
+ ctx.send_info(
+ from.osd,
+ spg_t(info.pgid.pgid, from.shard),
+ get_osdmap_epoch(), // fixme: use lower epoch?
+ get_osdmap_epoch(),
+ tinfo);
}
return found_missing;
}
void PeeringState::activate(
ObjectStore::Transaction& t,
epoch_t activation_epoch,
- map<int,vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx)
{
ceph_assert(!is_peered());
<< " from (" << pi.log_tail << "," << pi.last_update
<< "] " << pi.last_backfill
<< " to " << info.last_update;
- if (!pi.is_empty() && activator_map) {
+ if (!pi.is_empty()) {
psdout(10) << "activate peer osd." << peer
<< " is up to date, queueing in pending_activators" << dendl;
- (*activator_map)[peer.osd].emplace_back(
- pg_notify_t(
- peer.shard, pg_whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info,
- past_intervals));
+ ctx.send_info(
+ peer.osd,
+ spg_t(info.pgid.pgid, peer.shard),
+ get_osdmap_epoch(), // fixme: use lower epoch?
+ get_osdmap_epoch(),
+ info);
} else {
psdout(10) << "activate peer osd." << peer
<< " is up to date, but sending pg_log anyway" << dendl;
ps->start_flush(context< PeeringMachine >().get_cur_transaction());
ps->activate(context< PeeringMachine >().get_cur_transaction(),
ps->get_osdmap_epoch(),
- &context< PeeringMachine >().get_info_map(),
context< PeeringMachine >().get_recovery_ctx());
// everyone has to commit/ack before we are truly active
ps->activate(
context< PeeringMachine >().get_cur_transaction(),
actevt.activation_epoch,
- NULL,
context< PeeringMachine >().get_recovery_ctx());
psdout(10) << "Activate Finished" << dendl;
return discard_event();
}
void send_notify(int to, const pg_notify_t &n);
void send_query(int to, spg_t spgid, const pg_query_t &q);
+ void send_info(int to, spg_t to_spgid,
+ epoch_t min_epoch, epoch_t cur_epoch,
+ const pg_info_t &info);
};
struct HeartbeatStamps : public RefCountedObject {
void send_query(int to, spg_t spgid, const pg_query_t &q) {
msgs.send_query(to, spgid, q);
}
+ void send_info(int to, spg_t to_spgid,
+ epoch_t min_epoch, epoch_t cur_epoch,
+ const pg_info_t &info) {
+ msgs.send_info(to, to_spgid, min_epoch, cur_epoch, info);
+ }
};
/* Encapsulates PG recovery process */
void activate(
ObjectStore::Transaction& t,
epoch_t activation_epoch,
- map<int, vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx);
void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);