]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: use new workqueue in osd for ops
authorSage Weil <sage@newdream.net>
Wed, 10 Dec 2008 19:35:04 +0000 (11:35 -0800)
committerSage Weil <sage@newdream.net>
Wed, 10 Dec 2008 20:17:24 +0000 (12:17 -0800)
src/osd/OSD.cc
src/osd/OSD.h

index ddafa660a71ff70b9404684c4240a3221cda835d..30fb27c1aade565107b1a659da0f08d75f31337b 100644 (file)
@@ -71,7 +71,6 @@
 #include "common/Logger.h"
 #include "common/LogType.h"
 #include "common/Timer.h"
-#include "common/ThreadPool.h"
 #include "common/LogClient.h"
 
 #include <iostream>
@@ -260,6 +259,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) :
   whoami(id), dev_name(dev),
   boot_epoch(0), last_active_epoch(0),
   state(STATE_BOOTING),
+  op_tp("OSD::op_tp", g_conf.osd_maxthreads),
   recovery_tp("OSD::recovery_tp", 1),
   disk_tp("OSD::disk_tp", 2),
   heartbeat_lock("OSD::heartbeat_lock"),
@@ -273,6 +273,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) :
   qlen_calc(3),
   iat_averager(g_conf.osd_flash_crowd_iat_alpha),
   finished_lock("OSD::finished_lock"),
+  op_wq(this, &op_tp),
   osdmap(NULL),
   map_lock("OSD::map_lock"),
   map_cache_lock("OSD::map_cache_lock"),
@@ -304,7 +305,6 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) :
 
 OSD::~OSD()
 {
-  delete threadpool;
   delete osdmap;
   delete logger;
   delete store;
@@ -425,15 +425,6 @@ int OSD::init()
   osd_logtype.add_inc("mapf");
   osd_logtype.add_inc("mapfdup");
   
-  // request thread pool
-  {
-    char name[80];
-    sprintf(name,"osd%d.threadpool", whoami);
-    threadpool = new ThreadPool<OSD*, PG*>(name, g_conf.osd_maxthreads, 
-                                          static_dequeueop,
-                                          this);
-  }
-  
   // i'm ready!
   messenger->set_dispatcher(this);
   heartbeat_messenger->set_dispatcher(&heartbeat_dispatcher);
@@ -441,11 +432,8 @@ int OSD::init()
   // announce to monitor i exist and have booted.
   do_mon_report();
   
-  recovery_tp.add_work_queue(&recovery_wq);
+  op_tp.start();
   recovery_tp.start();
-
-  disk_tp.add_work_queue(&scrub_wq);
-  disk_tp.add_work_queue(&snap_trim_wq);
   disk_tp.start();
 
   // start the heartbeat
@@ -489,13 +477,12 @@ int OSD::shutdown()
   clear_pg_stat_queue();
 
   // stop threads
-  delete threadpool;
-  threadpool = 0;
-
   recovery_tp.stop();
   dout(10) << "recovery tp stopped" << dendl;
   disk_tp.stop();
   dout(10) << "disk tp stopped" << dendl;
+  op_tp.stop();
+  dout(10) << "op tp stopped" << dendl;
 
   // tell pgs we're shutting down
   for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
@@ -3519,7 +3506,7 @@ void OSD::enqueue_op(PG *pg, Message *op)
   
   // add pg to threadpool queue
   pg->get();   // we're exposing the pointer, here.
-  threadpool->put_op(pg);
+  op_wq.queue(pg);
 }
 
 /*
index 3a384be2f6c0a868f998954fe2a305218eb497ea..a8c48f8dfcdb00ed1e13c24453387f9b4f039992 100644 (file)
@@ -114,7 +114,7 @@ public:
 
 private:
 
-
+  WorkThreadPool op_tp;
   WorkThreadPool recovery_tp;
   WorkThreadPool disk_tp;
 
@@ -269,7 +269,34 @@ private:
   }
   
   // -- op queue --
-  class ThreadPool<class OSD*, PG*>   *threadpool;
+  deque<PG*> op_queue;
+  
+  struct OpWQ : public WorkThreadPool::WorkQueue<PG> {
+    OSD *osd;
+    OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
+
+    bool _enqueue(PG *pg) {
+      pg->get();
+      osd->op_queue.push_back(pg);
+      return true;
+    }
+    void _dequeue(PG *pg) {
+      assert(0);
+    }
+    PG * _dequeue() {
+      if (osd->op_queue.empty())
+       return NULL;
+      PG *pg = osd->op_queue.front();
+      osd->op_queue.pop_front();
+      return pg;
+    }
+    void _process(PG *pg) {
+      osd->dequeue_op(pg);
+    }
+    void _clear() {
+      assert(osd->op_queue.empty());
+    }
+  } op_wq;
 
   int   pending_ops;
   bool  waiting_for_no_ops;