From 4434bcad244ac3dec368ff5c1b4b5ad195c281ce Mon Sep 17 00:00:00 2001 From: sbrandt Date: Tue, 21 Jun 2005 04:20:51 +0000 Subject: [PATCH] First version. Untested. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@328 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/osd/OSD.cc | 148 +++++++++++++++++++++++++---------------- ceph/osd/OSD.h | 8 ++- ceph/osd/OSDMap.h | 12 +++- ceph/osd/ThreadPool.cc | 99 +++++++++++++++++++++++++++ ceph/osd/ThreadPool.h | 19 ++++++ 5 files changed, 226 insertions(+), 60 deletions(-) create mode 100644 ceph/osd/ThreadPool.cc create mode 100644 ceph/osd/ThreadPool.h diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 557e9997aa072..f529923530842 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -77,6 +77,7 @@ OSD::OSD(int id, Messenger *m) sprintf(name, "osd%02d", whoami); logger = new Logger(name, (LogType*)&osd_logtype); + threadpool->init(10); } OSD::~OSD() @@ -114,8 +115,6 @@ int OSD::shutdown() void OSD::dispatch(Message *m) { - osd_lock.Lock(); - switch (m->get_type()) { // host monitor case MSG_PING_ACK: @@ -148,8 +147,6 @@ void OSD::dispatch(Message *m) default: dout(1) << " got unknown message " << m->get_type() << endl; } - - osd_lock.Unlock(); } @@ -171,6 +168,9 @@ void OSD::handle_ping(MPing *m) void OSD::handle_getcluster_ack(MOSDGetClusterAck *m) { + // SAB + osd_lock.Lock(); + if (!osdcluster) osdcluster = new OSDCluster(); osdcluster->decode(m->get_osdcluster()); dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl; @@ -186,19 +186,30 @@ void OSD::handle_getcluster_ack(MOSDGetClusterAck *m) handle_op(*it); } + // SAB + osd_lock.Unlock(); } void OSD::handle_op(MOSDOp *op) { // starting up? + if (!osdcluster) { + // SAB + osd_lock.Lock(); + dout(7) << "no OSDCluster, starting up" << endl; if (waiting_for_osdcluster.empty()) messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), MSG_ADDR_MDS(0), MDS_PORT_MAIN); waiting_for_osdcluster.push_back(op); + + // SAB + osd_lock.Unlock(); + return; } + // check cluster version if (op->get_ocv() > osdcluster->get_version()) { @@ -210,7 +221,15 @@ void OSD::handle_op(MOSDOp *op) messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), MSG_ADDR_MDS(0), MDS_PORT_MAIN); assert(0); + + // SAB + osd_lock.Lock(); + waiting_for_osdcluster.push_back(op); + + // SAB + osd_lock.Unlock(); + return; } @@ -246,82 +265,45 @@ void OSD::handle_op(MOSDOp *op) } } } - + do_op(op); +} +void OSD::do_op(MOSDOp *op) +{ // do the op switch (op->get_op()) { case OSD_OP_READ: - op_read(op); - break; + op_read(op); + break; case OSD_OP_WRITE: - op_write(op); - break; + op_write(op); + break; case OSD_OP_MKFS: - dout(3) << "MKFS" << endl; - { - int r = store->mkfs(); - messenger->send_message(new MOSDOpReply(op, r, osdcluster), - op->get_asker()); - } - delete op; - break; + op_mkfs(op); + break; case OSD_OP_DELETE: - { - int r = store->remove(op->get_oid()); - dout(3) << "delete on " << op->get_oid() << " r = " << r << endl; - - // "ack" - messenger->send_message(new MOSDOpReply(op, r, osdcluster), - op->get_asker()); - - logger->inc("rm"); - } - delete op; - break; + op_delete(op); + break; case OSD_OP_TRUNCATE: - { - int r = store->truncate(op->get_oid(), op->get_offset()); - dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl; - - // "ack" - messenger->send_message(new MOSDOpReply(op, r, osdcluster), - op->get_asker()); - logger->inc("trunc"); - } - delete op; - break; + op_truncate(op); + break; case OSD_OP_STAT: - { - struct stat st; - memset(&st, sizeof(st), 0); - int r = store->stat(op->get_oid(), &st); - - dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl; - - MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster); - reply->set_object_size(st.st_size); - messenger->send_message(reply, op->get_asker()); - - logger->inc("stat"); - } - delete op; - break; + op_stat(op); + break; default: - assert(0); + assert(0); } } - - void OSD::op_read(MOSDOp *r) { // read into a buffer @@ -406,3 +388,53 @@ void OSD::op_write(MOSDOp *m) delete m; } +void OSD::op_mkfs(MOSDOp *op) +{ + dout(3) << "MKFS" << endl; + { + int r = store->mkfs(); + messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + } + delete op; +} + +void OSD::op_delete(MOSDOp *op) +{ + int r = store->remove(op->get_oid()); + dout(3) << "delete on " << op->get_oid() << " r = " << r << endl; + + // "ack" + messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + + logger->inc("rm"); + delete op; +} + +void OSD::op_truncate(MOSDOp *op) +{ + int r = store->truncate(op->get_oid(), op->get_offset()); + dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl; + + // "ack" + messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + + logger->inc("trunc"); + + delete op; +} + +void OSD::op_stat(MOSDOp *op) +{ + struct stat st; + memset(&st, sizeof(st), 0); + int r = store->stat(op->get_oid(), &st); + + dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl; + + MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster); + reply->set_object_size(st.st_size); + messenger->send_message(reply, op->get_asker()); + + logger->inc("stat"); + delete op; +} diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 35fbc52e1b042..23c96a84ca117 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -20,7 +20,6 @@ class Message; #define RG_DIRTY_REPLICA_MEM 4 #define RG_DIRTY_REPLICA_SYNC 8 - class ReplicaGroup { public: repgroup_t rg; @@ -45,6 +44,7 @@ class OSD : public Dispatcher { class ObjectStore *store; class HostMonitor *monitor; class Logger *logger; + class ThreadPool *threadpool; list waiting_for_osdcluster; @@ -61,6 +61,8 @@ class OSD : public Dispatcher { // OSDCluster void update_osd_cluster(__uint64_t ocv, bufferlist& blist); + void do_op(class MOSDOp *m); + // messages virtual void dispatch(Message *m); @@ -69,6 +71,10 @@ class OSD : public Dispatcher { void handle_op(class MOSDOp *m); void op_read(class MOSDOp *m); void op_write(class MOSDOp *m); + void op_mkfs(class MOSDOp *m); + void op_delete(class MOSDOp *m); + void op_truncate(class MOSDOp *m); + void op_stat(class MOSDOp *m); }; #endif diff --git a/ceph/osd/OSDMap.h b/ceph/osd/OSDMap.h index 678963b6d8009..bf9912ceb2909 100644 --- a/ceph/osd/OSDMap.h +++ b/ceph/osd/OSDMap.h @@ -71,14 +71,24 @@ class OSDCluster { Rush *rush; // rush implementation + Mutex osd_cluster_lock; + void init_rush() { + + // SAB + osd_cluster_lock.Lock(); + if (rush) delete rush; rush = new Rush(); int ngroups = osd_groups.size(); - for (int i=0; iAddCluster(osd_groups[i].num_osds, osd_groups[i].weight); + } + + // SAB + osd_cluster_lock.Unlock(); } diff --git a/ceph/osd/ThreadPool.cc b/ceph/osd/ThreadPool.cc new file mode 100644 index 0000000000000..19523df535bcb --- /dev/null +++ b/ceph/osd/ThreadPool.cc @@ -0,0 +1,99 @@ +#include "include/types.h" + +#include "OSD.h" +#include "FakeStore.h" +#include "OSDCluster.h" + +#include "mds/MDS.h" + +#include "msg/Messenger.h" +#include "msg/Message.h" + +#include "msg/HostMonitor.h" + +#include "messages/MGenericMessage.h" +#include "messages/MPing.h" +#include "messages/MPingAck.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "messages/MOSDGetClusterAck.h" + +#include "common/Logger.h" +#include "common/LogType.h" +#include "common/Mutex.h" + +#include "OSD/ThreadPool.h" + +#include + +#include +#include +#include +#include + +void main(int argc, char *argv) { + ThreadPool t(10); + +} + +ThreadPool::Threadpool(int howmany) { + num_ops = 0; + num_threads = 0; + + int status; + + num_threads = howmany; + + for(int i = 0; i < howmany; i++) { + status = pthread_create(thread[i], NULL, do_ops, (void *)&i); + } +} + +ThreadPool::~Threadpool() { + queue_lock.Lock(); + for(int i = num_ops; i > 0; i--) + get_op(); + + for(int i = 0; i < num_threads; i++) { + put_op((MOSDOp *)NULL); + } + + for(int i = 0; i < num_threads; i++) { + cout << "Waiting for thread " << i << " to die"; + pthread_join(threads[i]); + } + + queue_lock.Unlock(); +} + +void do_ops(void *whoami) { + MOSDOp *op; + + cout << "Thread " << (int)i << " ready for action\n"; + while(1) { + op = get_op(); + + if(op == NULL) { + cout << "Thread " << (int)i << " dying"; + pthread_exit(0); + } + + OSD.do_op(op); + } +} + +MOSDOp *get_op() { + MOSDOp *op; + queue_lock.Lock(); + op = op_queue.pop(); + num_ops--; + queue_lock.Unlock(); +} + +void put_op(MOSDOp *op) { + queue_lock.Lock(); + opqueue.push(op); + num_ops++; + queue_lock.Unlock(); +} + diff --git a/ceph/osd/ThreadPool.h b/ceph/osd/ThreadPool.h new file mode 100644 index 0000000000000..135c5deae9da0 --- /dev/null +++ b/ceph/osd/ThreadPool.h @@ -0,0 +1,19 @@ +#define MAX_THREADS 1000 + +class ThreadPool { + queue op_queue; + Mutex queue_lock; + pthread_t thread[MAX_THREADS]; + int num_ops; + int num_threads; + + ThreadPool::Threadpool(int howmany); + + ThreadPool::~Threadpool(); + + void put_op(MOSDOp *op); + + void do_ops(void *whoami); + + MOSDOp *get_op(); +} -- 2.39.5