From a75a9a8178f42ba7363de4929af5da140b64c668 Mon Sep 17 00:00:00 2001 From: Xinze Chi Date: Fri, 11 Sep 2015 17:32:41 +0800 Subject: [PATCH] FileJournal: batch pop and unpop from writeq and completions Signed-off-by: Xinze Chi --- src/os/FileJournal.cc | 109 ++++++++++++++++++++++++++---------------- src/os/FileJournal.h | 17 +++++-- 2 files changed, 82 insertions(+), 44 deletions(-) diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 86d269b33f0b9..70591974fed9b 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -878,45 +878,57 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_ return -ENOSPC; while (!writeq_empty()) { - int r = prepare_single_write(bl, queue_pos, orig_ops, orig_bytes); - if (r == -ENOSPC) { - if (orig_ops) - break; // commit what we have - - if (logger) - logger->inc(l_os_j_full); - - if (wait_on_full) { - dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; - } else { - dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; - - // throw out what we have so far - full_state = FULL_FULL; - while (!writeq_empty()) { - put_throttle(1, peek_write().orig_len); - pop_write(); - } - print_header(header); + list items; + batch_pop_write(items); + list::iterator it = items.begin(); + while (it != items.end()) { + int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); + if (r == 0) { // prepare ok, delete it + items.erase(it++); } - - return -ENOSPC; // hrm, full on first op - } - - if (eleft) { - if (--eleft == 0) { - dout(20) << "prepare_multi_write hit max events per write " << g_conf->journal_max_write_entries << dendl; - break; + if (r == -ENOSPC) { + // the journal maybe full, insert the left item to writeq + batch_unpop_write(items); + if (orig_ops) + goto out; // commit what we have + + if (logger) + logger->inc(l_os_j_full); + + if (wait_on_full) { + dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; + } else { + dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; + + // throw out what we have so far + full_state = FULL_FULL; + while (!writeq_empty()) { + put_throttle(1, peek_write().orig_len); + pop_write(); + } + print_header(header); + } + + return -ENOSPC; // hrm, full on first op } - } - if (bmax) { - if (bl.length() >= bmax) { - dout(20) << "prepare_multi_write hit max write size " << g_conf->journal_max_write_bytes << dendl; - break; + if (eleft) { + if (--eleft == 0) { + dout(20) << "prepare_multi_write hit max events per write " << g_conf->journal_max_write_entries << dendl; + batch_unpop_write(items); + goto out; + } + } + if (bmax) { + if (bl.length() >= bmax) { + dout(20) << "prepare_multi_write hit max write size " << g_conf->journal_max_write_bytes << dendl; + batch_unpop_write(items); + goto out; + } } } } +out: dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; assert((write_pos + bl.length() == queue_pos) || (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); @@ -946,11 +958,13 @@ void FileJournal::queue_completions_thru(uint64_t seq) { assert(finisher_lock.is_locked()); utime_t now = ceph_clock_now(g_ceph_context); - while (!completions_empty()) { - completion_item next = completion_peek_front(); + list items; + batch_pop_completions(items); + list::iterator it = items.begin(); + while (it != items.end()) { + completion_item& next = *it; if (next.seq > seq) break; - completion_pop_front(); utime_t lat = now; lat -= next.start; dout(10) << "queue_completions_thru seq " << seq @@ -964,14 +978,15 @@ void FileJournal::queue_completions_thru(uint64_t seq) finisher->queue(next.finish); if (next.tracked_op) next.tracked_op->mark_event("journaled_completion_queued"); + items.erase(it++); } + batch_unpop_completions(items); finisher_cond.Signal(); } -int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) + +int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) { - // grab next item - write_item &next_write = peek_write(); uint64_t seq = next_write.seq; bufferlist &ebl = next_write.bl; off64_t size = ebl.length(); @@ -1010,8 +1025,6 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64 if (next_write.tracked_op) next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); - // pop from writeq - pop_write(); journalq.push_back(pair(seq, queue_pos)); writing_seq = seq; @@ -1666,6 +1679,20 @@ void FileJournal::pop_write() writeq.pop_front(); } +void FileJournal::batch_pop_write(list &items) +{ + assert(write_lock.is_locked()); + Mutex::Locker locker(writeq_lock); + writeq.swap(items); +} + +void FileJournal::batch_unpop_write(list &items) +{ + assert(write_lock.is_locked()); + Mutex::Locker locker(writeq_lock); + writeq.splice(writeq.begin(), items); +} + void FileJournal::commit_start(uint64_t seq) { dout(10) << "commit_start" << dendl; diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index 50bc8109b9e93..602a034805754 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -66,17 +66,27 @@ public: Mutex writeq_lock; Cond writeq_cond; - deque writeq; + list writeq; bool writeq_empty(); write_item &peek_write(); void pop_write(); + void batch_pop_write(list &items); + void batch_unpop_write(list &items); Mutex completions_lock; - deque completions; + list completions; bool completions_empty() { Mutex::Locker l(completions_lock); return completions.empty(); } + void batch_pop_completions(list &items) { + Mutex::Locker l(completions_lock); + completions.swap(items); + } + void batch_unpop_completions(list &items) { + Mutex::Locker l(completions_lock); + completions.splice(completions.begin(), items); + } completion_item completion_peek_front() { Mutex::Locker l(completions_lock); assert(!completions.empty()); @@ -311,7 +321,8 @@ private: int check_for_full(uint64_t seq, off64_t pos, off64_t size); int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee); - int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes); + int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, + uint64_t& orig_ops, uint64_t& orig_bytes); void do_write(bufferlist& bl); void write_finish_thread_entry(); -- 2.39.5