});
/* Complete all in-flight writes before shutting down */
ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
- internal_flush(ctx);
+ internal_flush(false, ctx);
}
}
ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source) {
- internal_flush(on_finish);
+ internal_flush(false, on_finish);
return;
}
m_perfcounter->inc(l_librbd_rwl_aio_flush, 1);
template <typename I>
void ReplicatedWriteLog<I>::flush(Context *on_finish) {
- internal_flush(on_finish);
+ internal_flush(false, on_finish);
}
template <typename I>
void ReplicatedWriteLog<I>::invalidate(Context *on_finish) {
+ internal_flush(true, on_finish);
}
template <typename I>
std::lock_guard locker(m_lock);
m_wake_up_requested = false;
}
- if (m_alloc_failed_since_retire ||
+ if (m_alloc_failed_since_retire || m_invalidating ||
m_bytes_allocated > high_water_bytes ||
(m_log_entries.size() > high_water_entries)) {
int retired = 0;
<< ", allocated_entries > high_water="
<< (m_log_entries.size() > high_water_entries)
<< dendl;
- while (m_alloc_failed_since_retire ||
+ while (m_alloc_failed_since_retire || m_invalidating ||
(m_bytes_allocated > high_water_bytes) ||
(m_log_entries.size() > high_water_entries) ||
(((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) &&
(utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
- if (!retire_entries((m_shutting_down ||
+ if (!retire_entries((m_shutting_down || m_invalidating ||
(m_bytes_allocated > aggressive_high_water_bytes) ||
(m_log_entries.size() > aggressive_high_water_entries))
? MAX_ALLOC_PER_TRANSACTION
ldout(cct, 20) << "" << dendl;
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (m_invalidating) {
+ return true;
+ }
+
/* For OWB we can flush entries with the same sync gen number (write between
* aio_flush() calls) concurrently. Here we'll consider an entry flushable if
* its sync gen number is <= the lowest sync gen number carried by all the
template <typename I>
Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericLogEntry> log_entry) {
- //TODO handle invalidate in later PRs
CephContext *cct = m_image_ctx.cct;
+ bool invalidating = m_invalidating; // snapshot so we behave consistently
ldout(cct, 20) << "" << dendl;
ceph_assert(m_entry_reader_lock.is_locked());
/* Flush write completion action */
Context *ctx = new LambdaContext(
- [this, log_entry](int r) {
+ [this, log_entry, invalidating](int r) {
{
std::lock_guard locker(m_lock);
if (r < 0) {
m_bytes_dirty -= log_entry->bytes_dirty();
sync_point_writer_flushed(log_entry->get_sync_point_entry());
ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
+ << " invalidating=" << invalidating
<< dendl;
}
m_flush_ops_in_flight -= 1;
}
});
+ if (invalidating) {
+ return ctx;
+ }
return new LambdaContext(
[this, log_entry, ctx](int r) {
m_image_ctx.op_work_queue->queue(new LambdaContext(
}
template <typename I>
-void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
+void ReplicatedWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
+ ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
+
+ if (m_perfcounter) {
+ if (invalidate) {
+ m_perfcounter->inc(l_librbd_rwl_invalidate_cache, 1);
+ } else {
+ m_perfcounter->inc(l_librbd_rwl_flush, 1);
+ }
+ }
/* May be called even if initialization fails */
if (!m_initialized) {
* results. */
GuardedRequestFunctionContext *guarded_ctx =
new GuardedRequestFunctionContext(
- [this, on_finish](GuardedRequestFunctionContext &guard_ctx) {
+ [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
DeferredContexts on_exit;
ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
ceph_assert(guard_ctx.cell);
Context *ctx = new LambdaContext(
- [this, cell=guard_ctx.cell, on_finish](int r) {
+ [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
std::lock_guard locker(m_lock);
- ldout(m_image_ctx.cct, 6) << "Done flush" << dendl;
+ m_invalidating = false;
+ ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
+ << invalidate << ")" << dendl;
if (m_log_entries.size()) {
ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
<< m_log_entries.size() << ", "
<< "front()=" << *m_log_entries.front()
<< dendl;
}
+ if (invalidate) {
+ ceph_assert(m_log_entries.size() == 0);
+ }
ceph_assert(m_dirty_log_entries.size() == 0);
m_image_ctx.op_work_queue->queue(on_finish, r);
release_guarded_request(cell);
});
ctx = new LambdaContext(
- [this, ctx](int r) {
+ [this, ctx, invalidate](int r) {
Context *next_ctx = ctx;
if (r < 0) {
/* Override on_finish status with this error */
ctx->complete(r);
});
}
- {
- std::lock_guard locker(m_lock);
- ceph_assert(m_dirty_log_entries.size() == 0);
+ if (invalidate) {
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ ceph_assert(!m_invalidating);
+ ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
+ m_invalidating = true;
+ }
+ /* Discards all RWL entries */
+ while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
+ next_ctx->complete(0);
+ } else {
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ ceph_assert(!m_invalidating);
+ }
+ m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
}
- m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
});
ctx = new LambdaContext(
[this, ctx](int r) {