#include <queue>
#include<semaphore.h>
#include <pthread.h>
+#include <common/Mutex.h>
using namespace std;
public:
- Semaphore(int i) {
+ void init(int i) {
sem_init(&sem, 0, i);
}
private:
queue<T *> q;
- Semaphore q_lock(1);
- Semaphore q_sem(0);
+ Mutex q_lock;
+ Semaphore q_sem;
pthread_t thread[MAX_THREADS];
int num_ops;
int num_threads;
T* get_op() {
T* op;
- q_lock.get();
+ q_lock.Lock();
op = q.front();
q.pop();
num_ops--;
- q_lock.put();
+ q_lock.Unlock();
return op;
}
num_threads = howmany;
+ q_sem.init(0);
+
for(int i = 0; i < howmany; i++) {
status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
}
}
~ThreadPool() {
- q_lock.get();
+ q_lock.Lock();
for(int i = num_ops; i > 0; i--)
get_op();
for(int i = 0; i < num_threads; i++) {
put_op((T*)NULL);
}
- q_lock.put();
+ q_lock.Unlock();
for(int i = 0; i < num_threads; i++) {
cout << "Waiting for thread " << i << " to die\n";
}
void put_op(T* op) {
- q_lock.get();
+ q_lock.Lock();
q.push(op);
num_ops++;
q_sem.put();
- q_lock.put();
+ q_lock.Unlock();
}
};
#include "common/Mutex.h"
#include "osd/ThreadPool.h"
-#include <thread.h>
class Op {
int i;
};
void foo(Op *o) {
- cout << "Thread "<< thr_self() << ": " << o->get() << "\n";
+ cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
usleep(1);
}