From: sbrandt Date: Wed, 22 Jun 2005 05:43:36 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: v0.1~2048 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=92731357dfa2cb680e37df85af48640a7fcc8b62;p=ceph.git *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@333 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/common/Semaphore.h b/ceph/common/Semaphore.h index f8821f7c686a..dc407183d13f 100644 --- a/ceph/common/Semaphore.h +++ b/ceph/common/Semaphore.h @@ -16,7 +16,8 @@ class Semaphore count = 0; } - void Put() { + void Put() + { m.Lock(); count++; c.Signal(); @@ -27,7 +28,7 @@ class Semaphore { m.Lock(); while(count <= 0) { - C.Wait(m); + c.Wait(m); } count--; m.Unlock(); diff --git a/ceph/common/ThreadPool.h b/ceph/common/ThreadPool.h new file mode 100644 index 000000000000..a91153929f5d --- /dev/null +++ b/ceph/common/ThreadPool.h @@ -0,0 +1,98 @@ +#ifndef THREADPOOL +#define THREADPOOL + +#include +#include +#include +#include +#include + +using namespace std; + +#define MAX_THREADS 1000 + +template +class ThreadPool { + + private: + queue q; + Mutex q_lock; + Semaphore q_sem; + pthread_t *thread; + int num_ops; + int num_threads; + void (*func)(T*); + + static void *foo(void *arg) + { + ThreadPool *t = (ThreadPool *)arg; + t->do_ops(arg); + } + + void * do_ops(void *nothing) + { + T* op; + + cout << "Thread ready for action\n"; + while(1) { + q_sem.Get(); + op = get_op(); + + if(op == NULL) { + cout << "Thread exiting\n"; + pthread_exit(0); + } + func(op); + } + } + + + T* get_op() + { + T* op; + + q_lock.Lock(); + op = q.front(); + q.pop(); + num_ops--; + q_lock.Unlock(); + + return op; + } + + public: + + ThreadPool(int howmany, void (*f)(T*)) + { + int status; + + num_ops = 0; + func = f; + num_threads = howmany; + thread = new pthread_t[num_threads]; + + for(int i = 0; i < howmany; i++) { + status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this); + } + } + + ~ThreadPool() + { + for(int i = 0; i < num_threads; i++) { + cout << "Killing thread " << i << "\n"; + pthread_cancel(thread[i]); + } + delete thread; + } + + void put_op(T* op) + { + q_lock.Lock(); + q.push(op); + num_ops++; + q_sem.Put(); + q_lock.Unlock(); + } + +}; +#endif diff --git a/ceph/osd/ThreadPool.h b/ceph/osd/ThreadPool.h deleted file mode 100644 index 70e7610b6ca1..000000000000 --- a/ceph/osd/ThreadPool.h +++ /dev/null @@ -1,121 +0,0 @@ -#ifndef THREADPOOL -#define THREADPOOL - -#include -#include -#include -#include - -using namespace std; - -#define MAX_THREADS 1000 - -class Semaphore { - sem_t sem; - - public: - - void init(int i) { - sem_init(&sem, 0, i); - } - - void get() { - sem_wait(&sem); - } - - void put() { - sem_post(&sem); - } -}; - -template -class ThreadPool { - - private: - queue q; - Mutex q_lock; - Semaphore q_sem; - pthread_t thread[MAX_THREADS]; - int num_ops; - int num_threads; - void (*func)(T*); - - static void *foo(void *arg) { - ThreadPool *t = (ThreadPool *)arg; - t->do_ops(arg); - } - - void * do_ops(void *nothing) { - T* op; - - cout << "Thread ready for action\n"; - while(1) { - q_sem.get(); - op = get_op(); - - if(op == NULL) { - cout << "Thread exiting\n"; - pthread_exit(0); - } - func(op); - } - } - - - T* get_op() { - T* op; - - q_lock.Lock(); - op = q.front(); - q.pop(); - num_ops--; - q_lock.Unlock(); - - return op; - } - - public: - ThreadPool(int howmany, void (*f)(T*)) { - num_ops = 0; - num_threads = 0; - - int status; - - func = f; - - num_threads = howmany; - - q_sem.init(0); - - for(int i = 0; i < howmany; i++) { - status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this); - } - } - - ~ThreadPool() { - q_lock.Lock(); - for(int i = num_ops; i > 0; i--) - get_op(); - - for(int i = 0; i < num_threads; i++) { - put_op((T*)NULL); - } - q_lock.Unlock(); - - for(int i = 0; i < num_threads; i++) { - cout << "Waiting for thread " << i << " to die\n"; - pthread_join(thread[i], NULL); - } - - } - - void put_op(T* op) { - q_lock.Lock(); - q.push(op); - num_ops++; - q_sem.put(); - q_lock.Unlock(); - } - -}; -#endif diff --git a/ceph/osd/tp.cc b/ceph/osd/tp.cc index e817e26ed1cd..7968878e6503 100644 --- a/ceph/osd/tp.cc +++ b/ceph/osd/tp.cc @@ -1,43 +1,48 @@ #include #include -#include using namespace std; #include "common/Mutex.h" #include "osd/ThreadPool.h" +// #include class Op { int i; public: - Op(int i) { + Op(int i) + { this->i = i; } - int get() { + int get() + { return i; } }; -void foo(Op *o) { +void foo(Op *o) +{ cout << "Thread "<< pthread_self() << ": " << o->get() << "\n"; usleep(1); + + // sched_yield(); } -int main(int argc, char *argv) { +int main(int argc, char *argv) +{ ThreadPool *t = new ThreadPool(10, foo); - sleep(1); - for(int i = 0; i < 100; i++) { Op *o = new Op(i); t->put_op(o); } sleep(1); + delete(t); return 0;