]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
workqueue: non-inline worker, control methods; debugging
authorSage Weil <sage@newdream.net>
Wed, 10 Dec 2008 20:15:00 +0000 (12:15 -0800)
committerSage Weil <sage@newdream.net>
Wed, 10 Dec 2008 20:17:25 +0000 (12:17 -0800)
src/Makefile.am
src/common/WorkQueue.cc [new file with mode: 0644]
src/common/WorkQueue.h
src/config.cc
src/config.h
src/include/buffer.h
src/os/FileStore.h
src/osd/OSD.h

index f3bb88331ec8385bafe18568f3fada2ebca3ccad..e7308d1d767a842747300db829ddc185c6ce241f 100644 (file)
@@ -181,6 +181,7 @@ libcommon_a_SOURCES = \
        common/sctp_crc32.c\
        common/assert.cc \
        common/debug.cc \
+       common/WorkQueue.cc \
        mon/MonMap.cc \
        mon/MonClient.cc \
        osd/OSDMap.cc \
diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc
new file mode 100644 (file)
index 0000000..5ce590b
--- /dev/null
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "WorkQueue.h"
+
+#include "config.h"
+
+#define DOUT_SUBSYS tp
+#undef dout_prefix
+#define dout_prefix *_dout << dbeginl << pthread_self() << " " << name << " "
+
+
+void ThreadPool::worker()
+{
+  _lock.Lock();
+  dout(10) << "worker start" << dendl;
+  while (!_stop) {
+    if (!_pause && work_queues.size()) {
+      _WorkQueue *wq;
+      int tries = work_queues.size();
+      bool did = false;
+      while (tries--) {
+       last_work_queue++;
+       last_work_queue %= work_queues.size();
+       wq = work_queues[last_work_queue];
+       
+       void *item = wq->_void_dequeue();
+       if (item) {
+         processing++;
+         dout(12) << "worker wq " << wq->name << " start processing " << item << dendl;
+         _lock.Unlock();
+         wq->_void_process(item);
+         _lock.Lock();
+         dout(15) << "worker wq " << wq->name << " done processing " << item << dendl;
+         processing--;
+         if (_pause)
+           _wait_cond.Signal();
+         did = true;
+         break;
+       }
+      }
+      if (did)
+       continue;
+    }
+    dout(15) << "worker waiting" << dendl;
+    _cond.Wait(_lock);
+  }
+  dout(0) << "worker finish" << dendl;
+  _lock.Unlock();
+}
+
+void ThreadPool::start()
+{
+  dout(10) << "start" << dendl;
+  for (set<WorkThread*>::iterator p = _threads.begin();
+       p != _threads.end();
+       p++)
+    (*p)->create();
+  dout(15) << "started" << dendl;
+}
+void ThreadPool::stop(bool clear_after)
+{
+  dout(10) << "stop" << dendl;
+  _lock.Lock();
+  _stop = true;
+  _cond.Signal();
+  _lock.Unlock();
+  for (set<WorkThread*>::iterator p = _threads.begin();
+       p != _threads.end();
+       p++)
+    (*p)->join();
+  _lock.Lock();
+  for (unsigned i=0; i<work_queues.size(); i++)
+    work_queues[i]->_clear();
+  _lock.Unlock();    
+  dout(15) << "stopped" << dendl;
+}
+
+
+void ThreadPool::pause()
+{
+  dout(10) << "pause" << dendl;
+  _lock.Lock();
+  assert(!_pause);
+  _pause = true;
+  while (processing)
+    _wait_cond.Wait(_lock);
+  _lock.Unlock();
+  dout(15) << "paused" << dendl;
+}
+
+void ThreadPool::pause_new()
+{
+  dout(10) << "pause_new" << dendl;
+  _lock.Lock();
+  assert(!_pause);
+  _pause = true;
+  _lock.Unlock();
+}
+
+void ThreadPool::unpause()
+{
+  dout(10) << "unpause" << dendl;
+  _lock.Lock();
+  assert(_pause);
+  _pause = false;
+  _cond.Signal();
+  _lock.Unlock();
+}
index d8787c5062d113311a4875b34e65f2668de3da10..e275198e50e13a3d93a5cc8798084480929c7878 100644 (file)
@@ -19,7 +19,8 @@
 #include "Cond.h"
 #include "Thread.h"
 
