common/Semaphore.h\
common/Spinlock.h\
common/Thread.h\
+ common/Throttle.h\
common/Timer.h\
common/tls.h\
common/WorkQueue.h\
--- /dev/null
+#ifndef _CEPH_THROTTLE_H
+#define _CEPH_THROTTLE_H
+
+#include "Mutex.h"
+#include "Cond.h"
+
+class Throttle {
+ __u64 count, want, max;
+ Mutex lock;
+ Cond cond;
+
+public:
+ Throttle(__u64 m = 0) : count(0), max(m),
+ lock("Throttle::lock") {}
+
+private:
+ void _reset_max(__u64 m) {
+ if (m) {
+ if (m < max)
+ cond.SignalAll();
+ max = m;
+ }
+ }
+ bool _wait(__u64 c) {
+ bool waited = false;
+ while (max && count + c > max) {
+ waited = true;
+ cond.Wait(lock);
+ }
+ return waited;
+ }
+
+public:
+ __u64 get_current() {
+ Mutex::Locker l(lock);
+ return count;
+ }
+
+ bool wait(__u64 m = 0) {
+ Mutex::Locker l(lock);
+ _reset_max(m);
+ return _wait(0);
+ }
+
+ __u64 take(__u64 c = 1) {
+ Mutex::Locker l(lock);
+ count += c;
+ return count;
+ }
+
+ bool get(__u64 c = 1, __u64 m = 0) {
+ Mutex::Locker l(lock);
+ _reset_max(m);
+ bool waited = _wait(c);
+ count += c;
+ return waited;
+ }
+
+ __u64 put(__u64 c = 1) {
+ Mutex::Locker l(lock);
+ cond.SignalAll();
+ count -= c;
+ return count;
+ }
+};
+
+
+#endif
OPTION(journal_block_align, 0, OPT_BOOL, true),
OPTION(journal_max_write_bytes, 0, OPT_INT, 0),
OPTION(journal_max_write_entries, 0, OPT_INT, 100),
+ OPTION(journal_queue_max_ops, 0, OPT_INT, 500),
+ OPTION(journal_queue_max_bytes, 0, OPT_INT, 100 << 20),
OPTION(bdev_lock, 0, OPT_BOOL, true),
OPTION(bdev_iothreads, 0, OPT_INT, 1), // number of ios to queue with kernel
OPTION(bdev_idle_kick_after_ms, 0, OPT_INT, 100), // ms
bool journal_block_align;
int journal_max_write_bytes;
int journal_max_write_entries;
+ int journal_queue_max_ops;
+ int journal_queue_max_bytes;
// block device
bool bdev_lock;
writing_seq.push_back(writeq.front().seq);
writing_fin.push_back(writeq.front().fin);
}
+ throttle_ops.put(1);
+ throttle_bytes.put(writeq.front().bl.length());
writeq.pop_front();
}
print_header();
}
-void FileJournal::prepare_multi_write(bufferlist& bl)
+void FileJournal::prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytes)
{
// gather queued writes
off64_t queue_pos = write_pos;
return;
while (!writeq.empty()) {
- bool r = prepare_single_write(bl, queue_pos);
+ bool r = prepare_single_write(bl, queue_pos, orig_bytes);
if (!r)
break;
+ orig_ops++;
+
if (eleft) {
if (--eleft == 0) {
dout(20) << "prepare_multi_write hit max events per write " << g_conf.journal_max_write_entries << dendl;
//assert(write_pos + bl.length() == queue_pos);
}
-bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos)
+bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_size)
{
// grab next item
__u64 seq = writeq.front().seq;
if (!check_for_full(seq, queue_pos, size))
return false;
+ orig_size += ebl.length();
+
// add to write buffer
dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq
<< " len " << ebl.length() << " -> " << size
continue;
}
+ __u64 orig_ops = 0;
+ __u64 orig_bytes = 0;
+
bufferlist bl;
- prepare_multi_write(bl);
+ prepare_multi_write(bl, orig_ops, orig_bytes);
do_write(bl);
+
+ __u64 new_ops = throttle_ops.put(orig_ops);
+ __u64 new_bytes = throttle_bytes.put(orig_bytes);
+ dout(10) << "write_thread throttle finished " << orig_ops << " ops and "
+ << orig_bytes << " bytes, now "
+ << new_ops << " ops and " << new_bytes << " bytes"
+ << dendl;
}
write_empty_cond.Signal();
write_lock.Unlock();
dout(10) << "submit_entry seq " << seq
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
+
+ throttle_ops.take(1);
+ throttle_bytes.take(e.length());
if (!full_commit_seq && full_restart_seq &&
seq >= full_restart_seq) {
<< dendl;
if (writeq.front().fin)
finisher->queue(writeq.front().fin);
+ throttle_ops.put(1);
+ throttle_bytes.put(writeq.front().bl.length());
writeq.pop_front();
}
return true;
}
+
+void FileJournal::throttle()
+{
+ if (throttle_ops.wait(g_conf.journal_queue_max_ops))
+ dout(1) << "throttle: waited for ops" << dendl;
+ if (throttle_bytes.wait(g_conf.journal_queue_max_bytes))
+ dout(1) << "throttle: waited for bytes" << dendl;
+}
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/Thread.h"
+#include "common/Throttle.h"
class FileJournal : public Journal {
public:
};
deque<write_item> writeq;
+ // throttle
+ Throttle throttle_ops, throttle_bytes;
+
// write thread
Mutex write_lock;
Cond write_cond, write_empty_cond;
void write_thread_entry();
bool check_for_full(__u64 seq, off64_t pos, off64_t size);
- void prepare_multi_write(bufferlist& bl);
- bool prepare_single_write(bufferlist& bl, off64_t& queue_pos);
- bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos);
+ void prepare_multi_write(bufferlist& bl, __u64& orig_ops, __u64& orig_bytse);
+ bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, __u64& orig_bytes);
void do_write(bufferlist& bl);
void write_bl(off64_t& pos, bufferlist& bl);
void flush();
+ void throttle();
+
bool is_writeable() {
return read_pos == 0;
}
{
if (journal && journal->is_writeable()) {
if (g_conf.filestore_journal_parallel) {
+
+ journal->throttle();
+
__u64 op = op_journal_start(0);
dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
virtual void close() = 0;
virtual void flush() = 0;
+ virtual void throttle() = 0;
void set_wait_on_full(bool b) { wait_on_full = b; }