count = 0;
}
- void Put() {
+ void Put()
+ {
m.Lock();
count++;
c.Signal();
{
m.Lock();
while(count <= 0) {
- C.Wait(m);
+ c.Wait(m);
}
count--;
m.Unlock();
--- /dev/null
+#ifndef THREADPOOL
+#define THREADPOOL
+
+#include <queue>
+#include <pthread.h>
+#include <common/Mutex.h>
+#include <common/Cond.h>
+#include<common/Semaphore.h>
+
+using namespace std;
+
+#define MAX_THREADS 1000
+
+template <class T>
+class ThreadPool {
+
+ private:
+ queue<T *> q;
+ Mutex q_lock;
+ Semaphore q_sem;
+ pthread_t *thread;
+ int num_ops;
+ int num_threads;
+ void (*func)(T*);
+
+ static void *foo(void *arg)
+ {
+ ThreadPool *t = (ThreadPool *)arg;
+ t->do_ops(arg);
+ }
+
+ void * do_ops(void *nothing)
+ {
+ T* op;
+
+ cout << "Thread ready for action\n";
+ while(1) {
+ q_sem.Get();
+ op = get_op();
+
+ if(op == NULL) {
+ cout << "Thread exiting\n";
+ pthread_exit(0);
+ }
+ func(op);
+ }
+ }
+
+
+ T* get_op()
+ {
+ T* op;
+
+ q_lock.Lock();
+ op = q.front();
+ q.pop();
+ num_ops--;
+ q_lock.Unlock();
+
+ return op;
+ }
+
+ public:
+
+ ThreadPool(int howmany, void (*f)(T*))
+ {
+ int status;
+
+ num_ops = 0;
+ func = f;
+ num_threads = howmany;
+ thread = new pthread_t[num_threads];
+
+ for(int i = 0; i < howmany; i++) {
+ status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
+ }
+ }
+
+ ~ThreadPool()
+ {
+ for(int i = 0; i < num_threads; i++) {
+ cout << "Killing thread " << i << "\n";
+ pthread_cancel(thread[i]);
+ }
+ delete thread;
+ }
+
+ void put_op(T* op)
+ {
+ q_lock.Lock();
+ q.push(op);
+ num_ops++;
+ q_sem.Put();
+ q_lock.Unlock();
+ }
+
+};
+#endif
+++ /dev/null
-#ifndef THREADPOOL
-#define THREADPOOL
-
-#include <queue>
-#include<semaphore.h>
-#include <pthread.h>
-#include <common/Mutex.h>
-
-using namespace std;
-
-#define MAX_THREADS 1000
-
-class Semaphore {
- sem_t sem;
-
- public:
-
- void init(int i) {
- sem_init(&sem, 0, i);
- }
-
- void get() {
- sem_wait(&sem);
- }
-
- void put() {
- sem_post(&sem);
- }
-};
-
-template <class T>
-class ThreadPool {
-
- private:
- queue<T *> q;
- Mutex q_lock;
- Semaphore q_sem;
- pthread_t thread[MAX_THREADS];
- int num_ops;
- int num_threads;
- void (*func)(T*);
-
- static void *foo(void *arg) {
- ThreadPool *t = (ThreadPool *)arg;
- t->do_ops(arg);
- }
-
- void * do_ops(void *nothing) {
- T* op;
-
- cout << "Thread ready for action\n";
- while(1) {
- q_sem.get();
- op = get_op();
-
- if(op == NULL) {
- cout << "Thread exiting\n";
- pthread_exit(0);
- }
- func(op);
- }
- }
-
-
- T* get_op() {
- T* op;
-
- q_lock.Lock();
- op = q.front();
- q.pop();
- num_ops--;
- q_lock.Unlock();
-
- return op;
- }
-
- public:
- ThreadPool(int howmany, void (*f)(T*)) {
- num_ops = 0;
- num_threads = 0;
-
- int status;
-
- func = f;
-
- num_threads = howmany;
-
- q_sem.init(0);
-
- for(int i = 0; i < howmany; i++) {
- status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
- }
- }
-
- ~ThreadPool() {
- q_lock.Lock();
- for(int i = num_ops; i > 0; i--)
- get_op();
-
- for(int i = 0; i < num_threads; i++) {
- put_op((T*)NULL);
- }
- q_lock.Unlock();
-
- for(int i = 0; i < num_threads; i++) {
- cout << "Waiting for thread " << i << " to die\n";
- pthread_join(thread[i], NULL);
- }
-
- }
-
- void put_op(T* op) {
- q_lock.Lock();
- q.push(op);
- num_ops++;
- q_sem.put();
- q_lock.Unlock();
- }
-
-};
-#endif
#include <iostream>
#include <string>
-#include <stdlib.h>
using namespace std;
#include "common/Mutex.h"
#include "osd/ThreadPool.h"
+// #include <thread.h>
class Op {
int i;
public:
- Op(int i) {
+ Op(int i)
+ {
this->i = i;
}
- int get() {
+ int get()
+ {
return i;
}
};
-void foo(Op *o) {
+void foo(Op *o)
+{
cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
usleep(1);
+
+ // sched_yield();
}
-int main(int argc, char *argv) {
+int main(int argc, char *argv)
+{
ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
- sleep(1);
-
for(int i = 0; i < 100; i++) {
Op *o = new Op(i);
t->put_op(o);
}
sleep(1);
+
delete(t);
return 0;