.set_default(5)
.set_description(""),
+ Option("osd_scrub_max_preemptions", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(5)
+ .set_description("Set the maximum number of times we will preempt a deep scrub due to a client operation before blocking client IO to complete the scrub"),
+
Option("osd_deep_scrub_interval", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
.set_default(7_day)
.set_description(""),
struct MOSDRepScrub : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 7;
+ static const int HEAD_VERSION = 8;
static const int COMPAT_VERSION = 6;
spg_t pgid; // PG to scrub
hobject_t end; // upper bound of scrub, exclusive
bool deep; // true if scrub should be deep
uint32_t seed; // seed value for digest calculation
+ bool allow_preemption = false;
epoch_t get_map_epoch() const override {
return map_epoch;
seed(0) { }
MOSDRepScrub(spg_t pgid, eversion_t scrub_to, epoch_t map_epoch, epoch_t min_epoch,
- hobject_t start, hobject_t end, bool deep, uint32_t seed)
+ hobject_t start, hobject_t end, bool deep, uint32_t seed,
+ bool preemption)
: MOSDFastDispatchOp(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
pgid(pgid),
scrub_to(scrub_to),
start(start),
end(end),
deep(deep),
- seed(seed) { }
+ seed(seed),
+ allow_preemption(preemption) { }
private:
public:
const char *get_type_name() const override { return "replica scrub"; }
void print(ostream& out) const override {
- out << "replica scrub(pg: ";
- out << pgid << ",from:" << scrub_from << ",to:" << scrub_to
+ out << "replica_scrub(pg: " << pgid
+ << ",from:" << scrub_from
+ << ",to:" << scrub_to
<< ",epoch:" << map_epoch << "/" << min_epoch
<< ",start:" << start << ",end:" << end
<< ",chunky:" << chunky
<< ",deep:" << deep
<< ",seed:" << seed
- << ",version:" << header.version;
- out << ")";
+ << ",version:" << header.version
+ << ",allow_preemption:" << (int)allow_preemption
+ << ")";
}
void encode_payload(uint64_t features) override {
encode(pgid.shard, payload);
encode(seed, payload);
encode(min_epoch, payload);
+ encode(allow_preemption, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
} else {
min_epoch = map_epoch;
}
+ if (header.version >= 8) {
+ decode(allow_preemption, p);
+ }
}
};
struct MOSDRepScrubMap : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 1;
+ static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
spg_t pgid; // primary spg_t
epoch_t map_epoch = 0;
pg_shard_t from; // whose scrubmap this is
bufferlist scrub_map_bl;
+ bool preempted = false;
epoch_t get_map_epoch() const override {
return map_epoch;
const char *get_type_name() const override { return "rep_scrubmap"; }
void print(ostream& out) const override {
out << "rep_scrubmap(" << pgid << " e" << map_epoch
- << " from shard " << from << ")";
+ << " from shard " << from
+ << (preempted ? " PREEMPTED":"") << ")";
}
void encode_payload(uint64_t features) override {
encode(pgid, payload);
encode(map_epoch, payload);
encode(from, payload);
+ encode(preempted, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
decode(pgid, p);
decode(map_epoch, p);
decode(from, p);
+ if (header.version >= 2) {
+ decode(preempted, p);
+ }
}
};
// not conflict with ECSubWrite's operator<<.
MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
_op->get_nonconst_req());
+ parent->maybe_preempt_replica_scrub(op->op.soid);
handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
return true;
}
<< dendl;
assert(scrubber.waiting_on_whom.count(m->from));
scrubber.waiting_on_whom.erase(m->from);
+ if (m->preempted) {
+ dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
+ scrub_preempted = true;
+ }
if (scrubber.waiting_on_whom.empty()) {
if (ops_blocked_by_scrub()) {
requeue_scrub(true);
void PG::_request_scrub_map(
pg_shard_t replica, eversion_t version,
hobject_t start, hobject_t end,
- bool deep, uint32_t seed)
+ bool deep, uint32_t seed,
+ bool allow_preemption)
{
assert(replica != pg_whoami);
dout(10) << "scrub requesting scrubmap from osd." << replica
spg_t(info.pgid.pgid, replica.shard), version,
get_osdmap()->get_epoch(),
get_last_peering_reset(),
- start, end, deep, seed);
+ start, end, deep, seed,
+ allow_preemption);
// default priority, we want the rep scrub processed prior to any recovery
// or client io messages (we are holding a lock!)
osd->send_message_osd_cluster(
scrubber.deep = msg->deep;
scrubber.epoch_start = info.history.same_interval_since;
+ scrub_can_preempt = msg->allow_preemption;
+ scrub_preempted = false;
scrubber.replica_scrubmap_pos.reset();
requeue_scrub(false);
}
scrubber.seed = -1;
-
+ scrubber.preempt_left = cct->_conf->get_val<uint64_t>(
+ "osd_scrub_max_preemptions");
+ scrubber.preempt_divisor = 1;
break;
case PG::Scrubber::NEW_CHUNK:
scrubber.primary_scrubmap = ScrubMap();
scrubber.received_maps.clear();
+ // begin (possible) preemption window
+ if (scrub_preempted) {
+ scrubber.preempt_left--;
+ scrubber.preempt_divisor *= 2;
+ dout(10) << __func__ << " preempted, " << scrubber.preempt_left
+ << " left" << dendl;
+ scrubber.state = PG::Scrubber::NEW_CHUNK;
+ }
+ scrub_preempted = false;
+ scrub_can_preempt = scrubber.preempt_left > 0;
+
{
/* get the start and end of our scrub chunk
*
if (*i == pg_whoami) continue;
_request_scrub_map(*i, scrubber.subset_last_update,
scrubber.start, scrubber.end, scrubber.deep,
- scrubber.seed);
+ scrubber.seed,
+ scrubber.preempt_left > 0);
scrubber.waiting_on_whom.insert(*i);
}
dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
<< dendl;
scrubber.state = PG::Scrubber::WAIT_PUSHES;
-
break;
case PG::Scrubber::WAIT_PUSHES:
assert(last_update_applied >= scrubber.subset_last_update);
// build my own scrub map
+ if (scrub_preempted) {
+ dout(10) << __func__ << " preempted" << dendl;
+ scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
+ break;
+ }
ret = build_scrub_map_chunk(
scrubber.primary_scrubmap,
scrubber.primary_scrubmap_pos,
// will be requeued by sub_op_scrub_map
dout(10) << "wait for replicas to build scrub map" << dendl;
done = true;
- } else {
+ break;
+ }
+ // end (possible) preemption window
+ scrub_can_preempt = false;
+ if (scrub_preempted) {
+ dout(10) << __func__ << " preempted, restarting chunk" << dendl;
+ scrubber.state = PG::Scrubber::NEW_CHUNK;
+ } else {
scrubber.state = PG::Scrubber::COMPARE_MAPS;
}
break;
break;
}
+ scrubber.preempt_left = cct->_conf->get_val<uint64_t>(
+ "osd_scrub_max_preemptions");
+ scrubber.preempt_divisor = 1;
+
if (!(scrubber.end.is_max())) {
- scrubber.state = PG::Scrubber::NEW_CHUNK;
+ scrubber.state = PG::Scrubber::NEW_CHUNK;
requeue_scrub();
done = true;
} else {
case PG::Scrubber::BUILD_MAP_REPLICA:
// build my own scrub map
+ if (scrub_preempted) {
+ dout(10) << __func__ << " preempted" << dendl;
+ ret = 0;
+ } else {
ret = build_scrub_map_chunk(
scrubber.replica_scrubmap,
scrubber.replica_scrubmap_pos,
scrubber.start, scrubber.end,
scrubber.deep, scrubber.seed,
handle);
+ }
if (ret == -EINPROGRESS) {
requeue_scrub();
done = true;
spg_t(info.pgid.pgid, get_primary().shard),
scrubber.replica_scrub_start,
pg_whoami);
+ reply->preempted = scrub_preempted;
::encode(scrubber.replica_scrubmap, reply->get_data());
osd->send_message_osd_cluster(
get_primary().osd, reply,
scrubber.replica_scrub_start);
}
+ scrub_preempted = false;
+ scrub_can_preempt = false;
scrubber.state = PG::Scrubber::INACTIVE;
scrubber.replica_scrubmap = ScrubMap();
scrubber.replica_scrubmap_pos = ScrubMapBuilder();
<< " [" << scrubber.start << "," << scrubber.end << ")" << dendl;
}
+bool PG::write_blocked_by_scrub(const hobject_t& soid)
+{
+ if (soid < scrubber.start || soid >= scrubber.end) {
+ return false;
+ }
+ if (scrub_can_preempt) {
+ if (!scrub_preempted) {
+ dout(10) << __func__ << " " << soid << " preempted" << dendl;
+ scrub_preempted = true;
+ } else {
+ dout(10) << __func__ << " " << soid << " already preempted" << dendl;
+ }
+ return false;
+ }
+ return true;
+}
+
void PG::scrub_clear_state()
{
assert(is_locked());
// deep scrub
bool deep;
uint32_t seed;
+ int preempt_left;
+ int preempt_divisor;
list<Context*> callbacks;
void add_callback(Context *context) {
bool is_chunky_scrub_active() const { return state != INACTIVE; }
- // classic (non chunk) scrubs block all writes
- // chunky scrubs only block writes to a range
- bool write_blocked_by_scrub(const hobject_t &soid) {
- return (soid >= start && soid < end);
- }
-
// clear all state
void reset() {
active = false;
int active_pushes;
+ bool scrub_can_preempt = false;
+ bool scrub_preempted = false;
+
+ // we allow some number of preemptions of the scrub, which mean we do
+ // not block. then we start to block. once we start blocking, we do
+ // not stop until the scrub range is completed.
+ bool write_blocked_by_scrub(const hobject_t &soid);
+
void repair_object(
const hobject_t& soid, list<pair<ScrubMap::object, pg_shard_t> > *ok_peers,
pg_shard_t bad_peer);
ThreadPool::TPHandle &handle);
void _request_scrub_map(pg_shard_t replica, eversion_t version,
hobject_t start, hobject_t end, bool deep,
- uint32_t seed);
+ uint32_t seed, bool allow_preemption);
int build_scrub_map_chunk(
ScrubMap &map,
ScrubMapBuilder &pos,
virtual bool check_osdmap_full(const set<pg_shard_t> &missing_on) = 0;
+ virtual bool maybe_preempt_replica_scrub(const hobject_t& oid) = 0;
virtual ~Listener() {}
};
Listener *parent;
}
if (write_ordered && scrubber.is_chunky_scrub_active() &&
- scrubber.write_blocked_by_scrub(head)) {
+ write_blocked_by_scrub(head)) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_scrub.push_back(op);
op->mark_delayed("waiting for scrub");
return cache_result_t::BLOCKED_RECOVERY;
}
- if (scrubber.write_blocked_by_scrub(head)) {
+ if (write_blocked_by_scrub(head)) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_scrub.push_back(op);
op->mark_delayed("waiting for scrub");
{
hobject_t hoid = obc ? obc->obs.oi.soid : missing_oid;
assert(hoid != hobject_t());
- if (scrubber.write_blocked_by_scrub(hoid)) {
+ if (write_blocked_by_scrub(hoid)) {
dout(10) << __func__ << " " << hoid
<< " blocked by scrub" << dendl;
if (op) {
}
if (!fop->blocking &&
- scrubber.write_blocked_by_scrub(oid)) {
+ write_blocked_by_scrub(oid)) {
if (fop->op) {
dout(10) << __func__ << " blocked by scrub" << dendl;
requeue_op(fop->op);
return;
}
- if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ if (write_blocked_by_scrub(obc->obs.oi.soid)) {
dout(10) << "handle_watch_timeout waiting for scrub on obj "
<< obc->obs.oi.soid
<< dendl;
// Once we hit a degraded object just skip
if (is_degraded_or_backfilling_object(aoid))
return;
- if (scrubber.write_blocked_by_scrub(aoid))
+ if (write_blocked_by_scrub(aoid))
return;
}
// Once we hit a degraded object just skip further trim
if (is_degraded_or_backfilling_object(aoid))
return;
- if (scrubber.write_blocked_by_scrub(aoid))
+ if (write_blocked_by_scrub(aoid))
return;
}
new_hset.using_gmt);
// If the current object is degraded we skip this persist request
- if (scrubber.write_blocked_by_scrub(oid))
+ if (write_blocked_by_scrub(oid))
return;
hit_set->seal();
osd->logger->inc(l_osd_agent_skip);
continue;
}
- if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ if (write_blocked_by_scrub(obc->obs.oi.soid)) {
dout(20) << __func__ << " skip (scrubbing) " << obc->obs.oi << dendl;
osd->logger->inc(l_osd_agent_skip);
continue;
if (!to_req.empty()) {
// requeue at front of scrub blocking queue if we are blocked by scrub
for (auto &&p: to_req) {
- if (scrubber.write_blocked_by_scrub(p.first.get_head())) {
+ if (write_blocked_by_scrub(p.first.get_head())) {
for (auto& op : p.second) {
op->mark_delayed("waiting for scrub");
}
void on_shutdown();
bool check_failsafe_full(ostream &ss) override;
bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
+ bool maybe_preempt_replica_scrub(const hobject_t& oid) override {
+ return write_blocked_by_scrub(oid);
+ }
int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
// attr cache handling
// we better not be missing this.
assert(!parent->get_log().get_missing().is_missing(soid));
+ parent->maybe_preempt_replica_scrub(soid);
+
int ackerosd = m->get_source().num();
op->mark_started();