<< m_log_pool_name << dendl;
if (remove(m_log_pool_name.c_str()) != 0) {
lderr(m_image_ctx.cct) << "failed to remove empty pool \""
- << m_log_pool_name << "\": "
+ << m_log_pool_name << "\": "
<< pmemobj_errormsg() << dendl;
}
}
template <typename I>
void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
uint32_t discard_granularity_bytes,
- Context *on_finish) {
+ Context *on_finish) {
}
template <typename I>
template <typename I>
void ReplicatedWriteLog<I>::flush(Context *on_finish) {
+ internal_flush(on_finish);
}
template <typename I>
void ReplicatedWriteLog<I>::detain_guarded_request(
C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx)
{
- //TODO: add is_barrier for flush request in later PRs
- auto req = GuardedRequest(request->image_extents_summary.block_extent(), guarded_ctx);
+ BlockExtent extent;
+ bool is_barrier = false;
+ if (request) {
+ extent = request->image_extents_summary.block_extent();
+ } else {
+ extent = block_extent(whole_volume_extent());
+ is_barrier = true;
+ }
+ auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
BlockGuardCell *cell = nullptr;
ldout(m_image_ctx.cct, 20) << dendl;
ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
ops_remain = true; /* Always check again before leaving */
ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
- << m_ops_to_append.size() << " remain" << dendl;
+ << m_ops_to_append.size() << " remain" << dendl;
} else {
ops_remain = false;
if (appending) {
ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
ops_remain = !m_ops_to_flush.empty();
ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
- << m_ops_to_flush.size() << " remain" << dendl;
+ << m_ops_to_flush.size() << " remain" << dendl;
} else {
ops_remain = false;
}
ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
<< "start address="
- << ops.front()->get_log_entry()->pmem_entry << " "
+ << ops.front()->get_log_entry()->pmem_entry << " "
<< "bytes="
- << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+ << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
<< dendl;
pmemobj_flush(m_log_pool,
ops.front()->get_log_entry()->pmem_entry,
ldout(m_image_ctx.cct, 20) << "APPENDING: index="
<< operation->get_log_entry()->log_entry_index << " "
<< "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
- << "]" << dendl;
+ << "]" << dendl;
entries_to_flush.push_back(operation);
}
flush_op_log_entries(entries_to_flush);
}
op->complete(result);
m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t,
- op->log_append_time - op->dispatch_time);
+ op->log_append_time - op->dispatch_time);
m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist,
- utime_t(now - op->dispatch_time).to_nsec(),
+ utime_t(now - op->dispatch_time).to_nsec(),
log_entry->ram_entry.write_bytes);
utime_t app_lat = op->log_append_comp_time - op->log_append_time;
m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat);
if (!req->has_io_waited_for_buffers()) {
req->set_io_waited_for_entries(true);
ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
- << m_bytes_allocated_cap
+ << m_bytes_allocated_cap
<< ", allocated=" << m_bytes_allocated
<< ") in write [" << *req << "]" << dendl;
}
ldout(cct, 20) << "" << dendl;
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
- //TODO handle invalidate
-
/* For OWB we can flush entries with the same sync gen number (write between
* aio_flush() calls) concurrently. Here we'll consider an entry flushable if
* its sync gen number is <= the lowest sync gen number carried by all the
}
if (all_clean) {
- // TODO: all flusing complete
+ /* All flushing complete, drain outside lock */
+ Contexts flush_contexts;
+ {
+ std::lock_guard locker(m_lock);
+ flush_contexts.swap(m_flush_complete_contexts);
+ }
+ finish_contexts(m_image_ctx.cct, flush_contexts, 0);
}
}
-/* Update/persist the last flushed sync point in the log */
+/**
+ * Update/persist the last flushed sync point in the log
+ */
template <typename I>
void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
{
}
/* Make a new sync point and flush the previous during initialization, when there may or may
-* not be a previous sync point */
+ * not be a previous sync point */
template <typename I>
void ReplicatedWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
}
/**
-* Begin a new sync point
-*/
+ * Begin a new sync point
+ */
template <typename I>
void ReplicatedWriteLog<I>::new_sync_point(DeferredContexts &later) {
CephContext *cct = m_image_ctx.cct;
}
template <typename I>
-void ReplicatedWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, DeferredContexts &later) {
+void ReplicatedWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
+ DeferredContexts &later) {
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
if (!flush_req) {
to_append->add_in_on_persisted_ctxs(flush_req);
}
+template <typename I>
+void ReplicatedWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
+ DeferredContexts &later) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ /* If there have been writes since the last sync point ... */
+ if (m_current_sync_point->log_entry->writes) {
+ flush_new_sync_point(flush_req, later);
+ } else {
+ /* There have been no writes to the current sync point. */
+ if (m_current_sync_point->earlier_sync_point) {
+ /* If previous sync point hasn't completed, complete this flush
+ * with the earlier sync point. No alloc or dispatch needed. */
+ m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
+ } else {
+ /* The previous sync point has already completed and been
+ * appended. The current sync point has no writes, so this flush
+ * has nothing to wait for. This flush completes now. */
+ later.add(flush_req);
+ }
+ }
+}
+
+/*
+ * RWL internal flush - will actually flush the RWL.
+ *
+ * User flushes should arrive at aio_flush(), and only flush prior
+ * writes to all log replicas.
+ *
+ * Librbd internal flushes will arrive at flush(invalidate=false,
+ * discard=false), and traverse the block guard to ensure in-flight writes are
+ * flushed.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_dirty_entries(Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ bool all_clean;
+ bool flushing;
+ bool stop_flushing;
+
+ {
+ std::lock_guard locker(m_lock);
+ flushing = (0 != m_flush_ops_in_flight);
+ all_clean = m_dirty_log_entries.empty();
+ stop_flushing = (m_shutting_down);
+ }
+
+ if (!flushing && (all_clean || stop_flushing)) {
+ /* Complete without holding m_lock */
+ if (all_clean) {
+ ldout(cct, 20) << "no dirty entries" << dendl;
+ } else {
+ ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
+ }
+ on_finish->complete(0);
+ } else {
+ if (all_clean) {
+ ldout(cct, 5) << "flush ops still in progress" << dendl;
+ } else {
+ ldout(cct, 20) << "dirty entries remain" << dendl;
+ }
+ std::lock_guard locker(m_lock);
+ /* on_finish can't be completed yet */
+ m_flush_complete_contexts.push_back(new LambdaContext(
+ [this, on_finish](int r) {
+ flush_dirty_entries(on_finish);
+ }));
+ wake_up();
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
+
+ /* May be called even if initialization fails */
+ if (!m_initialized) {
+ ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
+ /* Deadlock if completed here */
+ m_image_ctx.op_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ /* Flush/invalidate must pass through block guard to ensure all layers of
+ * cache are consistently flush/invalidated. This ensures no in-flight write leaves
+ * some layers with valid regions, which may later produce inconsistent read
+ * results. */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext(
+ [this, on_finish](GuardedRequestFunctionContext &guard_ctx) {
+ DeferredContexts on_exit;
+ ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
+ ceph_assert(guard_ctx.cell);
+
+ Context *ctx = new LambdaContext(
+ [this, cell=guard_ctx.cell, on_finish](int r) {
+ std::lock_guard locker(m_lock);
+ ldout(m_image_ctx.cct, 6) << "Done flush" << dendl;
+ if (m_log_entries.size()) {
+ ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
+ << m_log_entries.size() << ", "
+ << "front()=" << *m_log_entries.front()
+ << dendl;
+ }
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ m_image_ctx.op_work_queue->queue(on_finish, r);
+ release_guarded_request(cell);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = ctx;
+ if (r < 0) {
+ /* Override on_finish status with this error */
+ next_ctx = new LambdaContext([r, ctx](int _r) {
+ ctx->complete(r);
+ });
+ }
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ }
+ m_image_writeback.aio_flush(next_ctx);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ flush_dirty_entries(ctx);
+ });
+ std::lock_guard locker(m_lock);
+ /* Even if we're throwing everything away, but we want the last entry to
+ * be a sync point so we can cleanly resume.
+ *
+ * Also, the blockguard only guarantees the replication of this op
+ * can't overlap with prior ops. It doesn't guarantee those are all
+ * completed and eligible for flush & retire, which we require here.
+ */
+ auto flush_req = make_flush_req(ctx);
+ flush_new_sync_point_if_needed(flush_req, on_exit);
+ });
+ detain_guarded_request(nullptr, guarded_ctx);
+}
+
} // namespace cache
} // namespace librbd