-class WorkThreadPool {
+class ThreadPool {
+  string name;
   Mutex _lock;
   Cond _cond;
   bool _stop, _pause;
@@ -28,14 +29,15 @@ class WorkThreadPool {
   struct _WorkQueue {
     string name;
     _WorkQueue(string n) : name(n) {}
-    virtual bool _try_process() = 0;
     virtual void _clear() = 0;
+    virtual void *_void_dequeue() = 0;
+    virtual void _void_process(void *) = 0;
   };  
 
 public:
   template<class T>
   class WorkQueue : public _WorkQueue {
-    WorkThreadPool *pool;
+    ThreadPool *pool;
     
     virtual bool _enqueue(T *) = 0;
     virtual void _dequeue(T *) = 0;
@@ -43,8 +45,15 @@ public:
     virtual void _process(T *) = 0;
     virtual void _clear() = 0;
     
+    void *_void_dequeue() {
+      return (void *)_dequeue();
+    }
+    void _void_process(void *p) {
+      _process((T *)p);
+    }
+
   public:
-    WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) {
+    WorkQueue(string n, ThreadPool *p) : _WorkQueue(n), pool(p) {
       pool->add_work_queue(this);
     }
     ~WorkQueue() {
@@ -69,17 +78,6 @@ public:
       pool->_lock.Unlock();
     }
 
-    bool _try_process() {
-      T *item = _dequeue();
-      if (item) {
-       pool->_lock.Unlock();
-       _process(item);
-       pool->_lock.Lock();
-       return true;
-      }
-      return false;
-    }
-
     void lock() {
       pool->lock();
     }
@@ -99,10 +97,10 @@ private:
 
   // threads
   struct WorkThread : public Thread {
-    WorkThreadPool *pool;
-    WorkThread(WorkThreadPool *p) : pool(p) {}
+    ThreadPool *pool;
+    WorkThread(ThreadPool *p) : pool(p) {}
     void *entry() {
-      pool->entry();
+      pool->worker();
       return 0;
     }
   };
@@ -110,42 +108,11 @@ private:
   set<WorkThread*> _threads;
   int processing;
 
-
-  void entry() {
-    _lock.Lock();
-    //generic_dout(0) << "entry start" << dendl;
-    while (!_stop) {
-      if (!_pause && work_queues.size()) {
-       _WorkQueue *wq;
-       int tries = work_queues.size();
-       bool did = false;
-       while (tries--) {
-         last_work_queue++;
-         last_work_queue %= work_queues.size();
-         wq = work_queues[last_work_queue];
-         
-         processing++;
-         //generic_dout(0) << "entry trying wq " << wq->name << dendl;
-         did = wq->_try_process();
-         processing--;
-         //if (did) generic_dout(0) << "entry did wq " << wq->name << dendl;
-         if (did && _pause)
-           _wait_cond.Signal();
-         if (did)
-           break;
-       }
-       if (did)
-         continue;
-      }
-      //generic_dout(0) << "entry waiting" << dendl;
-      _cond.Wait(_lock);
-    }
-    //generic_dout(0) << "entry finish" << dendl;
-    _lock.Unlock();
-  }
+  void worker();
 
 public:
-  WorkThreadPool(string name, int n=1) :
+  ThreadPool(string nm, int n=1) :
+    name(nm),
     _lock((new string(name + "::lock"))->c_str()),  // deliberately leak this
     _stop(false),
     _pause(false),
@@ -153,7 +120,7 @@ public:
     processing(0) {
     set_num_threads(n);
   }
-  ~WorkThreadPool() {
+  ~ThreadPool() {
     for (set<WorkThread*>::iterator p = _threads.begin();
         p != _threads.end();
         p++)
@@ -180,26 +147,6 @@ public:
     }
   }
 
-  void start() {
-    for (set<WorkThread*>::iterator p = _threads.begin();
-        p != _threads.end();
-        p++)
-      (*p)->create();
-  }
-  void stop(bool clear_after=true) {
-    _lock.Lock();
-    _stop = true;
-    _cond.Signal();
-    _lock.Unlock();
-    for (set<WorkThread*>::iterator p = _threads.begin();
-        p != _threads.end();
-        p++)
-      (*p)->join();
-    _lock.Lock();
-    for (unsigned i=0; i<work_queues.size(); i++)
-      work_queues[i]->_clear();
-    _lock.Unlock();    
-  }
   void kick() {
     _lock.Lock();
     _cond.Signal();
@@ -216,29 +163,11 @@ public:
     _lock.Unlock();
   }
 
-  void pause() {
-    _lock.Lock();
-    assert(!_pause);
-    _pause = true;
-    while (processing)
-      _wait_cond.Wait(_lock);
-    _lock.Unlock();
-  }
-  void pause_new() {
-    _lock.Lock();
-    assert(!_pause);
-    _pause = true;
-    _lock.Unlock();
-  }
-
-  void unpause() {
-    _lock.Lock();
-    assert(_pause);
-    _pause = false;
-    _cond.Signal();
-    _lock.Unlock();
-  }
-
+  void start();
+  void stop(bool clear_after=true);
+  void pause();
+  void pause_new();
+  void unpause();
 };
 
 
index 673000fc854726e8be013e1af42b311c44de5fe9..9ef1d9d6cc29123b76267b91ee1e33b96a6cc497 100644 (file)
@@ -241,6 +241,7 @@ md_config_t g_conf = {
   debug_ms: 0,
   debug_mon: 1,
   debug_paxos: 0,
+  debug_tp: 0,
   
   debug_after: 0,
   
@@ -808,6 +809,11 @@ void parse_config_options(std::vector<const char*>& args, bool open)
         g_conf.debug_paxos = atoi(args[++i]);
       else 
         g_debug_after_conf.debug_paxos = atoi(args[++i]);
+    else if (strcmp(args[i], "--debug_tp") == 0) 
+      if (!g_conf.debug_after) 
+        g_conf.debug_tp = atoi(args[++i]);
+      else 
+        g_debug_after_conf.debug_tp = atoi(args[++i]);
 
     else if (strcmp(args[i], "--debug_after") == 0) {
       g_conf.debug_after = atoi(args[++i]);
index d51a70e054a65763af6d2d3566c0bba2819b7e0d..992a44afced1b40e2e9973ed578eabc4287d6b9f 100644 (file)
@@ -104,6 +104,7 @@ struct md_config_t {
   int debug_ms;
   int debug_mon;
   int debug_paxos;
+  int debug_tp;
 
   int debug_after;
 
index 5319ad83cb6d884e634cb1888169f92d434b6ef9..7beba30eacf2863efb5328ce792ede873aba0495 100644 (file)
@@ -41,8 +41,13 @@ void *valloc(size_t);
 #endif
 
 #include <iostream>
+#include <istream>
 #include <iomanip>
 #include <list>
+#include <string>
+
+using std::istream;
+using std::string;
 
 #include "atomic.h"
 #include "page.h"
index 27cacff94141eb5424af6e18d3dd9f4682d88340..9f6fabc1369029ca1614e833de61b56b55961604 100644 (file)
@@ -18,7 +18,6 @@
 
 #include "ObjectStore.h"
 #include "JournalingObjectStore.h"
-#include "common/ThreadPool.h"
 #include "common/Mutex.h"
 
 #include "Fake.h"
index a8c48f8dfcdb00ed1e13c24453387f9b4f039992..33c557523fd9631eb5374ec2bca7bdf58f128953 100644 (file)
@@ -19,7 +19,6 @@
 
 #include "common/Mutex.h"
 #include "common/RWLock.h"
-#include "common/ThreadPool.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "common/LogClient.h"
@@ -114,9 +113,9 @@ public:
 
 private:
 
-  WorkThreadPool op_tp;
-  WorkThreadPool recovery_tp;
-  WorkThreadPool disk_tp;
+  ThreadPool op_tp;
+  ThreadPool recovery_tp;
+  ThreadPool disk_tp;
 
 
 
@@ -271,9 +270,9 @@ private:
   // -- op queue --
   deque<PG*> op_queue;
   
-  struct OpWQ : public WorkThreadPool::WorkQueue<PG> {
+  struct OpWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
+    OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       pg->get();
@@ -502,9 +501,9 @@ private:
   utime_t defer_recovery_until;
   int recovery_ops_active;
 
-  struct RecoveryWQ : public WorkThreadPool::WorkQueue<PG> {
+  struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
+    RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       if (!pg->recovery_item.get_xlist()) {
@@ -578,9 +577,9 @@ private:
   // -- snap trimming --
   xlist<PG*> snap_trim_queue;
   
-  struct SnapTrimWQ : public WorkThreadPool::WorkQueue<PG> {
+  struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
+    SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       if (pg->snap_trim_item.is_on_xlist())
@@ -610,9 +609,9 @@ private:
   // -- scrubbing --
   xlist<PG*> scrub_queue;
 
-  struct ScrubWQ : public WorkThreadPool::WorkQueue<PG> {
+  struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
-    ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
+    ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
 
     bool _enqueue(PG *pg) {
       if (pg->scrub_item.is_on_xlist())