]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: shared threadpool for multiple work queues
authorSage Weil <sage@newdream.net>
Wed, 10 Dec 2008 19:13:00 +0000 (11:13 -0800)
committerSage Weil <sage@newdream.net>
Wed, 10 Dec 2008 20:17:22 +0000 (12:17 -0800)
src/common/WorkQueue.h
src/osd/OSD.cc
src/osd/OSD.h

index c14d3af86dd2d103e5baa04d6bf123d790bea65e..d8787c5062d113311a4875b34e65f2668de3da10 100644 (file)
 #include "Cond.h"
 #include "Thread.h"
 
-template<class T>
-class WorkQueue {
-  
+class WorkThreadPool {
   Mutex _lock;
-  Cond cond;
+  Cond _cond;
   bool _stop, _pause;
+  Cond _wait_cond;
+
+  struct _WorkQueue {
+    string name;
+    _WorkQueue(string n) : name(n) {}
+    virtual bool _try_process() = 0;
+    virtual void _clear() = 0;
+  };  
+
+public:
+  template<class T>
+  class WorkQueue : public _WorkQueue {
+    WorkThreadPool *pool;
+    
+    virtual bool _enqueue(T *) = 0;
+    virtual void _dequeue(T *) = 0;
+    virtual T *_dequeue() = 0;
+    virtual void _process(T *) = 0;
+    virtual void _clear() = 0;
+    
+  public:
+    WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) {
+      pool->add_work_queue(this);
+    }
+    ~WorkQueue() {
+      pool->remove_work_queue(this);
+    }
+    
+    bool queue(T *item) {
+      pool->_lock.Lock();
+      bool r = _enqueue(item);
+      pool->_cond.SignalOne();
+      pool->_lock.Unlock();
+      return r;
+    }
+    void dequeue(T *item) {
+      pool->_lock.Lock();
+      _dequeue(item);
+      pool->_lock.Unlock();
+    }
+    void clear() {
+      pool->_lock.Lock();
+      _clear();
+      pool->_lock.Unlock();
+    }
+
+    bool _try_process() {
+      T *item = _dequeue();
+      if (item) {
+       pool->_lock.Unlock();
+       _process(item);
+       pool->_lock.Lock();
+       return true;
+      }
+      return false;
+    }
+
+    void lock() {
+      pool->lock();
+    }
+    void unlock() {
+      pool->unlock();
+    }
+    void _kick() {
+      pool->_kick();
+    }
+
+  };
+
+private:
+  vector<_WorkQueue*> work_queues;
+  int last_work_queue;
+
+  // threads
+  struct WorkThread : public Thread {
+    WorkThreadPool *pool;
+    WorkThread(WorkThreadPool *p) : pool(p) {}
+    void *entry() {
+      pool->entry();
+      return 0;
+    }
+  };
+  
+  set<WorkThread*> _threads;
   int processing;
-  Cond wait_cond;
+
 
   void entry() {
     _lock.Lock();
+    //generic_dout(0) << "entry start" << dendl;
     while (!_stop) {
-      if (!_pause) {
-       T *item = _dequeue();
-       if (item) {
+      if (!_pause && work_queues.size()) {
+       _WorkQueue *wq;
+       int tries = work_queues.size();
+       bool did = false;
+       while (tries--) {
+         last_work_queue++;
+         last_work_queue %= work_queues.size();
+         wq = work_queues[last_work_queue];
+         
          processing++;
-         _lock.Unlock();
-         _process(item);
-         _lock.Lock();
+         //generic_dout(0) << "entry trying wq " << wq->name << dendl;
+         did = wq->_try_process();
          processing--;
-         if (_pause)
-           wait_cond.Signal();
-         continue;
+         //if (did) generic_dout(0) << "entry did wq " << wq->name << dendl;
+         if (did && _pause)
+           _wait_cond.Signal();
+         if (did)
+           break;
        }
+       if (did)
+         continue;
       }
-      cond.Wait(_lock);
+      //generic_dout(0) << "entry waiting" << dendl;
+      _cond.Wait(_lock);
     }
+    //generic_dout(0) << "entry finish" << dendl;
     _lock.Unlock();
   }
 
-  struct WorkThread : public Thread {
-    WorkQueue *wq;
-    WorkThread(WorkQueue *q) : wq(q) {}
-    void *entry() {
-      wq->entry();
-      return 0;
-    }
-  } thread;
-
 public:
-  WorkQueue(string name) :
+  WorkThreadPool(string name, int n=1) :
     _lock((new string(name + "::lock"))->c_str()),  // deliberately leak this
-    _stop(false), _pause(false),
-    processing(0),
-    thread(this) {}
+    _stop(false),
+    _pause(false),
+    last_work_queue(0),
+    processing(0) {
+    set_num_threads(n);
+  }
+  ~WorkThreadPool() {
+    for (set<WorkThread*>::iterator p = _threads.begin();
+        p != _threads.end();
+        p++)
+      delete *p;
+  }
+  
+  void add_work_queue(_WorkQueue *wq) {
+    work_queues.push_back(wq);
+  }
+  void remove_work_queue(_WorkQueue *wq) {
+    unsigned i = 0;
+    while (work_queues[i] != wq)
+      i++;
+    for (i++; i < work_queues.size(); i++) 
+      work_queues[i-1] = work_queues[i];
+    assert(i == work_queues.size());
+    work_queues.resize(i-1);
+  }
 
-  virtual bool _enqueue(T *) = 0;
-  virtual void _dequeue(T *) = 0;
-  virtual T *_dequeue() = 0;
-  virtual void _process(T *) = 0;
-  virtual void _clear() = 0;
+  void set_num_threads(unsigned n) {
+    while (_threads.size() < n) {
+      WorkThread *t = new WorkThread(this);
+      _threads.insert(t);
+    }
+  }
 
   void start() {
-    thread.create();
+    for (set<WorkThread*>::iterator p = _threads.begin();
+        p != _threads.end();
+        p++)
+      (*p)->create();
   }
   void stop(bool clear_after=true) {
     _lock.Lock();
     _stop = true;
-    cond.Signal();
+    _cond.Signal();
     _lock.Unlock();
-    thread.join();
-    if (clear_after)
-      clear();      
+    for (set<WorkThread*>::iterator p = _threads.begin();
+        p != _threads.end();
+        p++)
+      (*p)->join();
+    _lock.Lock();
+    for (unsigned i=0; i<work_queues.size(); i++)
+      work_queues[i]->_clear();
+    _lock.Unlock();    
   }
   void kick() {
     _lock.Lock();
-    cond.Signal();
+    _cond.Signal();
     _lock.Unlock();
   }
   void _kick() {
     assert(_lock.is_locked());
-    cond.Signal();
+    _cond.Signal();
   }
   void lock() {
     _lock.Lock();
@@ -104,7 +221,7 @@ public:
     assert(!_pause);
     _pause = true;
     while (processing)
-      wait_cond.Wait(_lock);
+      _wait_cond.Wait(_lock);
     _lock.Unlock();
   }
   void pause_new() {
@@ -118,29 +235,12 @@ public:
     _lock.Lock();
     assert(_pause);
     _pause = false;
-    cond.Signal();
-    _lock.Unlock();
-  }
-
-  bool queue(T *item) {
-    _lock.Lock();
-    bool r = _enqueue(item);
-    cond.Signal();
-    _lock.Unlock();
-    return r;
-  }
-  void dequeue(T *item) {
-    _lock.Lock();
-    _dequeue(item);
-    _lock.Unlock();
-  }
-  void clear() {
-    _lock.Lock();
-    _clear();
+    _cond.Signal();
     _lock.Unlock();
   }
 
 };
 
 
+
 #endif
index 631c98385cdf4c82d98e5085e8a238fb4cac021c..ddafa660a71ff70b9404684c4240a3221cda835d 100644 (file)
@@ -260,6 +260,8 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) :
   whoami(id), dev_name(dev),
   boot_epoch(0), last_active_epoch(0),
   state(STATE_BOOTING),
+  recovery_tp("OSD::recovery_tp", 1),
+  disk_tp("OSD::disk_tp", 2),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false), heartbeat_epoch(0),
   heartbeat_messenger(hbm),
