]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 22 Jun 2005 05:43:36 +0000 (05:43 +0000)
committersbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 22 Jun 2005 05:43:36 +0000 (05:43 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@333 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/common/Semaphore.h
ceph/common/ThreadPool.h [new file with mode: 0644]
ceph/osd/ThreadPool.h [deleted file]
ceph/osd/tp.cc

index f8821f7c686ae152566de84cb523d76442bd8c46..dc407183d13f7adf99275cebe036beb6212d9520 100644 (file)
@@ -16,7 +16,8 @@ class Semaphore
     count = 0;
   }
 
-  void Put()  { 
+  void Put()
+  { 
     m.Lock();
     count++;
     c.Signal();
@@ -27,7 +28,7 @@ class Semaphore
   { 
     m.Lock();
     while(count <= 0) {
-      C.Wait(m);
+      c.Wait(m);
     }
     count--;
     m.Unlock();
diff --git a/ceph/common/ThreadPool.h b/ceph/common/ThreadPool.h
new file mode 100644 (file)
index 0000000..a911539
--- /dev/null
@@ -0,0 +1,98 @@
+#ifndef THREADPOOL
+#define THREADPOOL
+
+#include <queue>
+#include <pthread.h>
+#include <common/Mutex.h>
+#include <common/Cond.h>
+#include<common/Semaphore.h>
+
+using namespace std;
+#define MAX_THREADS 1000
+
+template <class T>
+class ThreadPool {
+
+ private:
+  queue<T *> q;
+  Mutex q_lock;
+  Semaphore q_sem;
+  pthread_t *thread;
+  int num_ops;
+  int num_threads;
+  void (*func)(T*);
+
+  static void *foo(void *arg)
+  {
+    ThreadPool *t = (ThreadPool *)arg;
+    t->do_ops(arg);
+  }
+
+  void * do_ops(void *nothing)
+  {
+    T* op;
+
+    cout << "Thread ready for action\n";
+    while(1) {
+      q_sem.Get();
+      op = get_op();
+
+      if(op == NULL) {
+       cout << "Thread exiting\n";
+       pthread_exit(0);
+      }
+      func(op);
+    }
+  }
+
+
+  T* get_op()
+  {
+    T* op;
+
+    q_lock.Lock();
+    op = q.front();
+    q.pop();
+    num_ops--;
+    q_lock.Unlock();
+
+    return op;
+  }
+
+ public:
+
+  ThreadPool(int howmany, void (*f)(T*))
+  {
+    int status;
+
+    num_ops = 0;
+    func = f;
+    num_threads = howmany;
+    thread = new pthread_t[num_threads];
+
+    for(int i = 0; i < howmany; i++) {
+      status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
+    }
+  }
+
+  ~ThreadPool()
+  {
+    for(int i = 0; i < num_threads; i++) {
+      cout << "Killing thread " << i << "\n";
+      pthread_cancel(thread[i]);
+    }
+    delete thread;
+  }
+
+  void put_op(T* op)
+  {
+    q_lock.Lock();
+    q.push(op);
+    num_ops++;
+    q_sem.Put();
+    q_lock.Unlock();
+  }
+
+};
+#endif
diff --git a/ceph/osd/ThreadPool.h b/ceph/osd/ThreadPool.h
deleted file mode 100644 (file)
index 70e7610..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-#ifndef THREADPOOL
-#define THREADPOOL
-
-#include <queue>
-#include<semaphore.h>
-#include <pthread.h>
-#include <common/Mutex.h>
-
-using namespace std;
-#define MAX_THREADS 1000
-
-class Semaphore {
-  sem_t sem;
-
- public:
-
-  void init(int i) {
-    sem_init(&sem, 0, i);
-  }
-
-  void get() {
-    sem_wait(&sem);
-  }
-
-  void put() {
-    sem_post(&sem);
-  }
-};
-
-template <class T>
-class ThreadPool {
-
- private:
-  queue<T *> q;
-  Mutex q_lock;
-  Semaphore q_sem;
-  pthread_t thread[MAX_THREADS];
-  int num_ops;
-  int num_threads;
-  void (*func)(T*);
-
-  static void *foo(void *arg) {
-    ThreadPool *t = (ThreadPool *)arg;
-    t->do_ops(arg);
-  }
-
-  void * do_ops(void *nothing) {
-    T* op;
-
-    cout << "Thread ready for action\n";
-    while(1) {
-      q_sem.get();
-      op = get_op();
-
-      if(op == NULL) {
-       cout << "Thread exiting\n";
-       pthread_exit(0);
-      }
-      func(op);
-    }
-  }
-
-
-  T* get_op() {
-    T* op;
-
-    q_lock.Lock();
-    op = q.front();
-    q.pop();
-    num_ops--;
-    q_lock.Unlock();
-
-    return op;
-  }
-
- public:
-  ThreadPool(int howmany, void (*f)(T*)) {
-    num_ops = 0;
-    num_threads = 0;
-
-    int status;
-
-    func = f;
-
-    num_threads = howmany;
-
-    q_sem.init(0);
-
-    for(int i = 0; i < howmany; i++) {
-      status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
-    }
-  }
-
-  ~ThreadPool() {
-    q_lock.Lock();
-    for(int i = num_ops; i > 0; i--) 
-      get_op();
-
-    for(int i = 0; i < num_threads; i++) {
-      put_op((T*)NULL);
-    }
-    q_lock.Unlock();
-
-    for(int i = 0; i < num_threads; i++) {
-      cout << "Waiting for thread " << i << " to die\n";
-      pthread_join(thread[i], NULL);
-    }
-
-  }
-
-  void put_op(T* op) {
-    q_lock.Lock();
-    q.push(op);
-    num_ops++;
-    q_sem.put();
-    q_lock.Unlock();
-  }
-
-};
-#endif
index e817e26ed1cd2c269eba55f39257fa14eab9f397..7968878e650341f7d347518bbda7814e358409f0 100644 (file)
@@ -1,43 +1,48 @@
 
 #include <iostream>
 #include <string>
-#include <stdlib.h>
 
 using namespace std;
 
 #include "common/Mutex.h"
 #include "osd/ThreadPool.h"
+// #include <thread.h>
 
 class Op {
   int i;
 
 public:
  
-  Op(int i) {
+  Op(int i)
+  {
     this->i = i;
   }
 
-  int get() {
+  int get()
+  {
     return i;
   }
 };
 
-void foo(Op *o) {
+void foo(Op *o)
+{
   cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
   usleep(1);
+  
+  //  sched_yield();
 }
 
-int main(int argc, char *argv) {
+int main(int argc, char *argv)
+{
   ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
 
-  sleep(1);
-
   for(int i = 0; i < 100; i++) {
     Op *o = new Op(i); 
     t->put_op(o);
   }
 
   sleep(1);
+
   delete(t);
 
   return 0;