From 92731357dfa2cb680e37df85af48640a7fcc8b62 Mon Sep 17 00:00:00 2001 From: sbrandt Date: Wed, 22 Jun 2005 05:43:36 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@333 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/common/Semaphore.h | 5 ++- ceph/{osd => common}/ThreadPool.h | 67 ++++++++++--------------------- ceph/osd/tp.cc | 19 +++++---- 3 files changed, 37 insertions(+), 54 deletions(-) rename ceph/{osd => common}/ThreadPool.h (58%) diff --git a/ceph/common/Semaphore.h b/ceph/common/Semaphore.h index f8821f7c686ae..dc407183d13f7 100644 --- a/ceph/common/Semaphore.h +++ b/ceph/common/Semaphore.h @@ -16,7 +16,8 @@ class Semaphore count = 0; } - void Put() { + void Put() + { m.Lock(); count++; c.Signal(); @@ -27,7 +28,7 @@ class Semaphore { m.Lock(); while(count <= 0) { - C.Wait(m); + c.Wait(m); } count--; m.Unlock(); diff --git a/ceph/osd/ThreadPool.h b/ceph/common/ThreadPool.h similarity index 58% rename from ceph/osd/ThreadPool.h rename to ceph/common/ThreadPool.h index 70e7610b6ca15..a91153929f5d8 100644 --- a/ceph/osd/ThreadPool.h +++ b/ceph/common/ThreadPool.h @@ -2,32 +2,15 @@ #define THREADPOOL #include -#include #include #include +#include +#include using namespace std; #define MAX_THREADS 1000 -class Semaphore { - sem_t sem; - - public: - - void init(int i) { - sem_init(&sem, 0, i); - } - - void get() { - sem_wait(&sem); - } - - void put() { - sem_post(&sem); - } -}; - template class ThreadPool { @@ -35,22 +18,24 @@ class ThreadPool { queue q; Mutex q_lock; Semaphore q_sem; - pthread_t thread[MAX_THREADS]; + pthread_t *thread; int num_ops; int num_threads; void (*func)(T*); - static void *foo(void *arg) { + static void *foo(void *arg) + { ThreadPool *t = (ThreadPool *)arg; t->do_ops(arg); } - void * do_ops(void *nothing) { + void * do_ops(void *nothing) + { T* op; cout << "Thread ready for action\n"; while(1) { - q_sem.get(); + q_sem.Get(); op = get_op(); if(op == NULL) { @@ -62,7 +47,8 @@ class ThreadPool { } - T* get_op() { + T* get_op() + { T* op; q_lock.Lock(); @@ -75,45 +61,36 @@ class ThreadPool { } public: - ThreadPool(int howmany, void (*f)(T*)) { - num_ops = 0; - num_threads = 0; + ThreadPool(int howmany, void (*f)(T*)) + { int status; + num_ops = 0; func = f; - num_threads = howmany; - - q_sem.init(0); + thread = new pthread_t[num_threads]; for(int i = 0; i < howmany; i++) { status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this); } } - ~ThreadPool() { - q_lock.Lock(); - for(int i = num_ops; i > 0; i--) - get_op(); - + ~ThreadPool() + { for(int i = 0; i < num_threads; i++) { - put_op((T*)NULL); + cout << "Killing thread " << i << "\n"; + pthread_cancel(thread[i]); } - q_lock.Unlock(); - - for(int i = 0; i < num_threads; i++) { - cout << "Waiting for thread " << i << " to die\n"; - pthread_join(thread[i], NULL); - } - + delete thread; } - void put_op(T* op) { + void put_op(T* op) + { q_lock.Lock(); q.push(op); num_ops++; - q_sem.put(); + q_sem.Put(); q_lock.Unlock(); } diff --git a/ceph/osd/tp.cc b/ceph/osd/tp.cc index e817e26ed1cd2..7968878e65034 100644 --- a/ceph/osd/tp.cc +++ b/ceph/osd/tp.cc @@ -1,43 +1,48 @@ #include #include -#include using namespace std; #include "common/Mutex.h" #include "osd/ThreadPool.h" +// #include class Op { int i; public: - Op(int i) { + Op(int i) + { this->i = i; } - int get() { + int get() + { return i; } }; -void foo(Op *o) { +void foo(Op *o) +{ cout << "Thread "<< pthread_self() << ": " << o->get() << "\n"; usleep(1); + + // sched_yield(); } -int main(int argc, char *argv) { +int main(int argc, char *argv) +{ ThreadPool *t = new ThreadPool(10, foo); - sleep(1); - for(int i = 0; i < 100; i++) { Op *o = new Op(i); t->put_op(o); } sleep(1); + delete(t); return 0; -- 2.39.5