From c8f27eca73930400720cd7bb0d19cf61d3614e76 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 2 Feb 2010 13:50:26 -0800 Subject: [PATCH] journal: throttle journal --- src/Makefile.am | 1 + src/common/Throttle.h | 68 +++++++++++++++++++++++++++++++++++++++++++ src/config.cc | 2 ++ src/config.h | 2 ++ src/os/FileJournal.cc | 37 ++++++++++++++++++++--- src/os/FileJournal.h | 11 +++++-- src/os/FileStore.cc | 3 ++ src/os/Journal.h | 1 + 8 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 src/common/Throttle.h diff --git a/src/Makefile.am b/src/Makefile.am index 214e0fedc0530..fb150ffc53eba 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -469,6 +469,7 @@ noinst_HEADERS = \ common/Semaphore.h\ common/Spinlock.h\ common/Thread.h\ + common/Throttle.h\ common/Timer.h\ common/tls.h\ common/WorkQueue.h\ diff --git a/src/common/Throttle.h b/src/common/Throttle.h new file mode 100644 index 0000000000000..69e6aa3276629 --- /dev/null +++ b/src/common/Throttle.h @@ -0,0 +1,68 @@ +#ifndef _CEPH_THROTTLE_H +#define _CEPH_THROTTLE_H + +#include "Mutex.h" +#include "Cond.h" + +class Throttle { + __u64 count, want, max; + Mutex lock; + Cond cond; + +public: + Throttle(__u64 m = 0) : count(0), max(m), + lock("Throttle::lock") {} + +private: + void _reset_max(__u64 m) { + if (m) { + if (m < max) + cond.SignalAll(); + max = m; + } + } + bool _wait(__u64 c) { + bool waited = false; + while (max && count + c > max) { + waited = true; + cond.Wait(lock); + } + return waited; + } + +public: + __u64 get_current() { + Mutex::Locker l(lock); + return count; + } + + bool wait(__u64 m = 0) { + Mutex::Locker l(lock); + _reset_max(m); + return _wait(0); + } + + __u64 take(__u64 c = 1) { + Mutex::Locker l(lock); + count += c; + return count; + } + + bool get(__u64 c = 1, __u64 m = 0) { + Mutex::Locker l(lock); + _reset_max(m); + bool waited = _wait(c); + count += c; + return waited; + } + + __u64 put(__u64 c = 1) { + Mutex::Locker l(lock); + cond.SignalAll(); + count -= c; + return count; + } +}; + + +#endif diff --git a/src/config.cc b/src/config.cc index 94660a169c88c..c49af9e2b8680 100644 --- a/src/config.cc +++ b/src/config.cc @@ -548,6 +548,8 @@ static struct config_option config_optionsp[] = { OPTION(journal_block_align, 0, OPT_BOOL, true), OPTION(journal_max_write_bytes, 0, OPT_INT, 0), OPTION(journal_max_write_entries, 0, OPT_INT, 100), + OPTION(journal_queue_max_ops, 0, OPT_INT, 500), + OPTION(journal_queue_max_bytes, 0, OPT_INT, 100 << 20), OPTION(bdev_lock, 0, OPT_BOOL, true), OPTION(bdev_iothreads, 0, OPT_INT, 1), // number of ios to queue with kernel OPTION(bdev_idle_kick_after_ms, 0, OPT_INT, 100), // ms diff --git a/src/config.h b/src/config.h index 874a36512d0fb..7a3dc51441343 100644 --- a/src/config.h +++ b/src/config.h @@ -363,6 +363,8 @@ struct md_config_t { bool journal_block_align; int journal_max_write_bytes; int journal_max_write_entries; + int journal_queue_max_ops; + int journal_queue_max_bytes; // block device bool bdev_lock; diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 73bc28d4e09a5..6098be16f6a2f 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -356,6 +356,8 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size) writing_seq.push_back(writeq.front().seq); writing_fin.push_back(writeq.front().fin); } + throttle_ops.put(1); + throttle_bytes.put(writeq.front().bl.length()); writeq.pop_front(); } print_header(); @@ -363,7 +365,7 @@ bool FileJournal::check_for_full(__u64 seq, off64_t pos, off64_t size) } -void FileJournal::prepare_multi_write(bufferlist& bl) +void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes) { // gather queued writes off64_t queue_pos = write_pos; @@ -375,10 +377,12 @@ void FileJournal::prepare_multi_write(bufferlist& bl) return; while (!writeq.empty()) { - bool r = prepare_single_write(bl, queue_pos); + bool r = prepare_single_write(bl, queue_pos, orig_bytes); if (!r) break; + orig_ops++; + if (eleft) { if (--eleft == 0) { dout(20) << "prepare_multi_write hit max events per write " << g_conf.journal_max_write_entries << dendl; @@ -396,7 +400,7 @@ void FileJournal::prepare_multi_write(bufferlist& bl) //assert(write_pos + bl.length() == queue_pos); } -bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos) +bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_size) { // grab next item __u64 seq = writeq.front().seq; @@ -407,6 +411,8 @@ bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos) if (!check_for_full(seq, queue_pos, size)) return false; + orig_size += ebl.length(); + // add to write buffer dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq << " len " << ebl.length() << " -> " << size @@ -598,9 +604,19 @@ void FileJournal::write_thread_entry() continue; } + __u64 orig_ops = 0; + __u64 orig_bytes = 0; + bufferlist bl; - prepare_multi_write(bl); + prepare_multi_write(bl, orig_ops, orig_bytes); do_write(bl); + + __u64 new_ops = throttle_ops.put(orig_ops); + __u64 new_bytes = throttle_bytes.put(orig_bytes); + dout(10) << "write_thread throttle finished " << orig_ops << " ops and " + << orig_bytes << " bytes, now " + << new_ops << " ops and " << new_bytes << " bytes" + << dendl; } write_empty_cond.Signal(); write_lock.Unlock(); @@ -616,6 +632,9 @@ void FileJournal::submit_entry(__u64 seq, bufferlist& e, Context *oncommit) dout(10) << "submit_entry seq " << seq << " len " << e.length() << " (" << oncommit << ")" << dendl; + + throttle_ops.take(1); + throttle_bytes.take(e.length()); if (!full_commit_seq && full_restart_seq && seq >= full_restart_seq) { @@ -690,6 +709,8 @@ void FileJournal::committed_thru(__u64 seq) << dendl; if (writeq.front().fin) finisher->queue(writeq.front().fin); + throttle_ops.put(1); + throttle_bytes.put(writeq.front().bl.length()); writeq.pop_front(); } @@ -804,3 +825,11 @@ bool FileJournal::read_entry(bufferlist& bl, __u64& seq) return true; } + +void FileJournal::throttle() +{ + if (throttle_ops.wait(g_conf.journal_queue_max_ops)) + dout(1) << "throttle: waited for ops" << dendl; + if (throttle_bytes.wait(g_conf.journal_queue_max_bytes)) + dout(1) << "throttle: waited for bytes" << dendl; +} diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index b1bd6342e2ca7..1674a02a90635 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -23,6 +23,7 @@ using std::deque; #include "common/Cond.h" #include "common/Mutex.h" #include "common/Thread.h" +#include "common/Throttle.h" class FileJournal : public Journal { public: @@ -101,6 +102,9 @@ private: }; deque writeq; + // throttle + Throttle throttle_ops, throttle_bytes; + // write thread Mutex write_lock; Cond write_cond, write_empty_cond; @@ -117,9 +121,8 @@ private: void write_thread_entry(); bool check_for_full(__u64 seq, off64_t pos, off64_t size); - void prepare_multi_write(bufferlist& bl); - bool prepare_single_write(bufferlist& bl, off64_t& queue_pos); - bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos); + void prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytse); + bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_bytes); void do_write(bufferlist& bl); void write_bl(off64_t& pos, bufferlist& bl); @@ -163,6 +166,8 @@ private: void flush(); + void throttle(); + bool is_writeable() { return read_pos == 0; } diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index c1fe16e1e49cc..221c4cee17679 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -633,6 +633,9 @@ int FileStore::queue_transactions(list &tls, { if (journal && journal->is_writeable()) { if (g_conf.filestore_journal_parallel) { + + journal->throttle(); + __u64 op = op_journal_start(0); dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl; diff --git a/src/os/Journal.h b/src/os/Journal.h index 0584e66a2ba51..4b2051303c71a 100644 --- a/src/os/Journal.h +++ b/src/os/Journal.h @@ -38,6 +38,7 @@ public: virtual void close() = 0; virtual void flush() = 0; + virtual void throttle() = 0; void set_wait_on_full(bool b) { wait_on_full = b; } -- 2.39.5