@@ -280,10 +282,10 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev) :
   tid_lock("OSD::tid_lock"),
   num_pulling(0),
   recovery_ops_active(0),
-  recovery_wq(this),
+  recovery_wq(this, &recovery_tp),
   remove_list_lock("OSD::remove_list_lock"),
-  snap_trim_wq(this),
-  scrub_wq(this)
+  snap_trim_wq(this, &disk_tp),
+  scrub_wq(this, &disk_tp)
 {
   osdmap = 0;
 
@@ -439,9 +441,12 @@ int OSD::init()
   // announce to monitor i exist and have booted.
   do_mon_report();
   
-  recovery_wq.start();
-  scrub_wq.start();
-  snap_trim_wq.start();
+  recovery_tp.add_work_queue(&recovery_wq);
+  recovery_tp.start();
+
+  disk_tp.add_work_queue(&scrub_wq);
+  disk_tp.add_work_queue(&snap_trim_wq);
+  disk_tp.start();
 
   // start the heartbeat
   heartbeat_thread.create();
@@ -487,12 +492,10 @@ int OSD::shutdown()
   delete threadpool;
   threadpool = 0;
 
-  recovery_wq.stop();
-  dout(10) << "recovery wq stopped" << dendl;
-  scrub_wq.stop();
-  dout(10) << "scrub wq stopped" << dendl;
-  snap_trim_wq.stop();
-  dout(10) << "snap trim wq stopped" << dendl;
+  recovery_tp.stop();
+  dout(10) << "recovery tp stopped" << dendl;
+  disk_tp.stop();
+  dout(10) << "disk tp stopped" << dendl;
 
   // tell pgs we're shutting down
   for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
@@ -1161,7 +1164,7 @@ void OSD::tick()
   }
 
   // periodically kick recovery work queue
