OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
OPTION(filestore_journal_parallel, 0, OPT_BOOL, false),
OPTION(filestore_journal_writeahead, 0, OPT_BOOL, false),
+ OPTION(filestore_queue_max_ops, 0, OPT_INT, 500),
+ OPTION(filestore_queue_max_bytes, 0, OPT_INT, 100 << 20),
OPTION(ebofs, 0, OPT_BOOL, false),
OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
OPTION(ebofs_verify, 0, OPT_BOOL, false),
bool filestore_sync_flush;
bool filestore_journal_parallel;
bool filestore_journal_writeahead;
+ int filestore_queue_max_ops;
+ int filestore_queue_max_bytes;
// ebofs
bool ebofs;
void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable,
Context *oncommit)
{
- op_lock.Lock();
Op *o = new Op;
- dout(10) << "queue_op " << o << " " << op_seq << dendl;
+
+ __u64 bytes = 0, ops = 0;
+ for (list<Transaction*>::iterator p = tls.begin();
+ p != tls.end();
+ p++) {
+ bytes += (*p)->get_num_bytes();
+ ops += (*p)->get_num_ops();
+ }
+
+ op_lock.Lock();
+
+ while ((g_conf.filestore_queue_max_ops && op_queue_len >= (unsigned)g_conf.filestore_queue_max_ops) ||
+ (g_conf.filestore_queue_max_bytes && op_queue_bytes >= (unsigned)g_conf.filestore_queue_max_bytes)) {
+ dout(2) << "queue_op " << o << " throttle: "
+ << op_queue_len << " > " << g_conf.filestore_queue_max_ops << " ops || "
+ << op_queue_bytes << " > " << g_conf.filestore_queue_max_bytes << dendl;
+ op_throttle_cond.Wait(op_lock);
+ }
+ op_queue_len++;
+ op_queue_bytes += bytes;
+
+ dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes" << dendl;
o->op = op_seq;
o->tls.swap(tls);
o->onreadable = onreadable;
o->oncommit = oncommit;
+ o->ops = ops;
+ o->bytes = bytes;
op_queue.push_back(o);
op_cond.Signal();
op_lock.Unlock();
op_finisher.queue(o->onreadable, r);
- delete o;
-
op_lock.Lock();
+
+ op_queue_len--;
+ op_queue_bytes -= o->bytes;
+ op_throttle_cond.Signal();
+ dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
+ << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
+ delete o;
}
op_empty_cond.Signal();
if (stop)
__u64 op;
list<Transaction*> tls;
Context *onreadable, *oncommit;
+ __u64 ops, bytes;
};
Finisher op_finisher;
Mutex op_lock;
- Cond op_cond, op_empty_cond;
+ Cond op_cond, op_empty_cond, op_throttle_cond;
list<Op*> op_queue;
+ __u64 op_queue_len, op_queue_bytes;
void op_entry();
struct OpThread : public Thread {
FileStore *fs;
collections(this), fake_collections(false),
lock("FileStore::lock"),
sync_epoch(0), stop(false), sync_thread(this),
- op_lock("FileStore::op_lock"), op_thread(this),
+ op_lock("FileStore::op_lock"), op_queue_len(0), op_queue_bytes(0), op_thread(this),
flusher_queue_len(0), flusher_thread(this) { }
int mount();