From a62d778e06ba8ae69ad04c6103e8d6e685223d7e Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 1 Oct 2012 16:11:40 -0700 Subject: [PATCH] OSD: use PrioritizedQueue for OpWQ The OpWQ PriorityQueue replaces OSD::op_queue, PG::op_queue, and PG::qlock. The syncronization is now done as part of the usual WorkQueue syncronization pattern. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 122 ++++++++++++++++++++++++++----------------------- src/osd/OSD.h | 59 ++++++++++++------------ src/osd/PG.cc | 21 +++------ src/osd/PG.h | 12 +---- 4 files changed, 102 insertions(+), 112 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 66a55863a5298..dc64e08c8b213 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -728,7 +728,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, finished_lock("OSD::finished_lock"), admin_ops_hook(NULL), historic_ops_hook(NULL), - op_queue_len(0), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200), map_lock("OSD::map_lock"), @@ -5609,37 +5608,84 @@ bool OSD::op_is_discardable(MOSDOp *op) void OSD::enqueue_op(PG *pg, OpRequestRef op) { dout(15) << "enqueue_op " << op << " " << *(op->request) << dendl; - pg->queue_op(op); + op_wq.queue(make_pair(PGRef(pg), op)); } -bool OSD::OpWQ::_enqueue(PG *pg) +void OSD::OpWQ::_enqueue(pair item) { - pg->get(); - osd->op_queue.push_back(pg); - osd->op_queue_len++; - osd->logger->set(l_osd_opq, osd->op_queue_len); - return true; + pqueue.enqueue(item.second->request->get_source_inst(), + 1, 1, item); + osd->logger->set(l_osd_opq, pqueue.length()); } -PG *OSD::OpWQ::_dequeue() +void OSD::OpWQ::_enqueue_front(pair item) { - if (osd->op_queue.empty()) - return NULL; - PG *pg = osd->op_queue.front(); - osd->op_queue.pop_front(); - osd->op_queue_len--; - osd->logger->set(l_osd_opq, osd->op_queue_len); + { + Mutex::Locker l(qlock); + if (pg_for_processing.count(&*(item.first))) { + pg_for_processing[&*(item.first)].push_front(item.second); + item.second = pg_for_processing[&*(item.first)].back(); + pg_for_processing[&*(item.first)].pop_back(); + } + } + pqueue.enqueue_front(item.second->request->get_source_inst(), + 1, 1, item); + osd->logger->set(l_osd_opq, pqueue.length()); +} + +PGRef OSD::OpWQ::_dequeue() +{ + assert(!pqueue.empty()); + PGRef pg; + { + Mutex::Locker l(qlock); + pair ret = pqueue.dequeue(); + pg = ret.first; + pg_for_processing[&*pg].push_back(ret.second); + } + osd->logger->set(l_osd_opq, pqueue.length()); return pg; } -void OSDService::queue_for_peering(PG *pg) +void OSD::OpWQ::_process(PGRef pg) { - peering_wq.queue(pg); + pg->lock(); + OpRequestRef op; + { + Mutex::Locker l(qlock); + assert(pg_for_processing.count(&*pg)); + assert(pg_for_processing[&*pg].size()); + op = pg_for_processing[&*pg].front(); + pg_for_processing[&*pg].pop_front(); + if (!(pg_for_processing[&*pg].size())) + pg_for_processing.erase(&*pg); + } + osd->dequeue_op(pg, op); + pg->unlock(); +} + +/* + * NOTE: dequeue called in worker thread, with pg lock + */ +void OSD::dequeue_op(PGRef pg, OpRequestRef op) +{ + dout(10) << "dequeue_op " << op << " " << *(op->request) + << " pg " << *pg << dendl; + if (pg->deleting) + return; + + op->mark_reached_pg(); + + pg->do_request(op); + + // finish + dout(10) << "dequeue_op " << op << " finish" << dendl; } -void OSDService::queue_for_op(PG *pg) + +void OSDService::queue_for_peering(PG *pg) { - op_wq.queue(pg); + peering_wq.queue(pg); } void OSD::process_peering_events(const list &pgs) @@ -5678,44 +5724,6 @@ void OSD::process_peering_events(const list &pgs) service.send_pg_temp(); } -/* - * NOTE: dequeue called in worker thread, without osd_lock - */ -void OSD::dequeue_op(PG *pg) -{ - OpRequestRef op; - - pg->lock(); - if (pg->deleting) { - pg->unlock(); - pg->put(); - return; - } - - pg->lockq(); - assert(!pg->op_queue.empty()); - op = pg->op_queue.front(); - pg->op_queue.pop_front(); - pg->unlockq(); - - dout(10) << "dequeue_op " << op << " " << *op->request << " pg " << *pg << dendl; - - op->mark_reached_pg(); - - pg->do_request(op); - - // unlock and put pg - pg->unlock(); - pg->put(); - - //#warning foo - //scrub_wq.queue(pg); - - // finish - dout(10) << "dequeue_op " << op << " finish" << dendl; -} - - // -------------------------------- int OSD::init_op_flags(MOSDOp *op) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 23a8729202f79..4b018d7f5d1f5 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -53,6 +53,7 @@ using namespace __gnu_cxx; #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "common/sharedptr_registry.hpp" +#include "common/PrioritizedQueue.h" #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ @@ -172,7 +173,7 @@ public: Messenger *&client_messenger; PerfCounters *&logger; MonClient *&monc; - ThreadPool::WorkQueue &op_wq; + ThreadPool::WorkQueueVal, PGRef> &op_wq; ThreadPool::BatchWorkQueue &peering_wq; ThreadPool::WorkQueue &recovery_wq; ThreadPool::WorkQueue &snap_trim_wq; @@ -279,7 +280,6 @@ public: void send_pg_temp(); void queue_for_peering(PG *pg); - void queue_for_op(PG *pg); bool queue_for_recovery(PG *pg); bool queue_for_snap_trim(PG *pg) { return snap_trim_wq.queue(pg); @@ -569,44 +569,43 @@ private: HistoricOpsSocketHook *historic_ops_hook; // -- op queue -- - list op_queue; - int op_queue_len; - struct OpWQ : public ThreadPool::WorkQueue { + struct OpWQ: public ThreadPool::WorkQueueVal, + PGRef > { + Mutex qlock; + map > pg_for_processing; OSD *osd; + PrioritizedQueue, entity_inst_t > pqueue; OpWQ(OSD *o, time_t ti, ThreadPool *tp) - : ThreadPool::WorkQueue("OSD::OpWQ", ti, ti*10, tp), osd(o) {} - - bool _enqueue(PG *pg); - void _dequeue(PG *pg) { - for (list::iterator i = osd->op_queue.begin(); - i != osd->op_queue.end(); - ) { - if (*i == pg) { - osd->op_queue.erase(i++); - pg->put(); - } else { - ++i; - } + : ThreadPool::WorkQueueVal, PGRef >( + "OSD::OpWQ", ti, ti*10, tp), + qlock("OpWQ::qlock"), + osd(o) {} + + void _enqueue_front(pair item); + void _enqueue(pair item); + PGRef _dequeue(); + + struct Pred { + PG *pg; + Pred(PG *pg) : pg(pg) {} + bool operator()(const pair &op) { + return op.first == pg; } + }; + void dequeue(PG *pg) { + lock(); + pqueue.remove_by_filter(Pred(pg)); + unlock(); } bool _empty() { - return osd->op_queue.empty(); - } - PG *_dequeue(); - void _process(PG *pg) { - osd->dequeue_op(pg); - } - void _clear() { - assert(osd->op_queue.empty()); + return pqueue.empty(); } + void _process(PGRef pg); } op_wq; void enqueue_op(PG *pg, OpRequestRef op); - void dequeue_op(PG *pg); - static void static_dequeueop(OSD *o, PG *pg) { - o->dequeue_op(pg); - }; + void dequeue_op(PGRef pg, OpRequestRef op); // -- peering queue -- struct PeeringWQ : public ThreadPool::BatchWorkQueue { diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 87fd4621bb14a..a1e232fbd97e4 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -63,7 +63,6 @@ PG::PG(OSDService *o, OSDMapRef curmap, const hobject_t& ioid) : osd(o), osdmap_ref(curmap), pool(_pool), _lock("PG::_lock"), - _qlock("PG::_qlock"), ref(0), deleting(false), dirty_info(false), dirty_log(false), info(p), coll(p), log_oid(loid), biginfo_oid(ioid), recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this), @@ -2768,12 +2767,12 @@ void PG::requeue_object_waiters(map >& m) void PG::requeue_ops(list &ls) { dout(15) << " requeue_ops " << ls << dendl; - lockq(); - assert(&ls != &op_queue); - size_t requeue_size = ls.size(); - op_queue.splice(op_queue.begin(), ls, ls.begin(), ls.end()); - for (size_t i = 0; i < requeue_size; ++i) osd->queue_for_op(this); - unlockq(); + for (list::reverse_iterator i = ls.rbegin(); + i != ls.rend(); + ++i) { + osd->op_wq.queue_front(make_pair(PGRef(this), *i)); + } + ls.clear(); } @@ -4688,14 +4687,6 @@ bool PG::must_delay_request(OpRequestRef op) return false; } -void PG::queue_op(OpRequestRef op) -{ - _qlock.Lock(); - op_queue.push_back(op); - osd->queue_for_op(this); - _qlock.Unlock(); -} - void PG::take_waiters() { dout(10) << "take_waiters" << dendl; diff --git a/src/osd/PG.h b/src/osd/PG.h index 13d529f4d6ebb..151b6705e9f38 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -348,7 +348,6 @@ protected: * put_unlock() when done with the current pointer (_most common_). */ Mutex _lock; - Mutex _qlock; Cond _cond; atomic_t ref; @@ -356,18 +355,12 @@ public: bool deleting; // true while RemoveWQ should be chewing on us void lock(bool no_lockdep = false); - void lockq(bool no_lockdep = false) { - _qlock.Lock(no_lockdep); - } void unlock() { //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl; assert(!dirty_info); assert(!dirty_log); _lock.Unlock(); } - void unlockq() { - _qlock.Unlock(); - } /* During handle_osd_map, the osd holds a write lock to the osdmap. * *_with_map_lock_held assume that the map_lock is already held */ @@ -403,8 +396,6 @@ public: } - list op_queue; // op queue - bool dirty_info, dirty_log; public: @@ -1732,7 +1723,6 @@ public: bool can_discard_request(OpRequestRef op); bool must_delay_request(OpRequestRef op); - void queue_op(OpRequestRef op); bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch); bool old_peering_evt(CephPeeringEvtRef evt) { @@ -1810,4 +1800,6 @@ ostream& operator<<(ostream& out, const PG& pg); void intrusive_ptr_add_ref(PG *pg); void intrusive_ptr_release(PG *pg); +typedef boost::intrusive_ptr PGRef; + #endif -- 2.39.5