*/
int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
{
- Mutex::Locker locker(aio_lock);
align_bl(pos, bl);
dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
bufferlist tbl;
bl.splice(0, len, &tbl); // move bytes from bl -> tbl
+ // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
+ // modified in check_aio_completion
+ aio_lock.Lock();
aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
aio_info& aio = aio_queue.back();
aio.iov = iov;
aio_num++;
aio_bytes += aio.len;
+
+ // need to save current aio len to update write_pos later because current
+ // aio could be ereased from aio_queue once it is done
+ uint64_t cur_len = aio.len;
+ // unlock aio_lock because following io_submit might take time to return
+ aio_lock.Unlock();
iocb *piocb = &aio.iocb;
int attempts = 10;
do {
int r = io_submit(aio_ctx, 1, &piocb);
+ dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
if (r < 0) {
- derr << "io_submit to " << aio.off << "~" << aio.len
+ derr << "io_submit to " << aio.off << "~" << cur_len
<< " got " << cpp_strerror(r) << dendl;
if (r == -EAGAIN && attempts-- > 0) {
usleep(500);
break;
}
} while (true);
- pos += aio.len;
+ pos += cur_len;
}
+ aio_lock.Lock();
write_finish_cond.Signal();
+ aio_lock.Unlock();
return 0;
}
#endif