From: Samuel Just Date: Tue, 2 Apr 2019 23:10:20 +0000 (-0700) Subject: osd/: move fulfill_info etc X-Git-Tag: v15.1.0~2774^2~42 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d9f4e8b3892a8e91469dabc2b63ec23a76edbd2f;p=ceph.git osd/: move fulfill_info etc Signed-off-by: Samuel Just --- diff --git a/src/osd/PG.cc b/src/osd/PG.cc index eb1e9e3466be..a5816ef6893f 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1053,9 +1053,19 @@ void PG::set_probe_targets(const set &probe_set) } } -void PG::send_cluster_message(int target, Message *m, epoch_t epoch) +void PG::send_cluster_message( + int target, Message *m, + epoch_t epoch, bool share_map_update=false) { - osd->send_message_osd_cluster(target, m, epoch); + ConnectionRef con = osd->get_con_osd_cluster( + target, get_osdmap_epoch()); + if (!con) + return; + + if (share_map_update) { + osd->share_map_peer(target, con.get(), get_osdmap()); + } + osd->send_message_osd_cluster(m, con.get()); } void PG::clear_probe_targets() @@ -4209,77 +4219,6 @@ void PG::merge_new_log_entries( } } -void PG::fulfill_info( - pg_shard_t from, const pg_query_t &query, - pair ¬ify_info) -{ - ceph_assert(from == primary); - ceph_assert(query.type == pg_query_t::INFO); - - // info - dout(10) << "sending info" << dendl; - notify_info = make_pair(from, info); -} - -void PG::fulfill_log( - pg_shard_t from, const pg_query_t &query, epoch_t query_epoch) -{ - dout(10) << "log request from " << from << dendl; - ceph_assert(from == primary); - ceph_assert(query.type != pg_query_t::INFO); - ConnectionRef con = osd->get_con_osd_cluster( - from.osd, get_osdmap_epoch()); - if (!con) return; - - MOSDPGLog *mlog = new MOSDPGLog( - from.shard, pg_whoami.shard, - get_osdmap_epoch(), - info, query_epoch); - mlog->missing = pg_log.get_missing(); - - // primary -> other, when building master log - if (query.type == pg_query_t::LOG) { - dout(10) << " sending info+missing+log since " << query.since - << dendl; - if (query.since != eversion_t() && query.since < pg_log.get_tail()) { - osd->clog->error() << info.pgid << " got broken pg_query_t::LOG since " << query.since - << " when my log.tail is " << pg_log.get_tail() - << ", sending full log instead"; - mlog->log = pg_log.get_log(); // primary should not have requested this!! - } else - mlog->log.copy_after(pg_log.get_log(), query.since); - } - else if (query.type == pg_query_t::FULLLOG) { - dout(10) << " sending info+missing+full log" << dendl; - mlog->log = pg_log.get_log(); - } - - dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; - - osd->share_map_peer(from.osd, con.get(), get_osdmap()); - osd->send_message_osd_cluster(mlog, con.get()); -} - -void PG::fulfill_query(const MQuery& query, PeeringCtx *rctx) -{ - if (query.query.type == pg_query_t::INFO) { - pair notify_info; - update_history(query.query.history); - fulfill_info(query.from, query.query, notify_info); - rctx->send_notify( - notify_info.first, - pg_notify_t( - notify_info.first.shard, pg_whoami.shard, - query.query_epoch, - get_osdmap_epoch(), - notify_info.second), - past_intervals); - } else { - update_history(query.query.history); - fulfill_log(query.from, query.query, query.query_epoch); - } -} - bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch) { if (last_peering_reset > reply_epoch || diff --git a/src/osd/PG.h b/src/osd/PG.h index 70a5f8a49aac..a4b57ef4586a 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -682,7 +682,8 @@ protected: public: bool dne() { return info.dne(); } - virtual void send_cluster_message(int osd, Message *m, epoch_t epoch); + virtual void send_cluster_message( + int osd, Message *m, epoch_t epoch, bool share_map_update) override; protected: epoch_t get_last_peering_reset() const { @@ -1540,10 +1541,6 @@ protected: void update_history(const pg_history_t& history) { recovery_state.update_history(history); } - void fulfill_info(pg_shard_t from, const pg_query_t &query, - pair ¬ify_info); - void fulfill_log(pg_shard_t from, const pg_query_t &query, epoch_t query_epoch); - void fulfill_query(const MQuery& q, PeeringCtx *rctx); // OpRequest queueing bool can_discard_op(OpRequestRef& op); diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 4c3e88e5c0de..f6bae3585a21 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -2465,6 +2465,73 @@ void PeeringState::proc_replica_log( peer_missing[from].claim(omissing); } +void PeeringState::fulfill_info( + pg_shard_t from, const pg_query_t &query, + pair ¬ify_info) +{ + ceph_assert(from == primary); + ceph_assert(query.type == pg_query_t::INFO); + + // info + psdout(10) << "sending info" << dendl; + notify_info = make_pair(from, info); +} + +void PeeringState::fulfill_log( + pg_shard_t from, const pg_query_t &query, epoch_t query_epoch) +{ + psdout(10) << "log request from " << from << dendl; + ceph_assert(from == primary); + ceph_assert(query.type != pg_query_t::INFO); + + MOSDPGLog *mlog = new MOSDPGLog( + from.shard, pg_whoami.shard, + get_osdmap_epoch(), + info, query_epoch); + mlog->missing = pg_log.get_missing(); + + // primary -> other, when building master log + if (query.type == pg_query_t::LOG) { + psdout(10) << " sending info+missing+log since " << query.since + << dendl; + if (query.since != eversion_t() && query.since < pg_log.get_tail()) { + pl->get_clog().error() << info.pgid << " got broken pg_query_t::LOG since " + << query.since + << " when my log.tail is " << pg_log.get_tail() + << ", sending full log instead"; + mlog->log = pg_log.get_log(); // primary should not have requested this!! + } else + mlog->log.copy_after(pg_log.get_log(), query.since); + } + else if (query.type == pg_query_t::FULLLOG) { + psdout(10) << " sending info+missing+full log" << dendl; + mlog->log = pg_log.get_log(); + } + + psdout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; + + pl->send_cluster_message(from.osd, mlog, get_osdmap_epoch(), true); +} + +void PeeringState::fulfill_query(const MQuery& query, PeeringCtx *rctx) +{ + if (query.query.type == pg_query_t::INFO) { + pair notify_info; + update_history(query.query.history); + fulfill_info(query.from, query.query, notify_info); + rctx->send_notify( + notify_info.first, + pg_notify_t( + notify_info.first.shard, pg_whoami.shard, + query.query_epoch, + get_osdmap_epoch(), + notify_info.second), + past_intervals); + } else { + update_history(query.query.history); + fulfill_log(query.from, query.query, query.query_epoch); + } +} /*------------ Peering State Machine----------------*/ #undef dout_prefix @@ -4229,7 +4296,7 @@ boost::statechart::result PeeringState::ReplicaActive::react( const MQuery& query) { DECLARE_LOCALS - pg->fulfill_query(query, context().get_recovery_ctx()); + ps->fulfill_query(query, context().get_recovery_ctx()); return discard_event(); } @@ -4325,7 +4392,7 @@ boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt) boost::statechart::result PeeringState::Stray::react(const MQuery& query) { DECLARE_LOCALS - pg->fulfill_query(query, context().get_recovery_ctx()); + ps->fulfill_query(query, context().get_recovery_ctx()); return discard_event(); } diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 02efbb0cea0f..a8e637c94ea2 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -69,7 +69,8 @@ public: virtual void on_info_history_change() = 0; virtual void scrub_requested(bool deep, bool repair) = 0; - virtual void send_cluster_message(int osd, Message *m, epoch_t epoch) = 0; + virtual void send_cluster_message( + int osd, Message *m, epoch_t epoch, bool share_map_update=false) = 0; // Flush state virtual bool try_flush_or_schedule_async() = 0; @@ -1368,6 +1369,13 @@ public: void proc_replica_log(pg_info_t &oinfo, const pg_log_t &olog, pg_missing_t& omissing, pg_shard_t from); + void fulfill_info( + pg_shard_t from, const pg_query_t &query, + pair ¬ify_info); + void fulfill_log( + pg_shard_t from, const pg_query_t &query, epoch_t query_epoch); + void fulfill_query(const MQuery& q, PeeringCtx *rctx); + public: PeeringState( CephContext *cct,