From ae184215f1254e64e1a0fc656b8cdbf2c4f1daa6 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 10 Dec 2008 11:35:04 -0800 Subject: [PATCH] osd: use new workqueue in osd for ops --- src/osd/OSD.cc | 25 ++++++------------------- src/osd/OSD.h | 31 +++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ddafa660a71ff..30fb27c1aade5 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -71,7 +71,6 @@ #include "common/Logger.h" #include "common/LogType.h" #include "common/Timer.h" -#include "common/ThreadPool.h" #include "common/LogClient.h" #include @@ -260,6 +259,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) : whoami(id), dev_name(dev), boot_epoch(0), last_active_epoch(0), state(STATE_BOOTING), + op_tp("OSD::op_tp", g_conf.osd_maxthreads), recovery_tp("OSD::recovery_tp", 1), disk_tp("OSD::disk_tp", 2), heartbeat_lock("OSD::heartbeat_lock"), @@ -273,6 +273,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) : qlen_calc(3), iat_averager(g_conf.osd_flash_crowd_iat_alpha), finished_lock("OSD::finished_lock"), + op_wq(this, &op_tp), osdmap(NULL), map_lock("OSD::map_lock"), map_cache_lock("OSD::map_cache_lock"), @@ -304,7 +305,6 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) : OSD::~OSD() { - delete threadpool; delete osdmap; delete logger; delete store; @@ -425,15 +425,6 @@ int OSD::init() osd_logtype.add_inc("mapf"); osd_logtype.add_inc("mapfdup"); - // request thread pool - { - char name[80]; - sprintf(name,"osd%d.threadpool", whoami); - threadpool = new ThreadPool(name, g_conf.osd_maxthreads, - static_dequeueop, - this); - } - // i'm ready! messenger->set_dispatcher(this); heartbeat_messenger->set_dispatcher(&heartbeat_dispatcher); @@ -441,11 +432,8 @@ int OSD::init() // announce to monitor i exist and have booted. do_mon_report(); - recovery_tp.add_work_queue(&recovery_wq); + op_tp.start(); recovery_tp.start(); - - disk_tp.add_work_queue(&scrub_wq); - disk_tp.add_work_queue(&snap_trim_wq); disk_tp.start(); // start the heartbeat @@ -489,13 +477,12 @@ int OSD::shutdown() clear_pg_stat_queue(); // stop threads - delete threadpool; - threadpool = 0; - recovery_tp.stop(); dout(10) << "recovery tp stopped" << dendl; disk_tp.stop(); dout(10) << "disk tp stopped" << dendl; + op_tp.stop(); + dout(10) << "op tp stopped" << dendl; // tell pgs we're shutting down for (hash_map::iterator p = pg_map.begin(); @@ -3519,7 +3506,7 @@ void OSD::enqueue_op(PG *pg, Message *op) // add pg to threadpool queue pg->get(); // we're exposing the pointer, here. - threadpool->put_op(pg); + op_wq.queue(pg); } /* diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 3a384be2f6c0a..a8c48f8dfcdb0 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -114,7 +114,7 @@ public: private: - + WorkThreadPool op_tp; WorkThreadPool recovery_tp; WorkThreadPool disk_tp; @@ -269,7 +269,34 @@ private: } // -- op queue -- - class ThreadPool *threadpool; + deque op_queue; + + struct OpWQ : public WorkThreadPool::WorkQueue { + OSD *osd; + OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue("OSD::OpWQ", tp), osd(o) {} + + bool _enqueue(PG *pg) { + pg->get(); + osd->op_queue.push_back(pg); + return true; + } + void _dequeue(PG *pg) { + assert(0); + } + PG * _dequeue() { + if (osd->op_queue.empty()) + return NULL; + PG *pg = osd->op_queue.front(); + osd->op_queue.pop_front(); + return pg; + } + void _process(PG *pg) { + osd->dequeue_op(pg); + } + void _clear() { + assert(osd->op_queue.empty()); + } + } op_wq; int pending_ops; bool waiting_for_no_ops; -- 2.39.5