From f5ae8145fa0fbe79c9fc196d50bb34f89fbde838 Mon Sep 17 00:00:00 2001 From: sbrandt Date: Sat, 25 Jun 2005 01:10:03 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@335 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/common/ThreadPool.h | 13 +++++---- ceph/osd/OSD.cc | 60 ++++++++++++++++++++++++---------------- ceph/osd/OSD.h | 8 +++++- ceph/osd/tp.cc | 50 +++++++++++++++++++++------------ 4 files changed, 84 insertions(+), 47 deletions(-) diff --git a/ceph/common/ThreadPool.h b/ceph/common/ThreadPool.h index a91153929f5d8..f1c4820da7dbf 100644 --- a/ceph/common/ThreadPool.h +++ b/ceph/common/ThreadPool.h @@ -11,7 +11,7 @@ using namespace std; #define MAX_THREADS 1000 -template +template class ThreadPool { private: @@ -21,7 +21,8 @@ class ThreadPool { pthread_t *thread; int num_ops; int num_threads; - void (*func)(T*); + void (*func)(U*,T*); + U *u; static void *foo(void *arg) { @@ -33,7 +34,7 @@ class ThreadPool { { T* op; - cout << "Thread ready for action\n"; + cout << "Thread "<< pthread_self() << " ready for action\n"; while(1) { q_sem.Get(); op = get_op(); @@ -42,7 +43,8 @@ class ThreadPool { cout << "Thread exiting\n"; pthread_exit(0); } - func(op); + cout << "Thread "<< pthread_self() << " calling the function\n"; + func(u, op); } } @@ -62,10 +64,11 @@ class ThreadPool { public: - ThreadPool(int howmany, void (*f)(T*)) + ThreadPool(int howmany, void (*f)(U*,T*), U *obj) { int status; + u = obj; num_ops = 0; func = f; num_threads = howmany; diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 7617a5c45ea0a..0f67bd3171714 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -22,6 +22,8 @@ #include "common/Logger.h" #include "common/LogType.h" +#include "common/ThreadPool.h" + #include #include #include @@ -40,6 +42,7 @@ char *osd_base_path = "./osddata"; LogType osd_logtype; + OSD::OSD(int id, Messenger *m) { whoami = id; @@ -70,13 +73,13 @@ OSD::OSD(int id, Messenger *m) monitor->get_notify().insert(MSG_ADDR_MDS(0)); - - // log char name[80]; sprintf(name, "osd%02d", whoami); logger = new Logger(name, (LogType*)&osd_logtype); + // Thread pool + threadpool = new ThreadPool(10, (void (*)(OSD*, MOSDOp*))doop, this); } OSD::~OSD() @@ -241,31 +244,36 @@ void OSD::handle_op(MOSDOp *op) // am i the right rg_role? if (0) { - repgroup_t rg = op->get_rg(); - if (op->get_rg_role() == 0) { - // PRIMARY + repgroup_t rg = op->get_rg(); + if (op->get_rg_role() == 0) { + // PRIMARY - // verify that we are primary, or acting primary - int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() ); - if (acting_primary != whoami) { - dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl; - messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0); - logger->inc("fwd"); - return; - } - } else { - // REPLICA - int my_role = osdcluster->get_rg_role(rg, whoami); - - dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl; - - if (my_role != op->get_rg_role()) { - assert(0); - } - } + // verify that we are primary, or acting primary + int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() ); + if (acting_primary != whoami) { + dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl; + messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0); + logger->inc("fwd"); + return; + } + } else { + // REPLICA + int my_role = osdcluster->get_rg_role(rg, whoami); + + dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl; + + if (my_role != op->get_rg_role()) { + assert(0); + } + } } - do_op(op); + queue_op(op); + // do_op(op); +} + +void OSD::queue_op(MOSDOp *op) { + threadpool->put_op(op); } void OSD::do_op(MOSDOp *op) @@ -437,3 +445,7 @@ void OSD::op_stat(MOSDOp *op) logger->inc("stat"); delete op; } + +void doop(OSD *u, MOSDOp *p) { + u->do_op(p); +} diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 23c96a84ca117..53fc178356535 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -5,6 +5,7 @@ #include "msg/Dispatcher.h" #include "common/Mutex.h" +#include "common/ThreadPool.h" #include using namespace std; @@ -44,7 +45,7 @@ class OSD : public Dispatcher { class ObjectStore *store; class HostMonitor *monitor; class Logger *logger; - class ThreadPool *threadpool; + class ThreadPool *threadpool; list waiting_for_osdcluster; @@ -61,7 +62,12 @@ class OSD : public Dispatcher { // OSDCluster void update_osd_cluster(__uint64_t ocv, bufferlist& blist); + void queue_op(class MOSDOp *m); void do_op(class MOSDOp *m); + static void doop(OSD *o, MOSDOp *op) { + o->do_op(op); + }; + // messages virtual void dispatch(Message *m); diff --git a/ceph/osd/tp.cc b/ceph/osd/tp.cc index 7968878e65034..209e5cdd9544d 100644 --- a/ceph/osd/tp.cc +++ b/ceph/osd/tp.cc @@ -5,7 +5,7 @@ using namespace std; #include "common/Mutex.h" -#include "osd/ThreadPool.h" +#include "common/ThreadPool.h" // #include class Op { @@ -24,27 +24,43 @@ public: } }; -void foo(Op *o) -{ - cout << "Thread "<< pthread_self() << ": " << o->get() << "\n"; - usleep(1); - - // sched_yield(); -} +void foop(class TP *t, class Op *o); + +class TP { +public: -int main(int argc, char *argv) -{ - ThreadPool *t = new ThreadPool(10, foo); + void foo(Op *o) + { + cout << "Thread "<< pthread_self() << ": " << o->get() << "\n"; + usleep(1); + + // sched_yield(); + } - for(int i = 0; i < 100; i++) { - Op *o = new Op(i); - t->put_op(o); + int main(int argc, char *argv) + { + ThreadPool *t = new ThreadPool(10, (void (*)(TP*, Op*))foop, this); + + for(int i = 0; i < 100; i++) { + Op *o = new Op(i); + t->put_op(o); + } + + sleep(1); + + delete(t); + + return 0; } +}; - sleep(1); +void foop(class TP *t, class Op *o) { + t->foo(o); +} - delete(t); +int main(int argc, char *argv) { + TP t; - return 0; + t.main(argc,argv); } -- 2.39.5