sprintf(name, "osd%02d", whoami);
logger = new Logger(name, (LogType*)&osd_logtype);
+ threadpool->init(10);
}
OSD::~OSD()
void OSD::dispatch(Message *m)
{
- osd_lock.Lock();
-
switch (m->get_type()) {
// host monitor
case MSG_PING_ACK:
default:
dout(1) << " got unknown message " << m->get_type() << endl;
}
-
- osd_lock.Unlock();
}
void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
{
+ // SAB
+ osd_lock.Lock();
+
if (!osdcluster) osdcluster = new OSDCluster();
osdcluster->decode(m->get_osdcluster());
dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
handle_op(*it);
}
+ // SAB
+ osd_lock.Unlock();
}
void OSD::handle_op(MOSDOp *op)
{
// starting up?
+
if (!osdcluster) {
+ // SAB
+ osd_lock.Lock();
+
dout(7) << "no OSDCluster, starting up" << endl;
if (waiting_for_osdcluster.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
waiting_for_osdcluster.push_back(op);
+
+ // SAB
+ osd_lock.Unlock();
+
return;
}
+
// check cluster version
if (op->get_ocv() > osdcluster->get_version()) {
messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
assert(0);
+
+ // SAB
+ osd_lock.Lock();
+
waiting_for_osdcluster.push_back(op);
+
+ // SAB
+ osd_lock.Unlock();
+
return;
}
}
}
}
-
+ do_op(op);
+}
+void OSD::do_op(MOSDOp *op)
+{
// do the op
switch (op->get_op()) {
case OSD_OP_READ:
- op_read(op);
- break;
+ op_read(op);
+ break;
case OSD_OP_WRITE:
- op_write(op);
- break;
+ op_write(op);
+ break;
case OSD_OP_MKFS:
- dout(3) << "MKFS" << endl;
- {
- int r = store->mkfs();
- messenger->send_message(new MOSDOpReply(op, r, osdcluster),
- op->get_asker());
- }
- delete op;
- break;
+ op_mkfs(op);
+ break;
case OSD_OP_DELETE:
- {
- int r = store->remove(op->get_oid());
- dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
-
- // "ack"
- messenger->send_message(new MOSDOpReply(op, r, osdcluster),
- op->get_asker());
-
- logger->inc("rm");
- }
- delete op;
- break;
+ op_delete(op);
+ break;
case OSD_OP_TRUNCATE:
- {
- int r = store->truncate(op->get_oid(), op->get_offset());
- dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
-
- // "ack"
- messenger->send_message(new MOSDOpReply(op, r, osdcluster),
- op->get_asker());
- logger->inc("trunc");
- }
- delete op;
- break;
+ op_truncate(op);
+ break;
case OSD_OP_STAT:
- {
- struct stat st;
- memset(&st, sizeof(st), 0);
- int r = store->stat(op->get_oid(), &st);
-
- dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
-
- MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
- reply->set_object_size(st.st_size);
- messenger->send_message(reply, op->get_asker());
-
- logger->inc("stat");
- }
- delete op;
- break;
+ op_stat(op);
+ break;
default:
- assert(0);
+ assert(0);
}
}
-
-
void OSD::op_read(MOSDOp *r)
{
// read into a buffer
delete m;
}
+void OSD::op_mkfs(MOSDOp *op)
+{
+ dout(3) << "MKFS" << endl;
+ {
+ int r = store->mkfs();
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+ }
+ delete op;
+}
+
+void OSD::op_delete(MOSDOp *op)
+{
+ int r = store->remove(op->get_oid());
+ dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
+
+ // "ack"
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+
+ logger->inc("rm");
+ delete op;
+}
+
+void OSD::op_truncate(MOSDOp *op)
+{
+ int r = store->truncate(op->get_oid(), op->get_offset());
+ dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
+
+ // "ack"
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+
+ logger->inc("trunc");
+
+ delete op;
+}
+
+void OSD::op_stat(MOSDOp *op)
+{
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ int r = store->stat(op->get_oid(), &st);
+
+ dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
+
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+ reply->set_object_size(st.st_size);
+ messenger->send_message(reply, op->get_asker());
+
+ logger->inc("stat");
+ delete op;
+}
#define RG_DIRTY_REPLICA_MEM 4
#define RG_DIRTY_REPLICA_SYNC 8
-
class ReplicaGroup {
public:
repgroup_t rg;
class ObjectStore *store;
class HostMonitor *monitor;
class Logger *logger;
+ class ThreadPool *threadpool;
list<class MOSDOp*> waiting_for_osdcluster;
// OSDCluster
void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
+ void do_op(class MOSDOp *m);
+
// messages
virtual void dispatch(Message *m);
void handle_op(class MOSDOp *m);
void op_read(class MOSDOp *m);
void op_write(class MOSDOp *m);
+ void op_mkfs(class MOSDOp *m);
+ void op_delete(class MOSDOp *m);
+ void op_truncate(class MOSDOp *m);
+ void op_stat(class MOSDOp *m);
};
#endif
--- /dev/null
+#include "include/types.h"
+
+#include "OSD.h"
+#include "FakeStore.h"
+#include "OSDCluster.h"
+
+#include "mds/MDS.h"
+
+#include "msg/Messenger.h"
+#include "msg/Message.h"
+
+#include "msg/HostMonitor.h"
+
+#include "messages/MGenericMessage.h"
+#include "messages/MPing.h"
+#include "messages/MPingAck.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDGetClusterAck.h"
+
+#include "common/Logger.h"
+#include "common/LogType.h"
+#include "common/Mutex.h"
+
+#include "OSD/ThreadPool.h"
+
+#include <queue>
+
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+#include <sys/stat.h>
+
+void main(int argc, char *argv) {
+ ThreadPool t(10);
+
+}
+
+ThreadPool::Threadpool(int howmany) {
+ num_ops = 0;
+ num_threads = 0;
+
+ int status;
+
+ num_threads = howmany;
+
+ for(int i = 0; i < howmany; i++) {
+ status = pthread_create(thread[i], NULL, do_ops, (void *)&i);
+ }
+}
+
+ThreadPool::~Threadpool() {
+ queue_lock.Lock();
+ for(int i = num_ops; i > 0; i--)
+ get_op();
+
+ for(int i = 0; i < num_threads; i++) {
+ put_op((MOSDOp *)NULL);
+ }
+
+ for(int i = 0; i < num_threads; i++) {
+ cout << "Waiting for thread " << i << " to die";
+ pthread_join(threads[i]);
+ }
+
+ queue_lock.Unlock();
+}
+
+void do_ops(void *whoami) {
+ MOSDOp *op;
+
+ cout << "Thread " << (int)i << " ready for action\n";
+ while(1) {
+ op = get_op();
+
+ if(op == NULL) {
+ cout << "Thread " << (int)i << " dying";
+ pthread_exit(0);
+ }
+
+ OSD.do_op(op);
+ }
+}
+
+MOSDOp *get_op() {
+ MOSDOp *op;
+ queue_lock.Lock();
+ op = op_queue.pop();
+ num_ops--;
+ queue_lock.Unlock();
+}
+
+void put_op(MOSDOp *op) {
+ queue_lock.Lock();
+ opqueue.push(op);
+ num_ops++;
+ queue_lock.Unlock();
+}
+