]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: throttle wal work
authorSage Weil <sage@redhat.com>
Thu, 16 Apr 2015 23:30:31 +0000 (16:30 -0700)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:39 +0000 (13:39 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/WorkQueue.h
src/common/config_opts.h
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h
src/os/newstore/newstore_types.h

index cf78b2dc30294371b437d9371c4bfb39ee5d7171..f0754de8e1992bb4974698ea7dbdc7ec02a31ff2 100644 (file)
@@ -322,6 +322,10 @@ public:
       pool->_lock.Unlock();
     }
 
+    Mutex &get_lock() {
+      return pool->_lock;
+    }
+
     void lock() {
       pool->lock();
     }
index d9198b92a71702258da66ac47bca3cf282a6b5c4..bfe7d0df6650cefd53f5cf09f80dc3c984131c84 100644 (file)
@@ -798,9 +798,11 @@ OPTION(newstore_sync_queue_transaction, OPT_BOOL, false)  // perform write synch
 OPTION(newstore_fsync_threads, OPT_INT, 16)  // num threads calling fsync
 OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
 OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
-OPTION(newstore_wal_threads, OPT_INT, 2)
+OPTION(newstore_wal_threads, OPT_INT, 4)
 OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
 OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
+OPTION(newstore_wal_max_ops, OPT_U64, 64)
+OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024)
 OPTION(newstore_fid_prealloc, OPT_INT, 1024)
 OPTION(newstore_nid_prealloc, OPT_INT, 1024)
 OPTION(newstore_overlay_max_length, OPT_INT, 65536)
index a3a873d8624e5b108b89b54362af8ba7c6ddb61d..d4691f8b7f705c94944fc6b54d974bdfc9aca912 100644 (file)
@@ -2419,6 +2419,10 @@ int NewStore::queue_transactions(
     tls, &onreadable, &ondisk, &onreadable_sync);
   int r;
 
+  // throttle wal work
+  wal_wq.throttle(g_conf->newstore_wal_max_ops,
+                 g_conf->newstore_wal_max_bytes);
+
   // set up the sequencer
   OpSequencer *osr;
   if (!posr)
index d95452f37670c9fa10671c109c61bafeed8951bd..2563ba3d5bf4a9fdd62f42b5aeced8b95e0082c8 100644 (file)
@@ -367,6 +367,8 @@ public:
   private:
     NewStore *store;
     wal_osr_queue_t wal_queue;
+    uint64_t ops, bytes;
+    Cond throttle_cond;
 
   public:
     WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
@@ -381,6 +383,8 @@ public:
        wal_queue.push_back(*i->osr);
       }
       i->osr->wal_q.push_back(*i);
+      ++ops;
+      bytes += i->wal_txn->get_bytes();
       return true;
     }
     void _dequeue(TransContext *p) {
@@ -397,12 +401,18 @@ public:
        // requeue at the end to minimize contention
        wal_queue.push_back(*i->osr);
       }
+      --ops;
+      bytes -= i->wal_txn->get_bytes();
+      throttle_cond.Signal();
+
+      // preserve wal ordering for this sequencer by taking the lock
+      // while still holding the queue lock
+      i->osr->wal_apply_lock.Lock();
       return i;
     }
     void _process(TransContext *i, ThreadPool::TPHandle &handle) {
-      // preserve wal ordering for this sequencer
-      Mutex::Locker l(i->osr->wal_apply_lock);
       store->_apply_wal_transaction(i);
+      i->osr->wal_apply_lock.Unlock();
     }
     void _clear() {
       assert(wal_queue.empty());
@@ -416,6 +426,14 @@ public:
       unlock();
       drain();
     }
+
+    void throttle(uint64_t max_ops, uint64_t max_bytes) {
+      Mutex& lock = get_lock();
+      Mutex::Locker l(lock);
+      while (ops > max_ops || bytes > max_bytes) {
+       throttle_cond.Wait(lock);
+      }
+    }
   };
 
   struct KVSyncThread : public Thread {
index 636ef65a96c8d03c03653d999177b2bb32d2854e..286fc773e6791866535bd4a4b96f29e55006d8de 100644 (file)
@@ -165,6 +165,20 @@ struct wal_transaction_t {
   uint64_t seq;
   list<wal_op_t> ops;
 
+  int64_t _bytes;  ///< cached byte count
+
+  wal_transaction_t() : _bytes(-1) {}
+
+  uint64_t get_bytes() {
+    if (_bytes < 0) {
+      _bytes = 0;
+      for (list<wal_op_t>::iterator p = ops.begin(); p != ops.end(); ++p) {
+       _bytes += p->length;
+      }
+    }
+    return _bytes;
+  }
+
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& p);
   void dump(Formatter *f) const;