From c1ad6dcf872af7d2c4acb0eb1298fd6ea69d61c8 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Thu, 21 Nov 2019 17:27:48 +0800 Subject: [PATCH] librbd: add aio_write tests Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/common/subsys.h | 1 + src/librbd/cache/ReplicatedWriteLog.cc | 495 ++++++++-------- src/librbd/cache/ReplicatedWriteLog.h | 105 ++-- src/librbd/cache/rwl/ImageCacheState.cc | 2 +- src/librbd/cache/rwl/LogEntry.cc | 59 +- src/librbd/cache/rwl/LogEntry.h | 87 ++- src/librbd/cache/rwl/LogOperation.cc | 230 ++++---- src/librbd/cache/rwl/LogOperation.h | 183 +++--- src/librbd/cache/rwl/Request.cc | 533 ++++++------------ src/librbd/cache/rwl/Request.h | 132 +++-- src/librbd/cache/rwl/SyncPoint.cc | 44 +- src/librbd/cache/rwl/SyncPoint.h | 20 +- src/librbd/cache/rwl/Types.cc | 43 +- src/librbd/cache/rwl/Types.h | 45 +- .../cache/test_mock_ReplicatedWriteLog.cc | 33 +- 15 files changed, 912 insertions(+), 1100 deletions(-) diff --git a/src/common/subsys.h b/src/common/subsys.h index 2e134c38bd6..e61b478bfae 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -37,6 +37,7 @@ SUBSYS(rados, 0, 5) SUBSYS(rbd, 0, 5) SUBSYS(rbd_mirror, 0, 5) SUBSYS(rbd_replay, 0, 5) +SUBSYS(rbd_rwl, 0, 5) SUBSYS(journaler, 0, 5) SUBSYS(objectcacher, 0, 5) SUBSYS(immutable_obj_cache, 0, 5) diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index 343790a41c4..51d789e31f1 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -20,7 +20,7 @@ #include #include -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::ReplicatedWriteLog: " << this << " " \ << __func__ << ": " @@ -33,7 +33,7 @@ using namespace librbd::cache::rwl; typedef ReplicatedWriteLog::Extent Extent; typedef ReplicatedWriteLog::Extents Extents; -const unsigned long int ops_appended_together = MAX_ALLOC_PER_TRANSACTION; +const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION; template ReplicatedWriteLog::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::ImageCacheState* cache_state) @@ -50,8 +50,6 @@ ReplicatedWriteLog::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::Imag "librbd::cache::ReplicatedWriteLog::m_lock", this))), m_blockguard_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::ReplicatedWriteLog::m_blockguard_lock", this))), - m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name( - "librbd::cache::ReplicatedWriteLog::m_entry_bl_lock", this))), m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl", 4, ""), @@ -132,43 +130,43 @@ void ReplicatedWriteLog::perf_start(std::string name) { plb.add_u64_avg(l_librbd_rwl_log_op_bytes, "log_op_bytes", "Average log append bytes"); plb.add_time_avg( - l_librbd_rwl_req_arr_to_all_t, "req_arr_to_all_t", - "Average arrival to allocation time (time deferred for overlap)"); + l_librbd_rwl_req_arr_to_all_t, "req_arr_to_all_t", + "Average arrival to allocation time (time deferred for overlap)"); plb.add_time_avg( - l_librbd_rwl_req_arr_to_dis_t, "req_arr_to_dis_t", - "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); + l_librbd_rwl_req_arr_to_dis_t, "req_arr_to_dis_t", + "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); plb.add_time_avg( - l_librbd_rwl_req_all_to_dis_t, "req_all_to_dis_t", - "Average allocation to dispatch time (time deferred for log resources)"); + l_librbd_rwl_req_all_to_dis_t, "req_all_to_dis_t", + "Average allocation to dispatch time (time deferred for log resources)"); plb.add_time_avg( - l_librbd_rwl_wr_latency, "wr_latency", - "Latency of writes (persistent completion)"); + l_librbd_rwl_wr_latency, "wr_latency", + "Latency of writes (persistent completion)"); plb.add_u64_counter_histogram( l_librbd_rwl_wr_latency_hist, "wr_latency_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of write request latency (nanoseconds) vs. bytes written"); plb.add_time_avg( - l_librbd_rwl_wr_caller_latency, "caller_wr_latency", - "Latency of write completion to caller"); + l_librbd_rwl_wr_caller_latency, "caller_wr_latency", + "Latency of write completion to caller"); plb.add_time_avg( - l_librbd_rwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t", - "Average arrival to allocation time (time deferred for overlap)"); + l_librbd_rwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t", + "Average arrival to allocation time (time deferred for overlap)"); plb.add_time_avg( - l_librbd_rwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t", - "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); + l_librbd_rwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t", + "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); plb.add_time_avg( - l_librbd_rwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t", - "Average allocation to dispatch time (time deferred for log resources)"); + l_librbd_rwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t", + "Average allocation to dispatch time (time deferred for log resources)"); plb.add_time_avg( - l_librbd_rwl_nowait_wr_latency, "wr_latency_nw", - "Latency of writes (persistent completion) not deferred for free space"); + l_librbd_rwl_nowait_wr_latency, "wr_latency_nw", + "Latency of writes (persistent completion) not deferred for free space"); plb.add_u64_counter_histogram( l_librbd_rwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space"); plb.add_time_avg( - l_librbd_rwl_nowait_wr_caller_latency, "caller_wr_latency_nw", - "Latency of write completion to callerfor writes not deferred for free space"); + l_librbd_rwl_nowait_wr_caller_latency, "caller_wr_latency_nw", + "Latency of write completion to callerfor writes not deferred for free space"); plb.add_time_avg(l_librbd_rwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time"); plb.add_u64_counter_histogram( l_librbd_rwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram", @@ -183,21 +181,21 @@ void ReplicatedWriteLog::perf_start(std::string name) { "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written"); plb.add_time_avg( - l_librbd_rwl_log_op_buf_to_app_t, "op_buf_to_app_t", - "Average buffer persist to log append time (write data persist/replicate + wait for append time)"); + l_librbd_rwl_log_op_buf_to_app_t, "op_buf_to_app_t", + "Average buffer persist to log append time (write data persist/replicate + wait for append time)"); plb.add_time_avg( - l_librbd_rwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t", - "Average buffer persist time (write data persist/replicate time)"); + l_librbd_rwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t", + "Average buffer persist time (write data persist/replicate time)"); plb.add_u64_counter_histogram( l_librbd_rwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of write buffer persist time (nanoseconds) vs. bytes written"); plb.add_time_avg( - l_librbd_rwl_log_op_app_to_cmp_t, "op_app_to_cmp_t", - "Average log append to persist complete time (log entry append/replicate + wait for complete time)"); + l_librbd_rwl_log_op_app_to_cmp_t, "op_app_to_cmp_t", + "Average log append to persist complete time (log entry append/replicate + wait for complete time)"); plb.add_time_avg( - l_librbd_rwl_log_op_app_to_appc_t, "op_app_to_appc_t", - "Average log append to persist complete time (log entry append/replicate time)"); + l_librbd_rwl_log_op_app_to_appc_t, "op_app_to_appc_t", + "Average log append to persist complete time (log entry append/replicate time)"); plb.add_u64_counter_histogram( l_librbd_rwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, @@ -409,7 +407,7 @@ void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later << " first_valid=" << m_first_valid_entry << ", first_free=" << m_first_free_entry << ", flushed_sync_gen=" << m_flushed_sync_gen - << ", current_sync_gen=" << m_current_sync_gen << dendl; + << ", m_current_sync_gen=" << m_current_sync_gen << dendl; if (m_first_free_entry == m_first_valid_entry) { ldout(cct,1) << "write log is empty" << dendl; m_cache_state->empty = true; @@ -418,15 +416,15 @@ void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later // TODO: Will init sync point, this will be covered in later PR. // init_flush_new_sync_point(later); ++m_current_sync_gen; - auto new_sync_point = std::make_shared(*this, m_current_sync_gen); - m_current_sync_point = new_sync_point; + m_current_sync_point = std::make_shared(m_current_sync_gen, + this->m_image_ctx.cct); m_initialized = true; // Start the thread m_thread_pool.start(); m_periodic_stats_enabled = m_cache_state->log_periodic_stats; - /* Do these after we drop m_lock */ + /* Do these after we drop lock */ later.add(new LambdaContext([this](int r) { if (m_periodic_stats_enabled) { /* Log stats for the first time */ @@ -467,7 +465,23 @@ void ReplicatedWriteLog::init(Context *on_finish) { template void ReplicatedWriteLog::shut_down(Context *on_finish) { - // TODO: This is cover in later PR. + // Here we only close pmem pool file and remove the pool file. + // TODO: We'll continue to update this part in later PRs. + if (m_log_pool) { + ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl; + pmemobj_close(m_log_pool); + } + if (m_log_is_poolset) { + ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl; + } else { + ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " + << m_log_pool_name << dendl; + if (remove(m_log_pool_name.c_str()) != 0) { + lderr(m_image_ctx.cct) << "failed to remove empty pool \"" + << m_log_pool_name << "\": " + << pmemobj_errormsg() << dendl; + } + } on_finish->complete(0); } @@ -483,28 +497,17 @@ void ReplicatedWriteLog::aio_write(Extents &&image_extents, int fadvise_flags, Context *on_finish) { CephContext *cct = m_image_ctx.cct; - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "aio_write" << dendl; - } + + ldout(cct, 20) << "aio_write" << dendl; + utime_t now = ceph_clock_now(); m_perfcounter->inc(l_librbd_rwl_wr_req, 1); ceph_assert(m_initialized); - { - std::shared_lock image_locker(m_image_ctx.image_lock); - if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { - on_finish->complete(-EROFS); - return; - } - } - - if (ExtentsSummary(image_extents).total_bytes == 0) { - on_finish->complete(0); - return; - } auto *write_req = - new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, on_finish); + new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, + m_lock, m_perfcounter, on_finish); m_perfcounter->inc(l_librbd_rwl_wr_bytes, write_req->image_extents_summary.total_bytes); /* The lambda below will be called when the block guard for all @@ -515,13 +518,13 @@ void ReplicatedWriteLog::aio_write(Extents &&image_extents, alloc_and_dispatch_io_req(write_req); }); - detain_guarded_request(GuardedRequest(write_req->image_extents_summary.block_extent(), - guarded_ctx)); + detain_guarded_request(write_req, guarded_ctx); } template void ReplicatedWriteLog::aio_discard(uint64_t offset, uint64_t length, - uint32_t discard_granularity_bytes, Context *on_finish) { + uint32_t discard_granularity_bytes, + Context *on_finish) { } template @@ -556,6 +559,11 @@ template void ReplicatedWriteLog::invalidate(Context *on_finish) { } +template +CephContext *ReplicatedWriteLog::get_context() { + return m_image_ctx.cct; +} + template BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_helper(GuardedRequest &req) { @@ -563,44 +571,37 @@ BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_helper(GuardedRequ BlockGuardCell *cell; ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << dendl; - } + ldout(cct, 20) << dendl; int r = m_write_log_guard.detain(req.block_extent, &req, &cell); ceph_assert(r>=0); if (r > 0) { - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "detaining guarded request due to in-flight requests: " - << "req=" << req << dendl; - } + ldout(cct, 20) << "detaining guarded request due to in-flight requests: " + << "req=" << req << dendl; return nullptr; } - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "in-flight request cell: " << cell << dendl; - } + ldout(cct, 20) << "in-flight request cell: " << cell << dendl; return cell; } template -BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_barrier_helper(GuardedRequest &req) +BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_barrier_helper( + GuardedRequest &req) { BlockGuardCell *cell = nullptr; ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << dendl; - } + ldout(m_image_ctx.cct, 20) << dendl; if (m_barrier_in_progress) { - req.guard_ctx->m_state.queued = true; + req.guard_ctx->state.queued = true; m_awaiting_barrier.push_back(req); } else { - bool barrier = req.guard_ctx->m_state.barrier; + bool barrier = req.guard_ctx->state.barrier; if (barrier) { m_barrier_in_progress = true; - req.guard_ctx->m_state.current_barrier = true; + req.guard_ctx->state.current_barrier = true; } cell = detain_guarded_request_helper(req); if (barrier) { @@ -613,19 +614,20 @@ BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_barrier_helper(Gua } template -void ReplicatedWriteLog::detain_guarded_request(GuardedRequest &&req) +void ReplicatedWriteLog::detain_guarded_request( + C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx) { + //TODO: add is_barrier for flush request in later PRs + auto req = GuardedRequest(request->image_extents_summary.block_extent(), guarded_ctx); BlockGuardCell *cell = nullptr; - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << dendl; - } + ldout(m_image_ctx.cct, 20) << dendl; { std::lock_guard locker(m_blockguard_lock); cell = detain_guarded_request_barrier_helper(req); } if (cell) { - req.guard_ctx->m_cell = cell; + req.guard_ctx->cell = cell; req.guard_ctx->complete(0); } } @@ -635,47 +637,39 @@ void ReplicatedWriteLog::release_guarded_request(BlockGuardCell *released_cel { CephContext *cct = m_image_ctx.cct; WriteLogGuard::BlockOperations block_reqs; - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "released_cell=" << released_cell << dendl; - } + ldout(cct, 20) << "released_cell=" << released_cell << dendl; { std::lock_guard locker(m_blockguard_lock); m_write_log_guard.release(released_cell, &block_reqs); for (auto &req : block_reqs) { - req.guard_ctx->m_state.detained = true; + req.guard_ctx->state.detained = true; BlockGuardCell *detained_cell = detain_guarded_request_helper(req); if (detained_cell) { - if (req.guard_ctx->m_state.current_barrier) { + if (req.guard_ctx->state.current_barrier) { /* The current barrier is acquiring the block guard, so now we know its cell */ m_barrier_cell = detained_cell; /* detained_cell could be == released_cell here */ - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; - } + ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; } - req.guard_ctx->m_cell = detained_cell; + req.guard_ctx->cell = detained_cell; m_work_queue.queue(req.guard_ctx); } } if (m_barrier_in_progress && (released_cell == m_barrier_cell)) { - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; - } + ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; /* The released cell is the current barrier request */ m_barrier_in_progress = false; m_barrier_cell = nullptr; /* Move waiting requests into the blockguard. Stop if there's another barrier */ while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) { auto &req = m_awaiting_barrier.front(); - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; - } + ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req); if (detained_cell) { - req.guard_ctx->m_cell = detained_cell; + req.guard_ctx->cell = detained_cell; m_work_queue.queue(req.guard_ctx); } m_awaiting_barrier.pop_front(); @@ -683,9 +677,7 @@ void ReplicatedWriteLog::release_guarded_request(BlockGuardCell *released_cel } } - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "exit" << dendl; - } + ldout(cct, 20) << "exit" << dendl; } /* @@ -695,13 +687,11 @@ void ReplicatedWriteLog::release_guarded_request(BlockGuardCell *released_cel template void ReplicatedWriteLog::append_scheduled_ops(void) { - GenericLogOperationsT ops; + GenericLogOperations ops; int append_result = 0; bool ops_remain = false; bool appending = false; /* true if we set m_appending */ - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << dendl; - } + ldout(m_image_ctx.cct, 20) << dendl; do { ops.clear(); @@ -709,9 +699,7 @@ void ReplicatedWriteLog::append_scheduled_ops(void) std::lock_guard locker(m_lock); if (!appending && m_appending) { /* Another thread is appending */ - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; - } + ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; return; } if (m_ops_to_append.size()) { @@ -719,15 +707,14 @@ void ReplicatedWriteLog::append_scheduled_ops(void) m_appending = true; auto last_in_batch = m_ops_to_append.begin(); unsigned int ops_to_append = m_ops_to_append.size(); - if (ops_to_append > ops_appended_together) { - ops_to_append = ops_appended_together; + if (ops_to_append > OPS_APPENDED_TOGETHER) { + ops_to_append = OPS_APPENDED_TOGETHER; } std::advance(last_in_batch, ops_to_append); ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); ops_remain = true; /* Always check again before leaving */ - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " << m_ops_to_append.size() << " remain" << dendl; - } + ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " + << m_ops_to_append.size() << " remain" << dendl; } else { ops_remain = false; if (appending) { @@ -770,10 +757,10 @@ void ReplicatedWriteLog::enlist_op_appender() * all prior log entries are persisted everywhere. */ template -void ReplicatedWriteLog::schedule_append(GenericLogOperationsT &ops) +void ReplicatedWriteLog::schedule_append(GenericLogOperations &ops) { bool need_finisher; - GenericLogOperationsVectorT appending; + GenericLogOperationsVector appending; std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); { @@ -793,9 +780,9 @@ void ReplicatedWriteLog::schedule_append(GenericLogOperationsT &ops) } template -void ReplicatedWriteLog::schedule_append(GenericLogOperationsVectorT &ops) +void ReplicatedWriteLog::schedule_append(GenericLogOperationsVector &ops) { - GenericLogOperationsT to_append(ops.begin(), ops.end()); + GenericLogOperations to_append(ops.begin(), ops.end()); schedule_append(to_append); } @@ -808,11 +795,9 @@ const unsigned long int ops_flushed_together = 4; template void ReplicatedWriteLog::flush_then_append_scheduled_ops(void) { - GenericLogOperationsT ops; + GenericLogOperations ops; bool ops_remain = false; - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << dendl; - } + ldout(m_image_ctx.cct, 20) << dendl; do { { ops.clear(); @@ -823,15 +808,12 @@ void ReplicatedWriteLog::flush_then_append_scheduled_ops(void) if (ops_to_flush > ops_flushed_together) { ops_to_flush = ops_flushed_together; } - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl; - } + ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl; std::advance(last_in_batch, ops_to_flush); ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch); ops_remain = !m_ops_to_flush.empty(); - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " << m_ops_to_flush.size() << " remain" << dendl; - } + ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " + << m_ops_to_flush.size() << " remain" << dendl; } else { ops_remain = false; } @@ -869,13 +851,11 @@ void ReplicatedWriteLog::enlist_op_flusher() * then get their log entries appended. */ template -void ReplicatedWriteLog::schedule_flush_and_append(GenericLogOperationsVectorT &ops) +void ReplicatedWriteLog::schedule_flush_and_append(GenericLogOperationsVector &ops) { - GenericLogOperationsT to_flush(ops.begin(), ops.end()); + GenericLogOperations to_flush(ops.begin(), ops.end()); bool need_finisher; - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << dendl; - } + ldout(m_image_ctx.cct, 20) << dendl; { std::lock_guard locker(m_lock); @@ -898,12 +878,7 @@ template void ReplicatedWriteLog::flush_pmem_buffer(V& ops) { for (auto &operation : ops) { - if (operation->is_write() || operation->is_writesame()) { - operation->buf_persist_time = ceph_clock_now(); - auto write_entry = operation->get_write_log_entry(); - - pmemobj_flush(m_log_pool, write_entry->pmem_buffer, write_entry->write_bytes()); - } + operation->flush_pmem_buf_to_cache(m_log_pool); } /* Drain once for all */ @@ -911,12 +886,10 @@ void ReplicatedWriteLog::flush_pmem_buffer(V& ops) utime_t now = ceph_clock_now(); for (auto &operation : ops) { - if (operation->is_write() || operation->is_writesame()) { + if (operation->reserved_allocated()) { operation->buf_persist_comp_time = now; } else { - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; - } + ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; } } } @@ -925,10 +898,10 @@ void ReplicatedWriteLog::flush_pmem_buffer(V& ops) * Allocate the (already reserved) write log entries for a set of operations. * * Locking: - * Acquires m_lock + * Acquires lock */ template -void ReplicatedWriteLog::alloc_op_log_entries(GenericLogOperationsT &ops) +void ReplicatedWriteLog::alloc_op_log_entries(GenericLogOperations &ops) { TOID(struct WriteLogPoolRoot) pool_root; pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); @@ -948,9 +921,7 @@ void ReplicatedWriteLog::alloc_op_log_entries(GenericLogOperationsT &ops) log_entry->pmem_entry = &pmem_log_entries[entry_index]; log_entry->ram_entry.entry_valid = 1; m_log_entries.push_back(log_entry); - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; - } + ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; } } @@ -959,7 +930,7 @@ void ReplicatedWriteLog::alloc_op_log_entries(GenericLogOperationsT &ops) * be contiguous in persistent memory. */ template -void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVectorT &ops) +void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVector &ops) { if (ops.empty()) { return; @@ -969,12 +940,12 @@ void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVectorT &op ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry); } - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " " - << "start address=" << ops.front()->get_log_entry()->pmem_entry << " " - << "bytes=" << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) - << dendl; - } + ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " " + << "start address=" + << ops.front()->get_log_entry()->pmem_entry << " " + << "bytes=" + << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) + << dendl; pmemobj_flush(m_log_pool, ops.front()->get_log_entry()->pmem_entry, ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))); @@ -986,10 +957,10 @@ void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVectorT &op * of these must already have been persisted to its reserved area. */ template -int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) +int ReplicatedWriteLog::append_op_log_entries(GenericLogOperations &ops) { CephContext *cct = m_image_ctx.cct; - GenericLogOperationsVectorT entries_to_flush; + GenericLogOperationsVector entries_to_flush; TOID(struct WriteLogPoolRoot) pool_root; pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); int ret = 0; @@ -999,7 +970,7 @@ int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) if (ops.empty()) { return 0; } - entries_to_flush.reserve(ops_appended_together); + entries_to_flush.reserve(OPS_APPENDED_TOGETHER); /* Write log entries to ring and persist */ utime_t now = ceph_clock_now(); @@ -1009,34 +980,27 @@ int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) * tail of the ring */ if (entries_to_flush.back()->get_log_entry()->log_entry_index > operation->get_log_entry()->log_entry_index) { - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at " - << "operation=[" << *operation << "]" << dendl; - } + ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at " + << "operation=[" << *operation << "]" << dendl; flush_op_log_entries(entries_to_flush); entries_to_flush.clear(); now = ceph_clock_now(); } } - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index=" - << operation->get_log_entry()->log_entry_index << " " - << "from " << &operation->get_log_entry()->ram_entry << " " - << "to " << operation->get_log_entry()->pmem_entry << " " - << "operation=[" << *operation << "]" << dendl; - } - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 05) << "APPENDING: index=" - << operation->get_log_entry()->log_entry_index << " " - << "operation=[" << *operation << "]" << dendl; - } + ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index=" + << operation->get_log_entry()->log_entry_index << " " + << "from " << &operation->get_log_entry()->ram_entry << " " + << "to " << operation->get_log_entry()->pmem_entry << " " + << "operation=[" << *operation << "]" << dendl; + ldout(m_image_ctx.cct, 05) << "APPENDING: index=" + << operation->get_log_entry()->log_entry_index << " " + << "operation=[" << *operation << "]" << dendl; operation->log_append_time = now; *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry; - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "APPENDING: index=" - << operation->get_log_entry()->log_entry_index << " " - << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry << "]" << dendl; - } + ldout(m_image_ctx.cct, 20) << "APPENDING: index=" + << operation->get_log_entry()->log_entry_index << " " + << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry + << "]" << dendl; entries_to_flush.push_back(operation); } flush_op_log_entries(entries_to_flush); @@ -1052,18 +1016,17 @@ int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) TX_BEGIN(m_log_pool) { D_RW(pool_root)->first_free_entry = m_first_free_entry; for (auto &operation : ops) { - if (operation->is_write() || operation->is_writesame()) { - auto write_op = (std::shared_ptr&) operation; + if (operation->reserved_allocated()) { + auto write_op = (std::shared_ptr&) operation; pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1); } else { - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; - } + ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; } } } TX_ONCOMMIT { } TX_ONABORT { - lderr(cct) << "failed to commit " << ops.size() << " log entries (" << m_log_pool_name << ")" << dendl; + lderr(cct) << "failed to commit " << ops.size() + << " log entries (" << m_log_pool_name << ")" << dendl; ceph_assert(false); ret = -EIO; } TX_FINALLY { @@ -1071,7 +1034,8 @@ int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) utime_t tx_end = ceph_clock_now(); m_perfcounter->tinc(l_librbd_rwl_append_tx_t, tx_end - tx_start); - m_perfcounter->hinc(l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size()); + m_perfcounter->hinc( + l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size()); for (auto &operation : ops) { operation->log_append_comp_time = tx_end; } @@ -1083,48 +1047,32 @@ int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) * Complete a set of write ops with the result of append_op_entries. */ template -void ReplicatedWriteLog::complete_op_log_entries(GenericLogOperationsT &&ops, const int result) +void ReplicatedWriteLog::complete_op_log_entries(GenericLogOperations &&ops, + const int result) { GenericLogEntries dirty_entries; int published_reserves = 0; - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; - } + ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; for (auto &op : ops) { utime_t now = ceph_clock_now(); auto log_entry = op->get_log_entry(); log_entry->completed = true; - if (op->is_writing_op()) { - op->get_gen_write_op()->sync_point->log_entry->writes_completed++; + if (op->reserved_allocated()) { + op->mark_log_entry_completed(); dirty_entries.push_back(log_entry); - } - if (op->is_write() || op->is_writesame()) { published_reserves++; } - if (op->is_discard()) { - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << __func__ << ": completing discard" << dendl; - } - } op->complete(result); - if (op->is_write()) { - m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, op->buf_persist_time - op->dispatch_time); - } - m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, op->log_append_time - op->dispatch_time); + m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, + op->log_append_time - op->dispatch_time); m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time); - m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(), + m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, + utime_t(now - op->dispatch_time).to_nsec(), log_entry->ram_entry.write_bytes); - if (op->is_write()) { - utime_t buf_lat = op->buf_persist_comp_time - op->buf_persist_time; - m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat); - m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(), - log_entry->ram_entry.write_bytes); - m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, op->log_append_time - op->buf_persist_time); - } utime_t app_lat = op->log_append_comp_time - op->log_append_time; m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat); m_perfcounter->hinc(l_librbd_rwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(), - log_entry->ram_entry.write_bytes); + log_entry->ram_entry.write_bytes); m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_cmp_t, now - op->log_append_time); } @@ -1167,7 +1115,7 @@ void ReplicatedWriteLog::dispatch_deferred_writes(void) std::lock_guard locker(m_lock); ceph_assert(m_dispatching_deferred_ops); if (allocated) { - /* On the 2..n-1 th time we get m_lock, front_req->alloc_resources() will + /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will * have succeeded, and we'll need to pop it off the deferred ops list * here. */ ceph_assert(front_req); @@ -1229,13 +1177,11 @@ void ReplicatedWriteLog::dispatch_deferred_writes(void) * deferred write */ template -void ReplicatedWriteLog::release_write_lanes(C_WriteRequestT *write_req) +void ReplicatedWriteLog::release_write_lanes(C_BlockIORequestT *req) { { std::lock_guard locker(m_lock); - ceph_assert(write_req->resources.allocated); - m_free_lanes += write_req->image_extents.size(); - write_req->resources.allocated = false; + m_free_lanes += req->image_extents.size(); } dispatch_deferred_writes(); } @@ -1259,9 +1205,7 @@ void ReplicatedWriteLog::alloc_and_dispatch_io_req(C_BlockIORequestT *req) dispatch_here = req->alloc_resources(); } if (dispatch_here) { - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; - } + ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; req->dispatch(); } else { req->deferred(); @@ -1269,17 +1213,128 @@ void ReplicatedWriteLog::alloc_and_dispatch_io_req(C_BlockIORequestT *req) std::lock_guard locker(m_lock); m_deferred_ios.push_back(req); } - if (RWL_VERBOSE_LOGGING) { - ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; - } + ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; dispatch_deferred_writes(); } } } + +template +bool ReplicatedWriteLog::alloc_resources(C_BlockIORequestT *req) { + bool alloc_succeeds = true; + bool no_space = false; + uint64_t bytes_allocated = 0; + uint64_t bytes_cached = 0; + uint64_t bytes_dirtied = 0; + uint64_t num_lanes = 0; + uint64_t num_unpublished_reserves = 0; + uint64_t num_log_entries = 0; + + // Setup buffer, and get all the number of required resources + req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated, + num_lanes, num_log_entries, num_unpublished_reserves); + + { + std::lock_guard locker(m_lock); + if (m_free_lanes < num_lanes) { + req->set_io_waited_for_lanes(true); + ldout(m_image_ctx.cct, 20) << "not enough free lanes (need " + << num_lanes + << ", have " << m_free_lanes << ") " + << *req << dendl; + alloc_succeeds = false; + /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ + } + if (m_free_log_entries < num_log_entries) { + req->set_io_waited_for_entries(true); + ldout(m_image_ctx.cct, 20) << "not enough free entries (need " + << num_log_entries + << ", have " << m_free_log_entries << ") " + << *req << dendl; + alloc_succeeds = false; + no_space = true; /* Entries must be retired */ + } + /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ + if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) { + if (!req->has_io_waited_for_buffers()) { + req->set_io_waited_for_entries(true); + ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" + << m_bytes_allocated_cap + << ", allocated=" << m_bytes_allocated + << ") in write [" << *req << "]" << dendl; + } + alloc_succeeds = false; + no_space = true; /* Entries must be retired */ + } + } + + std::vector& buffers = req->get_resources_buffers(); + if (alloc_succeeds) { + for (auto &buffer : buffers) { + utime_t before_reserve = ceph_clock_now(); + buffer.buffer_oid = pmemobj_reserve(m_log_pool, + &buffer.buffer_alloc_action, + buffer.allocation_size, + 0 /* Object type */); + buffer.allocation_lat = ceph_clock_now() - before_reserve; + if (TOID_IS_NULL(buffer.buffer_oid)) { + if (!req->has_io_waited_for_buffers()) { + req->set_io_waited_for_entries(true); + } + ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: " + << pmemobj_errormsg() << ". " + << *req << dendl; + alloc_succeeds = false; + no_space = true; /* Entries need to be retired */ + break; + } else { + buffer.allocated = true; + } + ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo + << "." << buffer.buffer_oid.oid.off + << ", size=" << buffer.allocation_size << dendl; + } + } + + if (alloc_succeeds) { + std::lock_guard locker(m_lock); + /* We need one free log entry per extent (each is a separate entry), and + * one free "lane" for remote replication. */ + if ((m_free_lanes >= num_lanes) && + (m_free_log_entries >= num_log_entries)) { + m_free_lanes -= num_lanes; + m_free_log_entries -= num_log_entries; + m_unpublished_reserves += num_unpublished_reserves; + m_bytes_allocated += bytes_allocated; + m_bytes_cached += bytes_cached; + m_bytes_dirty += bytes_dirtied; + } else { + alloc_succeeds = false; + } + } + + if (!alloc_succeeds) { + /* On alloc failure, free any buffers we did allocate */ + for (auto &buffer : buffers) { + if (buffer.allocated) { + pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1); + } + } + if (no_space) { + /* Expedite flushing and/or retiring */ + std::lock_guard locker(m_lock); + m_alloc_failed_since_retire = true; + m_last_alloc_fail = ceph_clock_now(); + } + } + + req->set_allocated(alloc_succeeds); + + return alloc_succeeds; +} + } // namespace cache } // namespace librbd -#ifndef TEST_F template class librbd::cache::ReplicatedWriteLog; template class librbd::cache::ImageCache; -#endif diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index adbdddca9bd..dea992f77cb 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -29,11 +29,10 @@ namespace cache { namespace rwl { class SyncPointLogEntry; -class GeneralWriteLogEntry; +class GenericWriteLogEntry; class WriteLogEntry; class GenericLogEntry; -typedef std::list> GeneralWriteLogEntries; typedef std::list> WriteLogEntries; typedef std::list> GenericLogEntries; @@ -41,9 +40,6 @@ typedef std::list> GenericLogEntries; typedef librbd::BlockGuard WriteLogGuard; -template -struct C_GuardedBlockIORequest; - class DeferredContexts; template class ImageCacheState; @@ -53,8 +49,7 @@ struct C_BlockIORequest; template struct C_WriteRequest; -template -using GenericLogOperations = std::list>; +using GenericLogOperations = std::list; } // namespace rwl @@ -93,35 +88,46 @@ public: void invalidate(Context *on_finish); void flush(Context *on_finish) override; -private: using This = ReplicatedWriteLog; - using SyncPointT = rwl::SyncPoint; - using GenericLogOperationT = rwl::GenericLogOperation; - using GenericLogOperationSharedPtrT = rwl::GenericLogOperationSharedPtr; - using WriteLogOperationT = rwl::WriteLogOperation; - using WriteLogOperationSetT = rwl::WriteLogOperationSet; - using SyncPointLogOperationT = rwl::SyncPointLogOperation; - using GenericLogOperationsT = rwl::GenericLogOperations; - using GenericLogOperationsVectorT = rwl::GenericLogOperationsVector; - using C_BlockIORequestT = rwl::C_BlockIORequest; using C_WriteRequestT = rwl::C_WriteRequest; - - friend class rwl::SyncPoint; - friend class rwl::GenericLogOperation; - friend class rwl::GeneralWriteLogOperation; - friend class rwl::WriteLogOperation; - friend class rwl::WriteLogOperationSet; - friend class rwl::SyncPointLogOperation; - friend struct rwl::C_GuardedBlockIORequest; - friend struct rwl::C_BlockIORequest; - friend struct rwl::C_WriteRequest; + using C_BlockIORequestT = rwl::C_BlockIORequest; + CephContext * get_context(); + void release_guarded_request(BlockGuardCell *cell); + void release_write_lanes(C_BlockIORequestT *req); + bool alloc_resources(C_BlockIORequestT *req); + template + void flush_pmem_buffer(V& ops); + void schedule_append(rwl::GenericLogOperationsVector &ops); + void schedule_flush_and_append(rwl::GenericLogOperationsVector &ops); + std::shared_ptr get_current_sync_point() { + return m_current_sync_point; + } + bool get_persist_on_flush() { + return m_persist_on_flush; + } + void inc_last_op_sequence_num() { + m_perfcounter->inc(l_librbd_rwl_log_ops, 1); + ++m_last_op_sequence_num; + } + uint64_t get_last_op_sequence_num() { + return m_last_op_sequence_num; + } + uint64_t get_current_sync_gen() { + return m_current_sync_gen; + } + unsigned int get_free_lanes() { + return m_free_lanes; + } + uint32_t get_free_log_entries() { + return m_free_log_entries; + } +private: typedef std::list *> C_WriteRequests; typedef std::list *> C_BlockIORequests; BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req); BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req); - void detain_guarded_request(rwl::GuardedRequest &&req); - void release_guarded_request(BlockGuardCell *cell); + void detain_guarded_request(C_BlockIORequestT *request, rwl::GuardedRequestFunctionContext *guarded_ctx); librbd::cache::rwl::ImageCacheState* m_cache_state = nullptr; @@ -159,7 +165,6 @@ private: /* Starts at 0 for a new write log. Incremented on every flush. */ uint64_t m_current_sync_gen = 0; - std::shared_ptr m_current_sync_point = nullptr; /* Starts at 0 on each sync gen increase. Incremented before applied to an operation */ uint64_t m_last_op_sequence_num = 0; @@ -167,12 +172,6 @@ private: uint64_t m_flushed_sync_gen = 0; bool m_persist_on_write_until_flush = true; - /* True if it's safe to complete a user request in persist-on-flush - * mode before the write is persisted. This is only true if there is - * a local copy of the write data, or if local write failure always - * causes local node failure. */ - bool m_persist_on_flush_early_user_comp = false; /* Assume local write failure does not cause node failure */ - bool m_persist_on_flush = false; /* If false, persist each write before completion */ bool m_flush_seen = false; AsyncOpTracker m_async_op_tracker; @@ -187,13 +186,11 @@ private: mutable ceph::mutex m_deferred_dispatch_lock; /* Hold m_log_append_lock while appending or retiring log entries. */ mutable ceph::mutex m_log_append_lock; - /* Used for most synchronization */ mutable ceph::mutex m_lock; + /* Used in release/detain to make BlockGuard preserve submission order */ mutable ceph::mutex m_blockguard_lock; - /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */ - mutable ceph::mutex m_entry_bl_lock; /* Use m_blockguard_lock for the following 3 things */ rwl::WriteLogGuard::BlockOperations m_awaiting_barrier; @@ -203,19 +200,23 @@ private: bool m_appending = false; bool m_dispatching_deferred_ops = false; - GenericLogOperationsT m_ops_to_flush; /* Write ops needing flush in local log */ - GenericLogOperationsT m_ops_to_append; /* Write ops needing event append in local log */ + rwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */ + rwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */ /* New entries are at the back. Oldest at the front */ rwl::GenericLogEntries m_log_entries; rwl::GenericLogEntries m_dirty_log_entries; + PerfCounters *m_perfcounter = nullptr; + + std::shared_ptr m_current_sync_point = nullptr; + bool m_persist_on_flush = false; /* If false, persist each write before completion */ + /* Writes that have left the block guard, but are waiting for resources */ C_BlockIORequests m_deferred_ios; /* Throttle writes concurrently allocating & replicating */ - unsigned int m_free_lanes = MAX_CONCURRENT_WRITES; + unsigned int m_free_lanes = rwl::MAX_CONCURRENT_WRITES; unsigned int m_unpublished_reserves = 0; - PerfCounters *m_perfcounter = nullptr; /* Initialized from config, then set false during shutdown */ std::atomic m_periodic_stats_enabled = {false}; @@ -234,26 +235,20 @@ private: void rwl_init(Context *on_finish, rwl::DeferredContexts &later); void update_image_cache_state(Context *on_finish); - void start_workers(); void wake_up(); void dispatch_deferred_writes(void); - void release_write_lanes(C_WriteRequestT *write_req); void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req); void append_scheduled_ops(void); void enlist_op_appender(); - void schedule_append(GenericLogOperationsVectorT &ops); - void schedule_append(GenericLogOperationsT &ops); + void schedule_append(rwl::GenericLogOperations &ops); void flush_then_append_scheduled_ops(void); void enlist_op_flusher(); - void schedule_flush_and_append(GenericLogOperationsVectorT &ops); - template - void flush_pmem_buffer(V& ops); - void alloc_op_log_entries(GenericLogOperationsT &ops); - void flush_op_log_entries(GenericLogOperationsVectorT &ops); - int append_op_log_entries(GenericLogOperationsT &ops); - void complete_op_log_entries(GenericLogOperationsT &&ops, const int r); - void schedule_complete_op_log_entries(GenericLogOperationsT &&ops, const int r); + void alloc_op_log_entries(rwl::GenericLogOperations &ops); + void flush_op_log_entries(rwl::GenericLogOperationsVector &ops); + int append_op_log_entries(rwl::GenericLogOperations &ops); + void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r); + void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r); }; } // namespace cache diff --git a/src/librbd/cache/rwl/ImageCacheState.cc b/src/librbd/cache/rwl/ImageCacheState.cc index cc5819a34f2..dcf1d7e7901 100644 --- a/src/librbd/cache/rwl/ImageCacheState.cc +++ b/src/librbd/cache/rwl/ImageCacheState.cc @@ -9,7 +9,7 @@ #include "common/config_proxy.h" #include "common/ceph_json.h" -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::ImageCacheState: " << this << " " \ << __func__ << ": " diff --git a/src/librbd/cache/rwl/LogEntry.cc b/src/librbd/cache/rwl/LogEntry.cc index 50bf5a9a1c0..16858b50537 100644 --- a/src/librbd/cache/rwl/LogEntry.cc +++ b/src/librbd/cache/rwl/LogEntry.cc @@ -4,7 +4,7 @@ #include #include "LogEntry.h" -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::LogEntry: " << this << " " \ << __func__ << ": " @@ -15,26 +15,6 @@ namespace cache { namespace rwl { -bool GenericLogEntry::is_sync_point() { - return ram_entry.is_sync_point(); -} - -bool GenericLogEntry::is_discard() { - return ram_entry.is_discard(); -} - -bool GenericLogEntry::is_writesame() { - return ram_entry.is_writesame(); -} - -bool GenericLogEntry::is_write() { - return ram_entry.is_write(); -} - -bool GenericLogEntry::is_writer() { - return ram_entry.is_writer(); -} - std::ostream& GenericLogEntry::format(std::ostream &os) const { os << "ram_entry=[" << ram_entry << "], " << "pmem_entry=" << (void*)pmem_entry << ", " @@ -66,7 +46,7 @@ std::ostream &operator<<(std::ostream &os, return entry.format(os); } -std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const { +std::ostream& GenericWriteLogEntry::format(std::ostream &os) const { GenericLogEntry::format(os); os << ", " << "sync_point_entry=["; @@ -83,19 +63,38 @@ std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const { }; std::ostream &operator<<(std::ostream &os, - const GeneralWriteLogEntry &entry) { + const GenericWriteLogEntry &entry) { return entry.format(os); } +void WriteLogEntry::init(bool has_data, std::vector::iterator allocation, + uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush) { + ram_entry.has_data = 1; + ram_entry.write_data = allocation->buffer_oid; + ceph_assert(!TOID_IS_NULL(ram_entry.write_data)); + pmem_buffer = D_RW(ram_entry.write_data); + ram_entry.sync_gen_number = current_sync_gen; + if (persist_on_flush) { + /* Persist on flush. Sequence #0 is never used. */ + ram_entry.write_sequence_number = 0; + } else { + /* Persist on write */ + ram_entry.write_sequence_number = last_op_sequence_num; + ram_entry.sequenced = 1; + } + ram_entry.sync_point = 0; + ram_entry.discard = 0; +} + void WriteLogEntry::init_pmem_bp() { - assert(!pmem_bp.have_raw()); + ceph_assert(!pmem_bp.have_raw()); pmem_bp = buffer::ptr(buffer::create_static(this->write_bytes(), (char*)pmem_buffer)); } void WriteLogEntry::init_pmem_bl() { pmem_bl.clear(); init_pmem_bp(); - assert(pmem_bp.have_raw()); + ceph_assert(pmem_bp.have_raw()); int before_bl = pmem_bp.raw_nref(); this->init_bl(pmem_bp, pmem_bl); int after_bl = pmem_bp.raw_nref(); @@ -111,9 +110,9 @@ unsigned int WriteLogEntry::reader_count() { } /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */ -buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) { +buffer::list& WriteLogEntry::get_pmem_bl() { if (0 == bl_refs) { - std::lock_guard locker(entry_bl_lock); + std::lock_guard locker(m_entry_bl_lock); if (0 == bl_refs) { init_pmem_bl(); } @@ -123,8 +122,8 @@ buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) { }; /* Constructs a new bl containing copies of pmem_bp */ -void WriteLogEntry::copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl) { - this->get_pmem_bl(entry_bl_lock); +void WriteLogEntry::copy_pmem_bl(bufferlist *out_bl) { + this->get_pmem_bl(); /* pmem_bp is now initialized */ buffer::ptr cloned_bp(pmem_bp.clone()); out_bl->clear(); @@ -133,7 +132,7 @@ void WriteLogEntry::copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl) std::ostream& WriteLogEntry::format(std::ostream &os) const { os << "(Write) "; - GeneralWriteLogEntry::format(os); + GenericWriteLogEntry::format(os); os << ", " << "pmem_buffer=" << (void*)pmem_buffer << ", "; os << "pmem_bp=" << pmem_bp << ", "; diff --git a/src/librbd/cache/rwl/LogEntry.h b/src/librbd/cache/rwl/LogEntry.h index b6d73b3f535..b26fddb366f 100644 --- a/src/librbd/cache/rwl/LogEntry.h +++ b/src/librbd/cache/rwl/LogEntry.h @@ -4,6 +4,7 @@ #ifndef CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H #define CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H +#include "common/ceph_mutex.h" #include "librbd/Utils.h" #include "librbd/cache/rwl/Types.h" #include @@ -14,7 +15,7 @@ namespace cache { namespace rwl { class SyncPointLogEntry; -class GeneralWriteLogEntry; +class GenericWriteLogEntry; class WriteLogEntry; class GenericLogEntry { @@ -29,22 +30,6 @@ public: virtual ~GenericLogEntry() { }; GenericLogEntry(const GenericLogEntry&) = delete; GenericLogEntry &operator=(const GenericLogEntry&) = delete; - virtual unsigned int write_bytes() = 0; - bool is_sync_point(); - bool is_discard(); - bool is_writesame(); - bool is_write(); - bool is_writer(); - virtual const GenericLogEntry* get_log_entry() = 0; - virtual const SyncPointLogEntry* get_sync_point_log_entry() { - return nullptr; - } - virtual const GeneralWriteLogEntry* get_gen_write_log_entry() { - return nullptr; - } - virtual const WriteLogEntry* get_write_log_entry() { - return nullptr; - } virtual std::ostream& format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const GenericLogEntry &entry); @@ -67,36 +52,29 @@ public: ram_entry.sync_gen_number = sync_gen_number; ram_entry.sync_point = 1; }; + ~SyncPointLogEntry() override {}; SyncPointLogEntry(const SyncPointLogEntry&) = delete; SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete; - virtual inline unsigned int write_bytes() { - return 0; - } - const GenericLogEntry* get_log_entry() override { - return get_sync_point_log_entry(); - } - const SyncPointLogEntry* get_sync_point_log_entry() override { - return this; - } std::ostream& format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const SyncPointLogEntry &entry); }; -class GeneralWriteLogEntry : public GenericLogEntry { +class GenericWriteLogEntry : public GenericLogEntry { public: uint32_t referring_map_entries = 0; bool flushing = false; bool flushed = false; /* or invalidated */ std::shared_ptr sync_point_entry; - GeneralWriteLogEntry(std::shared_ptr sync_point_entry, + GenericWriteLogEntry(std::shared_ptr sync_point_entry, const uint64_t image_offset_bytes, const uint64_t write_bytes) : GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(sync_point_entry) { } - GeneralWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes) + GenericWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes) : GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(nullptr) { } - GeneralWriteLogEntry(const GeneralWriteLogEntry&) = delete; - GeneralWriteLogEntry &operator=(const GeneralWriteLogEntry&) = delete; - virtual inline unsigned int write_bytes() { + ~GenericWriteLogEntry() override {}; + GenericWriteLogEntry(const GenericWriteLogEntry&) = delete; + GenericWriteLogEntry &operator=(const GenericWriteLogEntry&) = delete; + inline unsigned int write_bytes() { /* The valid bytes in this ops data buffer. Discard and WS override. */ return ram_entry.write_bytes; }; @@ -104,15 +82,9 @@ public: /* The bytes in the image this op makes dirty. Discard and WS override. */ return write_bytes(); }; - const BlockExtent block_extent() { + BlockExtent block_extent() { return ram_entry.block_extent(); } - const GenericLogEntry* get_log_entry() override { - return get_gen_write_log_entry(); - } - const GeneralWriteLogEntry* get_gen_write_log_entry() override { - return this; - } uint32_t get_map_ref() { return(referring_map_entries); } @@ -120,14 +92,16 @@ public: void dec_map_ref() { referring_map_entries--; } std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, - const GeneralWriteLogEntry &entry); + const GenericWriteLogEntry &entry); }; -class WriteLogEntry : public GeneralWriteLogEntry { +class WriteLogEntry : public GenericWriteLogEntry { protected: buffer::ptr pmem_bp; buffer::list pmem_bl; std::atomic bl_refs = {0}; /* The refs held on pmem_bp by pmem_bl */ + /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */ + mutable ceph::mutex m_entry_bl_lock; void init_pmem_bp(); @@ -142,30 +116,33 @@ public: uint8_t *pmem_buffer = nullptr; WriteLogEntry(std::shared_ptr sync_point_entry, const uint64_t image_offset_bytes, const uint64_t write_bytes) - : GeneralWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) { } + : GenericWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes), + m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::rwl::WriteLogEntry::m_entry_bl_lock", this))) + { } WriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes) - : GeneralWriteLogEntry(nullptr, image_offset_bytes, write_bytes) { } + : GenericWriteLogEntry(nullptr, image_offset_bytes, write_bytes), + m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::rwl::WriteLogEntry::m_entry_bl_lock", this))) + { } + ~WriteLogEntry() override {}; WriteLogEntry(const WriteLogEntry&) = delete; WriteLogEntry &operator=(const WriteLogEntry&) = delete; - const BlockExtent block_extent(); + void init(bool has_data, std::vector::iterator allocation, + uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush); + BlockExtent block_extent(); unsigned int reader_count(); /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */ - buffer::list &get_pmem_bl(ceph::mutex &entry_bl_lock); + buffer::list &get_pmem_bl(); /* Constructs a new bl containing copies of pmem_bp */ - void copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl); - virtual const GenericLogEntry* get_log_entry() override { - return get_write_log_entry(); - } - const WriteLogEntry* get_write_log_entry() override { - return this; - } + void copy_pmem_bl(bufferlist *out_bl); std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const WriteLogEntry &entry); }; -} // namespace rwl -} // namespace cache -} // namespace librbd +} // namespace rwl +} // namespace cache +} // namespace librbd #endif // CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H diff --git a/src/librbd/cache/rwl/LogOperation.cc b/src/librbd/cache/rwl/LogOperation.cc index a7cb581b536..c0c50a50b93 100644 --- a/src/librbd/cache/rwl/LogOperation.cc +++ b/src/librbd/cache/rwl/LogOperation.cc @@ -5,7 +5,7 @@ #include "LogOperation.h" #include "librbd/cache/rwl/Types.h" -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \ << __func__ << ": " @@ -16,13 +16,11 @@ namespace cache { namespace rwl { -template -GenericLogOperation::GenericLogOperation(T &rwl, const utime_t dispatch_time) - : rwl(rwl), dispatch_time(dispatch_time) { +GenericLogOperation::GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter) + : m_perfcounter(perfcounter), dispatch_time(dispatch_time) { } -template -std::ostream& GenericLogOperation::format(std::ostream &os) const { +std::ostream& GenericLogOperation::format(std::ostream &os) const { os << "dispatch_time=[" << dispatch_time << "], " << "buf_persist_time=[" << buf_persist_time << "], " << "buf_persist_comp_time=[" << buf_persist_comp_time << "], " @@ -31,161 +29,144 @@ std::ostream& GenericLogOperation::format(std::ostream &os) const { return os; }; -template std::ostream &operator<<(std::ostream &os, - const GenericLogOperation &op) { + const GenericLogOperation &op) { return op.format(os); } -template -SyncPointLogOperation::SyncPointLogOperation(T &rwl, - std::shared_ptr> sync_point, - const utime_t dispatch_time) - : GenericLogOperation(rwl, dispatch_time), sync_point(sync_point) { +SyncPointLogOperation::SyncPointLogOperation(ceph::mutex &lock, + std::shared_ptr sync_point, + const utime_t dispatch_time, + PerfCounters *perfcounter, + CephContext *cct) + : GenericLogOperation(dispatch_time, perfcounter), m_cct(cct), m_lock(lock), sync_point(sync_point) { } -template -SyncPointLogOperation::~SyncPointLogOperation() { } +SyncPointLogOperation::~SyncPointLogOperation() { } -template -std::ostream &SyncPointLogOperation::format(std::ostream &os) const { +std::ostream &SyncPointLogOperation::format(std::ostream &os) const { os << "(Sync Point) "; - GenericLogOperation::format(os); + GenericLogOperation::format(os); os << ", " << "sync_point=[" << *sync_point << "]"; return os; }; -template std::ostream &operator<<(std::ostream &os, - const SyncPointLogOperation &op) { + const SyncPointLogOperation &op) { return op.format(os); } -template -void SyncPointLogOperation::appending() { +std::vector SyncPointLogOperation::append_sync_point() { std::vector appending_contexts; + std::lock_guard locker(m_lock); + if (!sync_point->appending) { + sync_point->appending = true; + } + appending_contexts.swap(sync_point->on_sync_point_appending); + return appending_contexts; +} + +void SyncPointLogOperation::clear_earlier_sync_point() { + std::lock_guard locker(m_lock); + ceph_assert(sync_point->later_sync_point); + ceph_assert(sync_point->later_sync_point->earlier_sync_point == + sync_point); + sync_point->later_sync_point->earlier_sync_point = nullptr; +} +std::vector SyncPointLogOperation::swap_on_sync_point_persisted() { + std::lock_guard locker(m_lock); + std::vector persisted_contexts; + persisted_contexts.swap(sync_point->on_sync_point_persisted); + return persisted_contexts; +} + +void SyncPointLogOperation::appending() { ceph_assert(sync_point); - { - std::lock_guard locker(rwl.m_lock); - if (!sync_point->m_appending) { - ldout(rwl.m_image_ctx.cct, 20) << "Sync point op=[" << *this - << "] appending" << dendl; - sync_point->m_appending = true; - } - appending_contexts.swap(sync_point->m_on_sync_point_appending); - } + ldout(m_cct, 20) << "Sync point op=[" << *this + << "] appending" << dendl; + auto appending_contexts = append_sync_point(); for (auto &ctx : appending_contexts) { ctx->complete(0); } } -template -void SyncPointLogOperation::complete(int result) { - std::vector persisted_contexts; - +void SyncPointLogOperation::complete(int result) { ceph_assert(sync_point); - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << "Sync point op =[" << *this - << "] completed" << dendl; - } - { - std::lock_guard locker(rwl.m_lock); - /* Remove link from next sync point */ - ceph_assert(sync_point->later_sync_point); - ceph_assert(sync_point->later_sync_point->earlier_sync_point == - sync_point); - sync_point->later_sync_point->earlier_sync_point = nullptr; - } + ldout(m_cct, 20) << "Sync point op =[" << *this + << "] completed" << dendl; + clear_earlier_sync_point(); /* Do append now in case completion occurred before the * normal append callback executed, and to handle * on_append work that was queued after the sync point * entered the appending state. */ appending(); - - { - std::lock_guard locker(rwl.m_lock); - /* The flush request that scheduled this op will be one of these - * contexts */ - persisted_contexts.swap(sync_point->m_on_sync_point_persisted); - // TODO handle flushed sync point in later PRs - } + auto persisted_contexts = swap_on_sync_point_persisted(); for (auto &ctx : persisted_contexts) { ctx->complete(result); } } -template -GeneralWriteLogOperation::GeneralWriteLogOperation(T &rwl, - std::shared_ptr> sync_point, - const utime_t dispatch_time) - : GenericLogOperation(rwl, dispatch_time), +GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr sync_point, + const utime_t dispatch_time, + PerfCounters *perfcounter, + CephContext *cct) + : GenericLogOperation(dispatch_time, perfcounter), m_lock(ceph::make_mutex(util::unique_lock_name( - "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))), sync_point(sync_point) { + "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))), + m_cct(cct), + sync_point(sync_point) { } -template -GeneralWriteLogOperation::~GeneralWriteLogOperation() { } +GenericWriteLogOperation::~GenericWriteLogOperation() { } -template -std::ostream &GeneralWriteLogOperation::format(std::ostream &os) const { - GenericLogOperation::format(os); +std::ostream &GenericWriteLogOperation::format(std::ostream &os) const { + GenericLogOperation::format(os); return os; }; -template std::ostream &operator<<(std::ostream &os, - const GeneralWriteLogOperation &op) { + const GenericWriteLogOperation &op) { return op.format(os); } /* Called when the write log operation is appending and its log position is guaranteed */ -template -void GeneralWriteLogOperation::appending() { +void GenericWriteLogOperation::appending() { Context *on_append = nullptr; - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << dendl; - } + ldout(m_cct, 20) << __func__ << " " << this << dendl; { std::lock_guard locker(m_lock); on_append = on_write_append; on_write_append = nullptr; } if (on_append) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl; - } + ldout(m_cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl; on_append->complete(0); } } /* Called when the write log operation is completed in all log replicas */ -template -void GeneralWriteLogOperation::complete(int result) { +void GenericWriteLogOperation::complete(int result) { appending(); Context *on_persist = nullptr; - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << dendl; - } + ldout(m_cct, 20) << __func__ << " " << this << dendl; { std::lock_guard locker(m_lock); on_persist = on_write_persist; on_write_persist = nullptr; } if (on_persist) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl; - } + ldout(m_cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl; on_persist->complete(result); } } -template -WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set, - uint64_t image_offset_bytes, uint64_t write_bytes) - : GeneralWriteLogOperation(set.rwl, set.sync_point, set.dispatch_time), +WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set, + uint64_t image_offset_bytes, uint64_t write_bytes, + CephContext *cct) + : GenericWriteLogOperation(set.sync_point, set.dispatch_time, set.perfcounter, cct), log_entry(std::make_shared(set.sync_point->log_entry, image_offset_bytes, write_bytes)) { on_write_append = set.extent_ops_appending->new_sub(); on_write_persist = set.extent_ops_persist->new_sub(); @@ -193,13 +174,20 @@ WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set, log_entry->sync_point_entry->bytes += write_bytes; } -template -WriteLogOperation::~WriteLogOperation() { } +WriteLogOperation::~WriteLogOperation() { } -template -std::ostream &WriteLogOperation::format(std::ostream &os) const { +void WriteLogOperation::init(bool has_data, std::vector::iterator allocation, uint64_t current_sync_gen, + uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset, + bool persist_on_flush) { + log_entry->init(has_data, allocation, current_sync_gen, last_op_sequence_num, persist_on_flush); + buffer_alloc = &(*allocation); + bl.substr_of(write_req_bl, buffer_offset, + log_entry->write_bytes()); +} + +std::ostream &WriteLogOperation::format(std::ostream &os) const { os << "(Write) "; - GeneralWriteLogOperation::format(os); + GenericWriteLogOperation::format(os); os << ", "; if (log_entry) { os << "log_entry=[" << *log_entry << "], "; @@ -211,26 +199,48 @@ std::ostream &WriteLogOperation::format(std::ostream &os) const { return os; }; -template std::ostream &operator<<(std::ostream &os, - const WriteLogOperation &op) { + const WriteLogOperation &op) { return op.format(os); } -template -WriteLogOperationSet::WriteLogOperationSet(T &rwl, utime_t dispatched, std::shared_ptr> sync_point, - bool persist_on_flush, Context *on_finish) - : m_on_finish(on_finish), rwl(rwl), - persist_on_flush(persist_on_flush), dispatch_time(dispatched), sync_point(sync_point) { +void WriteLogOperation::complete(int result) { + GenericWriteLogOperation::complete(result); + m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, buf_persist_time - dispatch_time); + utime_t buf_lat = buf_persist_comp_time - buf_persist_time; + m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat); + m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(), + log_entry->ram_entry.write_bytes); + m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, log_append_time - buf_persist_time); +} + +void WriteLogOperation::copy_bl_to_pmem_buffer() { + /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */ + bufferlist::iterator i(&bl); + m_perfcounter->inc(l_librbd_rwl_log_op_bytes, log_entry->write_bytes()); + ldout(m_cct, 20) << bl << dendl; + i.copy((unsigned)log_entry->write_bytes(), (char*)log_entry->pmem_buffer); +} + +void WriteLogOperation::flush_pmem_buf_to_cache(PMEMobjpool *log_pool) { + buf_persist_time = ceph_clock_now(); + pmemobj_flush(log_pool, log_entry->pmem_buffer, log_entry->write_bytes()); +} + +WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr sync_point, + bool persist_on_flush, CephContext *cct, Context *on_finish) + : m_cct(cct), m_on_finish(on_finish), + persist_on_flush(persist_on_flush), + dispatch_time(dispatched), + perfcounter(perfcounter), + sync_point(sync_point) { on_ops_appending = sync_point->prior_log_entries_persisted->new_sub(); on_ops_persist = nullptr; extent_ops_persist = - new C_Gather(rwl.m_image_ctx.cct, + new C_Gather(m_cct, new LambdaContext( [this](int r) { - if (RWL_VERBOSE_LOGGING) { - ldout(this->rwl.m_image_ctx.cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl; - } + ldout(this->m_cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl; if (on_ops_persist) { on_ops_persist->complete(r); } @@ -238,22 +248,18 @@ WriteLogOperationSet::WriteLogOperationSet(T &rwl, utime_t dispatched, std::s })); auto appending_persist_sub = extent_ops_persist->new_sub(); extent_ops_appending = - new C_Gather(rwl.m_image_ctx.cct, + new C_Gather(m_cct, new LambdaContext( [this, appending_persist_sub](int r) { - if (RWL_VERBOSE_LOGGING) { - ldout(this->rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl; - } + ldout(this->m_cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl; on_ops_appending->complete(r); appending_persist_sub->complete(r); })); } -template -WriteLogOperationSet::~WriteLogOperationSet() { } +WriteLogOperationSet::~WriteLogOperationSet() { } -template std::ostream &operator<<(std::ostream &os, - const WriteLogOperationSet &s) { + const WriteLogOperationSet &s) { os << "cell=" << (void*)s.cell << ", " << "extent_ops_appending=[" << s.extent_ops_appending << ", " << "extent_ops_persist=[" << s.extent_ops_persist << "]"; diff --git a/src/librbd/cache/rwl/LogOperation.h b/src/librbd/cache/rwl/LogOperation.h index dc0dd547356..1197e7be798 100644 --- a/src/librbd/cache/rwl/LogOperation.h +++ b/src/librbd/cache/rwl/LogOperation.h @@ -13,200 +13,161 @@ namespace cache { namespace rwl { struct WriteBufferAllocation; -template class WriteLogOperationSet; -template class WriteLogOperation; -template -class GeneralWriteLogOperation; +class GenericWriteLogOperation; -template class SyncPointLogOperation; -template class GenericLogOperation; -template -using GenericLogOperationSharedPtr = std::shared_ptr>; +using GenericLogOperationSharedPtr = std::shared_ptr; -template -using GenericLogOperationsVector = std::vector>; +using GenericLogOperationsVector = std::vector; -template class GenericLogOperation { +protected: + PerfCounters *m_perfcounter = nullptr; public: - T &rwl; utime_t dispatch_time; // When op created utime_t buf_persist_time; // When buffer persist begins utime_t buf_persist_comp_time; // When buffer persist completes utime_t log_append_time; // When log append begins utime_t log_append_comp_time; // When log append completes - GenericLogOperation(T &rwl, const utime_t dispatch_time); + GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter); virtual ~GenericLogOperation() { }; GenericLogOperation(const GenericLogOperation&) = delete; GenericLogOperation &operator=(const GenericLogOperation&) = delete; virtual std::ostream &format(std::ostream &os) const; - template friend std::ostream &operator<<(std::ostream &os, - const GenericLogOperation &op); + const GenericLogOperation &op); virtual const std::shared_ptr get_log_entry() = 0; - virtual const std::shared_ptr get_sync_point_log_entry() { - return nullptr; - } - virtual const std::shared_ptr get_gen_write_log_entry() { - return nullptr; - } - virtual const std::shared_ptr get_write_log_entry() { - return nullptr; - } virtual void appending() = 0; virtual void complete(int r) = 0; - virtual bool is_write() { - return false; - } - virtual bool is_sync_point() { + virtual void mark_log_entry_completed() {}; + virtual bool reserved_allocated() { return false; } - virtual bool is_discard() { - return false; - } - virtual bool is_writesame() { - return false; - } - virtual bool is_writing_op() { - return false; - } - virtual GeneralWriteLogOperation *get_gen_write_op() { - return nullptr; - }; - virtual WriteLogOperation *get_write_op() { - return nullptr; - }; + virtual void copy_bl_to_pmem_buffer() {}; + virtual void flush_pmem_buf_to_cache(PMEMobjpool *log_pool) {}; }; -template -class SyncPointLogOperation : public GenericLogOperation { +class SyncPointLogOperation : public GenericLogOperation { +private: + CephContext *m_cct; + ceph::mutex &m_lock; + std::vector append_sync_point(); + void clear_earlier_sync_point(); + std::vector swap_on_sync_point_persisted(); public: - using GenericLogOperation::rwl; - std::shared_ptr> sync_point; - SyncPointLogOperation(T &rwl, - std::shared_ptr> sync_point, - const utime_t dispatch_time); - ~SyncPointLogOperation(); + std::shared_ptr sync_point; + SyncPointLogOperation(ceph::mutex &lock, + std::shared_ptr sync_point, + const utime_t dispatch_time, + PerfCounters *perfcounter, + CephContext *cct); + ~SyncPointLogOperation() override; SyncPointLogOperation(const SyncPointLogOperation&) = delete; SyncPointLogOperation &operator=(const SyncPointLogOperation&) = delete; std::ostream &format(std::ostream &os) const; - template friend std::ostream &operator<<(std::ostream &os, - const SyncPointLogOperation &op); - const std::shared_ptr get_log_entry() { - return get_sync_point_log_entry(); - } - const std::shared_ptr get_sync_point_log_entry() { + const SyncPointLogOperation &op); + const std::shared_ptr get_log_entry() override { return sync_point->log_entry; } - bool is_sync_point() { - return true; - } - void appending(); - void complete(int r); + void appending() override; + void complete(int r) override; }; -template -class GeneralWriteLogOperation : public GenericLogOperation { +class GenericWriteLogOperation : public GenericLogOperation { protected: ceph::mutex m_lock; + CephContext *m_cct; public: - using GenericLogOperation::rwl; - std::shared_ptr> sync_point; + std::shared_ptr sync_point; Context *on_write_append = nullptr; /* Completion for things waiting on this * write's position in the log to be * guaranteed */ Context *on_write_persist = nullptr; /* Completion for things waiting on this * write to persist */ - GeneralWriteLogOperation(T &rwl, - std::shared_ptr> sync_point, - const utime_t dispatch_time); - ~GeneralWriteLogOperation(); - GeneralWriteLogOperation(const GeneralWriteLogOperation&) = delete; - GeneralWriteLogOperation &operator=(const GeneralWriteLogOperation&) = delete; + GenericWriteLogOperation(std::shared_ptr sync_point, + const utime_t dispatch_time, + PerfCounters *perfcounter, + CephContext *cct); + ~GenericWriteLogOperation() override; + GenericWriteLogOperation(const GenericWriteLogOperation&) = delete; + GenericWriteLogOperation &operator=(const GenericWriteLogOperation&) = delete; std::ostream &format(std::ostream &os) const; - template friend std::ostream &operator<<(std::ostream &os, - const GeneralWriteLogOperation &op); - GeneralWriteLogOperation *get_gen_write_op() { - return this; + const GenericWriteLogOperation &op); + void mark_log_entry_completed() override{ + sync_point->log_entry->writes_completed++; } - bool is_writing_op() { + bool reserved_allocated() override { return true; } - void appending(); - void complete(int r); + void appending() override; + void complete(int r) override; }; -template -class WriteLogOperation : public GeneralWriteLogOperation { +class WriteLogOperation : public GenericWriteLogOperation { public: - using GenericLogOperation::rwl; - using GeneralWriteLogOperation::m_lock; - using GeneralWriteLogOperation::sync_point; - using GeneralWriteLogOperation::on_write_append; - using GeneralWriteLogOperation::on_write_persist; + using GenericWriteLogOperation::m_lock; + using GenericWriteLogOperation::sync_point; + using GenericWriteLogOperation::on_write_append; + using GenericWriteLogOperation::on_write_persist; std::shared_ptr log_entry; bufferlist bl; WriteBufferAllocation *buffer_alloc = nullptr; - WriteLogOperation(WriteLogOperationSet &set, const uint64_t image_offset_bytes, const uint64_t write_bytes); - ~WriteLogOperation(); + WriteLogOperation(WriteLogOperationSet &set, const uint64_t image_offset_bytes, + const uint64_t write_bytes, CephContext *cct); + ~WriteLogOperation() override; WriteLogOperation(const WriteLogOperation&) = delete; WriteLogOperation &operator=(const WriteLogOperation&) = delete; + void init(bool has_data, std::vector::iterator allocation, uint64_t current_sync_gen, + uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset, + bool persist_on_flush); std::ostream &format(std::ostream &os) const; - template friend std::ostream &operator<<(std::ostream &os, - const WriteLogOperation &op); - const std::shared_ptr get_log_entry() { - return get_write_log_entry(); - } - const std::shared_ptr get_write_log_entry() { + const WriteLogOperation &op); + const std::shared_ptr get_log_entry() override { return log_entry; } - WriteLogOperation *get_write_op() override { - return this; - } - bool is_write() { - return true; - } + + void complete(int r) override; + void copy_bl_to_pmem_buffer() override; + void flush_pmem_buf_to_cache(PMEMobjpool *log_pool) override; }; -template class WriteLogOperationSet { private: + CephContext *m_cct; Context *m_on_finish; public: - T &rwl; bool persist_on_flush; BlockGuardCell *cell; C_Gather *extent_ops_appending; Context *on_ops_appending; C_Gather *extent_ops_persist; Context *on_ops_persist; - GenericLogOperationsVector operations; + GenericLogOperationsVector operations; utime_t dispatch_time; /* When set created */ - std::shared_ptr> sync_point; - WriteLogOperationSet(T &rwl, const utime_t dispatched, std::shared_ptr> sync_point, - const bool persist_on_flush, Context *on_finish); + PerfCounters *perfcounter = nullptr; + std::shared_ptr sync_point; + WriteLogOperationSet(const utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr sync_point, + const bool persist_on_flush, CephContext *cct, Context *on_finish); ~WriteLogOperationSet(); WriteLogOperationSet(const WriteLogOperationSet&) = delete; WriteLogOperationSet &operator=(const WriteLogOperationSet&) = delete; - template friend std::ostream &operator<<(std::ostream &os, - const WriteLogOperationSet &s); + const WriteLogOperationSet &s); }; -} // namespace rwl -} // namespace cache -} // namespace librbd +} // namespace rwl +} // namespace cache +} // namespace librbd #endif // CEPH_LIBRBD_CACHE_RWL_LOG_OPERATION_H diff --git a/src/librbd/cache/rwl/Request.cc b/src/librbd/cache/rwl/Request.cc index c16f3363113..6dd46fe8e07 100644 --- a/src/librbd/cache/rwl/Request.cc +++ b/src/librbd/cache/rwl/Request.cc @@ -5,7 +5,7 @@ #include "librbd/BlockGuard.h" #include "librbd/cache/rwl/LogEntry.h" -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \ << __func__ << ": " @@ -14,129 +14,90 @@ namespace librbd { namespace cache { namespace rwl { -typedef std::list> GeneralWriteLogEntries; +typedef std::list> GenericWriteLogEntries; template -C_GuardedBlockIORequest::C_GuardedBlockIORequest(T &rwl) - : rwl(rwl) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 99) << this << dendl; - } +C_BlockIORequest::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents, + bufferlist&& bl, const int fadvise_flags, Context *user_req) + : rwl(rwl), image_extents(std::move(extents)), + bl(std::move(bl)), fadvise_flags(fadvise_flags), + user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) { + ldout(rwl.get_context(), 99) << this << dendl; } template -C_GuardedBlockIORequest::~C_GuardedBlockIORequest() { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 99) << this << dendl; - } +C_BlockIORequest::~C_BlockIORequest() { + ldout(rwl.get_context(), 99) << this << dendl; ceph_assert(m_cell_released || !m_cell); } template -void C_GuardedBlockIORequest::set_cell(BlockGuardCell *cell) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << cell << dendl; - } +std::ostream &operator<<(std::ostream &os, + const C_BlockIORequest &req) { + os << "image_extents=[" << req.image_extents << "], " + << "image_extents_summary=[" << req.image_extents_summary << "], " + << "bl=" << req.bl << ", " + << "user_req=" << req.user_req << ", " + << "m_user_req_completed=" << req.m_user_req_completed << ", " + << "m_deferred=" << req.m_deferred << ", " + << "detained=" << req.detained << ", " + << "waited_lanes=" << req.waited_lanes << ", " + << "waited_entries=" << req.waited_entries << ", " + << "waited_buffers=" << req.waited_buffers << ""; + return os; +} + +template +void C_BlockIORequest::set_cell(BlockGuardCell *cell) { + ldout(rwl.get_context(), 20) << this << " cell=" << cell << dendl; ceph_assert(cell); ceph_assert(!m_cell); m_cell = cell; } template -BlockGuardCell *C_GuardedBlockIORequest::get_cell(void) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl; - } +BlockGuardCell *C_BlockIORequest::get_cell(void) { + ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl; return m_cell; } template -void C_GuardedBlockIORequest::release_cell() { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl; - } +void C_BlockIORequest::release_cell() { + ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl; ceph_assert(m_cell); bool initial = false; if (m_cell_released.compare_exchange_strong(initial, true)) { rwl.release_guarded_request(m_cell); } else { - ldout(rwl.m_image_ctx.cct, 5) << "cell " << m_cell << " already released for " << this << dendl; - } -} - -template -C_BlockIORequest::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents, - bufferlist&& bl, const int fadvise_flags, Context *user_req) - : C_GuardedBlockIORequest(rwl), image_extents(std::move(extents)), - bl(std::move(bl)), fadvise_flags(fadvise_flags), - user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 99) << this << dendl; - } - /* Remove zero length image extents from input */ - for (auto it = image_extents.begin(); it != image_extents.end(); ) { - if (0 == it->second) { - it = image_extents.erase(it); - continue; - } - ++it; - } -} - -template -C_BlockIORequest::~C_BlockIORequest() { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + ldout(rwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl; } } -template -std::ostream &operator<<(std::ostream &os, - const C_BlockIORequest &req) { - os << "image_extents=[" << req.image_extents << "], " - << "image_extents_summary=[" << req.image_extents_summary << "], " - << "bl=" << req.bl << ", " - << "user_req=" << req.user_req << ", " - << "m_user_req_completed=" << req.m_user_req_completed << ", " - << "m_deferred=" << req.m_deferred << ", " - << "detained=" << req.detained << ", " - << "m_waited_lanes=" << req.m_waited_lanes << ", " - << "m_waited_entries=" << req.m_waited_entries << ", " - << "m_waited_buffers=" << req.m_waited_buffers << ""; - return os; - } - template void C_BlockIORequest::complete_user_request(int r) { bool initial = false; if (m_user_req_completed.compare_exchange_strong(initial, true)) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 15) << this << " completing user req" << dendl; - } + ldout(rwl.get_context(), 15) << this << " completing user req" << dendl; m_user_req_completed_time = ceph_clock_now(); user_req->complete(r); // Set user_req as null as it is deleted user_req = nullptr; } else { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << this << " user req already completed" << dendl; - } + ldout(rwl.get_context(), 20) << this << " user req already completed" << dendl; } } template void C_BlockIORequest::finish(int r) { - ldout(rwl.m_image_ctx.cct, 20) << this << dendl; + ldout(rwl.get_context(), 20) << this << dendl; complete_user_request(r); bool initial = false; if (m_finish_called.compare_exchange_strong(initial, true)) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 15) << this << " finishing" << dendl; - } + ldout(rwl.get_context(), 15) << this << " finishing" << dendl; finish_req(0); } else { - ldout(rwl.m_image_ctx.cct, 20) << this << " already finished" << dendl; + ldout(rwl.get_context(), 20) << this << " already finished" << dendl; ceph_assert(0); } } @@ -151,93 +112,154 @@ void C_BlockIORequest::deferred() { template C_WriteRequest::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents, - bufferlist&& bl, const int fadvise_flags, Context *user_req) - : C_BlockIORequest(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 99) << this << dendl; - } + bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) + : C_BlockIORequest(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req), + m_lock(lock), m_perfcounter(perfcounter) { + ldout(rwl.get_context(), 99) << this << dendl; } template C_WriteRequest::~C_WriteRequest() { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 99) << this << dendl; - } + ldout(rwl.get_context(), 99) << this << dendl; } template std::ostream &operator<<(std::ostream &os, - const C_WriteRequest &req) { + const C_WriteRequest &req) { os << (C_BlockIORequest&)req - << " m_resources.allocated=" << req.resources.allocated; - if (req.m_op_set) { - os << "m_op_set=" << *req.m_op_set; + << " m_resources.allocated=" << req.m_resources.allocated; + if (req.op_set) { + os << "op_set=" << *req.op_set; } return os; }; template void C_WriteRequest::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.m_cell << dendl; - } + ldout(rwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl; - ceph_assert(guard_ctx.m_cell); - this->detained = guard_ctx.m_state.detained; /* overlapped */ - this->m_queued = guard_ctx.m_state.queued; /* queued behind at least one barrier */ - this->set_cell(guard_ctx.m_cell); + ceph_assert(guard_ctx.cell); + this->detained = guard_ctx.state.detained; /* overlapped */ + this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */ + this->set_cell(guard_ctx.cell); } template void C_WriteRequest::finish_req(int r) { - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; - } + ldout(rwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; /* Completed to caller by here (in finish(), which calls this) */ utime_t now = ceph_clock_now(); rwl.release_write_lanes(this); + ceph_assert(m_resources.allocated); + m_resources.allocated = false; this->release_cell(); /* TODO: Consider doing this in appending state */ update_req_stats(now); } template -void C_WriteRequest::setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied) { +void C_WriteRequest::setup_buffer_resources( + uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated, + uint64_t &number_lanes, uint64_t &number_log_entries, + uint64_t &number_unpublished_reserves) { + + ceph_assert(!m_resources.allocated); + + auto image_extents_size = this->image_extents.size(); + m_resources.buffers.reserve(image_extents_size); + + bytes_cached = 0; + bytes_allocated = 0; + number_lanes = image_extents_size; + number_log_entries = image_extents_size; + number_unpublished_reserves = image_extents_size; + for (auto &extent : this->image_extents) { - resources.buffers.emplace_back(); - struct WriteBufferAllocation &buffer = resources.buffers.back(); + m_resources.buffers.emplace_back(); + struct WriteBufferAllocation &buffer = m_resources.buffers.back(); buffer.allocation_size = MIN_WRITE_ALLOC_SIZE; buffer.allocated = false; bytes_cached += extent.second; if (extent.second > buffer.allocation_size) { buffer.allocation_size = extent.second; } + bytes_allocated += buffer.allocation_size; } bytes_dirtied = bytes_cached; } template void C_WriteRequest::setup_log_operations() { - for (auto &extent : this->image_extents) { - /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */ - auto operation = - std::make_shared>(*m_op_set, extent.first, extent.second); - m_op_set->operations.emplace_back(operation); + { + std::lock_guard locker(m_lock); + // TODO: Add sync point if necessary + std::shared_ptr current_sync_point = rwl.get_current_sync_point(); + uint64_t current_sync_gen = rwl.get_current_sync_gen(); + op_set = + make_unique(this->m_dispatched_time, + m_perfcounter, + current_sync_point, + rwl.get_persist_on_flush(), + rwl.get_context(), this); + ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() << dendl; + ceph_assert(m_resources.allocated); + /* op_set->operations initialized differently for plain write or write same */ + auto allocation = m_resources.buffers.begin(); + uint64_t buffer_offset = 0; + for (auto &extent : this->image_extents) { + /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */ + auto operation = + std::make_shared(*op_set, extent.first, extent.second, rwl.get_context()); + op_set->operations.emplace_back(operation); + + /* A WS is also a write */ + ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() + << " operation=" << operation << dendl; + rwl.inc_last_op_sequence_num(); + operation->init(true, allocation, current_sync_gen, + rwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush); + buffer_offset += operation->log_entry->write_bytes(); + ldout(rwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl; + allocation++; + } + } + /* All extent ops subs created */ + op_set->extent_ops_appending->activate(); + op_set->extent_ops_persist->activate(); + + /* Write data */ + for (auto &operation : op_set->operations) { + operation->copy_bl_to_pmem_buffer(); } } +template +bool C_WriteRequest::append_write_request(std::shared_ptr sync_point) { + std::lock_guard locker(m_lock); + auto write_req_sp = this; + if (sync_point->earlier_sync_point) { + Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) { + write_req_sp->schedule_append(); + }); + sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx); + return true; + } + return false; +} + template void C_WriteRequest::schedule_append() { ceph_assert(++m_appended == 1); if (m_do_early_flush) { /* This caller is waiting for persist, so we'll use their thread to * expedite it */ - rwl.flush_pmem_buffer(this->m_op_set->operations); - rwl.schedule_append(this->m_op_set->operations); + rwl.flush_pmem_buffer(this->op_set->operations); + rwl.schedule_append(this->op_set->operations); } else { /* This is probably not still the caller's thread, so do the payload * flushing/replicating later. */ - rwl.schedule_flush_and_append(this->m_op_set->operations); + rwl.schedule_flush_and_append(this->op_set->operations); } } @@ -250,274 +272,49 @@ void C_WriteRequest::schedule_append() { * Lanes are released after the write persists via release_write_lanes() */ template -bool C_WriteRequest::alloc_resources() -{ - bool alloc_succeeds = true; - bool no_space = false; - utime_t alloc_start = ceph_clock_now(); - uint64_t bytes_allocated = 0; - uint64_t bytes_cached = 0; - uint64_t bytes_dirtied = 0; - - ceph_assert(!resources.allocated); - resources.buffers.reserve(this->image_extents.size()); - { - std::lock_guard locker(rwl.m_lock); - if (rwl.m_free_lanes < this->image_extents.size()) { - this->m_waited_lanes = true; - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << "not enough free lanes (need " - << this->image_extents.size() - << ", have " << rwl.m_free_lanes << ") " - << *this << dendl; - } - alloc_succeeds = false; - /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ - } - if (rwl.m_free_log_entries < this->image_extents.size()) { - this->m_waited_entries = true; - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << "not enough free entries (need " - << this->image_extents.size() - << ", have " << rwl.m_free_log_entries << ") " - << *this << dendl; - } - alloc_succeeds = false; - no_space = true; /* Entries must be retired */ - } - /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ - if (rwl.m_bytes_allocated > rwl.m_bytes_allocated_cap) { - if (!this->m_waited_buffers) { - this->m_waited_buffers = true; - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" << rwl.m_bytes_allocated_cap - << ", allocated=" << rwl.m_bytes_allocated - << ") in write [" << *this << "]" << dendl; - } - } - alloc_succeeds = false; - no_space = true; /* Entries must be retired */ - } - } - if (alloc_succeeds) { - setup_buffer_resources(bytes_cached, bytes_dirtied); - } - - if (alloc_succeeds) { - for (auto &buffer : resources.buffers) { - bytes_allocated += buffer.allocation_size; - utime_t before_reserve = ceph_clock_now(); - buffer.buffer_oid = pmemobj_reserve(rwl.m_log_pool, - &buffer.buffer_alloc_action, - buffer.allocation_size, - 0 /* Object type */); - buffer.allocation_lat = ceph_clock_now() - before_reserve; - if (TOID_IS_NULL(buffer.buffer_oid)) { - if (!this->m_waited_buffers) { - this->m_waited_buffers = true; - } - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 5) << "can't allocate all data buffers: " - << pmemobj_errormsg() << ". " - << *this << dendl; - } - alloc_succeeds = false; - no_space = true; /* Entries need to be retired */ - break; - } else { - buffer.allocated = true; - } - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo - << "." << buffer.buffer_oid.oid.off - << ", size=" << buffer.allocation_size << dendl; - } - } - } - - if (alloc_succeeds) { - unsigned int num_extents = this->image_extents.size(); - std::lock_guard locker(rwl.m_lock); - /* We need one free log entry per extent (each is a separate entry), and - * one free "lane" for remote replication. */ - if ((rwl.m_free_lanes >= num_extents) && - (rwl.m_free_log_entries >= num_extents)) { - rwl.m_free_lanes -= num_extents; - rwl.m_free_log_entries -= num_extents; - rwl.m_unpublished_reserves += num_extents; - rwl.m_bytes_allocated += bytes_allocated; - rwl.m_bytes_cached += bytes_cached; - rwl.m_bytes_dirty += bytes_dirtied; - resources.allocated = true; - } else { - alloc_succeeds = false; - } - } - - if (!alloc_succeeds) { - /* On alloc failure, free any buffers we did allocate */ - for (auto &buffer : resources.buffers) { - if (buffer.allocated) { - pmemobj_cancel(rwl.m_log_pool, &buffer.buffer_alloc_action, 1); - } - } - resources.buffers.clear(); - if (no_space) { - /* Expedite flushing and/or retiring */ - std::lock_guard locker(rwl.m_lock); - rwl.m_alloc_failed_since_retire = true; - rwl.m_last_alloc_fail = ceph_clock_now(); - } - } - - this->m_allocated_time = alloc_start; - return alloc_succeeds; +bool C_WriteRequest::alloc_resources() { + this->allocated_time = ceph_clock_now(); + return rwl.alloc_resources(this); } /** * Takes custody of write_req. Resources must already be allocated. * * Locking: - * Acquires m_lock + * Acquires lock */ template void C_WriteRequest::dispatch() { - CephContext *cct = rwl.m_image_ctx.cct; - GeneralWriteLogEntries log_entries; - DeferredContexts on_exit; + CephContext *cct = rwl.get_context(); utime_t now = ceph_clock_now(); - auto write_req_sp = this; this->m_dispatched_time = now; - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 15) << "name: " << rwl.m_image_ctx.name << " id: " << rwl.m_image_ctx.id - << "write_req=" << this << " cell=" << this->get_cell() << dendl; - } - - { - uint64_t buffer_offset = 0; - std::lock_guard locker(rwl.m_lock); - Context *set_complete = this; - // TODO: Add sync point if necessary - // - m_op_set = - make_unique>(rwl, now, rwl.m_current_sync_point, rwl.m_persist_on_flush, - set_complete); - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() << dendl; - } - ceph_assert(resources.allocated); - /* m_op_set->operations initialized differently for plain write or write same */ - this->setup_log_operations(); - auto allocation = resources.buffers.begin(); - for (auto &gen_op : m_op_set->operations) { - /* A WS is also a write */ - auto operation = gen_op->get_write_op(); - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() - << " operation=" << operation << dendl; - } - log_entries.emplace_back(operation->log_entry); - rwl.m_perfcounter->inc(l_librbd_rwl_log_ops, 1); - - operation->log_entry->ram_entry.has_data = 1; - operation->log_entry->ram_entry.write_data = allocation->buffer_oid; - // TODO: make std::shared_ptr - operation->buffer_alloc = &(*allocation); - ceph_assert(!TOID_IS_NULL(operation->log_entry->ram_entry.write_data)); - operation->log_entry->pmem_buffer = D_RW(operation->log_entry->ram_entry.write_data); - operation->log_entry->ram_entry.sync_gen_number = rwl.m_current_sync_gen; - if (m_op_set->persist_on_flush) { - /* Persist on flush. Sequence #0 is never used. */ - operation->log_entry->ram_entry.write_sequence_number = 0; - } else { - /* Persist on write */ - operation->log_entry->ram_entry.write_sequence_number = ++rwl.m_last_op_sequence_num; - operation->log_entry->ram_entry.sequenced = 1; - } - operation->log_entry->ram_entry.sync_point = 0; - operation->log_entry->ram_entry.discard = 0; - operation->bl.substr_of(this->bl, buffer_offset, - operation->log_entry->write_bytes()); - buffer_offset += operation->log_entry->write_bytes(); - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << "operation=[" << *operation << "]" << dendl; - } - allocation++; - } - } - /* All extent ops subs created */ - m_op_set->extent_ops_appending->activate(); - m_op_set->extent_ops_persist->activate(); - - /* Write data */ - for (auto &operation : m_op_set->operations) { - /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */ - auto write_op = operation->get_write_op(); - ceph_assert(write_op != nullptr); - bufferlist::iterator i(&write_op->bl); - rwl.m_perfcounter->inc(l_librbd_rwl_log_op_bytes, write_op->log_entry->write_bytes()); - if (RWL_VERBOSE_LOGGING) { - ldout(cct, 20) << write_op->bl << dendl; - } - i.copy((unsigned)write_op->log_entry->write_bytes(), (char*)write_op->log_entry->pmem_buffer); - } - - // TODO: Add to log map for read - - /* - * Entries are added to m_log_entries in alloc_op_log_entries() when their - * order is established. They're added to m_dirty_log_entries when the write - * completes to all replicas. They must not be flushed before then. We don't - * prevent the application from reading these before they persist. If we - * supported coherent shared access, that might be a problem (the write could - * fail after another initiator had read it). As it is the cost of running - * reads through the block guard (and exempting them from the barrier, which - * doesn't need to apply to them) to prevent reading before the previous - * write of that data persists doesn't seem justified. - */ - - if (rwl.m_persist_on_flush_early_user_comp && - m_op_set->persist_on_flush) { - /* - * We're done with the caller's buffer, and not guaranteeing - * persistence until the next flush. The block guard for this - * write_req will not be released until the write is persisted - * everywhere, but the caller's request can complete now. - */ - this->complete_user_request(0); - } + ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; + setup_log_operations(); bool append_deferred = false; - { - std::lock_guard locker(rwl.m_lock); - if (!m_op_set->persist_on_flush && - m_op_set->sync_point->earlier_sync_point) { - /* In persist-on-write mode, we defer the append of this write until the - * previous sync point is appending (meaning all the writes before it are - * persisted and that previous sync point can now appear in the - * log). Since we insert sync points in persist-on-write mode when writes - * have already completed to the current sync point, this limits us to - * one inserted sync point in flight at a time, and gives the next - * inserted sync point some time to accumulate a few writes if they - * arrive soon. Without this we can insert an absurd number of sync - * points, each with one or two writes. That uses a lot of log entries, - * and limits flushing to very few writes at a time. */ - m_do_early_flush = false; - Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) { - write_req_sp->schedule_append(); - }); - m_op_set->sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx); - append_deferred = true; - } else { - /* The prior sync point is done, so we'll schedule append here. If this is - * persist-on-write, and probably still the caller's thread, we'll use this - * caller's thread to perform the persist & replication of the payload - * buffer. */ - m_do_early_flush = - !(this->detained || this->m_queued || this->m_deferred || m_op_set->persist_on_flush); - } + if (!op_set->persist_on_flush && + append_write_request(op_set->sync_point)) { + /* In persist-on-write mode, we defer the append of this write until the + * previous sync point is appending (meaning all the writes before it are + * persisted and that previous sync point can now appear in the + * log). Since we insert sync points in persist-on-write mode when writes + * have already completed to the current sync point, this limits us to + * one inserted sync point in flight at a time, and gives the next + * inserted sync point some time to accumulate a few writes if they + * arrive soon. Without this we can insert an absurd number of sync + * points, each with one or two writes. That uses a lot of log entries, + * and limits flushing to very few writes at a time. */ + m_do_early_flush = false; + append_deferred = true; + } else { + /* The prior sync point is done, so we'll schedule append here. If this is + * persist-on-write, and probably still the caller's thread, we'll use this + * caller's thread to perform the persist & replication of the payload + * buffer. */ + m_do_early_flush = + !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush); } if (!append_deferred) { this->schedule_append(); @@ -539,18 +336,18 @@ GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::functionm_state=[" << r.guard_ctx->m_state << "], " + os << "guard_ctx->state=[" << r.guard_ctx->state << "], " << "block_extent.block_start=" << r.block_extent.block_start << ", " << "block_extent.block_start=" << r.block_extent.block_end; return os; }; -} // namespace rwl -} // namespace cache -} // namespace librbd +} // namespace rwl +} // namespace cache +} // namespace librbd diff --git a/src/librbd/cache/rwl/Request.h b/src/librbd/cache/rwl/Request.h index 7af0c196e88..595e06c4396 100644 --- a/src/librbd/cache/rwl/Request.h +++ b/src/librbd/cache/rwl/Request.h @@ -17,50 +17,43 @@ namespace rwl { class GuardedRequestFunctionContext; -/** - * A request that can be deferred in a BlockGuard to sequence - * overlapping operations. - */ -template -class C_GuardedBlockIORequest : public Context { -public: - T &rwl; - C_GuardedBlockIORequest(T &rwl); - ~C_GuardedBlockIORequest(); - C_GuardedBlockIORequest(const C_GuardedBlockIORequest&) = delete; - C_GuardedBlockIORequest &operator=(const C_GuardedBlockIORequest&) = delete; - - virtual const char *get_name() const = 0; - void set_cell(BlockGuardCell *cell); - BlockGuardCell *get_cell(void); - void release_cell(); - -private: - std::atomic m_cell_released = {false}; - BlockGuardCell* m_cell = nullptr; +struct WriteRequestResources { + bool allocated = false; + std::vector buffers; }; /** + * A request that can be deferred in a BlockGuard to sequence + * overlapping operations. * This is the custodian of the BlockGuard cell for this IO, and the * state information about the progress of this IO. This object lives * until the IO is persisted in all (live) log replicas. User request * may be completed from here before the IO persists. */ template -class C_BlockIORequest : public C_GuardedBlockIORequest { +class C_BlockIORequest : public Context { public: - using C_GuardedBlockIORequest::rwl; - + T &rwl; io::Extents image_extents; bufferlist bl; int fadvise_flags; Context *user_req; /* User write request */ ExtentsSummary image_extents_summary; bool detained = false; /* Detained in blockguard (overlapped with a prior IO) */ + utime_t allocated_time; /* When allocation began */ + bool waited_lanes = false; /* This IO waited for free persist/replicate lanes */ + bool waited_entries = false; /* This IO waited for free log entries */ + bool waited_buffers = false; /* This IO waited for data buffers (pmemobj_reserve() failed) */ C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents, bufferlist&& bl, const int fadvise_flags, Context *user_req); - virtual ~C_BlockIORequest(); + ~C_BlockIORequest() override; + C_BlockIORequest(const C_BlockIORequest&) = delete; + C_BlockIORequest &operator=(const C_BlockIORequest&) = delete; + + void set_cell(BlockGuardCell *cell); + BlockGuardCell *get_cell(void); + void release_cell(); void complete_user_request(int r); void finish(int r); @@ -74,34 +67,59 @@ public: virtual void dispatch() = 0; - virtual const char *get_name() const override { + virtual const char *get_name() const { return "C_BlockIORequest"; } + uint64_t get_image_extents_size() { + return image_extents.size(); + } + void set_io_waited_for_lanes(bool waited) { + waited_lanes = waited; + } + void set_io_waited_for_entries(bool waited) { + waited_entries = waited; + } + void set_io_waited_for_buffers(bool waited) { + waited_buffers = waited; + } + bool has_io_waited_for_buffers() { + return waited_buffers; + } + std::vector& get_resources_buffers() { + return m_resources.buffers; + } + + void set_allocated(bool allocated) { + if (allocated) { + m_resources.allocated = true; + } else { + m_resources.buffers.clear(); + } + } + + virtual void setup_buffer_resources( + uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated, + uint64_t &number_lanes, uint64_t &number_log_entries, + uint64_t &number_unpublished_reserves) {}; protected: utime_t m_arrived_time; - utime_t m_allocated_time; /* When allocation began */ utime_t m_dispatched_time; /* When dispatch began */ utime_t m_user_req_completed_time; - bool m_waited_lanes = false; /* This IO waited for free persist/replicate lanes */ - bool m_waited_entries = false; /* This IO waited for free log entries */ - bool m_waited_buffers = false; /* This IO waited for data buffers (pmemobj_reserve() failed) */ std::atomic m_deferred = {false}; /* Deferred because this or a prior IO had to wait for write resources */ + WriteRequestResources m_resources; private: std::atomic m_user_req_completed = {false}; std::atomic m_finish_called = {false}; + std::atomic m_cell_released = {false}; + BlockGuardCell* m_cell = nullptr; template friend std::ostream &operator<<(std::ostream &os, const C_BlockIORequest &req); }; -struct WriteRequestResources { - bool allocated = false; - std::vector buffers; -}; - /** * This is the custodian of the BlockGuard cell for this write. Block * guard is not released until the write persists everywhere (this is @@ -112,26 +130,24 @@ template class C_WriteRequest : public C_BlockIORequest { public: using C_BlockIORequest::rwl; - WriteRequestResources resources; + unique_ptr op_set = nullptr; C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents, - bufferlist&& bl, const int fadvise_flags, Context *user_req); + bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req); - ~C_WriteRequest(); + ~C_WriteRequest() override; void blockguard_acquired(GuardedRequestFunctionContext &guard_ctx); /* Common finish to plain write and compare-and-write (if it writes) */ - virtual void finish_req(int r); + void finish_req(int r) override; /* Compare and write will override this */ virtual void update_req_stats(utime_t &now) { // TODO: Add in later PRs } - virtual bool alloc_resources() override; - - /* Plain writes will allocate one buffer per request extent */ - virtual void setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied); + bool alloc_resources() override; void deferred_handler() override { } @@ -139,18 +155,28 @@ public: virtual void setup_log_operations(); + bool append_write_request(std::shared_ptr sync_point); + virtual void schedule_append(); const char *get_name() const override { return "C_WriteRequest"; } +protected: + using C_BlockIORequest::m_resources; + /* Plain writes will allocate one buffer per request extent */ + void setup_buffer_resources( + uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated, + uint64_t &number_lanes, uint64_t &number_log_entries, + uint64_t &number_unpublished_reserves) override; + private: - unique_ptr> m_op_set = nullptr; bool m_do_early_flush = false; std::atomic m_appended = {0}; bool m_queued = false; - + ceph::mutex &m_lock; + PerfCounters *m_perfcounter = nullptr; template friend std::ostream &operator<<(std::ostream &os, const C_WriteRequest &req); @@ -167,10 +193,10 @@ struct BlockGuardReqState { class GuardedRequestFunctionContext : public Context { public: - BlockGuardCell *m_cell = nullptr; - BlockGuardReqState m_state; + BlockGuardCell *cell = nullptr; + BlockGuardReqState state; GuardedRequestFunctionContext(boost::function &&callback); - ~GuardedRequestFunctionContext(void); + ~GuardedRequestFunctionContext(void) override; GuardedRequestFunctionContext(const GuardedRequestFunctionContext&) = delete; GuardedRequestFunctionContext &operator=(const GuardedRequestFunctionContext&) = delete; @@ -187,14 +213,14 @@ public: GuardedRequest(const BlockExtent block_extent, GuardedRequestFunctionContext *on_guard_acquire, bool barrier = false) : block_extent(block_extent), guard_ctx(on_guard_acquire) { - guard_ctx->m_state.barrier = barrier; + guard_ctx->state.barrier = barrier; } friend std::ostream &operator<<(std::ostream &os, const GuardedRequest &r); }; -} // namespace rwl -} // namespace cache -} // namespace librbd +} // namespace rwl +} // namespace cache +} // namespace librbd -#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H +#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H diff --git a/src/librbd/cache/rwl/SyncPoint.cc b/src/librbd/cache/rwl/SyncPoint.cc index c9c4582d8b2..d9573aa0750 100644 --- a/src/librbd/cache/rwl/SyncPoint.cc +++ b/src/librbd/cache/rwl/SyncPoint.cc @@ -3,7 +3,7 @@ #include "SyncPoint.h" -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::SyncPoint: " << this << " " \ << __func__ << ": " @@ -12,39 +12,35 @@ namespace librbd { namespace cache { namespace rwl { -template -SyncPoint::SyncPoint(T &rwl, const uint64_t sync_gen_num) - : rwl(rwl), log_entry(std::make_shared(sync_gen_num)) { - prior_log_entries_persisted = new C_Gather(rwl.m_image_ctx.cct, nullptr); - sync_point_persist = new C_Gather(rwl.m_image_ctx.cct, nullptr); +SyncPoint::SyncPoint(uint64_t sync_gen_num, CephContext *cct) + : log_entry(std::make_shared(sync_gen_num)), m_cct(cct) { + prior_log_entries_persisted = new C_Gather(cct, nullptr); + sync_point_persist = new C_Gather(cct, nullptr); on_sync_point_appending.reserve(MAX_WRITES_PER_SYNC_POINT + 2); on_sync_point_persisted.reserve(MAX_WRITES_PER_SYNC_POINT + 2); - if (RWL_VERBOSE_LOGGING) { - ldout(rwl.m_image_ctx.cct, 20) << "sync point " << sync_gen_num << dendl; - } + ldout(m_cct, 20) << "sync point " << sync_gen_num << dendl; } -template -SyncPoint::~SyncPoint() { +SyncPoint::~SyncPoint() { ceph_assert(on_sync_point_appending.empty()); ceph_assert(on_sync_point_persisted.empty()); ceph_assert(!earlier_sync_point); } -template -std::ostream &SyncPoint::format(std::ostream &os) const { - os << "log_entry=[" << *log_entry << "], " - << "earlier_sync_point=" << earlier_sync_point << ", " - << "later_sync_point=" << later_sync_point << ", " - << "final_op_sequence_num=" << final_op_sequence_num << ", " - << "prior_log_entries_persisted=" << prior_log_entries_persisted << ", " - << "prior_log_entries_persisted_complete=" << prior_log_entries_persisted_complete << ", " - << "append_scheduled=" << append_scheduled << ", " - << "appending=" << appending << ", " - << "on_sync_point_appending=" << on_sync_point_appending.size() << ", " - << "on_sync_point_persisted=" << on_sync_point_persisted.size() << ""; +std::ostream &operator<<(std::ostream &os, + const SyncPoint &p) { + os << "log_entry=[" << *p.log_entry << "], " + << "earlier_sync_point=" << p.earlier_sync_point << ", " + << "later_sync_point=" << p.later_sync_point << ", " + << "final_op_sequence_num=" << p.final_op_sequence_num << ", " + << "prior_log_entries_persisted=" << p.prior_log_entries_persisted << ", " + << "prior_log_entries_persisted_complete=" << p.prior_log_entries_persisted_complete << ", " + << "append_scheduled=" << p.append_scheduled << ", " + << "appending=" << p.appending << ", " + << "on_sync_point_appending=" << p.on_sync_point_appending.size() << ", " + << "on_sync_point_persisted=" << p.on_sync_point_persisted.size() << ""; return os; -}; +} } // namespace rwl } // namespace cache diff --git a/src/librbd/cache/rwl/SyncPoint.h b/src/librbd/cache/rwl/SyncPoint.h index 7dad7204623..da3cd09610e 100644 --- a/src/librbd/cache/rwl/SyncPoint.h +++ b/src/librbd/cache/rwl/SyncPoint.h @@ -11,17 +11,13 @@ namespace librbd { namespace cache { namespace rwl { -/* Limit work between sync points */ -static const uint64_t MAX_WRITES_PER_SYNC_POINT = 256; -template class SyncPoint { public: - T &rwl; std::shared_ptr log_entry; - /* Use m_lock for earlier/later links */ - std::shared_ptr> earlier_sync_point; /* NULL if earlier has completed */ - std::shared_ptr> later_sync_point; + /* Use lock for earlier/later links */ + std::shared_ptr earlier_sync_point; /* NULL if earlier has completed */ + std::shared_ptr later_sync_point; uint64_t final_op_sequence_num = 0; /* A sync point can't appear in the log until all the writes bearing * it and all the prior sync points have been appended and @@ -46,15 +42,15 @@ public: * aio_flush() calls are added to this. */ std::vector on_sync_point_persisted; - SyncPoint(T &rwl, const uint64_t sync_gen_num); + SyncPoint(uint64_t sync_gen_num, CephContext *cct); ~SyncPoint(); SyncPoint(const SyncPoint&) = delete; SyncPoint &operator=(const SyncPoint&) = delete; - std::ostream &format(std::ostream &os) const; + +private: + CephContext *m_cct; friend std::ostream &operator<<(std::ostream &os, - const SyncPoint &p) { - return p.format(os); - } + const SyncPoint &p); }; } // namespace rwl diff --git a/src/librbd/cache/rwl/Types.cc b/src/librbd/cache/rwl/Types.cc index 3a6cfa6a5ea..3ad62176f0d 100644 --- a/src/librbd/cache/rwl/Types.cc +++ b/src/librbd/cache/rwl/Types.cc @@ -6,7 +6,7 @@ #include "common/ceph_context.h" #include "include/Context.h" -#define dout_subsys ceph_subsys_rbd +#define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::Types: " << this << " " \ << __func__ << ": " @@ -35,43 +35,21 @@ void DeferredContexts::add(Context* ctx) { * convert between image and block extents here using a "block size" * of 1. */ -const BlockExtent block_extent(const uint64_t offset_bytes, const uint64_t length_bytes) +BlockExtent convert_to_block_extent(const uint64_t offset_bytes, const uint64_t length_bytes) { return BlockExtent(offset_bytes, offset_bytes + length_bytes - 1); } -const BlockExtent WriteLogPmemEntry::block_extent() { - return BlockExtent(librbd::cache::rwl::block_extent(image_offset_bytes, write_bytes)); +BlockExtent WriteLogPmemEntry::block_extent() { + return convert_to_block_extent(image_offset_bytes, write_bytes); } -bool WriteLogPmemEntry::is_sync_point() { - return sync_point; -} - -bool WriteLogPmemEntry::is_discard() { - return discard; -} - -bool WriteLogPmemEntry::is_writesame() { - return writesame; -} - -bool WriteLogPmemEntry::is_write() { - /* Log entry is a basic write */ - return !is_sync_point() && !is_discard() && !is_writesame(); -} - -bool WriteLogPmemEntry::is_writer() { - /* Log entry is any type that writes data */ - return is_write() || is_discard() || is_writesame(); -} - -const uint64_t WriteLogPmemEntry::get_offset_bytes() { +uint64_t WriteLogPmemEntry::get_offset_bytes() { return image_offset_bytes; } -const uint64_t WriteLogPmemEntry::get_write_bytes() { +uint64_t WriteLogPmemEntry::get_write_bytes() { return write_bytes; } @@ -93,10 +71,9 @@ std::ostream& operator<<(std::ostream& os, }; template -ExtentsSummary::ExtentsSummary(const ExtentsType &extents) { - total_bytes = 0; - first_image_byte = 0; - last_image_byte = 0; +ExtentsSummary::ExtentsSummary(const ExtentsType &extents) + : total_bytes(0), first_image_byte(0), last_image_byte(0) +{ if (extents.empty()) return; /* These extents refer to image offsets between first_image_byte * and last_image_byte, inclusive, but we don't guarantee here @@ -119,7 +96,7 @@ ExtentsSummary::ExtentsSummary(const ExtentsType &extents) { template std::ostream &operator<<(std::ostream &os, - const ExtentsSummary &s) { + const ExtentsSummary &s) { os << "total_bytes=" << s.total_bytes << ", " << "first_image_byte=" << s.first_image_byte << ", " << "last_image_byte=" << s.last_image_byte << ""; diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index ac76aaf1f85..ac300a8b38a 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -137,6 +137,13 @@ enum { l_librbd_rwl_last, }; +namespace librbd { +namespace cache { +namespace rwl { + +/* Limit work between sync points */ +const uint64_t MAX_WRITES_PER_SYNC_POINT = 256; + const uint32_t MIN_WRITE_ALLOC_SIZE = 512; const uint32_t LOG_STATS_INTERVAL_SECONDS = 5; @@ -152,12 +159,6 @@ const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16; const uint8_t RWL_POOL_VERSION = 1; const uint64_t MAX_LOG_ENTRIES = (1024 * 1024); -namespace librbd { -namespace cache { -namespace rwl { - -static const bool RWL_VERBOSE_LOGGING = false; - /* Defer a set of Contexts until destruct/exit. Used for deferring * work on a given thread until a required lock is dropped. */ class DeferredContexts { @@ -197,14 +198,9 @@ struct WriteLogPmemEntry { : image_offset_bytes(image_offset_bytes), write_bytes(write_bytes), entry_valid(0), sync_point(0), sequenced(0), has_data(0), discard(0), writesame(0) { } - const BlockExtent block_extent(); - bool is_sync_point(); - bool is_discard(); - bool is_writesame(); - bool is_write(); - bool is_writer(); - const uint64_t get_offset_bytes(); - const uint64_t get_write_bytes(); + BlockExtent block_extent(); + uint64_t get_offset_bytes(); + uint64_t get_write_bytes(); friend std::ostream& operator<<(std::ostream& os, const WriteLogPmemEntry &entry); }; @@ -236,30 +232,31 @@ struct WriteBufferAllocation { utime_t allocation_lat; }; +static inline io::Extent image_extent(const BlockExtent& block_extent) { + return io::Extent(block_extent.block_start, + block_extent.block_end - block_extent.block_start + 1); +} + template class ExtentsSummary { public: uint64_t total_bytes; uint64_t first_image_byte; uint64_t last_image_byte; - ExtentsSummary(const ExtentsType &extents); + explicit ExtentsSummary(const ExtentsType &extents); template friend std::ostream &operator<<(std::ostream &os, const ExtentsSummary &s); - const BlockExtent block_extent() { + BlockExtent block_extent() { return BlockExtent(first_image_byte, last_image_byte); } - const io::Extent image_extent(const BlockExtent& block_extent) { - return io::Extent(block_extent.block_start, - block_extent.block_end - block_extent.block_start + 1); - } - const io::Extent image_extent() { + io::Extent image_extent() { return image_extent(block_extent()); } }; -} // namespace rwl -} // namespace cache -} // namespace librbd +} // namespace rwl +} // namespace cache +} // namespace librbd #endif // CEPH_LIBRBD_CACHE_RWL_TYPES_H diff --git a/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc b/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc index 8cf86556c0b..b3ea123499f 100644 --- a/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc +++ b/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc @@ -100,8 +100,8 @@ struct TestMockCacheReplicatedWriteLog : public TestMockFixture { void expect_context_complete(MockContextRWL& mock_context, int r) { EXPECT_CALL(mock_context, complete(r)) .WillRepeatedly(Invoke([&mock_context](int r) { - mock_context.do_complete(r); - })); + mock_context.do_complete(r); + })); } void expect_metadata_set(MockImageCtx& mock_image_ctx) { @@ -196,5 +196,34 @@ TEST_F(TestMockCacheReplicatedWriteLog, init_shutdown) { ASSERT_EQ(0, finish_ctx2.wait()); } +TEST_F(TestMockCacheReplicatedWriteLog, aio_write) { + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + MockReplicatedWriteLog rwl(mock_image_ctx, get_cache_state(mock_image_ctx)); + + MockContextRWL finish_ctx1; + expect_op_work_queue(mock_image_ctx); + expect_metadata_set(mock_image_ctx); + expect_context_complete(finish_ctx1, 0); + rwl.init(&finish_ctx1); + ASSERT_EQ(0, finish_ctx1.wait()); + + MockContextRWL finish_ctx2; + expect_context_complete(finish_ctx2, 0); + Extents image_extents{{0, 4096}}; + bufferlist bl; + bl.append(std::string(4096, '1')); + int fadvise_flags = 0; + rwl.aio_write(std::move(image_extents), std::move(bl), fadvise_flags, &finish_ctx2); + ASSERT_EQ(0, finish_ctx2.wait()); + + MockContextRWL finish_ctx3; + expect_context_complete(finish_ctx3, 0); + rwl.shut_down(&finish_ctx3); + ASSERT_EQ(0, finish_ctx3.wait()); +} + } // namespace cache } // namespace librbd -- 2.39.5