OPTION(osd_recovery_thread_suicide_timeout, OPT_INT, 300)
OPTION(osd_recovery_sleep, OPT_FLOAT, 0) // seconds to sleep between recovery ops
OPTION(osd_snap_trim_sleep, OPT_FLOAT, 0)
-OPTION(osd_scrub_thread_timeout, OPT_INT, 60)
-OPTION(osd_scrub_thread_suicide_timeout, OPT_INT, 60)
-OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10)
OPTION(osd_scrub_invalid_stats, OPT_BOOL, true)
OPTION(osd_remove_thread_timeout, OPT_INT, 60*60)
OPTION(osd_remove_thread_suicide_timeout, OPT_INT, 10*60*60)
OPTION(osd_snap_trim_priority, OPT_U32, 5)
OPTION(osd_snap_trim_cost, OPT_U32, 1<<20) // set default cost equal to 1MB io
+OPTION(osd_scrub_priority, OPT_U32, 5)
+// set default cost equal to 50MB io
+OPTION(osd_scrub_cost, OPT_U32, 50<<20)
+
/**
* osd_recovery_op_warn_multiple scales the normal warning threshhold,
* osd_op_complaint_time, so that slow recovery ops won't cause noise
return pg->snap_trimmer(op.epoch_queued);
}
+void PGQueueable::RunVis::operator()(PGScrub &op) {
+ return pg->scrub(op.epoch_queued, handle);
+}
+
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
op_wq(osd->op_shardedwq),
peering_wq(osd->peering_wq),
recovery_wq(osd->recovery_wq),
- scrub_wq(osd->scrub_wq),
recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
&osd->recovery_tp),
op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
cct->_conf->osd_recovery_thread_suicide_timeout,
&recovery_tp),
replay_queue_lock("OSD::replay_queue_lock"),
- scrub_wq(
- this,
- cct->_conf->osd_scrub_thread_timeout,
- cct->_conf->osd_scrub_thread_suicide_timeout,
- &disk_tp),
remove_wq(
store,
cct->_conf->osd_remove_thread_timeout,
return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
case MSG_OSD_EC_READ_REPLY:
return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+ case MSG_OSD_REP_SCRUB:
+ return replica_op_required_epoch<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
default:
assert(0);
return 0;
break;
case MSG_OSD_REP_SCRUB:
handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
+ break;
default:
assert(0);
}
class OSD;
+struct PGScrub {
+ epoch_t epoch_queued;
+ PGScrub(epoch_t e) : epoch_queued(e) {}
+ ostream &operator<<(ostream &rhs) {
+ return rhs << "PGScrub";
+ }
+};
+
struct PGSnapTrim {
epoch_t epoch_queued;
PGSnapTrim(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
- return rhs << "SnapTrim";
+ return rhs << "PGSnapTrim";
}
};
class PGQueueable {
typedef boost::variant<
OpRequestRef,
- PGSnapTrim
+ PGSnapTrim,
+ PGScrub
> QVariant;
QVariant qvariant;
int cost;
: osd(osd), pg(pg), handle(handle) {}
void operator()(OpRequestRef &op);
void operator()(PGSnapTrim &op);
+ void operator()(PGScrub &op);
};
public:
PGQueueable(OpRequestRef op)
const entity_inst_t &owner)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner) {}
+ PGQueueable(
+ const PGScrub &op, int cost, unsigned priority, utime_t start_time,
+ const entity_inst_t &owner)
+ : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+ owner(owner) {}
boost::optional<OpRequestRef> maybe_get_op() {
OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
return op ? *op : boost::optional<OpRequestRef>();
ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
ThreadPool::WorkQueue<PG> &recovery_wq;
- ThreadPool::WorkQueue<PG> &scrub_wq;
GenContextWQ recovery_gen_wq;
GenContextWQ op_gen_wq;
ClassHandler *&class_handler;
ceph_clock_now(cct),
entity_inst_t())));
}
- bool queue_for_scrub(PG *pg) {
- return scrub_wq.queue(pg);
+ void queue_for_scrub(PG *pg) {
+ op_wq.queue(
+ make_pair(
+ pg,
+ PGQueueable(
+ PGScrub(pg->get_osdmap()->get_epoch()),
+ cct->_conf->osd_scrub_cost,
+ cct->_conf->osd_scrub_priority,
+ ceph_clock_now(cct),
+ entity_inst_t())));
}
// osd map cache (past osd maps)
bool scrub_should_schedule();
bool scrub_time_permit(utime_t now);
- xlist<PG*> scrub_queue;
-
- struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
- OSD *osd;
- ScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, si, tp), osd(o) {}
-
- bool _empty() {
- return osd->scrub_queue.empty();
- }
- bool _enqueue(PG *pg) {
- if (pg->scrub_item.is_on_list()) {
- return false;
- }
- pg->get("ScrubWQ");
- osd->scrub_queue.push_back(&pg->scrub_item);
- return true;
- }
- void _dequeue(PG *pg) {
- if (pg->scrub_item.remove_myself()) {
- pg->put("ScrubWQ");
- }
- }
- PG *_dequeue() {
- if (osd->scrub_queue.empty())
- return NULL;
- PG *pg = osd->scrub_queue.front();
- osd->scrub_queue.pop_front();
- return pg;
- }
- void _process(
- PG *pg,
- ThreadPool::TPHandle &handle) {
- pg->scrub(handle);
- pg->put("ScrubWQ");
- }
- void _clear() {
- while (!osd->scrub_queue.empty()) {
- PG *pg = osd->scrub_queue.front();
- osd->scrub_queue.pop_front();
- pg->put("ScrubWQ");
- }
- }
- } scrub_wq;
-
// -- removing --
struct RemoveWQ :
public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
coll(p), pg_log(cct),
pgmeta_oid(p.make_pgmeta_oid()),
missing_loc(this),
- recovery_item(this), scrub_item(this), stat_queue_item(this),
+ recovery_item(this), stat_queue_item(this),
snap_trim_queued(false),
+ scrub_queued(false),
recovery_ops_active(0),
role(0),
state(0),
}
}
+bool PG::requeue_scrub()
+{
+ assert(is_locked());
+ if (scrub_queued) {
+ dout(10) << __func__ << ": already queued" << dendl;
+ return false;
+ } else {
+ dout(10) << __func__ << ": queueing" << dendl;
+ scrub_queued = true;
+ osd->queue_for_scrub(this);
+ return true;
+ }
+}
+
bool PG::queue_scrub()
{
assert(_lock.is_locked());
state_set(PG_STATE_REPAIR);
scrubber.must_repair = false;
}
- osd->queue_for_scrub(this);
+ requeue_scrub();
return true;
}
scrubber.waiting_on_whom.erase(m->from);
if (scrubber.waiting_on == 0) {
- osd->scrub_wq.queue(this);
+ requeue_scrub();
}
}
void PG::clear_scrub_reserved()
{
- osd->scrub_wq.dequeue(this);
scrubber.reserved_peers.clear();
scrubber.reserve_failed = false;
* scrub will be chunky if all OSDs in PG support chunky scrub
* scrub will fail if OSDs are too old.
*/
-void PG::scrub(ThreadPool::TPHandle &handle)
+void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
{
- lock();
- if (deleting) {
- unlock();
- return;
- }
if (g_conf->osd_scrub_sleep > 0 &&
(scrubber.state == PG::Scrubber::NEW_CHUNK ||
scrubber.state == PG::Scrubber::INACTIVE)) {
lock();
dout(20) << __func__ << " slept for " << t << dendl;
}
+ if (deleting || pg_has_reset_since(queued)) {
+ return;
+ }
+ assert(scrub_queued);
+ scrub_queued = false;
if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
dout(10) << "scrub -- not primary or active or not clean" << dendl;
state_clear(PG_STATE_REPAIR);
state_clear(PG_STATE_DEEP_SCRUB);
publish_stats_to_osd();
- unlock();
return;
}
}
chunky_scrub(handle);
-
- unlock();
}
/*
if (scrubber.end < hobject_t::get_max()) {
scrubber.state = PG::Scrubber::NEW_CHUNK;
- osd->scrub_wq.queue(this);
+ requeue_scrub();
done = true;
} else {
scrubber.state = PG::Scrubber::FINISH;
peer_purged.clear();
actingbackfill.clear();
snap_trim_queued = false;
+ scrub_queued = false;
// reset primary state?
if (was_old_primary || is_primary()) {
return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
case MSG_OSD_EC_READ_REPLY:
return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+ case MSG_OSD_REP_SCRUB:
+ return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
case MSG_OSD_PG_SCAN:
return can_discard_scan(op);
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDECSubOpReadReply*>(op->get_req())->map_epoch);
+
+ case MSG_OSD_REP_SCRUB:
+ return !have_same_or_newer_map(
+ cur_epoch,
+ static_cast<MOSDRepScrub*>(op->get_req())->map_epoch);
}
assert(0);
return false;
/* You should not use these items without taking their respective queue locks
* (if they have one) */
- xlist<PG*>::item recovery_item, scrub_item, stat_queue_item;
+ xlist<PG*>::item recovery_item, stat_queue_item;
bool snap_trim_queued;
+ bool scrub_queued;
int recovery_ops_active;
set<pg_shard_t> waiting_on_backfill;
const hobject_t& soid, list<pair<ScrubMap::object, pg_shard_t> > *ok_peers,
pg_shard_t bad_peer);
- void scrub(ThreadPool::TPHandle &handle);
+ void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void chunky_scrub(ThreadPool::TPHandle &handle);
void scrub_compare_maps();
void scrub_process_inconsistent();
void log_weirdness();
void queue_snap_trim();
+ bool requeue_scrub();
bool queue_scrub();
/// share pg info after a pg is active
if (is_primary()) {
if (scrubber.active) {
if (last_update_applied == scrubber.subset_last_update) {
- osd->scrub_wq.queue(this);
+ requeue_scrub();
}
} else {
assert(scrubber.start == scrubber.end);
waiting_for_blocked_object.erase(p);
if (obc->requeue_scrub_on_unblock)
- osd->queue_for_scrub(this);
+ requeue_scrub();
}
SnapSetContext *ReplicatedPG::create_snapset_context(const hobject_t& oid)
// requeue an active chunky scrub waiting on recovery ops
if (!deleting && active_pushes == 0
&& scrubber.is_chunky_scrub_active()) {
- osd->scrub_wq.queue(this);
+ requeue_scrub();
}
unlock();
// remove from queues
osd->recovery_wq.dequeue(this);
- osd->scrub_wq.dequeue(this);
osd->pg_stat_queue_dequeue(this);
osd->dequeue_pg(this, 0);
osd->peering_wq.dequeue(this);
{
dout(20) << __func__ << dendl;
if (--scrubber.num_digest_updates_pending == 0) {
- osd->scrub_wq.queue(this);
+ requeue_scrub();
}
}