scrubs_pending(0),
scrubs_active(0),
scrub_wq(this, &disk_tp),
+ rep_scrub_wq(this, &disk_tp),
remove_wq(this, &disk_tp)
{
monc->set_messenger(client_messenger);
#include "include/CompatSet.h"
#include "auth/KeyRing.h"
+#include "messages/MOSDRepScrub.h"
#include <map>
#include <memory>
}
} scrub_wq;
+ struct RepScrubWQ : public ThreadPool::WorkQueue<MOSDRepScrub> {
+ private:
+ OSD *osd;
+ list<MOSDRepScrub*> rep_scrub_queue;
+
+ public:
+ RepScrubWQ(OSD *o, ThreadPool *tp) :
+ ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", tp), osd(o) {}
+
+ bool _empty() {
+ return rep_scrub_queue.empty();
+ }
+ bool _enqueue(MOSDRepScrub *msg) {
+ rep_scrub_queue.push_back(msg);
+ return true;
+ }
+ void _dequeue(MOSDRepScrub *msg) {
+ assert(0); // Not applicable for this wq
+ return;
+ }
+ MOSDRepScrub *_dequeue() {
+ if (rep_scrub_queue.empty())
+ return NULL;
+ MOSDRepScrub *msg = rep_scrub_queue.front();
+ rep_scrub_queue.pop_front();
+ return msg;
+ }
+ void _process(MOSDRepScrub *msg) {
+ osd->osd_lock.Lock();
+ if (osd->_have_pg(msg->pgid)) {
+ PG *pg = osd->_lookup_lock_pg(msg->pgid);
+ osd->osd_lock.Unlock();
+ pg->replica_scrub(msg);
+ pg->unlock();
+ } else {
+ msg->put();
+ osd->osd_lock.Unlock();
+ }
+ }
+ void _clear() {
+ while (!rep_scrub_queue.empty()) {
+ MOSDRepScrub *msg = rep_scrub_queue.front();
+ rep_scrub_queue.pop_front();
+ msg->put();
+ }
+ }
+ } rep_scrub_wq;
+
// -- removing --
xlist<PG*> remove_queue;