recovery_wq(osd->recovery_wq),
snap_trim_wq(osd->snap_trim_wq),
scrub_wq(osd->scrub_wq),
- rep_scrub_wq(osd->rep_scrub_wq),
recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
&osd->recovery_tp),
op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
cct->_conf->osd_scrub_thread_timeout,
cct->_conf->osd_scrub_thread_suicide_timeout,
&disk_tp),
- rep_scrub_wq(
- this,
- cct->_conf->osd_scrub_thread_timeout,
- cct->_conf->osd_scrub_thread_suicide_timeout,
- &disk_tp),
remove_wq(
store,
cct->_conf->osd_remove_thread_timeout,
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
break;
-
case MSG_OSD_PG_NOTIFY:
handle_pg_notify(op);
break;
case MSG_OSD_EC_READ_REPLY:
handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
break;
+ case MSG_OSD_REP_SCRUB:
+ handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
default:
assert(0);
}
handle_scrub(static_cast<MOSDScrub*>(m));
break;
- case MSG_OSD_REP_SCRUB:
- handle_rep_scrub(static_cast<MOSDRepScrub*>(m));
- break;
-
// -- need OSDMap --
default:
}
-void OSD::handle_rep_scrub(MOSDRepScrub *m)
-{
- dout(10) << __func__ << " " << *m << dendl;
- if (!require_self_aliveness(m, m->map_epoch)) {
- m->put();
- return;
- }
- if (!require_osd_peer(m)) {
- m->put();
- return;
- }
- if (osdmap->get_epoch() >= m->map_epoch &&
- !require_same_peer_instance(m, osdmap, true)) {
- m->put();
- return;
- }
-
- rep_scrub_wq.queue(m);
-}
-
void OSD::handle_scrub(MOSDScrub *m)
{
dout(10) << "handle_scrub " << *m << dendl;
ThreadPool::WorkQueue<PG> &recovery_wq;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
ThreadPool::WorkQueue<PG> &scrub_wq;
- ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
GenContextWQ recovery_gen_wq;
GenContextWQ op_gen_wq;
ClassHandler *&class_handler;
}
} scrub_wq;
- struct RepScrubWQ : public ThreadPool::WorkQueue<MOSDRepScrub> {
- private:
- OSD *osd;
- list<MOSDRepScrub*> rep_scrub_queue;
-
- public:
- RepScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, si, tp), osd(o) {}
-
- bool _empty() {
- return rep_scrub_queue.empty();
- }
- bool _enqueue(MOSDRepScrub *msg) {
- rep_scrub_queue.push_back(msg);
- return true;
- }
- void _dequeue(MOSDRepScrub *msg) {
- assert(0); // Not applicable for this wq
- return;
- }
- MOSDRepScrub *_dequeue() {
- if (rep_scrub_queue.empty())
- return NULL;
- MOSDRepScrub *msg = rep_scrub_queue.front();
- rep_scrub_queue.pop_front();
- return msg;
- }
- void _process(
- MOSDRepScrub *msg,
- ThreadPool::TPHandle &handle) {
- PG *pg = NULL;
- {
- Mutex::Locker lock(osd->osd_lock);
- if (osd->is_stopping() ||
- !osd->_have_pg(msg->pgid)) {
- msg->put();
- return;
- }
- pg = osd->_lookup_lock_pg(msg->pgid);
- }
- assert(pg);
- pg->replica_scrub(msg, handle);
- msg->put();
- pg->unlock();
- }
- void _clear() {
- while (!rep_scrub_queue.empty()) {
- MOSDRepScrub *msg = rep_scrub_queue.front();
- rep_scrub_queue.pop_front();
- msg->put();
- }
- }
- } rep_scrub_wq;
-
// -- removing --
struct RemoveWQ :
public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
case MSG_OSD_EC_WRITE_REPLY:
case MSG_OSD_EC_READ:
case MSG_OSD_EC_READ_REPLY:
+ case MSG_OSD_REP_SCRUB:
return true;
default:
return false;
static int write_meta(ObjectStore *store,
uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
- void handle_rep_scrub(MOSDRepScrub *m);
void handle_scrub(struct MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
spg_t(info.pgid.pgid, replica.shard), version,
get_osdmap()->get_epoch(),
start, end, deep, seed);
+ // default priority, we want the rep scrub processed prior to any recovery
+ // or client io messages (we are holding a lock!)
osd->send_message_osd_cluster(
replica.osd, repscrubop, get_osdmap()->get_epoch());
}
* scrubmap of objects that are in the range [msg->start, msg->end).
*/
void PG::replica_scrub(
- MOSDRepScrub *msg,
+ OpRequestRef op,
ThreadPool::TPHandle &handle)
{
+ MOSDRepScrub *msg = static_cast<MOSDRepScrub *>(op->get_req());
assert(!scrubber.active_rep_scrub);
dout(7) << "replica_scrub" << dendl;
assert(msg->chunky);
if (last_update_applied < msg->scrub_to) {
dout(10) << "waiting for last_update_applied to catch up" << dendl;
- scrubber.active_rep_scrub = msg;
+ scrubber.active_rep_scrub = op;
msg->get();
return;
}
if (active_pushes > 0) {
dout(10) << "waiting for active pushes to finish" << dendl;
- scrubber.active_rep_scrub = msg;
- msg->get();
+ scrubber.active_rep_scrub = op;
return;
}
epoch_start(0),
active(false), queue_snap_trim(false),
waiting_on(0), shallow_errors(0), deep_errors(0), fixed(0),
- active_rep_scrub(0),
must_scrub(false), must_deep_scrub(false), must_repair(false),
num_digest_updates_pending(0),
state(INACTIVE),
int fixed;
ScrubMap primary_scrubmap;
map<pg_shard_t, ScrubMap> received_maps;
- MOSDRepScrub *active_rep_scrub;
+ OpRequestRef active_rep_scrub;
utime_t scrub_reg_stamp; // stamp we registered for
// flags to indicate explicitly requested scrubs (by admin)
waiting_on = 0;
waiting_on_whom.clear();
if (active_rep_scrub) {
- active_rep_scrub->put();
- active_rep_scrub = NULL;
+ active_rep_scrub = OpRequestRef();
}
received_maps.clear();
void unreg_next_scrub();
void replica_scrub(
- struct MOSDRepScrub *op,
+ OpRequestRef op,
ThreadPool::TPHandle &handle);
void sub_op_scrub_map(OpRequestRef op);
void sub_op_scrub_reserve(OpRequestRef op);
do_backfill(op);
break;
+ case MSG_OSD_REP_SCRUB:
+ replica_scrub(op, handle);
+ break;
+
default:
assert(0 == "bad message type in do_request");
}
}
} else {
if (scrubber.active_rep_scrub) {
- if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
- osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
- scrubber.active_rep_scrub = 0;
+ if (last_update_applied == static_cast<MOSDRepScrub*>(
+ scrubber.active_rep_scrub->get_req())->scrub_to) {
+ osd->op_wq.queue(
+ make_pair(
+ this,
+ scrubber.active_rep_scrub));
+ scrubber.active_rep_scrub = OpRequestRef();
}
}
}
// requeue an active chunky scrub waiting on recovery ops
if (!deleting && active_pushes == 0 &&
- scrubber.active_rep_scrub && scrubber.active_rep_scrub->chunky) {
- osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
- scrubber.active_rep_scrub = 0;
+ scrubber.active_rep_scrub && static_cast<MOSDRepScrub*>(
+ scrubber.active_rep_scrub->get_req())->chunky) {
+ osd->op_wq.queue(
+ make_pair(
+ this,
+ scrubber.active_rep_scrub));
+ scrubber.active_rep_scrub = OpRequestRef();
}
unlock();