}
}
-void PG::send_cluster_message(int target, Message *m, epoch_t epoch)
+void PG::send_cluster_message(
+ int target, Message *m,
+ epoch_t epoch, bool share_map_update=false)
{
- osd->send_message_osd_cluster(target, m, epoch);
+ ConnectionRef con = osd->get_con_osd_cluster(
+ target, get_osdmap_epoch());
+ if (!con)
+ return;
+
+ if (share_map_update) {
+ osd->share_map_peer(target, con.get(), get_osdmap());
+ }
+ osd->send_message_osd_cluster(m, con.get());
}
void PG::clear_probe_targets()
}
}
-void PG::fulfill_info(
- pg_shard_t from, const pg_query_t &query,
- pair<pg_shard_t, pg_info_t> ¬ify_info)
-{
- ceph_assert(from == primary);
- ceph_assert(query.type == pg_query_t::INFO);
-
- // info
- dout(10) << "sending info" << dendl;
- notify_info = make_pair(from, info);
-}
-
-void PG::fulfill_log(
- pg_shard_t from, const pg_query_t &query, epoch_t query_epoch)
-{
- dout(10) << "log request from " << from << dendl;
- ceph_assert(from == primary);
- ceph_assert(query.type != pg_query_t::INFO);
- ConnectionRef con = osd->get_con_osd_cluster(
- from.osd, get_osdmap_epoch());
- if (!con) return;
-
- MOSDPGLog *mlog = new MOSDPGLog(
- from.shard, pg_whoami.shard,
- get_osdmap_epoch(),
- info, query_epoch);
- mlog->missing = pg_log.get_missing();
-
- // primary -> other, when building master log
- if (query.type == pg_query_t::LOG) {
- dout(10) << " sending info+missing+log since " << query.since
- << dendl;
- if (query.since != eversion_t() && query.since < pg_log.get_tail()) {
- osd->clog->error() << info.pgid << " got broken pg_query_t::LOG since " << query.since
- << " when my log.tail is " << pg_log.get_tail()
- << ", sending full log instead";
- mlog->log = pg_log.get_log(); // primary should not have requested this!!
- } else
- mlog->log.copy_after(pg_log.get_log(), query.since);
- }
- else if (query.type == pg_query_t::FULLLOG) {
- dout(10) << " sending info+missing+full log" << dendl;
- mlog->log = pg_log.get_log();
- }
-
- dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
-
- osd->share_map_peer(from.osd, con.get(), get_osdmap());
- osd->send_message_osd_cluster(mlog, con.get());
-}
-
-void PG::fulfill_query(const MQuery& query, PeeringCtx *rctx)
-{
- if (query.query.type == pg_query_t::INFO) {
- pair<pg_shard_t, pg_info_t> notify_info;
- update_history(query.query.history);
- fulfill_info(query.from, query.query, notify_info);
- rctx->send_notify(
- notify_info.first,
- pg_notify_t(
- notify_info.first.shard, pg_whoami.shard,
- query.query_epoch,
- get_osdmap_epoch(),
- notify_info.second),
- past_intervals);
- } else {
- update_history(query.query.history);
- fulfill_log(query.from, query.query, query.query_epoch);
- }
-}
-
bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
{
if (last_peering_reset > reply_epoch ||
public:
bool dne() { return info.dne(); }
- virtual void send_cluster_message(int osd, Message *m, epoch_t epoch);
+ virtual void send_cluster_message(
+ int osd, Message *m, epoch_t epoch, bool share_map_update) override;
protected:
epoch_t get_last_peering_reset() const {
void update_history(const pg_history_t& history) {
recovery_state.update_history(history);
}
- void fulfill_info(pg_shard_t from, const pg_query_t &query,
- pair<pg_shard_t, pg_info_t> ¬ify_info);
- void fulfill_log(pg_shard_t from, const pg_query_t &query, epoch_t query_epoch);
- void fulfill_query(const MQuery& q, PeeringCtx *rctx);
// OpRequest queueing
bool can_discard_op(OpRequestRef& op);
peer_missing[from].claim(omissing);
}
+void PeeringState::fulfill_info(
+ pg_shard_t from, const pg_query_t &query,
+ pair<pg_shard_t, pg_info_t> ¬ify_info)
+{
+ ceph_assert(from == primary);
+ ceph_assert(query.type == pg_query_t::INFO);
+
+ // info
+ psdout(10) << "sending info" << dendl;
+ notify_info = make_pair(from, info);
+}
+
+void PeeringState::fulfill_log(
+ pg_shard_t from, const pg_query_t &query, epoch_t query_epoch)
+{
+ psdout(10) << "log request from " << from << dendl;
+ ceph_assert(from == primary);
+ ceph_assert(query.type != pg_query_t::INFO);
+
+ MOSDPGLog *mlog = new MOSDPGLog(
+ from.shard, pg_whoami.shard,
+ get_osdmap_epoch(),
+ info, query_epoch);
+ mlog->missing = pg_log.get_missing();
+
+ // primary -> other, when building master log
+ if (query.type == pg_query_t::LOG) {
+ psdout(10) << " sending info+missing+log since " << query.since
+ << dendl;
+ if (query.since != eversion_t() && query.since < pg_log.get_tail()) {
+ pl->get_clog().error() << info.pgid << " got broken pg_query_t::LOG since "
+ << query.since
+ << " when my log.tail is " << pg_log.get_tail()
+ << ", sending full log instead";
+ mlog->log = pg_log.get_log(); // primary should not have requested this!!
+ } else
+ mlog->log.copy_after(pg_log.get_log(), query.since);
+ }
+ else if (query.type == pg_query_t::FULLLOG) {
+ psdout(10) << " sending info+missing+full log" << dendl;
+ mlog->log = pg_log.get_log();
+ }
+
+ psdout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
+
+ pl->send_cluster_message(from.osd, mlog, get_osdmap_epoch(), true);
+}
+
+void PeeringState::fulfill_query(const MQuery& query, PeeringCtx *rctx)
+{
+ if (query.query.type == pg_query_t::INFO) {
+ pair<pg_shard_t, pg_info_t> notify_info;
+ update_history(query.query.history);
+ fulfill_info(query.from, query.query, notify_info);
+ rctx->send_notify(
+ notify_info.first,
+ pg_notify_t(
+ notify_info.first.shard, pg_whoami.shard,
+ query.query_epoch,
+ get_osdmap_epoch(),
+ notify_info.second),
+ past_intervals);
+ } else {
+ update_history(query.query.history);
+ fulfill_log(query.from, query.query, query.query_epoch);
+ }
+}
/*------------ Peering State Machine----------------*/
#undef dout_prefix
const MQuery& query)
{
DECLARE_LOCALS
- pg->fulfill_query(query, context<PeeringMachine>().get_recovery_ctx());
+ ps->fulfill_query(query, context<PeeringMachine>().get_recovery_ctx());
return discard_event();
}
boost::statechart::result PeeringState::Stray::react(const MQuery& query)
{
DECLARE_LOCALS
- pg->fulfill_query(query, context<PeeringMachine>().get_recovery_ctx());
+ ps->fulfill_query(query, context<PeeringMachine>().get_recovery_ctx());
return discard_event();
}
virtual void on_info_history_change() = 0;
virtual void scrub_requested(bool deep, bool repair) = 0;
- virtual void send_cluster_message(int osd, Message *m, epoch_t epoch) = 0;
+ virtual void send_cluster_message(
+ int osd, Message *m, epoch_t epoch, bool share_map_update=false) = 0;
// Flush state
virtual bool try_flush_or_schedule_async() = 0;
void proc_replica_log(pg_info_t &oinfo, const pg_log_t &olog,
pg_missing_t& omissing, pg_shard_t from);
+ void fulfill_info(
+ pg_shard_t from, const pg_query_t &query,
+ pair<pg_shard_t, pg_info_t> ¬ify_info);
+ void fulfill_log(
+ pg_shard_t from, const pg_query_t &query, epoch_t query_epoch);
+ void fulfill_query(const MQuery& q, PeeringCtx *rctx);
+
public:
PeeringState(
CephContext *cct,