plugin::Api<I>& plugin_api)
: m_builder(builder),
m_write_log_guard(image_ctx.cct),
+ m_flush_guard(image_ctx.cct),
+ m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
"librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
}
template <typename I>
-Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
- bool invalidating) {
- CephContext *cct = m_image_ctx.cct;
+void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+ GuardedRequestFunctionContext *guarded_ctx) {
+ ldout(m_image_ctx.cct, 20) << dendl;
- ldout(cct, 20) << "" << dendl;
- ceph_assert(m_entry_reader_lock.is_locked());
- ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
- if (!m_flush_ops_in_flight ||
- (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
- m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
+ BlockExtent extent;
+ if (log_entry->is_sync_point()) {
+ extent = block_extent(whole_volume_extent());
+ } else {
+ extent = log_entry->ram_entry.block_extent();
}
- m_flush_ops_in_flight += 1;
- m_flush_ops_will_send += 1;
- /* For write same this is the bytes affected by the flush op, not the bytes transferred */
- m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
+
+ auto req = GuardedRequest(extent, guarded_ctx, false);
+ BlockGuardCell *cell = nullptr;
+
+ {
+ std::lock_guard locker(m_flush_guard_lock);
+ m_flush_guard.detain(req.block_extent, &req, &cell);
+ }
+ if (cell) {
+ req.guard_ctx->cell = cell;
+ m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+ }
+}
+
+template <typename I>
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+ bool invalidating) {
+ ldout(m_image_ctx.cct, 20) << "" << dendl;
/* Flush write completion action */
utime_t writeback_start_time = ceph_clock_now();
});
/* Flush through lower cache before completing */
ctx = new LambdaContext(
- [this, ctx](int r) {
+ [this, ctx, log_entry](int r) {
+ {
+
+ WriteLogGuard::BlockOperations block_reqs;
+ BlockGuardCell *detained_cell = nullptr;
+
+ std::lock_guard locker{m_flush_guard_lock};
+ m_flush_guard.release(log_entry->m_cell, &block_reqs);
+
+ for (auto &req : block_reqs) {
+ m_flush_guard.detain(req.block_extent, &req, &detained_cell);
+ if (detained_cell) {
+ req.guard_ctx->cell = detained_cell;
+ m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+ }
+ }
+ }
+
if (r < 0) {
lderr(m_image_ctx.cct) << "failed to flush log entry"
<< cpp_strerror(r) << dendl;
if (!has_write_entry)
has_write_entry = candidate->is_write_entry();
m_dirty_log_entries.pop_front();
+
+ // To track candidate, we should add m_flush_ops_in_flight in here
+ {
+ if (!m_flush_ops_in_flight ||
+ (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+ m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
+ }
+ m_flush_ops_in_flight += 1;
+ m_flush_ops_will_send += 1;
+ /* For write same this is the bytes affected by the flush op, not the bytes transferred */
+ m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
+ }
} else {
ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
break;
bool m_persist_on_write_until_flush = true;
+ pwl::WriteLogGuard m_flush_guard;
+ mutable ceph::mutex m_flush_guard_lock;
+
/* Debug counters for the places m_async_op_tracker is used */
std::atomic<int> m_async_complete_ops = {0};
std::atomic<int> m_async_null_flush_finish = {0};
void detain_guarded_request(C_BlockIORequestT *request,
pwl::GuardedRequestFunctionContext *guarded_ctx,
bool is_barrier);
-
void perf_start(const std::string name);
void perf_stop();
void log_perf();
std::shared_ptr<pwl::GenericLogEntry> log_entry) = 0;
Context *construct_flush_entry(
const std::shared_ptr<pwl::GenericLogEntry> log_entry, bool invalidating);
+ void detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+ GuardedRequestFunctionContext *guarded_ctx);
void process_writeback_dirty_entries();
bool can_retire_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
WriteLogCacheEntry *cache_entry = nullptr;
uint64_t log_entry_index = 0;
bool completed = false;
+ BlockGuardCell* m_cell = nullptr;
GenericLogEntry(uint64_t image_offset_bytes = 0, uint64_t write_bytes = 0)
: ram_entry(image_offset_bytes, write_bytes) {
};
bool invalidating = this->m_invalidating; // snapshot so we behave consistently
for (auto &log_entry : entries_to_flush) {
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
- if (!invalidating) {
- ctx = new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- this->m_flush_ops_will_send -= 1;
- }), 0);
- });
- }
- post_unlock.add(ctx);
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, log_entry, invalidating]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ this->m_flush_ops_will_send -= 1;
+ }), 0);
+ });
+ }
+
+ ctx->complete(0);
+ });
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
}
}
if (invalidating || !has_write_entry) {
for (auto &log_entry : entries_to_flush) {
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
- if (!invalidating) {
- ctx = new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- this->m_flush_ops_will_send -= 1;
- }), 0);
- });
- }
- post_unlock.add(ctx);
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, log_entry, invalidating]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext([this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ this->m_flush_ops_will_send -= 1;
+ }), 0);
+ });
+ }
+ ctx->complete(0);
+ });
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
}
} else {
int count = entries_to_flush.size();
- std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
std::vector<bufferlist *> read_bls;
- std::vector<Context *> contexts;
- log_entries.reserve(count);
+ write_entries.reserve(count);
read_bls.reserve(count);
- contexts.reserve(count);
for (auto &log_entry : entries_to_flush) {
- // log_entry already removed from m_dirty_log_entries and
- // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
- // We call this func here to make ops can track.
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
if (log_entry->is_write_entry()) {
bufferlist *bl = new bufferlist;
auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
write_entry->inc_bl_refs();
- log_entries.push_back(write_entry);
+ write_entries.push_back(write_entry);
read_bls.push_back(bl);
}
- contexts.push_back(ctx);
}
Context *ctx = new LambdaContext(
- [this, entries_to_flush, read_bls, contexts](int r) {
- int i = 0, j = 0;
+ [this, entries_to_flush, read_bls](int r) {
+ int i = 0;
+ GuardedRequestFunctionContext *guarded_ctx = nullptr;
for (auto &log_entry : entries_to_flush) {
- Context *ctx = contexts[j++];
-
if (log_entry->is_write_entry()) {
bufferlist captured_entry_bl;
-
captured_entry_bl.claim_append(*read_bls[i]);
delete read_bls[i++];
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
- auto captured_entry_bl = std::move(entry_bl);
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback_bl(this->m_image_writeback, ctx,
- std::move(captured_entry_bl));
- this->m_flush_ops_will_send -= 1;
- }), 0);
+ guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, false);
+
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+ auto captured_entry_bl = std::move(entry_bl);
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback_bl(this->m_image_writeback, ctx,
+ std::move(captured_entry_bl));
+ this->m_flush_ops_will_send -= 1;
+ }), 0);
+ });
} else {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- this->m_flush_ops_will_send -= 1;
- }), 0);
+ guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, false);
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ this->m_flush_ops_will_send -= 1;
+ }), 0);
+ });
}
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
}
});
- aio_read_data_blocks(log_entries, read_bls, ctx);
+ aio_read_data_blocks(write_entries, read_bls, ctx);
}
}