m_periodic_stats_enabled = m_cache_state->log_periodic_stats;
/* Do these after we drop lock */
later.add(new LambdaContext([this](int r) {
- if (m_periodic_stats_enabled) {
- /* Log stats for the first time */
- periodic_stats();
- /* Arm periodic stats logging for the first time */
- std::lock_guard timer_locker(*m_timer_lock);
- arm_periodic_stats();
- }
- }));
+ if (m_periodic_stats_enabled) {
+ /* Log stats for the first time */
+ periodic_stats();
+ /* Arm periodic stats logging for the first time */
+ std::lock_guard timer_locker(*m_timer_lock);
+ arm_periodic_stats();
+ }
+ }));
m_image_ctx.op_work_queue->queue(on_finish, 0);
}
template <typename I>
void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
- // Here we only close pmem pool file and remove the pool file.
- // TODO: We'll continue to update this part in later PRs.
- if (m_log_pool) {
- ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
- pmemobj_close(m_log_pool);
- }
- if (m_log_is_poolset) {
- ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
+
+ Context *ctx = new LambdaContext(
+ [this, on_finish](int r) {
+ ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
+ m_image_ctx.op_work_queue->queue(on_finish, r);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = override_ctx(r, ctx);
+ bool periodic_stats_enabled = m_periodic_stats_enabled;
+ m_periodic_stats_enabled = false;
+
+ if (periodic_stats_enabled) {
+ /* Log stats one last time if they were enabled */
+ periodic_stats();
+ }
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ m_cache_state->clean = true;
+ m_log_entries.clear();
+ if (m_log_pool) {
+ ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+ pmemobj_close(m_log_pool);
+ }
+ if (m_cache_state->clean) {
+ if (m_log_is_poolset) {
+ ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << m_log_pool_name << dendl;
+ if (remove(m_log_pool_name.c_str()) != 0) {
+ lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << m_log_pool_name << "\": "
+ << pmemobj_errormsg() << dendl;
+ } else {
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ m_cache_state->present = false;
+ }
+ }
+ } else {
+ if (m_log_is_poolset) {
+ ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << m_log_pool_name << dendl;
+ }
+ }
+ if (m_perfcounter) {
+ perf_stop();
+ }
+ }
+ update_image_cache_state(next_ctx);
+ });
+ if (m_first_free_entry == m_first_valid_entry) { //if the log entries are free.
+ m_image_ctx.op_work_queue->queue(ctx, 0);
} else {
- ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
- << m_log_pool_name << dendl;
- if (remove(m_log_pool_name.c_str()) != 0) {
- lderr(m_image_ctx.cct) << "failed to remove empty pool \""
- << m_log_pool_name << "\": "
- << pmemobj_errormsg() << dendl;
- }
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = override_ctx(r, ctx);
+ {
+ /* Sync with process_writeback_dirty_entries() */
+ RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
+ m_shutting_down = true;
+ /* Flush all writes to OSDs (unless disabled) and wait for all
+ in-progress flush writes to complete */
+ ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
+ if (m_periodic_stats_enabled) {
+ periodic_stats();
+ }
+ }
+ flush_dirty_entries(next_ctx);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = override_ctx(r, ctx);
+ ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
+ // Wait for in progress IOs to complete
+ next_ctx = util::create_async_context_callback(m_image_ctx, next_ctx);
+ m_async_op_tracker.wait_for_ops(next_ctx);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
+ m_work_queue.queue(ctx, r);
+ });
+ /* Complete all in-flight writes before shutting down */
+ ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
+ internal_flush(ctx);
}
- on_finish->complete(0);
}
template <typename I>
alloc_and_dispatch_io_req(write_req);
});
- detain_guarded_request(write_req, guarded_ctx);
+ detain_guarded_request(write_req, guarded_ctx, false);
}
template <typename I>
Context *on_finish) {
}
+/**
+ * Aio_flush completes when all previously completed writes are
+ * flushed to persistent cache. We make a best-effort attempt to also
+ * defer until all in-progress writes complete, but we may not know
+ * about all of the writes the application considers in-progress yet,
+ * due to uncertainty in the IO submission workq (multiple WQ threads
+ * may allow out-of-order submission).
+ *
+ * This flush operation will not wait for writes deferred for overlap
+ * in the block guard.
+ */
template <typename I>
-void ReplicatedWriteLog<I>::aio_flush(Context *on_finish) {
+void ReplicatedWriteLog<I>::aio_flush(io::FlushSource flush_source, Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
+
+ if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source) {
+ internal_flush(on_finish);
+ return;
+ }
+ m_perfcounter->inc(l_librbd_rwl_aio_flush, 1);
+
+ /* May be called even if initialization fails */
+ if (!m_initialized) {
+ ldout(cct, 05) << "never initialized" << dendl;
+ /* Deadlock if completed here */
+ m_image_ctx.op_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ {
+ std::shared_lock image_locker(m_image_ctx.image_lock);
+ if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+ on_finish->complete(-EROFS);
+ return;
+ }
+ }
+
+ auto flush_req = make_flush_req(on_finish);
+
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
+ ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
+ ceph_assert(guard_ctx.cell);
+ flush_req->detained = guard_ctx.state.detained;
+ /* We don't call flush_req->set_cell(), because the block guard will be released here */
+ {
+ DeferredContexts post_unlock; /* Do these when the lock below is released */
+ std::lock_guard locker(m_lock);
+
+ if (!m_persist_on_flush && m_persist_on_write_until_flush) {
+ m_persist_on_flush = true;
+ ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
+ }
+
+ /*
+ * Create a new sync point if there have been writes since the last
+ * one.
+ *
+ * We do not flush the caches below the RWL here.
+ */
+ flush_new_sync_point_if_needed(flush_req, post_unlock);
+ }
+
+ release_guarded_request(guard_ctx.cell);
+ });
+
+ detain_guarded_request(flush_req, guarded_ctx, true);
}
template <typename I>
template <typename I>
void ReplicatedWriteLog<I>::detain_guarded_request(
- C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx)
+ C_BlockIORequestT *request,
+ GuardedRequestFunctionContext *guarded_ctx,
+ bool is_barrier)
{
BlockExtent extent;
- bool is_barrier = false;
if (request) {
extent = request->image_extents_summary.block_extent();
} else {
extent = block_extent(whole_volume_extent());
- is_barrier = true;
}
auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
BlockGuardCell *cell = nullptr;
<< cpp_strerror(r) << dendl;
ctx->complete(r);
} else {
- m_image_writeback.aio_flush(ctx);
+ m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
}
});
std::lock_guard locker(m_lock);
ceph_assert(m_dirty_log_entries.size() == 0);
}
- m_image_writeback.aio_flush(next_ctx);
+ m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
});
ctx = new LambdaContext(
[this, ctx](int r) {
auto flush_req = make_flush_req(ctx);
flush_new_sync_point_if_needed(flush_req, on_exit);
});
- detain_guarded_request(nullptr, guarded_ctx);
+ detain_guarded_request(nullptr, guarded_ctx, true);
}
} // namespace cache