#include "common/Logger.h"
#include "common/LogType.h"
#include "common/Timer.h"
-#include "common/ThreadPool.h"
#include "common/LogClient.h"
#include <iostream>
whoami(id), dev_name(dev),
boot_epoch(0), last_active_epoch(0),
state(STATE_BOOTING),
+ op_tp("OSD::op_tp", g_conf.osd_maxthreads),
recovery_tp("OSD::recovery_tp", 1),
disk_tp("OSD::disk_tp", 2),
heartbeat_lock("OSD::heartbeat_lock"),
qlen_calc(3),
iat_averager(g_conf.osd_flash_crowd_iat_alpha),
finished_lock("OSD::finished_lock"),
+ op_wq(this, &op_tp),
osdmap(NULL),
map_lock("OSD::map_lock"),
map_cache_lock("OSD::map_cache_lock"),
OSD::~OSD()
{
- delete threadpool;
delete osdmap;
delete logger;
delete store;
osd_logtype.add_inc("mapf");
osd_logtype.add_inc("mapfdup");
- // request thread pool
- {
- char name[80];
- sprintf(name,"osd%d.threadpool", whoami);
- threadpool = new ThreadPool<OSD*, PG*>(name, g_conf.osd_maxthreads,
- static_dequeueop,
- this);
- }
-
// i'm ready!
messenger->set_dispatcher(this);
heartbeat_messenger->set_dispatcher(&heartbeat_dispatcher);
// announce to monitor i exist and have booted.
do_mon_report();
- recovery_tp.add_work_queue(&recovery_wq);
+ op_tp.start();
recovery_tp.start();
-
- disk_tp.add_work_queue(&scrub_wq);
- disk_tp.add_work_queue(&snap_trim_wq);
disk_tp.start();
// start the heartbeat
clear_pg_stat_queue();
// stop threads
- delete threadpool;
- threadpool = 0;
-
recovery_tp.stop();
dout(10) << "recovery tp stopped" << dendl;
disk_tp.stop();
dout(10) << "disk tp stopped" << dendl;
+ op_tp.stop();
+ dout(10) << "op tp stopped" << dendl;
// tell pgs we're shutting down
for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
// add pg to threadpool queue
pg->get(); // we're exposing the pointer, here.
- threadpool->put_op(pg);
+ op_wq.queue(pg);
}
/*
private:
-
+ WorkThreadPool op_tp;
WorkThreadPool recovery_tp;
WorkThreadPool disk_tp;
}
// -- op queue --
- class ThreadPool<class OSD*, PG*> *threadpool;
+ deque<PG*> op_queue;
+
+ struct OpWQ : public WorkThreadPool::WorkQueue<PG> {
+ OSD *osd;
+ OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
+
+ bool _enqueue(PG *pg) {
+ pg->get();
+ osd->op_queue.push_back(pg);
+ return true;
+ }
+ void _dequeue(PG *pg) {
+ assert(0);
+ }
+ PG * _dequeue() {
+ if (osd->op_queue.empty())
+ return NULL;
+ PG *pg = osd->op_queue.front();
+ osd->op_queue.pop_front();
+ return pg;
+ }
+ void _process(PG *pg) {
+ osd->dequeue_op(pg);
+ }
+ void _clear() {
+ assert(osd->op_queue.empty());
+ }
+ } op_wq;
int pending_ops;
bool waiting_for_no_ops;