+++ /dev/null
-#include "include/types.h"
-
-#include "OSD.h"
-#include "FakeStore.h"
-#include "OSDCluster.h"
-
-#include "mds/MDS.h"
-
-#include "msg/Messenger.h"
-#include "msg/Message.h"
-
-#include "msg/HostMonitor.h"
-
-#include "messages/MGenericMessage.h"
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MOSDOp.h"
-#include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetClusterAck.h"
-
-#include "common/Logger.h"
-#include "common/LogType.h"
-#include "common/Mutex.h"
-
-#include "OSD/ThreadPool.h"
-
-#include <queue>
-
-#include <iostream>
-#include <cassert>
-#include <errno.h>
-#include <sys/stat.h>
-
-void main(int argc, char *argv) {
- ThreadPool t(10);
-
-}
-
-ThreadPool::Threadpool(int howmany) {
- num_ops = 0;
- num_threads = 0;
-
- int status;
-
- num_threads = howmany;
-
- for(int i = 0; i < howmany; i++) {
- status = pthread_create(thread[i], NULL, do_ops, (void *)&i);
- }
-}
-
-ThreadPool::~Threadpool() {
- queue_lock.Lock();
- for(int i = num_ops; i > 0; i--)
- get_op();
-
- for(int i = 0; i < num_threads; i++) {
- put_op((MOSDOp *)NULL);
- }
-
- for(int i = 0; i < num_threads; i++) {
- cout << "Waiting for thread " << i << " to die";
- pthread_join(threads[i]);
- }
-
- queue_lock.Unlock();
-}
-
-void do_ops(void *whoami) {
- MOSDOp *op;
-
- cout << "Thread " << (int)i << " ready for action\n";
- while(1) {
- op = get_op();
-
- if(op == NULL) {
- cout << "Thread " << (int)i << " dying";
- pthread_exit(0);
- }
-
- OSD.do_op(op);
- }
-}
-
-MOSDOp *get_op() {
- MOSDOp *op;
- queue_lock.Lock();
- op = op_queue.pop();
- num_ops--;
- queue_lock.Unlock();
-}
-
-void put_op(MOSDOp *op) {
- queue_lock.Lock();
- opqueue.push(op);
- num_ops++;
- queue_lock.Unlock();
-}
-
+#ifndef THREADPOOL
+#define THREADPOOL
+
+#include <queue>
+#include<semaphore.h>
+#include <pthread.h>
+
+using namespace std;
+
#define MAX_THREADS 1000
+class Semaphore {
+ sem_t sem;
+
+ public:
+
+ Semaphore(int i) {
+ sem_init(&sem, 0, i);
+ }
+
+ void get() {
+ sem_wait(&sem);
+ }
+
+ void put() {
+ sem_post(&sem);
+ }
+};
+
+template <class T>
class ThreadPool {
- queue<MOSDOp *> op_queue;
- Mutex queue_lock;
+
+ private:
+ queue<T *> q;
+ Semaphore q_lock(1);
+ Semaphore q_sem(0);
pthread_t thread[MAX_THREADS];
int num_ops;
int num_threads;
+ void (*func)(T*);
+
+ static void *foo(void *arg) {
+ ThreadPool *t = (ThreadPool *)arg;
+ t->do_ops(arg);
+ }
+
+ void * do_ops(void *nothing) {
+ T* op;
+
+ cout << "Thread ready for action\n";
+ while(1) {
+ q_sem.get();
+ op = get_op();
+
+ if(op == NULL) {
+ cout << "Thread exiting\n";
+ pthread_exit(0);
+ }
+ func(op);
+ }
+ }
+
+
+ T* get_op() {
+ T* op;
+
+ q_lock.get();
+ op = q.front();
+ q.pop();
+ num_ops--;
+ q_lock.put();
+
+ return op;
+ }
+
+ public:
+ ThreadPool(int howmany, void (*f)(T*)) {
+ num_ops = 0;
+ num_threads = 0;
+
+ int status;
+
+ func = f;
+
+ num_threads = howmany;
+
+ for(int i = 0; i < howmany; i++) {
+ status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
+ }
+ }
+
+ ~ThreadPool() {
+ q_lock.get();
+ for(int i = num_ops; i > 0; i--)
+ get_op();
- ThreadPool::Threadpool(int howmany);
+ for(int i = 0; i < num_threads; i++) {
+ put_op((T*)NULL);
+ }
+ q_lock.put();
- ThreadPool::~Threadpool();
+ for(int i = 0; i < num_threads; i++) {
+ cout << "Waiting for thread " << i << " to die\n";
+ pthread_join(thread[i], NULL);
+ }
- void put_op(MOSDOp *op);
+ }
- void do_ops(void *whoami);
+ void put_op(T* op) {
+ q_lock.get();
+ q.push(op);
+ num_ops++;
+ q_sem.put();
+ q_lock.put();
+ }
- MOSDOp *get_op();
-}
+};
+#endif
--- /dev/null
+
+#include <iostream>
+#include <string>
+#include <stdlib.h>
+
+using namespace std;
+
+#include "common/Mutex.h"
+#include "osd/ThreadPool.h"
+#include <thread.h>
+
+class Op {
+ int i;
+
+public:
+
+ Op(int i) {
+ this->i = i;
+ }
+
+ int get() {
+ return i;
+ }
+};
+
+void foo(Op *o) {
+ cout << "Thread "<< thr_self() << ": " << o->get() << "\n";
+ usleep(1);
+}
+
+int main(int argc, char *argv) {
+ ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
+
+ sleep(1);
+
+ for(int i = 0; i < 100; i++) {
+ Op *o = new Op(i);
+ t->put_op(o);
+ }
+
+ sleep(1);
+ delete(t);
+
+ return 0;
+}
+