batch_pop_write(items);
list<write_item>::iterator it = items.begin();
while (it != items.end()) {
+ uint64_t bytes = it->bl.length();
int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
if (r == 0) { // prepare ok, delete it
- items.erase(it++);
+ items.erase(it++);
+#ifdef HAVE_LIBAIO
+ {
+ Mutex::Locker locker(aio_lock);
+ assert(aio_write_queue_ops > 0);
+ aio_write_queue_ops--;
+ assert(aio_write_queue_bytes >= bytes);
+ aio_write_queue_bytes -= bytes;
+ }
+#endif
}
if (r == -ENOSPC) {
// the journal maybe full, insert the left item to writeq
while (aio_num > 0) {
int exp = MIN(aio_num * 2, 24);
long unsigned min_new = 1ull << exp;
- long unsigned cur = throttle_bytes.get_current();
+ uint64_t cur = aio_write_queue_bytes;
dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
<< " ... exp " << exp << " min_new " << min_new
<< " ... pending " << cur << dendl;
}
{
- Mutex::Locker l1(writeq_lock); // ** lock **
- Mutex::Locker l2(completions_lock); // ** lock **
+ Mutex::Locker l1(writeq_lock);
+#ifdef HAVE_LIBAIO
+ Mutex::Locker l2(aio_lock);
+#endif
+ Mutex::Locker l3(completions_lock);
+
+#ifdef HAVE_LIBAIO
+ aio_write_queue_ops++;
+ aio_write_queue_bytes += e.length();
+ aio_cond.Signal();
+#endif
+
completions.push_back(
completion_item(
seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
/**
* Implements journaling on top of block device or file.
*
- * Lock ordering is write_lock > aio_lock > finisher_lock
+ * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
*/
class FileJournal : public Journal {
public:
io_context_t aio_ctx;
list<aio_info> aio_queue;
int aio_num, aio_bytes;
+ uint64_t aio_write_queue_ops;
+ uint64_t aio_write_queue_bytes;
/// End protected by aio_lock
#endif
aio_lock("FileJournal::aio_lock"),
aio_ctx(0),
aio_num(0), aio_bytes(0),
+ aio_write_queue_ops(0),
+ aio_write_queue_bytes(0),
#endif
last_committed_seq(0),
journaled_since_start(0),