common/sctp_crc32.c\
common/assert.cc \
common/debug.cc \
+ common/WorkQueue.cc \
mon/MonMap.cc \
mon/MonClient.cc \
osd/OSDMap.cc \
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "WorkQueue.h"
+
+#include "config.h"
+
+#define DOUT_SUBSYS tp
+#undef dout_prefix
+#define dout_prefix *_dout << dbeginl << pthread_self() << " " << name << " "
+
+
+void ThreadPool::worker()
+{
+ _lock.Lock();
+ dout(10) << "worker start" << dendl;
+ while (!_stop) {
+ if (!_pause && work_queues.size()) {
+ _WorkQueue *wq;
+ int tries = work_queues.size();
+ bool did = false;
+ while (tries--) {
+ last_work_queue++;
+ last_work_queue %= work_queues.size();
+ wq = work_queues[last_work_queue];
+
+ void *item = wq->_void_dequeue();
+ if (item) {
+ processing++;
+ dout(12) << "worker wq " << wq->name << " start processing " << item << dendl;
+ _lock.Unlock();
+ wq->_void_process(item);
+ _lock.Lock();
+ dout(15) << "worker wq " << wq->name << " done processing " << item << dendl;
+ processing--;
+ if (_pause)
+ _wait_cond.Signal();
+ did = true;
+ break;
+ }
+ }
+ if (did)
+ continue;
+ }
+ dout(15) << "worker waiting" << dendl;
+ _cond.Wait(_lock);
+ }
+ dout(0) << "worker finish" << dendl;
+ _lock.Unlock();
+}
+
+void ThreadPool::start()
+{
+ dout(10) << "start" << dendl;
+ for (set<WorkThread*>::iterator p = _threads.begin();
+ p != _threads.end();
+ p++)
+ (*p)->create();
+ dout(15) << "started" << dendl;
+}
+void ThreadPool::stop(bool clear_after)
+{
+ dout(10) << "stop" << dendl;
+ _lock.Lock();
+ _stop = true;
+ _cond.Signal();
+ _lock.Unlock();
+ for (set<WorkThread*>::iterator p = _threads.begin();
+ p != _threads.end();
+ p++)
+ (*p)->join();
+ _lock.Lock();
+ for (unsigned i=0; i<work_queues.size(); i++)
+ work_queues[i]->_clear();
+ _lock.Unlock();
+ dout(15) << "stopped" << dendl;
+}
+
+
+void ThreadPool::pause()
+{
+ dout(10) << "pause" << dendl;
+ _lock.Lock();
+ assert(!_pause);
+ _pause = true;
+ while (processing)
+ _wait_cond.Wait(_lock);
+ _lock.Unlock();
+ dout(15) << "paused" << dendl;
+}
+
+void ThreadPool::pause_new()
+{
+ dout(10) << "pause_new" << dendl;
+ _lock.Lock();
+ assert(!_pause);
+ _pause = true;
+ _lock.Unlock();
+}
+
+void ThreadPool::unpause()
+{
+ dout(10) << "unpause" << dendl;
+ _lock.Lock();
+ assert(_pause);
+ _pause = false;
+ _cond.Signal();
+ _lock.Unlock();
+}
#include "Cond.h"
#include "Thread.h"
-class WorkThreadPool {
+class ThreadPool {
+ string name;
Mutex _lock;
Cond _cond;
bool _stop, _pause;
struct _WorkQueue {
string name;
_WorkQueue(string n) : name(n) {}
- virtual bool _try_process() = 0;
virtual void _clear() = 0;
+ virtual void *_void_dequeue() = 0;
+ virtual void _void_process(void *) = 0;
};
public:
template<class T>
class WorkQueue : public _WorkQueue {
- WorkThreadPool *pool;
+ ThreadPool *pool;
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
virtual void _process(T *) = 0;
virtual void _clear() = 0;
+ void *_void_dequeue() {
+ return (void *)_dequeue();
+ }
+ void _void_process(void *p) {
+ _process((T *)p);
+ }
+
public:
- WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) {
+ WorkQueue(string n, ThreadPool *p) : _WorkQueue(n), pool(p) {
pool->add_work_queue(this);
}
~WorkQueue() {
pool->_lock.Unlock();
}
- bool _try_process() {
- T *item = _dequeue();
- if (item) {
- pool->_lock.Unlock();
- _process(item);
- pool->_lock.Lock();
- return true;
- }
- return false;
- }
-
void lock() {
pool->lock();
}
// threads
struct WorkThread : public Thread {
- WorkThreadPool *pool;
- WorkThread(WorkThreadPool *p) : pool(p) {}
+ ThreadPool *pool;
+ WorkThread(ThreadPool *p) : pool(p) {}
void *entry() {
- pool->entry();
+ pool->worker();
return 0;
}
};
set<WorkThread*> _threads;
int processing;
-
- void entry() {
- _lock.Lock();
- //generic_dout(0) << "entry start" << dendl;
- while (!_stop) {
- if (!_pause && work_queues.size()) {
- _WorkQueue *wq;
- int tries = work_queues.size();
- bool did = false;
- while (tries--) {
- last_work_queue++;
- last_work_queue %= work_queues.size();
- wq = work_queues[last_work_queue];
-
- processing++;
- //generic_dout(0) << "entry trying wq " << wq->name << dendl;
- did = wq->_try_process();
- processing--;
- //if (did) generic_dout(0) << "entry did wq " << wq->name << dendl;
- if (did && _pause)
- _wait_cond.Signal();
- if (did)
- break;
- }
- if (did)
- continue;
- }
- //generic_dout(0) << "entry waiting" << dendl;
- _cond.Wait(_lock);
- }
- //generic_dout(0) << "entry finish" << dendl;
- _lock.Unlock();
- }
+ void worker();
public:
- WorkThreadPool(string name, int n=1) :
+ ThreadPool(string nm, int n=1) :
+ name(nm),
_lock((new string(name + "::lock"))->c_str()), // deliberately leak this
_stop(false),
_pause(false),
processing(0) {
set_num_threads(n);
}
- ~WorkThreadPool() {
+ ~ThreadPool() {
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
p++)
}
}
- void start() {
- for (set<WorkThread*>::iterator p = _threads.begin();
- p != _threads.end();
- p++)
- (*p)->create();
- }
- void stop(bool clear_after=true) {
- _lock.Lock();
- _stop = true;
- _cond.Signal();
- _lock.Unlock();
- for (set<WorkThread*>::iterator p = _threads.begin();
- p != _threads.end();
- p++)
- (*p)->join();
- _lock.Lock();
- for (unsigned i=0; i<work_queues.size(); i++)
- work_queues[i]->_clear();
- _lock.Unlock();
- }
void kick() {
_lock.Lock();
_cond.Signal();
_lock.Unlock();
}
- void pause() {
- _lock.Lock();
- assert(!_pause);
- _pause = true;
- while (processing)
- _wait_cond.Wait(_lock);
- _lock.Unlock();
- }
- void pause_new() {
- _lock.Lock();
- assert(!_pause);
- _pause = true;
- _lock.Unlock();
- }
-
- void unpause() {
- _lock.Lock();
- assert(_pause);
- _pause = false;
- _cond.Signal();
- _lock.Unlock();
- }
-
+ void start();
+ void stop(bool clear_after=true);
+ void pause();
+ void pause_new();
+ void unpause();
};
debug_ms: 0,
debug_mon: 1,
debug_paxos: 0,
+ debug_tp: 0,
debug_after: 0,
g_conf.debug_paxos = atoi(args[++i]);
else
g_debug_after_conf.debug_paxos = atoi(args[++i]);
+ else if (strcmp(args[i], "--debug_tp") == 0)
+ if (!g_conf.debug_after)
+ g_conf.debug_tp = atoi(args[++i]);
+ else
+ g_debug_after_conf.debug_tp = atoi(args[++i]);
else if (strcmp(args[i], "--debug_after") == 0) {
g_conf.debug_after = atoi(args[++i]);
int debug_ms;
int debug_mon;
int debug_paxos;
+ int debug_tp;
int debug_after;
#endif
#include <iostream>
+#include <istream>
#include <iomanip>
#include <list>
+#include <string>
+
+using std::istream;
+using std::string;
#include "atomic.h"
#include "page.h"
#include "ObjectStore.h"
#include "JournalingObjectStore.h"
-#include "common/ThreadPool.h"
#include "common/Mutex.h"
#include "Fake.h"
#include "common/Mutex.h"
#include "common/RWLock.h"
-#include "common/ThreadPool.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/LogClient.h"
private:
- WorkThreadPool op_tp;
- WorkThreadPool recovery_tp;
- WorkThreadPool disk_tp;
+ ThreadPool op_tp;
+ ThreadPool recovery_tp;
+ ThreadPool disk_tp;
// -- op queue --
deque<PG*> op_queue;
- struct OpWQ : public WorkThreadPool::WorkQueue<PG> {
+ struct OpWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
+ OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
pg->get();
utime_t defer_recovery_until;
int recovery_ops_active;
- struct RecoveryWQ : public WorkThreadPool::WorkQueue<PG> {
+ struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
+ RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (!pg->recovery_item.get_xlist()) {
// -- snap trimming --
xlist<PG*> snap_trim_queue;
- struct SnapTrimWQ : public WorkThreadPool::WorkQueue<PG> {
+ struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
+ SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (pg->snap_trim_item.is_on_xlist())
// -- scrubbing --
xlist<PG*> scrub_queue;
- struct ScrubWQ : public WorkThreadPool::WorkQueue<PG> {
+ struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
- ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
+ ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (pg->scrub_item.is_on_xlist())