From c736e81d2712131ce017f7470c76fc21e1d8a9ef Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 2 Feb 2010 13:29:56 -0800 Subject: [PATCH] filestore: throttle op queue --- src/config.cc | 2 ++ src/config.h | 2 ++ src/os/FileStore.cc | 35 +++++++++++++++++++++++++++++++---- src/os/FileStore.h | 6 ++++-- 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/config.cc b/src/config.cc index a726d9e18fe8e..94660a169c88c 100644 --- a/src/config.cc +++ b/src/config.cc @@ -531,6 +531,8 @@ static struct config_option config_optionsp[] = { OPTION(filestore_sync_flush, 0, OPT_BOOL, false), OPTION(filestore_journal_parallel, 0, OPT_BOOL, false), OPTION(filestore_journal_writeahead, 0, OPT_BOOL, false), + OPTION(filestore_queue_max_ops, 0, OPT_INT, 500), + OPTION(filestore_queue_max_bytes, 0, OPT_INT, 100 << 20), OPTION(ebofs, 0, OPT_BOOL, false), OPTION(ebofs_cloneable, 0, OPT_BOOL, true), OPTION(ebofs_verify, 0, OPT_BOOL, false), diff --git a/src/config.h b/src/config.h index 2570afacdd568..874a36512d0fb 100644 --- a/src/config.h +++ b/src/config.h @@ -342,6 +342,8 @@ struct md_config_t { bool filestore_sync_flush; bool filestore_journal_parallel; bool filestore_journal_writeahead; + int filestore_queue_max_ops; + int filestore_queue_max_bytes; // ebofs bool ebofs; diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index e96a56cbcaf0e..c1fe16e1e49cc 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -534,13 +534,35 @@ int FileStore::umount() void FileStore::queue_op(__u64 op_seq, list& tls, Context *onreadable, Context *oncommit) { - op_lock.Lock(); Op *o = new Op; - dout(10) << "queue_op " << o << " " << op_seq << dendl; + + __u64 bytes = 0, ops = 0; + for (list::iterator p = tls.begin(); + p != tls.end(); + p++) { + bytes += (*p)->get_num_bytes(); + ops += (*p)->get_num_ops(); + } + + op_lock.Lock(); + + while ((g_conf.filestore_queue_max_ops && op_queue_len >= (unsigned)g_conf.filestore_queue_max_ops) || + (g_conf.filestore_queue_max_bytes && op_queue_bytes >= (unsigned)g_conf.filestore_queue_max_bytes)) { + dout(2) << "queue_op " << o << " throttle: " + << op_queue_len << " > " << g_conf.filestore_queue_max_ops << " ops || " + << op_queue_bytes << " > " << g_conf.filestore_queue_max_bytes << dendl; + op_throttle_cond.Wait(op_lock); + } + op_queue_len++; + op_queue_bytes += bytes; + + dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes" << dendl; o->op = op_seq; o->tls.swap(tls); o->onreadable = onreadable; o->oncommit = oncommit; + o->ops = ops; + o->bytes = bytes; op_queue.push_back(o); op_cond.Signal(); op_lock.Unlock(); @@ -564,9 +586,14 @@ void FileStore::op_entry() op_finisher.queue(o->onreadable, r); - delete o; - op_lock.Lock(); + + op_queue_len--; + op_queue_bytes -= o->bytes; + op_throttle_cond.Signal(); + dout(10) << "op_entry finished " << o->bytes << " bytes, queue now " + << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl; + delete o; } op_empty_cond.Signal(); if (stop) diff --git a/src/os/FileStore.h b/src/os/FileStore.h index fcb8d2bbb7166..a467450c424c7 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -85,12 +85,14 @@ class FileStore : public JournalingObjectStore { __u64 op; list tls; Context *onreadable, *oncommit; + __u64 ops, bytes; }; Finisher op_finisher; Mutex op_lock; - Cond op_cond, op_empty_cond; + Cond op_cond, op_empty_cond, op_throttle_cond; list op_queue; + __u64 op_queue_len, op_queue_bytes; void op_entry(); struct OpThread : public Thread { FileStore *fs; @@ -130,7 +132,7 @@ class FileStore : public JournalingObjectStore { collections(this), fake_collections(false), lock("FileStore::lock"), sync_epoch(0), stop(false), sync_thread(this), - op_lock("FileStore::op_lock"), op_thread(this), + op_lock("FileStore::op_lock"), op_queue_len(0), op_queue_bytes(0), op_thread(this), flusher_queue_len(0), flusher_thread(this) { } int mount(); -- 2.39.5