From 2bea2d519286332d786e0cf2f7c2b454cd7c6807 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 18 Nov 2008 16:46:52 -0800 Subject: [PATCH] osd: add scrub wq --- src/Makefile.am | 1 + src/common/WorkQueue.h | 104 ++++++++++++++++++++++++++++++++++++++++ src/osd/OSD.cc | 11 ++++- src/osd/OSD.h | 27 +++++++++++ src/osd/PG.h | 6 ++- src/osd/ReplicatedPG.cc | 28 +++++++++++ 6 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 src/common/WorkQueue.h diff --git a/src/Makefile.am b/src/Makefile.am index e934332120274..31bd0bcd62c37 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -274,6 +274,7 @@ noinst_HEADERS = \ common/Thread.h\ common/ThreadPool.h\ common/Timer.h\ + common/WorkQueue.h\ config.h\ crush/CrushWrapper.h\ crush/CrushWrapper.i\ diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h new file mode 100644 index 0000000000000..7901725ba3674 --- /dev/null +++ b/src/common/WorkQueue.h @@ -0,0 +1,104 @@ +#ifndef __CEPH_WORKQUEUE +#define __CEPH_WORKQUEUE + +#include "Mutex.h" +#include "Cond.h" +#include "Thread.h" + +template +class WorkQueue { + + Mutex lock; + Cond cond; + Mutex queue_lock; + bool _stop, _pause; + int processing; + Cond wait_cond; + + void entry() { + lock.Lock(); + while (!_stop) { + if (!_pause) { + queue_lock.Lock(); + T *item = _dequeue(); + queue_lock.Unlock(); + if (item) { + processing++; + lock.Unlock(); + _process(item); + lock.Lock(); + processing--; + if (_pause) + wait_cond.Signal(); + continue; + } + } + cond.Wait(lock); + } + lock.Unlock(); + } + + struct WorkThread : public Thread { + WorkQueue *wq; + WorkThread(WorkQueue *q) : wq(q) {} + void *entry() { + wq->entry(); + return 0; + } + } thread; + +public: + WorkQueue() : lock("WorkQueue::lock"), + queue_lock("WorkQueue::queue_lock"), + _stop(false), _pause(false), + processing(0), + thread(this) {} + + virtual void _enqueue(T *) = 0; + virtual void _dequeue(T *) = 0; + virtual T *_dequeue() = 0; + virtual void _process(T *) = 0; + + void start() { + thread.create(); + } + void stop() { + lock.Lock(); + _stop = true; + cond.Signal(); + lock.Unlock(); + thread.join(); + } + + void pause() { + lock.Lock(); + assert(!_pause); + _pause = true; + while (processing) + wait_cond.Wait(lock); + lock.Unlock(); + } + + void unpause() { + lock.Lock(); + assert(pause); + pause = false; + cond.Signal(); + lock.Unlock(); + } + + void queue(T *item) { + queue_lock.Lock(); + _enqueue(item); + queue_lock.Unlock(); + } + void unqueue(T *item) { + queue_lock.Lock(); + _dequeue(item); + queue_lock.Unlock(); + } + +}; + + +#endif diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c1d10d06edadf..631c41e9fb7d0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -280,7 +280,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) : recovery_lock("OSD::recovery_lock"), recovery_ops_active(0), recovery_stop(false), recovery_pause(false), remove_list_lock("OSD::remove_list_lock"), - recovery_thread(this) + recovery_thread(this), + scrub_wq(this) { messenger = m; monmap = mm; @@ -438,6 +439,7 @@ int OSD::init() do_mon_report(); // start mon report timer recovery_thread.create(); + scrub_wq.start(); // start the heartbeat timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this)); @@ -475,6 +477,9 @@ int OSD::shutdown() stop_recovery_thread(); dout(10) << "recovery thread stopped" << dendl; + scrub_wq.stop(); + dout(10) << "scrub thread stopped" << dendl; + // zap waiters (bleh, this is messy) finished_lock.Lock(); finished.clear(); @@ -1503,6 +1508,7 @@ void OSD::handle_osd_map(MOSDMap *m) wait_for_no_ops(); pause_recovery_thread(); + scrub_wq.pause(); map_lock.get_write(); assert(osd_lock.is_locked()); @@ -3215,6 +3221,9 @@ void OSD::handle_op(MOSDOp *op) } pg->unlock(); + +#warning hack + scrub_wq.queue(pg); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index cb3ba10938803..0442480e64554 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -21,6 +21,7 @@ #include "common/RWLock.h" #include "common/ThreadPool.h" #include "common/Timer.h" +#include "common/WorkQueue.h" #include "mon/MonMap.h" @@ -512,6 +513,32 @@ private: }; + // -- scrubbing -- + xlist scrub_queue; + + struct ScrubWQ : public WorkQueue { + OSD *osd; + ScrubWQ(OSD *o) : osd(o) {} + + void _enqueue(PG *pg) { + osd->scrub_queue.push_back(&pg->scrub_item); + } + void _dequeue(PG *pg) { + pg->scrub_item.remove_myself(); + } + PG * _dequeue() { + if (osd->scrub_queue.empty()) + return NULL; + PG *pg = osd->scrub_queue.front(); + osd->scrub_queue.pop_front(); + return pg; + } + void _process(PG *pg) { + pg->scrub(); + } + } scrub_wq; + + public: OSD(int id, Messenger *m, MonMap *mm, const char *dev = 0); ~OSD(); diff --git a/src/osd/PG.h b/src/osd/PG.h index 37550b12303c2..db3281a693583 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -556,7 +556,7 @@ public: set snap_collections; map past_intervals; - xlist::item recovery_item; + xlist::item recovery_item, scrub_item; int recovery_ops_active; protected: @@ -683,7 +683,8 @@ public: _lock("PG::_lock"), ref(0), deleted(false), info(p), - recovery_item(this), recovery_ops_active(0), + recovery_item(this), scrub_item(this), + recovery_ops_active(0), role(0), state(0), pending_snap_removal_item(this), @@ -768,6 +769,7 @@ public: virtual void do_sub_op(MOSDSubOp *op) = 0; virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0; virtual bool snap_trimmer() = 0; + virtual void scrub() { }; virtual bool same_for_read_since(epoch_t e) = 0; virtual bool same_for_modify_since(epoch_t e) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4a309f82e28bb..4dd8e0e8b7922 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2785,12 +2785,17 @@ void ReplicatedPG::scrub() unsigned curclone = 0; int r; + pg_stat_t stat; + bufferlist last_data; for (vector::reverse_iterator p = ls.rbegin(); p != ls.rend(); p++) { pobject_t poid = *p; + stat.num_objects++; + if (poid.oid.snap != CEPH_NOSNAP) + stat.num_object_clones++; // basic checks. eversion_t v; @@ -2801,6 +2806,9 @@ void ReplicatedPG::scrub() dout(20) << "scrub " << poid << " v " << v << " size " << st.st_size << dendl; + stat.num_bytes += st.st_size; + stat.num_kb += SHIFT_ROUND_UP(st.st_size, 10); + bufferlist data; osd->store->read(c, poid, 0, 0, data); assert(data.length() == st.st_size); @@ -2827,6 +2835,19 @@ void ReplicatedPG::scrub() head = pobject_t(); // no clones. else curclone = snapset.clones.size()-1; + + // subtract off any clone overlap + for (map >::iterator q = snapset.clone_overlap.begin(); + q != snapset.clone_overlap.end(); + q++) { + for (map<__u64,__u64>::iterator r = q->second.m.begin(); + r != q->second.m.end(); + r++) { + stat.num_bytes -= r->second; + stat.num_kb -= SHIFT_ROUND_UP(r->first+r->second, 10) - (r->first >> 10); + } + } + } else if (poid.oid.snap) { // it's a clone assert(head != pobject_t()); @@ -2857,6 +2878,13 @@ void ReplicatedPG::scrub() } } + dout(10) << "scrub got " + << stat.num_objects << "/" << pg_stats.num_objects << " objects, " + << stat.num_object_clones << "/" << pg_stats.num_object_clones << " clones, " + << stat.num_bytes << "/" << pg_stats.num_bytes << " bytes, " + << stat.num_kb << "/" << pg_stats.num_kb << " kb." + << dendl; + dout(10) << "scrub finish" << dendl; unlock(); } -- 2.39.5