-  recovery_wq.kick();
+  recovery_tp.kick();
   
   map_lock.get_read();
 
@@ -1642,8 +1645,10 @@ void OSD::handle_scrub(MOSDScrub *m)
       if (pg->is_primary()) {
        if (m->repair)
          pg->state_set(PG_STATE_REPAIR);
-       if (!pg->is_scrubbing())
+       if (!pg->is_scrubbing()) {
+         dout(10) << "queueing " << *pg << " for scrub" << dendl;
          scrub_wq.queue(pg);
+       }
       }
     }
   } else {
@@ -1655,8 +1660,10 @@ void OSD::handle_scrub(MOSDScrub *m)
        if (pg->is_primary()) {
          if (m->repair)
            pg->state_set(PG_STATE_REPAIR);
-         if (!pg->is_scrubbing())
+         if (!pg->is_scrubbing()) {
+           dout(10) << "queueing " << *pg << " for scrub" << dendl;
            scrub_wq.queue(pg);
+         }
        }
       }
   }
@@ -1724,9 +1731,8 @@ void OSD::handle_osd_map(MOSDMap *m)
   state = STATE_ACTIVE;
 
   wait_for_no_ops();
-  recovery_wq.pause();
-  scrub_wq.pause_new();   // _process() may be waiting for a replica message
-  snap_trim_wq.pause();
+  recovery_tp.pause();
+  disk_tp.pause_new();   // _process() may be waiting for a replica message
   map_lock.get_write();
 
   assert(osd_lock.is_locked());
@@ -1920,9 +1926,8 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   map_lock.put_write();
 
-  recovery_wq.unpause();
-  scrub_wq.unpause();
-  snap_trim_wq.unpause();
+  recovery_tp.unpause();
+  disk_tp.unpause();
 
   //if (osdmap->get_epoch() == 1) store->sync();     // in case of early death, blah
 
index fe6ad22b714bbe5c75c20fade1ce4bf5d5808651..3a384be2f6c0a868f998954fe2a305218eb497ea 100644 (file)
@@ -114,6 +114,12 @@ public:
 
 private:
 
+
+  WorkThreadPool recovery_tp;
+  WorkThreadPool disk_tp;
+
+
+
   // -- heartbeat --
   Mutex heartbeat_lock;
   Cond heartbeat_cond;
@@ -462,14 +468,16 @@ private:
                        map<int, MOSDPGInfo*>* info_map,
                        int& created);
 
+
+
   // -- pg recovery --
   xlist<PG*> recovery_queue;
   utime_t defer_recovery_until;
   int recovery_ops_active;
 
-  struct RecoveryWQ : public WorkQueue<PG> {
+  struct RecoveryWQ : public WorkThreadPool::WorkQueue<PG> {
     OSD *osd;
-    RecoveryWQ(OSD *o) : WorkQueue<PG>("OSD::RecoveryWQ"), osd(o) {}
+    RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       if (!pg->recovery_item.get_xlist()) {
@@ -543,9 +551,9 @@ private:
   // -- snap trimming --
   xlist<PG*> snap_trim_queue;
   
-  struct SnapTrimWQ : public WorkQueue<PG> {
+  struct SnapTrimWQ : public WorkThreadPool::WorkQueue<PG> {
     OSD *osd;
-    SnapTrimWQ(OSD *o) : WorkQueue<PG>("OSD::SnapTrimWQ"), osd(o) {}
+    SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       if (pg->snap_trim_item.is_on_xlist())
@@ -575,9 +583,9 @@ private:
   // -- scrubbing --
   xlist<PG*> scrub_queue;
 
-  struct ScrubWQ : public WorkQueue<PG> {
+  struct ScrubWQ : public WorkThreadPool::WorkQueue<PG> {
     OSD *osd;
-    ScrubWQ(OSD *o) : WorkQueue<PG>("OSD::ScrubWQ"), osd(o) {}
+    ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       if (pg->scrub_item.is_on_xlist())