CephContext *cct = m_image_ctx.cct;
bool all_clean = false;
int flushed = 0;
+ bool has_write_entry = false;
ldout(cct, 20) << "Look for dirty entries" << dendl;
{
DeferredContexts post_unlock;
+ GenericLogEntries entries_to_flush;
+
std::shared_lock entry_reader_locker(m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
- std::lock_guard locker(m_lock);
if (m_shutting_down) {
ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
/* Do flush complete only when all flush ops are finished */
auto candidate = m_dirty_log_entries.front();
bool flushable = can_flush_entry(candidate);
if (flushable) {
- post_unlock.add(construct_flush_entry_ctx(candidate));
+ entries_to_flush.push_back(candidate);
flushed++;
+ if (!has_write_entry)
+ has_write_entry = candidate->is_write_entry();
m_dirty_log_entries.pop_front();
} else {
ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
break;
}
}
+
+ construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
}
if (all_clean) {
}
template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
- std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) {
bool invalidating = this->m_invalidating; // snapshot so we behave consistently
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
- if (invalidating) {
- return ctx;
- }
- return new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
+ for (auto &log_entry : entries_to_flush) {
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext(
[this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- }), 0);
- });
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+ }
+ post_unlock.add(ctx);
+ }
}
const unsigned long int ops_flushed_together = 4;
}
template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
- std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) {
// snapshot so we behave consistently
bool invalidating = this->m_invalidating;
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
- if (invalidating) {
- return ctx;
- }
- if (log_entry->is_write_entry()) {
- bufferlist *read_bl_ptr = new bufferlist;
- ctx = new LambdaContext(
- [this, log_entry, read_bl_ptr, ctx](int r) {
- bufferlist captured_entry_bl;
- captured_entry_bl.claim_append(*read_bl_ptr);
- delete read_bl_ptr;
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, entry_bl=move(captured_entry_bl), ctx](int r) {
- auto captured_entry_bl = std::move(entry_bl);
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback_bl(this->m_image_writeback, ctx,
- std::move(captured_entry_bl));
- }), 0);
- });
- ctx = new LambdaContext(
- [this, log_entry, read_bl_ptr, ctx](int r) {
- auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
- write_entry->inc_bl_refs();
- aio_read_data_block(std::move(write_entry), read_bl_ptr, ctx);
- });
- return ctx;
+ if (invalidating || !has_write_entry) {
+ for (auto &log_entry : entries_to_flush) {
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+ }
+ post_unlock.add(ctx);
+ }
} else {
- return new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- }), 0);
+ int count = entries_to_flush.size();
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+ std::vector<bufferlist *> read_bls;
+ std::vector<Context *> contexts;
+
+ log_entries.reserve(count);
+ read_bls.reserve(count);
+ contexts.reserve(count);
+
+ for (auto &log_entry : entries_to_flush) {
+ // log_entry already removed from m_dirty_log_entries and
+ // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
+ // We call this func here to make ops can track.
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+ if (log_entry->is_write_entry()) {
+ bufferlist *bl = new bufferlist;
+ auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
+ write_entry->inc_bl_refs();
+ log_entries.push_back(write_entry);
+ read_bls.push_back(bl);
+ }
+ contexts.push_back(ctx);
+ }
+
+ Context *ctx = new LambdaContext(
+ [this, entries_to_flush, read_bls, contexts](int r) {
+ int i = 0, j = 0;
+
+ for (auto &log_entry : entries_to_flush) {
+ Context *ctx = contexts[j++];
+
+ if (log_entry->is_write_entry()) {
+ bufferlist captured_entry_bl;
+
+ captured_entry_bl.claim_append(*read_bls[i]);
+ delete read_bls[i++];
+
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+ auto captured_entry_bl = std::move(entry_bl);
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback_bl(this->m_image_writeback, ctx,
+ std::move(captured_entry_bl));
+ }), 0);
+ } else {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ }
+ }
});
+
+ aio_read_data_blocks(log_entries, read_bls, ctx);
}
}