]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
First version. Untested.
authorsbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 21 Jun 2005 04:20:51 +0000 (04:20 +0000)
committersbrandt <sbrandt@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 21 Jun 2005 04:20:51 +0000 (04:20 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@328 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.h
ceph/osd/ThreadPool.cc [new file with mode: 0644]
ceph/osd/ThreadPool.h [new file with mode: 0644]

index 557e9997aa07209edd3797d47a6ceab2f408ea51..f52992353084274a3d9d48b372298517a7f639d0 100644 (file)
@@ -77,6 +77,7 @@ OSD::OSD(int id, Messenger *m)
   sprintf(name, "osd%02d", whoami);
   logger = new Logger(name, (LogType*)&osd_logtype);
 
+  threadpool->init(10);
 }
 
 OSD::~OSD()
@@ -114,8 +115,6 @@ int OSD::shutdown()
 
 void OSD::dispatch(Message *m) 
 {
-  osd_lock.Lock();
-
   switch (m->get_type()) {
        // host monitor
   case MSG_PING_ACK:
@@ -148,8 +147,6 @@ void OSD::dispatch(Message *m)
   default:
        dout(1) << " got unknown message " << m->get_type() << endl;
   }
-
-  osd_lock.Unlock();
 }
 
 
@@ -171,6 +168,9 @@ void OSD::handle_ping(MPing *m)
 
 void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
 {
+  // SAB
+  osd_lock.Lock();
+
   if (!osdcluster) osdcluster = new OSDCluster();
   osdcluster->decode(m->get_osdcluster());
   dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
@@ -186,19 +186,30 @@ void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
        handle_op(*it);
   }
 
+  // SAB
+  osd_lock.Unlock();
 }
 
 void OSD::handle_op(MOSDOp *op)
 {
   // starting up?
+
   if (!osdcluster) {
+    // SAB
+    osd_lock.Lock();
+
        dout(7) << "no OSDCluster, starting up" << endl;
        if (waiting_for_osdcluster.empty()) 
          messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), 
                                                          MSG_ADDR_MDS(0), MDS_PORT_MAIN);
        waiting_for_osdcluster.push_back(op);
+
+       // SAB
+       osd_lock.Unlock();
+
        return;
   }
+  
 
   // check cluster version
   if (op->get_ocv() > osdcluster->get_version()) {
@@ -210,7 +221,15 @@ void OSD::handle_op(MOSDOp *op)
        messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), 
                                                        MSG_ADDR_MDS(0), MDS_PORT_MAIN);
        assert(0);
+
+       // SAB
+       osd_lock.Lock();
+
        waiting_for_osdcluster.push_back(op);
+
+       // SAB
+       osd_lock.Unlock();
+
        return;
   }
 
@@ -246,82 +265,45 @@ void OSD::handle_op(MOSDOp *op)
        }
   }
   }
-  
 
+  do_op(op);
+}
   
+void OSD::do_op(MOSDOp *op) 
+{
   // do the op
   switch (op->get_op()) {
 
   case OSD_OP_READ:
-       op_read(op);
-       break;
+    op_read(op);
+    break;
 
   case OSD_OP_WRITE:
-       op_write(op);
-       break;
+    op_write(op);
+    break;
 
   case OSD_OP_MKFS:
-       dout(3) << "MKFS" << endl;
-       {
-         int r = store->mkfs();        
-         messenger->send_message(new MOSDOpReply(op, r, osdcluster), 
-                                                         op->get_asker());
-       }
-       delete op;
-       break;
+    op_mkfs(op);
+    break;
 
   case OSD_OP_DELETE:
-       {
-         int r = store->remove(op->get_oid());
-         dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
-         
-         // "ack"
-         messenger->send_message(new MOSDOpReply(op, r, osdcluster), 
-                                                         op->get_asker());
-
-         logger->inc("rm");
-       }
-       delete op;
-       break;
+    op_delete(op);
+    break;
 
   case OSD_OP_TRUNCATE:
-       {
-         int r = store->truncate(op->get_oid(), op->get_offset());
-         dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
-         
-         // "ack"
-         messenger->send_message(new MOSDOpReply(op, r, osdcluster), 
-                                                         op->get_asker());
-         logger->inc("trunc");
-       }
-       delete op;
-       break;
+    op_truncate(op);
+    break;
 
   case OSD_OP_STAT:
-       {
-         struct stat st;
-         memset(&st, sizeof(st), 0);
-         int r = store->stat(op->get_oid(), &st);
-  
-         dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
-         
-         MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
-         reply->set_object_size(st.st_size);
-         messenger->send_message(reply, op->get_asker());
-         
-         logger->inc("stat");
-       }
-       delete op;
-       break;
+    op_stat(op);
+    break;
        
   default:
-       assert(0);
+    assert(0);
   }
 }
 
 
-
-
 void OSD::op_read(MOSDOp *r)
 {
   // read into a buffer
@@ -406,3 +388,53 @@ void OSD::op_write(MOSDOp *m)
   delete m;
 }
 
+void OSD::op_mkfs(MOSDOp *op)
+{
+  dout(3) << "MKFS" << endl;
+  {
+    int r = store->mkfs();     
+    messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+  }
+  delete op;
+}
+
+void OSD::op_delete(MOSDOp *op)
+{
+  int r = store->remove(op->get_oid());
+  dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
+  
+  // "ack"
+  messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+  
+  logger->inc("rm");
+  delete op;
+}
+
+void OSD::op_truncate(MOSDOp *op)
+{
+  int r = store->truncate(op->get_oid(), op->get_offset());
+  dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
+  
+  // "ack"
+  messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+  
+  logger->inc("trunc");
+
+  delete op;
+}
+
+void OSD::op_stat(MOSDOp *op)
+{
+  struct stat st;
+  memset(&st, sizeof(st), 0);
+  int r = store->stat(op->get_oid(), &st);
+  
+  dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
+         
+  MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+  reply->set_object_size(st.st_size);
+  messenger->send_message(reply, op->get_asker());
+         
+  logger->inc("stat");
+  delete op;
+}
index 35fbc52e1b042080583e192058c369142a2c1b9b..23c96a84ca11710a64e803e879f84428c2c88c5c 100644 (file)
@@ -20,7 +20,6 @@ class Message;
 #define RG_DIRTY_REPLICA_MEM   4
 #define RG_DIRTY_REPLICA_SYNC  8
 
