]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 22 Jun 2005 03:13:02 +0000 (03:13 +0000)
committersbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 22 Jun 2005 03:13:02 +0000 (03:13 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@329 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osd/ThreadPool.cc [deleted file]
ceph/osd/ThreadPool.h
ceph/osd/tp.cc [new file with mode: 0644]

diff --git a/ceph/osd/ThreadPool.cc b/ceph/osd/ThreadPool.cc
deleted file mode 100644 (file)
index 19523df..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-#include "include/types.h"
-
-#include "OSD.h"
-#include "FakeStore.h"
-#include "OSDCluster.h"
-
-#include "mds/MDS.h"
-
-#include "msg/Messenger.h"
-#include "msg/Message.h"
-
-#include "msg/HostMonitor.h"
-
-#include "messages/MGenericMessage.h"
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MOSDOp.h"
-#include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetClusterAck.h"
-
-#include "common/Logger.h"
-#include "common/LogType.h"
-#include "common/Mutex.h"
-
-#include "OSD/ThreadPool.h"
-
-#include <queue>
-
-#include <iostream>
-#include <cassert>
-#include <errno.h>
-#include <sys/stat.h>
-
-void main(int argc, char *argv) {
-  ThreadPool t(10);
-
-}
-
-ThreadPool::Threadpool(int howmany) {
-  num_ops = 0;
-  num_threads = 0;
-
-  int status;
-
-  num_threads = howmany;
-
-  for(int i = 0; i < howmany; i++) {
-    status = pthread_create(thread[i], NULL, do_ops, (void *)&i);
-  }
-}
-
-ThreadPool::~Threadpool() {
-  queue_lock.Lock();
-  for(int i = num_ops; i > 0; i--) 
-    get_op();
-
-  for(int i = 0; i < num_threads; i++) {
-    put_op((MOSDOp *)NULL);
-  }
-
-  for(int i = 0; i < num_threads; i++) {
-    cout << "Waiting for thread " << i << " to die";
-    pthread_join(threads[i]);
-  }
-
-  queue_lock.Unlock();
-}
-
-void do_ops(void *whoami) {
-  MOSDOp *op;
-
-  cout << "Thread " << (int)i << " ready for action\n";
-  while(1) {
-    op = get_op();
-
-    if(op == NULL) {
-      cout << "Thread " << (int)i << " dying";
-      pthread_exit(0);
-    }
-      
-    OSD.do_op(op);
-  }
-}
-
-MOSDOp *get_op() {
-  MOSDOp *op;
-  queue_lock.Lock();
-  op = op_queue.pop();
-  num_ops--;
-  queue_lock.Unlock();
-}
-
-void put_op(MOSDOp *op) {
-  queue_lock.Lock();
-  opqueue.push(op);
-  num_ops++;
-  queue_lock.Unlock();
-}
-
index 135c5deae9da01ce826401c4a44d804db0e70bba..99c1bc48d99917408738cb0ed2d349f8f19e8041 100644 (file)
+#ifndef THREADPOOL
+#define THREADPOOL
+
+#include <queue>
+#include<semaphore.h>
+#include <pthread.h>
+
+using namespace std;
 #define MAX_THREADS 1000
 
+class Semaphore {
+  sem_t sem;
+
+ public:
+
+  Semaphore(int i) {
+    sem_init(&sem, 0, i);
+  }
+
+  void get() {
+    sem_wait(&sem);
+  }
+
+  void put() {
+    sem_post(&sem);
+  }
+};
+
+template <class T>
 class ThreadPool {
-  queue<MOSDOp *> op_queue;
-  Mutex queue_lock;
+
+ private:
+  queue<T *> q;
+  Semaphore q_lock(1);
+  Semaphore q_sem(0);
   pthread_t thread[MAX_THREADS];
   int num_ops;
   int num_threads;
+  void (*func)(T*);
+
+  static void *foo(void *arg) {
+    ThreadPool *t = (ThreadPool *)arg;
+    t->do_ops(arg);
+  }
+
+  void * do_ops(void *nothing) {
+    T* op;
+
+    cout << "Thread ready for action\n";
+    while(1) {
+      q_sem.get();
+      op = get_op();
+
+      if(op == NULL) {
+       cout << "Thread exiting\n";
+       pthread_exit(0);
+      }
+      func(op);
+    }
+  }
+
+
+  T* get_op() {
+    T* op;
+
+    q_lock.get();
+    op = q.front();
+    q.pop();
+    num_ops--;
+    q_lock.put();
+
+    return op;
+  }
+
+ public:
+  ThreadPool(int howmany, void (*f)(T*)) {
+    num_ops = 0;
+    num_threads = 0;
+
+    int status;
+
+    func = f;
+
+    num_threads = howmany;
+
+    for(int i = 0; i < howmany; i++) {
+      status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
+    }
+  }
+
+  ~ThreadPool() {
+    q_lock.get();
+    for(int i = num_ops; i > 0; i--) 
+      get_op();
 
-  ThreadPool::Threadpool(int howmany);
+    for(int i = 0; i < num_threads; i++) {
+      put_op((T*)NULL);
+    }
+    q_lock.put();
 
-  ThreadPool::~Threadpool();
+    for(int i = 0; i < num_threads; i++) {
+      cout << "Waiting for thread " << i << " to die\n";
+      pthread_join(thread[i], NULL);
+    }
 
-  void put_op(MOSDOp *op);
+  }
 
-  void do_ops(void *whoami);
+  void put_op(T* op) {
+    q_lock.get();
+    q.push(op);
+    num_ops++;
+    q_sem.put();
+    q_lock.put();
+  }
 
-  MOSDOp *get_op();
-}
+};
+#endif
diff --git a/ceph/osd/tp.cc b/ceph/osd/tp.cc
new file mode 100644 (file)
index 0000000..c95a646
--- /dev/null
@@ -0,0 +1,46 @@
+
+#include <iostream>
+#include <string>
+#include <stdlib.h>
+
+using namespace std;
+
+#include "common/Mutex.h"
+#include "osd/ThreadPool.h"
+#include <thread.h>
+
+class Op {
+  int i;
+
+public:
+  Op(int i) {
+    this->i = i;
+  }
+
+  int get() {
+    return i;
+  }
+};
+
+void foo(Op *o) {
+  cout << "Thread "<< thr_self() << ": " << o->get() << "\n";
+  usleep(1);
+}
+
+int main(int argc, char *argv) {
+  ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
+
+  sleep(1);
+
+  for(int i = 0; i < 100; i++) {
+    Op *o = new Op(i); 
+    t->put_op(o);
+  }
+
+  sleep(1);
+  delete(t);
+
+  return 0;
+}
+