]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 25 Jun 2005 01:10:03 +0000 (01:10 +0000)
committersbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 25 Jun 2005 01:10:03 +0000 (01:10 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@335 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/common/ThreadPool.h
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/tp.cc

index a91153929f5d87b79bbe63a9f4410471fa718d81..f1c4820da7dbf2787eb16727bbc2c21d37a9a5ee 100644 (file)
@@ -11,7 +11,7 @@ using namespace std;
  
 #define MAX_THREADS 1000
 
-template <class T>
+template <class U, class T>
 class ThreadPool {
 
  private:
@@ -21,7 +21,8 @@ class ThreadPool {
   pthread_t *thread;
   int num_ops;
   int num_threads;
-  void (*func)(T*);
+  void (*func)(U*,T*);
+  U *u;
 
   static void *foo(void *arg)
   {
@@ -33,7 +34,7 @@ class ThreadPool {
   {
     T* op;
 
-    cout << "Thread ready for action\n";
+    cout << "Thread "<< pthread_self() << " ready for action\n";
     while(1) {
       q_sem.Get();
       op = get_op();
@@ -42,7 +43,8 @@ class ThreadPool {
        cout << "Thread exiting\n";
        pthread_exit(0);
       }
-      func(op);
+      cout << "Thread "<< pthread_self() << " calling the function\n";
+      func(u, op);
     }
   }
 
@@ -62,10 +64,11 @@ class ThreadPool {
 
  public:
 
-  ThreadPool(int howmany, void (*f)(T*))
+  ThreadPool(int howmany, void (*f)(U*,T*), U *obj)
   {
     int status;
 
+    u = obj;
     num_ops = 0;
     func = f;
     num_threads = howmany;
index 7617a5c45ea0a46b0683464c94d5bd986e6e3608..0f67bd31717143088b6ca4ad5b415db006bc348b 100644 (file)
@@ -22,6 +22,8 @@
 #include "common/Logger.h"
 #include "common/LogType.h"
 
+#include "common/ThreadPool.h"
+
 #include <iostream>
 #include <cassert>
 #include <errno.h>
@@ -40,6 +42,7 @@ char *osd_base_path = "./osddata";
 
 LogType osd_logtype;
 
+
 OSD::OSD(int id, Messenger *m) 
 {
   whoami = id;
@@ -70,13 +73,13 @@ OSD::OSD(int id, Messenger *m)
   
   monitor->get_notify().insert(MSG_ADDR_MDS(0));
 
-
-
   // log
   char name[80];
   sprintf(name, "osd%02d", whoami);
   logger = new Logger(name, (LogType*)&osd_logtype);
 
+  // Thread pool
+  threadpool = new ThreadPool<OSD, MOSDOp>(10, (void (*)(OSD*, MOSDOp*))doop, this);
 }
 
 OSD::~OSD()
@@ -241,31 +244,36 @@ void OSD::handle_op(MOSDOp *op)
 
   // am i the right rg_role?
   if (0) {
-  repgroup_t rg = op->get_rg();
-  if (op->get_rg_role() == 0) {
-       // PRIMARY
+    repgroup_t rg = op->get_rg();
+    if (op->get_rg_role() == 0) {
+      // PRIMARY
        
-       // verify that we are primary, or acting primary
-       int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
-       if (acting_primary != whoami) {
-         dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
-         messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
-         logger->inc("fwd");
-         return;
-       }
-  } else {
-       // REPLICA
-       int my_role = osdcluster->get_rg_role(rg, whoami);
-       
-       dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
-
-       if (my_role != op->get_rg_role()) {
-         assert(0); 
-       }
-  }
+      // verify that we are primary, or acting primary
+      int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+      if (acting_primary != whoami) {
+       dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+       messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+       logger->inc("fwd");
+       return;
+      }
+    } else {
+      // REPLICA
+      int my_role = osdcluster->get_rg_role(rg, whoami);
+      
+      dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
+      
+      if (my_role != op->get_rg_role()) {
+       assert(0); 
+      }
+    }
   }
 
-  do_op(op);
+  queue_op(op);
+  // do_op(op);
+}
+
+void OSD::queue_op(MOSDOp *op) {
+  threadpool->put_op(op);
 }
   
 void OSD::do_op(MOSDOp *op) 
@@ -437,3 +445,7 @@ void OSD::op_stat(MOSDOp *op)
   logger->inc("stat");
   delete op;
 }
+
+void doop(OSD *u, MOSDOp *p) {
+  u->do_op(p);
+}
index 23c96a84ca11710a64e803e879f84428c2c88c5c..53fc1783565354a07c117da0a56e5a0782294531 100644 (file)
@@ -5,6 +5,7 @@
 #include "msg/Dispatcher.h"
 
 #include "common/Mutex.h"
+#include "common/ThreadPool.h"
 
 #include <map>
 using namespace std;
@@ -44,7 +45,7 @@ class OSD : public Dispatcher {
   class ObjectStore *store;
   class HostMonitor *monitor;
   class Logger      *logger;
-  class ThreadPool  *threadpool;
+  class ThreadPool<class OSD, class MOSDOp>  *threadpool;
 
   list<class MOSDOp*> waiting_for_osdcluster;
 
@@ -61,7 +62,12 @@ class OSD : public Dispatcher {
   // OSDCluster
   void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
 
+  void queue_op(class MOSDOp *m);
   void do_op(class MOSDOp *m);
+  static void doop(OSD *o, MOSDOp *op) {
+      o->do_op(op);
+    };
+
 
   // messages
   virtual void dispatch(Message *m);
index 7968878e650341f7d347518bbda7814e358409f0..209e5cdd9544d46f0a3af026ba0de2a8ec8870a4 100644 (file)
@@ -5,7 +5,7 @@
 using namespace std;
 
 #include "common/Mutex.h"
-#include "osd/ThreadPool.h"
+#include "common/ThreadPool.h"
 // #include <thread.h>
 
 class Op {
@@ -24,27 +24,43 @@ public:
   }
 };
 
-void foo(Op *o)
-{
-  cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
-  usleep(1);
-  
-  //  sched_yield();
-}
+void foop(class TP *t, class Op *o);
+
+class TP {
+public:
 
-int main(int argc, char *argv)
-{
-  ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
+  void foo(Op *o)
+  {
+    cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
+    usleep(1);
+    
+    //  sched_yield();
+  }
 
-  for(int i = 0; i < 100; i++) {
-    Op *o = new Op(i); 
-    t->put_op(o);
+  int main(int argc, char *argv)
+  {
+    ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
+    
+    for(int i = 0; i < 100; i++) {
+      Op *o = new Op(i); 
+      t->put_op(o);
+    }
+    
+    sleep(1);
+    
+    delete(t);
+    
+    return 0;
   }
+};
 
-  sleep(1);
+void foop(class TP *t, class Op *o) {
+  t->foo(o);
+}
 
-  delete(t);
+int main(int argc, char *argv) {
+  TP t;
 
-  return 0;
+  t.main(argc,argv);
 }