-
 class ReplicaGroup {
  public:
   repgroup_t rg;
@@ -45,6 +44,7 @@ class OSD : public Dispatcher {
   class ObjectStore *store;
   class HostMonitor *monitor;
   class Logger      *logger;
+  class ThreadPool  *threadpool;
 
   list<class MOSDOp*> waiting_for_osdcluster;
 
@@ -61,6 +61,8 @@ class OSD : public Dispatcher {
   // OSDCluster
   void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
 
+  void do_op(class MOSDOp *m);
+
   // messages
   virtual void dispatch(Message *m);
 
@@ -69,6 +71,10 @@ class OSD : public Dispatcher {
   void handle_op(class MOSDOp *m);
   void op_read(class MOSDOp *m);
   void op_write(class MOSDOp *m);
+  void op_mkfs(class MOSDOp *m);
+  void op_delete(class MOSDOp *m);
+  void op_truncate(class MOSDOp *m);
+  void op_stat(class MOSDOp *m);
 };
 
 #endif
index 678963b6d8009f9c445d8fbfaf84830562b68730..bf9912ceb290943992b357341914ac7e30f243da 100644 (file)
@@ -71,14 +71,24 @@ class OSDCluster {
 
   Rush             *rush;       // rush implementation
 
+  Mutex  osd_cluster_lock;
+
   void init_rush() {
+
+    // SAB
+    osd_cluster_lock.Lock();
+
        if (rush) delete rush;
        rush = new Rush();
        
        int ngroups = osd_groups.size();
-       for (int i=0; i<ngroups; i++) 
+       for (int i=0; i<ngroups; i++) {
          rush->AddCluster(osd_groups[i].num_osds,
                                           osd_groups[i].weight);
+       }
+
+    // SAB
+    osd_cluster_lock.Unlock();
   }
 
 
diff --git a/ceph/osd/ThreadPool.cc b/ceph/osd/ThreadPool.cc
new file mode 100644 (file)
index 0000000..19523df
--- /dev/null
@@ -0,0 +1,99 @@
+#include "include/types.h"
+
+#include "OSD.h"
+#include "FakeStore.h"
+#include "OSDCluster.h"
+
+#include "mds/MDS.h"
+
+#include "msg/Messenger.h"
+#include "msg/Message.h"
+
+#include "msg/HostMonitor.h"
+
+#include "messages/MGenericMessage.h"
+#include "messages/MPing.h"
+#include "messages/MPingAck.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDGetClusterAck.h"
+
+#include "common/Logger.h"
+#include "common/LogType.h"
+#include "common/Mutex.h"
+
+#include "OSD/ThreadPool.h"
+
+#include <queue>
+
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+#include <sys/stat.h>
+
+void main(int argc, char *argv) {
+  ThreadPool t(10);
+
+}
+
+ThreadPool::Threadpool(int howmany) {
+  num_ops = 0;
+  num_threads = 0;
+
+  int status;
+
+  num_threads = howmany;
+
+  for(int i = 0; i < howmany; i++) {
+    status = pthread_create(thread[i], NULL, do_ops, (void *)&i);
+  }
+}
+
+ThreadPool::~Threadpool() {
+  queue_lock.Lock();
+  for(int i = num_ops; i > 0; i--) 
+    get_op();
+
+  for(int i = 0; i < num_threads; i++) {
+    put_op((MOSDOp *)NULL);
+  }
+
+  for(int i = 0; i < num_threads; i++) {
+    cout << "Waiting for thread " << i << " to die";
+    pthread_join(threads[i]);
+  }
+
+  queue_lock.Unlock();
+}
+
+void do_ops(void *whoami) {
+  MOSDOp *op;
+
+  cout << "Thread " << (int)i << " ready for action\n";
+  while(1) {
+    op = get_op();
+
+    if(op == NULL) {
+      cout << "Thread " << (int)i << " dying";
+      pthread_exit(0);
+    }
+      
+    OSD.do_op(op);
+  }
+}
+
+MOSDOp *get_op() {
+  MOSDOp *op;
+  queue_lock.Lock();
+  op = op_queue.pop();
+  num_ops--;
+  queue_lock.Unlock();
+}
+
+void put_op(MOSDOp *op) {
+  queue_lock.Lock();
+  opqueue.push(op);
+  num_ops++;
+  queue_lock.Unlock();
+}
+
diff --git a/ceph/osd/ThreadPool.h b/ceph/osd/ThreadPool.h
new file mode 100644 (file)
index 0000000..135c5de
--- /dev/null
@@ -0,0 +1,19 @@
+#define MAX_THREADS 1000
+
+class ThreadPool {
+  queue<MOSDOp *> op_queue;
+  Mutex queue_lock;
+  pthread_t thread[MAX_THREADS];
+  int num_ops;
+  int num_threads;
+
+  ThreadPool::Threadpool(int howmany);
+
+  ThreadPool::~Threadpool();
+
+  void put_op(MOSDOp *op);
+
+  void do_ops(void *whoami);
+
+  MOSDOp *get_op();
+}