#ifdef HAVE_LIBAIO
Mutex::Locker q(aio_lock);
#endif
- Mutex::Locker p(queue_lock);
+ Mutex::Locker p(writeq_lock);
write_stop = true;
write_cond.Signal();
- queue_cond.Signal();
+ writeq_cond.Signal();
#ifdef HAVE_LIBAIO
aio_cond.Signal();
write_finish_cond.Signal();
{
assert(queue_lock.is_locked());
utime_t now = ceph_clock_now(g_ceph_context);
- while (!completions.empty() &&
- completions.front().seq <= seq) {
+ while (!completions_empty()) {
+ completion_item next = completion_peek_front();
+ if (next.seq > seq)
+ break;
+ completion_pop_front();
utime_t lat = now;
- lat -= completions.front().start;
+ lat -= next.start;
dout(10) << "queue_completions_thru seq " << seq
- << " queueing seq " << completions.front().seq
- << " " << completions.front().finish
+ << " queueing seq " << next.seq
+ << " " << next.finish
<< " lat " << lat << dendl;
if (logger) {
logger->finc(l_os_j_lat, lat);
}
- if (completions.front().finish)
- finisher->queue(completions.front().finish);
- if (completions.front().tracked_op)
- completions.front().tracked_op->mark_event("journaled_completion_queued");
- completions.pop_front();
+ if (next.finish)
+ finisher->queue(next.finish);
+ if (next.tracked_op)
+ next.tracked_op->mark_event("journaled_completion_queued");
}
queue_cond.Signal();
}
dout(5) << "waiting for completions to empty" << dendl;
{
Mutex::Locker l(queue_lock);
- while (!completions.empty())
+ while (!completions_empty())
queue_cond.Wait(queue_lock);
}
dout(5) << "flush waiting for finisher" << dendl;
dout(10) << "write_thread_entry start" << dendl;
while (1) {
{
- Mutex::Locker locker(queue_lock);
+ Mutex::Locker locker(writeq_lock);
if (writeq.empty()) {
if (write_stop)
break;
dout(20) << "write_thread_entry going to sleep" << dendl;
- {
- if (writeq.empty()) {
- queue_cond.Wait(queue_lock);
- }
- }
+ writeq_cond.Wait(writeq_lock);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
Context *oncommit, TrackedOpRef osd_op)
{
- Mutex::Locker locker(queue_lock); // ** lock **
-
// dump on queue
dout(5) << "submit_entry seq " << seq
- << " len " << e.length()
- << " (" << oncommit << ")" << dendl;
+ << " len " << e.length()
+ << " (" << oncommit << ")" << dendl;
assert(e.length() > 0);
- completions.push_back(
- completion_item(
- seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
-
- if (full_state == FULL_NOTFULL) {
- if (osd_op)
- osd_op->mark_event("commit_queued_for_journal_write");
- // queue and kick writer thread
- dout(30) << "XXX throttle take " << e.length() << dendl;
- throttle_ops.take(1);
- throttle_bytes.take(e.length());
-
- if (logger) {
- logger->set(l_os_jq_max_ops, throttle_ops.get_max());
- logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
- logger->set(l_os_jq_ops, throttle_ops.get_current());
- logger->set(l_os_jq_bytes, throttle_bytes.get_current());
- }
+ dout(30) << "XXX throttle take " << e.length() << dendl;
+ throttle_ops.take(1);
+ throttle_bytes.take(e.length());
+ if (osd_op)
+ osd_op->mark_event("commit_queued_for_journal_write");
+ if (logger) {
+ logger->set(l_os_jq_max_ops, throttle_ops.get_max());
+ logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
+ logger->set(l_os_jq_ops, throttle_ops.get_current());
+ logger->set(l_os_jq_bytes, throttle_bytes.get_current());
+ }
+ {
+ Mutex::Locker l1(writeq_lock); // ** lock **
+ Mutex::Locker l2(completions_lock); // ** lock **
+ completions.push_back(
+ completion_item(
+ seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
writeq.push_back(write_item(seq, e, alignment, osd_op));
- queue_cond.Signal();
- } else {
- if (osd_op)
- osd_op->mark_event("commit_blocked_by_journal_full");
- // not journaling this. restart writing no sooner than seq + 1.
- dout(10) << " journal is/was full" << dendl;
+ writeq_cond.Signal();
}
}
bool FileJournal::writeq_empty()
{
- Mutex::Locker locker(queue_lock);
+ Mutex::Locker locker(writeq_lock);
return writeq.empty();
}
FileJournal::write_item &FileJournal::peek_write()
{
assert(write_lock.is_locked());
- Mutex::Locker locker(queue_lock);
+ Mutex::Locker locker(writeq_lock);
return writeq.front();
}
void FileJournal::pop_write()
{
assert(write_lock.is_locked());
- Mutex::Locker locker(queue_lock);
+ Mutex::Locker locker(writeq_lock);
writeq.pop_front();
}
}
write_item() : seq(0), alignment(0) {}
};
+
Mutex queue_lock;
Cond queue_cond;
uint64_t journaled_seq;
bool plug_journal_completions;
+
+ Mutex writeq_lock;
+ Cond writeq_cond;
deque<write_item> writeq;
- deque<completion_item> completions;
bool writeq_empty();
write_item &peek_write();
void pop_write();
+
+ Mutex completions_lock;
+ deque<completion_item> completions;
+ bool completions_empty() {
+ Mutex::Locker l(completions_lock);
+ return completions.empty();
+ }
+ completion_item completion_peek_front() {
+ Mutex::Locker l(completions_lock);
+ assert(!completions.empty());
+ return completions.front();
+ }
+ void completion_pop_front() {
+ Mutex::Locker l(completions_lock);
+ assert(!completions.empty());
+ completions.pop_front();
+ }
+
void submit_entry(uint64_t seq, bufferlist& bl, int alignment,
Context *oncommit,
TrackedOpRef osd_op = TrackedOpRef());
queue_lock("FileJournal::queue_lock", false, true, false, g_ceph_context),
journaled_seq(0),
plug_journal_completions(false),
+ writeq_lock("FileJournal::writeq_lock", false, true, false, g_ceph_context),
+ completions_lock(
+ "FileJournal::completions_lock", false, true, false, g_ceph_context),
fn(f),
zero_buf(NULL),
max_size(0), block_size(0),