spg_t pgid; // PG to scrub
eversion_t scrub_from; // only scrub log entries after scrub_from
- eversion_t scrub_to; // last_update_applied when message sent
+ eversion_t scrub_to; // last_update_applied when message sent (not used)
epoch_t map_epoch = 0, min_epoch = 0;
bool chunky; // true for chunky scrubs
hobject_t start; // lower bound of scrub, inclusive
ceph_assert(applied_version <= info.last_update);
recovery_state.local_write_applied(applied_version);
- if (is_primary() && m_scrubber->should_requeue_blocked_ops(recovery_state.get_last_update_applied())) {
- osd->queue_scrub_applied_update(this, is_scrub_blocking_ops());
+ if (is_primary() && m_scrubber) {
+ // if there's a scrub operation waiting for the selected chunk to be fully updated -
+ // allow it to continue
+ m_scrubber->on_applied_when_primary(recovery_state.get_last_update_applied());
}
}
dout(15) << __func__ << " last-update: " << e << dendl;
}
+void PgScrubber::on_applied_when_primary(const eversion_t& applied_version)
+{
+ // we are only interested in updates if we are the Primary, and in state
+ // WaitLastUpdate
+ if (m_fsm->is_accepting_updates() && (applied_version >= m_subset_last_update)) {
+ m_osds->queue_scrub_applied_update(m_pg, m_pg->is_scrub_blocking_ops());
+ dout(15) << __func__ << " update: " << applied_version
+ << " vs. required: " << m_subset_last_update << dendl;
+ }
+}
+
/*
* setting:
* - m_subset_last_update
// scan objects
while (!pos.done()) {
int r = m_pg->get_pgbackend()->be_scan_list(map, pos);
- dout(10) << __func__ << " be r " << r << dendl;
+ dout(30) << __func__ << " BE returned " << r << dendl;
if (r == -EINPROGRESS) {
dout(20) << __func__ << " in progress" << dendl;
return r;
// are we still processing a previous scrub-map request without noticing that the
// interval changed? won't see it here, but rather at the reservation stage.
-
if (msg->map_epoch < m_pg->info.history.same_interval_since) {
dout(10) << "replica_scrub_op discarding old replica_scrub from " << msg->map_epoch
<< " < " << m_pg->info.history.same_interval_since << dendl;
* otherwise the interval would have changed.
* Ostensibly we can discard & redo the reservation. But then we
* will be temporarily releasing the OSD resource - and might not be able to grab it
- * again. Thus we simple treat this as a successful new request.
+ * again. Thus, we simply treat this as a successful new request.
*/
if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_stale()) {
// we are holding a stale reservation from a past epoch
m_remote_osd_resource.reset();
+ dout(10) << __func__ << " stale reservation request" << dendl;
}
if (request_ep < m_pg->get_same_interval_since()) {
void send_sched_replica(epoch_t epoch_queued) final;
void send_replica_pushes_upd(epoch_t epoch_queued) final;
+ /**
+ * The PG has updated its 'applied version'. It might be that we are waiting for this
+ * information: after selecting a range of objects to scrub, we've marked the latest
+ * version of these objects in m_subset_last_update. We will not start the map building
+ * before we know that the PG has reached this version.
+ */
+ void on_applied_when_primary(const eversion_t& applied_version) final;
/**
* we allow some number of preemptions of the scrub, which mean we do
return state_cast<const ReservingReplicas*>();
}
+bool ScrubMachine::is_accepting_updates() const
+{
+ DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
+ ceph_assert(scrbr->is_primary());
+
+ return state_cast<const WaitLastUpdate*>();
+}
+
// for the rest of the code in this file - we know what PG we are dealing with:
#undef dout_prefix
#define dout_prefix _prefix(_dout, this->context<ScrubMachine>().m_pg)
post_event(UpdatesApplied{});
}
+/**
+ * Note:
+ * Updates are locally readable immediately. Thus, on the replicas we do need
+ * to wait for the update notifications before scrubbing. For the Primary it's
+ * a bit different: on EC (and only there) rmw operations have an additional
+ * read roundtrip. That means that on the Primary we need to wait for
+ * last_update_applied (the replica side, even on EC, is still safe
+ * since the actual transaction will already be readable by commit time.
+ */
void WaitLastUpdate::on_new_updates(const UpdatesApplied&)
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
MEV(ChunkIsBusy)
MEV(ActivePushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery
///< that is in-flight to the local ObjectStore
-MEV(UpdatesApplied) // external
+MEV(UpdatesApplied) ///< (Primary only) all updates are committed
MEV(InternalAllUpdates) ///< the internal counterpart of UpdatesApplied
MEV(GotReplicas) ///< got a map from a replica
MEV(DigestUpdate) ///< external. called upon success of a MODIFY op. See
///< scrub_snapshot_metadata()
-MEV(AllChunksDone)
-
MEV(StartReplica) ///< initiating replica scrub. replica_scrub_op() -> OSD Q ->
///< replica_scrub()
MEV(StartReplicaNoWait) ///< 'start replica' when there are no pending updates
void my_states() const;
void assert_not_active() const;
[[nodiscard]] bool is_reserving() const;
+ [[nodiscard]] bool is_accepting_updates() const;
};
/**
~ActiveScrubbing();
using reactions = mpl::list<
- // done scrubbing
- sc::transition<AllChunksDone, NotActive>,
-
sc::custom_reaction<InternalError>,
sc::custom_reaction<FullReset>>;
- sc::result react(const AllChunksDone&);
sc::result react(const FullReset&);
sc::result react(const InternalError&);
};
virtual void send_sched_replica(epoch_t epoch_queued) = 0;
+// virtual void send_full_reset(epoch_t epoch_queued) = 0;
+//
+// virtual void send_chunk_free(epoch_t epoch_queued) = 0;
+//
+// virtual void send_chunk_busy(epoch_t epoch_queued) = 0;
+//
+// virtual void send_local_map_done(epoch_t epoch_queued) = 0;
+//
+// virtual void send_get_next_chunk(epoch_t epoch_queued) = 0;
+//
+// virtual void send_scrub_is_finished(epoch_t epoch_queued) = 0;
+//
+// virtual void send_maps_compared(epoch_t epoch_queued) = 0;
+
+ virtual void on_applied_when_primary(const eversion_t &applied_version) = 0;
+
// --------------------------------------------------
[[nodiscard]] virtual bool are_callbacks_pending()