#define MAX_THREADS 1000
-template <class T>
+template <class U, class T>
class ThreadPool {
private:
pthread_t *thread;
int num_ops;
int num_threads;
- void (*func)(T*);
+ void (*func)(U*,T*);
+ U *u;
static void *foo(void *arg)
{
{
T* op;
- cout << "Thread ready for action\n";
+ cout << "Thread "<< pthread_self() << " ready for action\n";
while(1) {
q_sem.Get();
op = get_op();
cout << "Thread exiting\n";
pthread_exit(0);
}
- func(op);
+ cout << "Thread "<< pthread_self() << " calling the function\n";
+ func(u, op);
}
}
public:
- ThreadPool(int howmany, void (*f)(T*))
+ ThreadPool(int howmany, void (*f)(U*,T*), U *obj)
{
int status;
+ u = obj;
num_ops = 0;
func = f;
num_threads = howmany;
#include "common/Logger.h"
#include "common/LogType.h"
+#include "common/ThreadPool.h"
+
#include <iostream>
#include <cassert>
#include <errno.h>
LogType osd_logtype;
+
OSD::OSD(int id, Messenger *m)
{
whoami = id;
monitor->get_notify().insert(MSG_ADDR_MDS(0));
-
-
// log
char name[80];
sprintf(name, "osd%02d", whoami);
logger = new Logger(name, (LogType*)&osd_logtype);
+ // Thread pool
+ threadpool = new ThreadPool<OSD, MOSDOp>(10, (void (*)(OSD*, MOSDOp*))doop, this);
}
OSD::~OSD()
// am i the right rg_role?
if (0) {
- repgroup_t rg = op->get_rg();
- if (op->get_rg_role() == 0) {
- // PRIMARY
+ repgroup_t rg = op->get_rg();
+ if (op->get_rg_role() == 0) {
+ // PRIMARY
- // verify that we are primary, or acting primary
- int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
- if (acting_primary != whoami) {
- dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
- messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
- logger->inc("fwd");
- return;
- }
- } else {
- // REPLICA
- int my_role = osdcluster->get_rg_role(rg, whoami);
-
- dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
-
- if (my_role != op->get_rg_role()) {
- assert(0);
- }
- }
+ // verify that we are primary, or acting primary
+ int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+ if (acting_primary != whoami) {
+ dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+ messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+ logger->inc("fwd");
+ return;
+ }
+ } else {
+ // REPLICA
+ int my_role = osdcluster->get_rg_role(rg, whoami);
+
+ dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
+
+ if (my_role != op->get_rg_role()) {
+ assert(0);
+ }
+ }
}
- do_op(op);
+ queue_op(op);
+ // do_op(op);
+}
+
+void OSD::queue_op(MOSDOp *op) {
+ threadpool->put_op(op);
}
void OSD::do_op(MOSDOp *op)
logger->inc("stat");
delete op;
}
+
+void doop(OSD *u, MOSDOp *p) {
+ u->do_op(p);
+}
#include "msg/Dispatcher.h"
#include "common/Mutex.h"
+#include "common/ThreadPool.h"
#include <map>
using namespace std;
class ObjectStore *store;
class HostMonitor *monitor;
class Logger *logger;
- class ThreadPool *threadpool;
+ class ThreadPool<class OSD, class MOSDOp> *threadpool;
list<class MOSDOp*> waiting_for_osdcluster;
// OSDCluster
void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
+ void queue_op(class MOSDOp *m);
void do_op(class MOSDOp *m);
+ static void doop(OSD *o, MOSDOp *op) {
+ o->do_op(op);
+ };
+
// messages
virtual void dispatch(Message *m);
using namespace std;
#include "common/Mutex.h"
-#include "osd/ThreadPool.h"
+#include "common/ThreadPool.h"
// #include <thread.h>
class Op {
}
};
-void foo(Op *o)
-{
- cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
- usleep(1);
-
- // sched_yield();
-}
+void foop(class TP *t, class Op *o);
+
+class TP {
+public:
-int main(int argc, char *argv)
-{
- ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
+ void foo(Op *o)
+ {
+ cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
+ usleep(1);
+
+ // sched_yield();
+ }
- for(int i = 0; i < 100; i++) {
- Op *o = new Op(i);
- t->put_op(o);
+ int main(int argc, char *argv)
+ {
+ ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
+
+ for(int i = 0; i < 100; i++) {
+ Op *o = new Op(i);
+ t->put_op(o);
+ }
+
+ sleep(1);
+
+ delete(t);
+
+ return 0;
}
+};
- sleep(1);
+void foop(class TP *t, class Op *o) {
+ t->foo(o);
+}
- delete(t);
+int main(int argc, char *argv) {
+ TP t;
- return 0;
+ t.main(argc,argv);
}