}
void ObjectRecorder::send_appends_aio() {
- AppendBuffers *append_buffers;
- uint64_t append_tid;
+ librados::ObjectWriteOperation op;
+ client::guard_append(&op, m_soft_max_size);
+ C_Gather *gather_ctx;
{
Mutex::Locker locker(*m_lock);
- append_tid = m_append_tid++;
+ uint64_t append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);
- // safe to hold pointer outside lock until op is submitted
- append_buffers = &m_in_flight_appends[append_tid];
- append_buffers->swap(m_pending_buffers);
- }
-
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
- << append_tid << dendl;
- C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
- C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
-
- librados::ObjectWriteOperation op;
- client::guard_append(&op, m_soft_max_size);
- for (AppendBuffers::iterator it = append_buffers->begin();
- it != append_buffers->end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
- << dendl;
- op.append(it->second);
- op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
+ << append_tid << dendl;
+
+ gather_ctx = new C_Gather(m_cct, new C_AppendFlush(this, append_tid));
+ auto append_buffers = &m_in_flight_appends[append_tid];
+
+ for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
+ ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl;
+ op.append(it->second);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+ m_aio_sent_size += it->second.length();
+ append_buffers->push_back(*it);
+ it = m_pending_buffers.erase(it);
+ if (m_aio_sent_size >= m_soft_max_size) {
+ break;
+ }
+ }
}
librados::AioCompletion *rados_completion =