SUBSYS(rbd, 0, 5)
SUBSYS(rbd_mirror, 0, 5)
SUBSYS(rbd_replay, 0, 5)
+SUBSYS(rbd_rwl, 0, 5)
SUBSYS(journaler, 0, 5)
SUBSYS(objectcacher, 0, 5)
SUBSYS(immutable_obj_cache, 0, 5)
#include <map>
#include <vector>
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::ReplicatedWriteLog: " << this << " " \
<< __func__ << ": "
typedef ReplicatedWriteLog<ImageCtx>::Extent Extent;
typedef ReplicatedWriteLog<ImageCtx>::Extents Extents;
-const unsigned long int ops_appended_together = MAX_ALLOC_PER_TRANSACTION;
+const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
template <typename I>
ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::ImageCacheState<I>* cache_state)
"librbd::cache::ReplicatedWriteLog::m_lock", this))),
m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::ReplicatedWriteLog::m_blockguard_lock", this))),
- m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
- "librbd::cache::ReplicatedWriteLog::m_entry_bl_lock", this))),
m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl",
4,
""),
plb.add_u64_avg(l_librbd_rwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
plb.add_time_avg(
- l_librbd_rwl_req_arr_to_all_t, "req_arr_to_all_t",
- "Average arrival to allocation time (time deferred for overlap)");
+ l_librbd_rwl_req_arr_to_all_t, "req_arr_to_all_t",
+ "Average arrival to allocation time (time deferred for overlap)");
plb.add_time_avg(
- l_librbd_rwl_req_arr_to_dis_t, "req_arr_to_dis_t",
- "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
+ l_librbd_rwl_req_arr_to_dis_t, "req_arr_to_dis_t",
+ "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
plb.add_time_avg(
- l_librbd_rwl_req_all_to_dis_t, "req_all_to_dis_t",
- "Average allocation to dispatch time (time deferred for log resources)");
+ l_librbd_rwl_req_all_to_dis_t, "req_all_to_dis_t",
+ "Average allocation to dispatch time (time deferred for log resources)");
plb.add_time_avg(
- l_librbd_rwl_wr_latency, "wr_latency",
- "Latency of writes (persistent completion)");
+ l_librbd_rwl_wr_latency, "wr_latency",
+ "Latency of writes (persistent completion)");
plb.add_u64_counter_histogram(
l_librbd_rwl_wr_latency_hist, "wr_latency_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of write request latency (nanoseconds) vs. bytes written");
plb.add_time_avg(
- l_librbd_rwl_wr_caller_latency, "caller_wr_latency",
- "Latency of write completion to caller");
+ l_librbd_rwl_wr_caller_latency, "caller_wr_latency",
+ "Latency of write completion to caller");
plb.add_time_avg(
- l_librbd_rwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
- "Average arrival to allocation time (time deferred for overlap)");
+ l_librbd_rwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
+ "Average arrival to allocation time (time deferred for overlap)");
plb.add_time_avg(
- l_librbd_rwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
- "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
+ l_librbd_rwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
+ "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
plb.add_time_avg(
- l_librbd_rwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
- "Average allocation to dispatch time (time deferred for log resources)");
+ l_librbd_rwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
+ "Average allocation to dispatch time (time deferred for log resources)");
plb.add_time_avg(
- l_librbd_rwl_nowait_wr_latency, "wr_latency_nw",
- "Latency of writes (persistent completion) not deferred for free space");
+ l_librbd_rwl_nowait_wr_latency, "wr_latency_nw",
+ "Latency of writes (persistent completion) not deferred for free space");
plb.add_u64_counter_histogram(
l_librbd_rwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
plb.add_time_avg(
- l_librbd_rwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
- "Latency of write completion to callerfor writes not deferred for free space");
+ l_librbd_rwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
+ "Latency of write completion to callerfor writes not deferred for free space");
plb.add_time_avg(l_librbd_rwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
plb.add_u64_counter_histogram(
l_librbd_rwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
"Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
plb.add_time_avg(
- l_librbd_rwl_log_op_buf_to_app_t, "op_buf_to_app_t",
- "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
+ l_librbd_rwl_log_op_buf_to_app_t, "op_buf_to_app_t",
+ "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
plb.add_time_avg(
- l_librbd_rwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
- "Average buffer persist time (write data persist/replicate time)");
+ l_librbd_rwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
+ "Average buffer persist time (write data persist/replicate time)");
plb.add_u64_counter_histogram(
l_librbd_rwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
"Histogram of write buffer persist time (nanoseconds) vs. bytes written");
plb.add_time_avg(
- l_librbd_rwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
- "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
+ l_librbd_rwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
+ "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
plb.add_time_avg(
- l_librbd_rwl_log_op_app_to_appc_t, "op_app_to_appc_t",
- "Average log append to persist complete time (log entry append/replicate time)");
+ l_librbd_rwl_log_op_app_to_appc_t, "op_app_to_appc_t",
+ "Average log append to persist complete time (log entry append/replicate time)");
plb.add_u64_counter_histogram(
l_librbd_rwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
op_hist_x_axis_config, op_hist_y_axis_config,
<< " first_valid=" << m_first_valid_entry
<< ", first_free=" << m_first_free_entry
<< ", flushed_sync_gen=" << m_flushed_sync_gen
- << ", current_sync_gen=" << m_current_sync_gen << dendl;
+ << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
if (m_first_free_entry == m_first_valid_entry) {
ldout(cct,1) << "write log is empty" << dendl;
m_cache_state->empty = true;
// TODO: Will init sync point, this will be covered in later PR.
// init_flush_new_sync_point(later);
++m_current_sync_gen;
- auto new_sync_point = std::make_shared<SyncPointT>(*this, m_current_sync_gen);
- m_current_sync_point = new_sync_point;
+ m_current_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen,
+ this->m_image_ctx.cct);
m_initialized = true;
// Start the thread
m_thread_pool.start();
m_periodic_stats_enabled = m_cache_state->log_periodic_stats;
- /* Do these after we drop m_lock */
+ /* Do these after we drop lock */
later.add(new LambdaContext([this](int r) {
if (m_periodic_stats_enabled) {
/* Log stats for the first time */
template <typename I>
void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
- // TODO: This is cover in later PR.
+ // Here we only close pmem pool file and remove the pool file.
+ // TODO: We'll continue to update this part in later PRs.
+ if (m_log_pool) {
+ ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+ pmemobj_close(m_log_pool);
+ }
+ if (m_log_is_poolset) {
+ ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
+ << m_log_pool_name << dendl;
+ if (remove(m_log_pool_name.c_str()) != 0) {
+ lderr(m_image_ctx.cct) << "failed to remove empty pool \""
+ << m_log_pool_name << "\": "
+ << pmemobj_errormsg() << dendl;
+ }
+ }
on_finish->complete(0);
}
int fadvise_flags,
Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "aio_write" << dendl;
- }
+
+ ldout(cct, 20) << "aio_write" << dendl;
+
utime_t now = ceph_clock_now();
m_perfcounter->inc(l_librbd_rwl_wr_req, 1);
ceph_assert(m_initialized);
- {
- std::shared_lock image_locker(m_image_ctx.image_lock);
- if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
- on_finish->complete(-EROFS);
- return;
- }
- }
-
- if (ExtentsSummary<Extents>(image_extents).total_bytes == 0) {
- on_finish->complete(0);
- return;
- }
auto *write_req =
- new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, on_finish);
+ new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags,
+ m_lock, m_perfcounter, on_finish);
m_perfcounter->inc(l_librbd_rwl_wr_bytes, write_req->image_extents_summary.total_bytes);
/* The lambda below will be called when the block guard for all
alloc_and_dispatch_io_req(write_req);
});
- detain_guarded_request(GuardedRequest(write_req->image_extents_summary.block_extent(),
- guarded_ctx));
+ detain_guarded_request(write_req, guarded_ctx);
}
template <typename I>
void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
- uint32_t discard_granularity_bytes, Context *on_finish) {
+ uint32_t discard_granularity_bytes,
+ Context *on_finish) {
}
template <typename I>
void ReplicatedWriteLog<I>::invalidate(Context *on_finish) {
}
+template <typename I>
+CephContext *ReplicatedWriteLog<I>::get_context() {
+ return m_image_ctx.cct;
+}
+
template <typename I>
BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
{
BlockGuardCell *cell;
ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << dendl;
- }
+ ldout(cct, 20) << dendl;
int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
ceph_assert(r>=0);
if (r > 0) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
- << "req=" << req << dendl;
- }
+ ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
+ << "req=" << req << dendl;
return nullptr;
}
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
- }
+ ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
return cell;
}
template <typename I>
-BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(GuardedRequest &req)
+BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(
+ GuardedRequest &req)
{
BlockGuardCell *cell = nullptr;
ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << dendl;
if (m_barrier_in_progress) {
- req.guard_ctx->m_state.queued = true;
+ req.guard_ctx->state.queued = true;
m_awaiting_barrier.push_back(req);
} else {
- bool barrier = req.guard_ctx->m_state.barrier;
+ bool barrier = req.guard_ctx->state.barrier;
if (barrier) {
m_barrier_in_progress = true;
- req.guard_ctx->m_state.current_barrier = true;
+ req.guard_ctx->state.current_barrier = true;
}
cell = detain_guarded_request_helper(req);
if (barrier) {
}
template <typename I>
-void ReplicatedWriteLog<I>::detain_guarded_request(GuardedRequest &&req)
+void ReplicatedWriteLog<I>::detain_guarded_request(
+ C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx)
{
+ //TODO: add is_barrier for flush request in later PRs
+ auto req = GuardedRequest(request->image_extents_summary.block_extent(), guarded_ctx);
BlockGuardCell *cell = nullptr;
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << dendl;
{
std::lock_guard locker(m_blockguard_lock);
cell = detain_guarded_request_barrier_helper(req);
}
if (cell) {
- req.guard_ctx->m_cell = cell;
+ req.guard_ctx->cell = cell;
req.guard_ctx->complete(0);
}
}
{
CephContext *cct = m_image_ctx.cct;
WriteLogGuard::BlockOperations block_reqs;
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "released_cell=" << released_cell << dendl;
- }
+ ldout(cct, 20) << "released_cell=" << released_cell << dendl;
{
std::lock_guard locker(m_blockguard_lock);
m_write_log_guard.release(released_cell, &block_reqs);
for (auto &req : block_reqs) {
- req.guard_ctx->m_state.detained = true;
+ req.guard_ctx->state.detained = true;
BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
if (detained_cell) {
- if (req.guard_ctx->m_state.current_barrier) {
+ if (req.guard_ctx->state.current_barrier) {
/* The current barrier is acquiring the block guard, so now we know its cell */
m_barrier_cell = detained_cell;
/* detained_cell could be == released_cell here */
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
- }
+ ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
}
- req.guard_ctx->m_cell = detained_cell;
+ req.guard_ctx->cell = detained_cell;
m_work_queue.queue(req.guard_ctx);
}
}
if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
- }
+ ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
/* The released cell is the current barrier request */
m_barrier_in_progress = false;
m_barrier_cell = nullptr;
/* Move waiting requests into the blockguard. Stop if there's another barrier */
while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
auto &req = m_awaiting_barrier.front();
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
- }
+ ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
if (detained_cell) {
- req.guard_ctx->m_cell = detained_cell;
+ req.guard_ctx->cell = detained_cell;
m_work_queue.queue(req.guard_ctx);
}
m_awaiting_barrier.pop_front();
}
}
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "exit" << dendl;
- }
+ ldout(cct, 20) << "exit" << dendl;
}
/*
template <typename I>
void ReplicatedWriteLog<I>::append_scheduled_ops(void)
{
- GenericLogOperationsT ops;
+ GenericLogOperations ops;
int append_result = 0;
bool ops_remain = false;
bool appending = false; /* true if we set m_appending */
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << dendl;
do {
ops.clear();
std::lock_guard locker(m_lock);
if (!appending && m_appending) {
/* Another thread is appending */
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
- }
+ ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
return;
}
if (m_ops_to_append.size()) {
m_appending = true;
auto last_in_batch = m_ops_to_append.begin();
unsigned int ops_to_append = m_ops_to_append.size();
- if (ops_to_append > ops_appended_together) {
- ops_to_append = ops_appended_together;
+ if (ops_to_append > OPS_APPENDED_TOGETHER) {
+ ops_to_append = OPS_APPENDED_TOGETHER;
}
std::advance(last_in_batch, ops_to_append);
ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
ops_remain = true; /* Always check again before leaving */
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " << m_ops_to_append.size() << " remain" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
+ << m_ops_to_append.size() << " remain" << dendl;
} else {
ops_remain = false;
if (appending) {
* all prior log entries are persisted everywhere.
*/
template <typename I>
-void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsT &ops)
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperations &ops)
{
bool need_finisher;
- GenericLogOperationsVectorT appending;
+ GenericLogOperationsVector appending;
std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
{
}
template <typename I>
-void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsVectorT &ops)
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsVector &ops)
{
- GenericLogOperationsT to_append(ops.begin(), ops.end());
+ GenericLogOperations to_append(ops.begin(), ops.end());
schedule_append(to_append);
}
template <typename I>
void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
{
- GenericLogOperationsT ops;
+ GenericLogOperations ops;
bool ops_remain = false;
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << dendl;
do {
{
ops.clear();
if (ops_to_flush > ops_flushed_together) {
ops_to_flush = ops_flushed_together;
}
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
std::advance(last_in_batch, ops_to_flush);
ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
ops_remain = !m_ops_to_flush.empty();
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " << m_ops_to_flush.size() << " remain" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
+ << m_ops_to_flush.size() << " remain" << dendl;
} else {
ops_remain = false;
}
* then get their log entries appended.
*/
template <typename I>
-void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVectorT &ops)
+void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
{
- GenericLogOperationsT to_flush(ops.begin(), ops.end());
+ GenericLogOperations to_flush(ops.begin(), ops.end());
bool need_finisher;
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << dendl;
{
std::lock_guard locker(m_lock);
void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
{
for (auto &operation : ops) {
- if (operation->is_write() || operation->is_writesame()) {
- operation->buf_persist_time = ceph_clock_now();
- auto write_entry = operation->get_write_log_entry();
-
- pmemobj_flush(m_log_pool, write_entry->pmem_buffer, write_entry->write_bytes());
- }
+ operation->flush_pmem_buf_to_cache(m_log_pool);
}
/* Drain once for all */
utime_t now = ceph_clock_now();
for (auto &operation : ops) {
- if (operation->is_write() || operation->is_writesame()) {
+ if (operation->reserved_allocated()) {
operation->buf_persist_comp_time = now;
} else {
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
}
}
}
* Allocate the (already reserved) write log entries for a set of operations.
*
* Locking:
- * Acquires m_lock
+ * Acquires lock
*/
template <typename I>
-void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperationsT &ops)
+void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
{
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
log_entry->pmem_entry = &pmem_log_entries[entry_index];
log_entry->ram_entry.entry_valid = 1;
m_log_entries.push_back(log_entry);
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
}
}
* be contiguous in persistent memory.
*/
template <typename I>
-void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVectorT &ops)
+void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
{
if (ops.empty()) {
return;
ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
}
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
- << "start address=" << ops.front()->get_log_entry()->pmem_entry << " "
- << "bytes=" << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
- << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
+ << "start address="
+ << ops.front()->get_log_entry()->pmem_entry << " "
+ << "bytes="
+ << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+ << dendl;
pmemobj_flush(m_log_pool,
ops.front()->get_log_entry()->pmem_entry,
ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
* of these must already have been persisted to its reserved area.
*/
template <typename I>
-int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
+int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
{
CephContext *cct = m_image_ctx.cct;
- GenericLogOperationsVectorT entries_to_flush;
+ GenericLogOperationsVector entries_to_flush;
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
int ret = 0;
if (ops.empty()) {
return 0;
}
- entries_to_flush.reserve(ops_appended_together);
+ entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
/* Write log entries to ring and persist */
utime_t now = ceph_clock_now();
* tail of the ring */
if (entries_to_flush.back()->get_log_entry()->log_entry_index >
operation->get_log_entry()->log_entry_index) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
- << "operation=[" << *operation << "]" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+ << "operation=[" << *operation << "]" << dendl;
flush_op_log_entries(entries_to_flush);
entries_to_flush.clear();
now = ceph_clock_now();
}
}
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
- << operation->get_log_entry()->log_entry_index << " "
- << "from " << &operation->get_log_entry()->ram_entry << " "
- << "to " << operation->get_log_entry()->pmem_entry << " "
- << "operation=[" << *operation << "]" << dendl;
- }
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 05) << "APPENDING: index="
- << operation->get_log_entry()->log_entry_index << " "
- << "operation=[" << *operation << "]" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "from " << &operation->get_log_entry()->ram_entry << " "
+ << "to " << operation->get_log_entry()->pmem_entry << " "
+ << "operation=[" << *operation << "]" << dendl;
+ ldout(m_image_ctx.cct, 05) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "operation=[" << *operation << "]" << dendl;
operation->log_append_time = now;
*operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "APPENDING: index="
- << operation->get_log_entry()->log_entry_index << " "
- << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry << "]" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
+ << "]" << dendl;
entries_to_flush.push_back(operation);
}
flush_op_log_entries(entries_to_flush);
TX_BEGIN(m_log_pool) {
D_RW(pool_root)->first_free_entry = m_first_free_entry;
for (auto &operation : ops) {
- if (operation->is_write() || operation->is_writesame()) {
- auto write_op = (std::shared_ptr<WriteLogOperationT>&) operation;
+ if (operation->reserved_allocated()) {
+ auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
} else {
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
}
}
} TX_ONCOMMIT {
} TX_ONABORT {
- lderr(cct) << "failed to commit " << ops.size() << " log entries (" << m_log_pool_name << ")" << dendl;
+ lderr(cct) << "failed to commit " << ops.size()
+ << " log entries (" << m_log_pool_name << ")" << dendl;
ceph_assert(false);
ret = -EIO;
} TX_FINALLY {
utime_t tx_end = ceph_clock_now();
m_perfcounter->tinc(l_librbd_rwl_append_tx_t, tx_end - tx_start);
- m_perfcounter->hinc(l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+ m_perfcounter->hinc(
+ l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
for (auto &operation : ops) {
operation->log_append_comp_time = tx_end;
}
* Complete a set of write ops with the result of append_op_entries.
*/
template <typename I>
-void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperationsT &&ops, const int result)
+void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
+ const int result)
{
GenericLogEntries dirty_entries;
int published_reserves = 0;
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
for (auto &op : ops) {
utime_t now = ceph_clock_now();
auto log_entry = op->get_log_entry();
log_entry->completed = true;
- if (op->is_writing_op()) {
- op->get_gen_write_op()->sync_point->log_entry->writes_completed++;
+ if (op->reserved_allocated()) {
+ op->mark_log_entry_completed();
dirty_entries.push_back(log_entry);
- }
- if (op->is_write() || op->is_writesame()) {
published_reserves++;
}
- if (op->is_discard()) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << __func__ << ": completing discard" << dendl;
- }
- }
op->complete(result);
- if (op->is_write()) {
- m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, op->buf_persist_time - op->dispatch_time);
- }
- m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, op->log_append_time - op->dispatch_time);
+ m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t,
+ op->log_append_time - op->dispatch_time);
m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
- m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(),
+ m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist,
+ utime_t(now - op->dispatch_time).to_nsec(),
log_entry->ram_entry.write_bytes);
- if (op->is_write()) {
- utime_t buf_lat = op->buf_persist_comp_time - op->buf_persist_time;
- m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
- m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
- log_entry->ram_entry.write_bytes);
- m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, op->log_append_time - op->buf_persist_time);
- }
utime_t app_lat = op->log_append_comp_time - op->log_append_time;
m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat);
m_perfcounter->hinc(l_librbd_rwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
- log_entry->ram_entry.write_bytes);
+ log_entry->ram_entry.write_bytes);
m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_cmp_t, now - op->log_append_time);
}
std::lock_guard locker(m_lock);
ceph_assert(m_dispatching_deferred_ops);
if (allocated) {
- /* On the 2..n-1 th time we get m_lock, front_req->alloc_resources() will
+ /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
* have succeeded, and we'll need to pop it off the deferred ops list
* here. */
ceph_assert(front_req);
* deferred write
*/
template <typename I>
-void ReplicatedWriteLog<I>::release_write_lanes(C_WriteRequestT *write_req)
+void ReplicatedWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
{
{
std::lock_guard locker(m_lock);
- ceph_assert(write_req->resources.allocated);
- m_free_lanes += write_req->image_extents.size();
- write_req->resources.allocated = false;
+ m_free_lanes += req->image_extents.size();
}
dispatch_deferred_writes();
}
dispatch_here = req->alloc_resources();
}
if (dispatch_here) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
req->dispatch();
} else {
req->deferred();
std::lock_guard locker(m_lock);
m_deferred_ios.push_back(req);
}
- if (RWL_VERBOSE_LOGGING) {
- ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
dispatch_deferred_writes();
}
}
}
+
+template <typename I>
+bool ReplicatedWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+ bool alloc_succeeds = true;
+ bool no_space = false;
+ uint64_t bytes_allocated = 0;
+ uint64_t bytes_cached = 0;
+ uint64_t bytes_dirtied = 0;
+ uint64_t num_lanes = 0;
+ uint64_t num_unpublished_reserves = 0;
+ uint64_t num_log_entries = 0;
+
+ // Setup buffer, and get all the number of required resources
+ req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
+ num_lanes, num_log_entries, num_unpublished_reserves);
+
+ {
+ std::lock_guard locker(m_lock);
+ if (m_free_lanes < num_lanes) {
+ req->set_io_waited_for_lanes(true);
+ ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
+ << num_lanes
+ << ", have " << m_free_lanes << ") "
+ << *req << dendl;
+ alloc_succeeds = false;
+ /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
+ }
+ if (m_free_log_entries < num_log_entries) {
+ req->set_io_waited_for_entries(true);
+ ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
+ << num_log_entries
+ << ", have " << m_free_log_entries << ") "
+ << *req << dendl;
+ alloc_succeeds = false;
+ no_space = true; /* Entries must be retired */
+ }
+ /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
+ if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
+ if (!req->has_io_waited_for_buffers()) {
+ req->set_io_waited_for_entries(true);
+ ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
+ << m_bytes_allocated_cap
+ << ", allocated=" << m_bytes_allocated
+ << ") in write [" << *req << "]" << dendl;
+ }
+ alloc_succeeds = false;
+ no_space = true; /* Entries must be retired */
+ }
+ }
+
+ std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+ if (alloc_succeeds) {
+ for (auto &buffer : buffers) {
+ utime_t before_reserve = ceph_clock_now();
+ buffer.buffer_oid = pmemobj_reserve(m_log_pool,
+ &buffer.buffer_alloc_action,
+ buffer.allocation_size,
+ 0 /* Object type */);
+ buffer.allocation_lat = ceph_clock_now() - before_reserve;
+ if (TOID_IS_NULL(buffer.buffer_oid)) {
+ if (!req->has_io_waited_for_buffers()) {
+ req->set_io_waited_for_entries(true);
+ }
+ ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
+ << pmemobj_errormsg() << ". "
+ << *req << dendl;
+ alloc_succeeds = false;
+ no_space = true; /* Entries need to be retired */
+ break;
+ } else {
+ buffer.allocated = true;
+ }
+ ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
+ << "." << buffer.buffer_oid.oid.off
+ << ", size=" << buffer.allocation_size << dendl;
+ }
+ }
+
+ if (alloc_succeeds) {
+ std::lock_guard locker(m_lock);
+ /* We need one free log entry per extent (each is a separate entry), and
+ * one free "lane" for remote replication. */
+ if ((m_free_lanes >= num_lanes) &&
+ (m_free_log_entries >= num_log_entries)) {
+ m_free_lanes -= num_lanes;
+ m_free_log_entries -= num_log_entries;
+ m_unpublished_reserves += num_unpublished_reserves;
+ m_bytes_allocated += bytes_allocated;
+ m_bytes_cached += bytes_cached;
+ m_bytes_dirty += bytes_dirtied;
+ } else {
+ alloc_succeeds = false;
+ }
+ }
+
+ if (!alloc_succeeds) {
+ /* On alloc failure, free any buffers we did allocate */
+ for (auto &buffer : buffers) {
+ if (buffer.allocated) {
+ pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
+ }
+ }
+ if (no_space) {
+ /* Expedite flushing and/or retiring */
+ std::lock_guard locker(m_lock);
+ m_alloc_failed_since_retire = true;
+ m_last_alloc_fail = ceph_clock_now();
+ }
+ }
+
+ req->set_allocated(alloc_succeeds);
+
+ return alloc_succeeds;
+}
+
} // namespace cache
} // namespace librbd
-#ifndef TEST_F
template class librbd::cache::ReplicatedWriteLog<librbd::ImageCtx>;
template class librbd::cache::ImageCache<librbd::ImageCtx>;
-#endif
namespace rwl {
class SyncPointLogEntry;
-class GeneralWriteLogEntry;
+class GenericWriteLogEntry;
class WriteLogEntry;
class GenericLogEntry;
-typedef std::list<std::shared_ptr<GeneralWriteLogEntry>> GeneralWriteLogEntries;
typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
typedef librbd::BlockGuard<GuardedRequest> WriteLogGuard;
-template <typename T>
-struct C_GuardedBlockIORequest;
-
class DeferredContexts;
template <typename> class ImageCacheState;
template <typename T>
struct C_WriteRequest;
-template <typename T>
-using GenericLogOperations = std::list<GenericLogOperationSharedPtr<T>>;
+using GenericLogOperations = std::list<GenericLogOperationSharedPtr>;
} // namespace rwl
void invalidate(Context *on_finish);
void flush(Context *on_finish) override;
-private:
using This = ReplicatedWriteLog<ImageCtxT>;
- using SyncPointT = rwl::SyncPoint<This>;
- using GenericLogOperationT = rwl::GenericLogOperation<This>;
- using GenericLogOperationSharedPtrT = rwl::GenericLogOperationSharedPtr<This>;
- using WriteLogOperationT = rwl::WriteLogOperation<This>;
- using WriteLogOperationSetT = rwl::WriteLogOperationSet<This>;
- using SyncPointLogOperationT = rwl::SyncPointLogOperation<This>;
- using GenericLogOperationsT = rwl::GenericLogOperations<This>;
- using GenericLogOperationsVectorT = rwl::GenericLogOperationsVector<This>;
- using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
using C_WriteRequestT = rwl::C_WriteRequest<This>;
-
- friend class rwl::SyncPoint<This>;
- friend class rwl::GenericLogOperation<This>;
- friend class rwl::GeneralWriteLogOperation<This>;
- friend class rwl::WriteLogOperation<This>;
- friend class rwl::WriteLogOperationSet<This>;
- friend class rwl::SyncPointLogOperation<This>;
- friend struct rwl::C_GuardedBlockIORequest<This>;
- friend struct rwl::C_BlockIORequest<This>;
- friend struct rwl::C_WriteRequest<This>;
+ using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
+ CephContext * get_context();
+ void release_guarded_request(BlockGuardCell *cell);
+ void release_write_lanes(C_BlockIORequestT *req);
+ bool alloc_resources(C_BlockIORequestT *req);
+ template <typename V>
+ void flush_pmem_buffer(V& ops);
+ void schedule_append(rwl::GenericLogOperationsVector &ops);
+ void schedule_flush_and_append(rwl::GenericLogOperationsVector &ops);
+ std::shared_ptr<rwl::SyncPoint> get_current_sync_point() {
+ return m_current_sync_point;
+ }
+ bool get_persist_on_flush() {
+ return m_persist_on_flush;
+ }
+ void inc_last_op_sequence_num() {
+ m_perfcounter->inc(l_librbd_rwl_log_ops, 1);
+ ++m_last_op_sequence_num;
+ }
+ uint64_t get_last_op_sequence_num() {
+ return m_last_op_sequence_num;
+ }
+ uint64_t get_current_sync_gen() {
+ return m_current_sync_gen;
+ }
+ unsigned int get_free_lanes() {
+ return m_free_lanes;
+ }
+ uint32_t get_free_log_entries() {
+ return m_free_log_entries;
+ }
+private:
typedef std::list<rwl::C_WriteRequest<This> *> C_WriteRequests;
typedef std::list<rwl::C_BlockIORequest<This> *> C_BlockIORequests;
BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req);
BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req);
- void detain_guarded_request(rwl::GuardedRequest &&req);
- void release_guarded_request(BlockGuardCell *cell);
+ void detain_guarded_request(C_BlockIORequestT *request, rwl::GuardedRequestFunctionContext *guarded_ctx);
librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
/* Starts at 0 for a new write log. Incremented on every flush. */
uint64_t m_current_sync_gen = 0;
- std::shared_ptr<SyncPointT> m_current_sync_point = nullptr;
/* Starts at 0 on each sync gen increase. Incremented before applied
to an operation */
uint64_t m_last_op_sequence_num = 0;
uint64_t m_flushed_sync_gen = 0;
bool m_persist_on_write_until_flush = true;
- /* True if it's safe to complete a user request in persist-on-flush
- * mode before the write is persisted. This is only true if there is
- * a local copy of the write data, or if local write failure always
- * causes local node failure. */
- bool m_persist_on_flush_early_user_comp = false; /* Assume local write failure does not cause node failure */
- bool m_persist_on_flush = false; /* If false, persist each write before completion */
bool m_flush_seen = false;
AsyncOpTracker m_async_op_tracker;
mutable ceph::mutex m_deferred_dispatch_lock;
/* Hold m_log_append_lock while appending or retiring log entries. */
mutable ceph::mutex m_log_append_lock;
-
/* Used for most synchronization */
mutable ceph::mutex m_lock;
+
/* Used in release/detain to make BlockGuard preserve submission order */
mutable ceph::mutex m_blockguard_lock;
- /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */
- mutable ceph::mutex m_entry_bl_lock;
/* Use m_blockguard_lock for the following 3 things */
rwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
bool m_appending = false;
bool m_dispatching_deferred_ops = false;
- GenericLogOperationsT m_ops_to_flush; /* Write ops needing flush in local log */
- GenericLogOperationsT m_ops_to_append; /* Write ops needing event append in local log */
+ rwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
+ rwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
/* New entries are at the back. Oldest at the front */
rwl::GenericLogEntries m_log_entries;
rwl::GenericLogEntries m_dirty_log_entries;
+ PerfCounters *m_perfcounter = nullptr;
+
+ std::shared_ptr<rwl::SyncPoint> m_current_sync_point = nullptr;
+ bool m_persist_on_flush = false; /* If false, persist each write before completion */
+
/* Writes that have left the block guard, but are waiting for resources */
C_BlockIORequests m_deferred_ios;
/* Throttle writes concurrently allocating & replicating */
- unsigned int m_free_lanes = MAX_CONCURRENT_WRITES;
+ unsigned int m_free_lanes = rwl::MAX_CONCURRENT_WRITES;
unsigned int m_unpublished_reserves = 0;
- PerfCounters *m_perfcounter = nullptr;
/* Initialized from config, then set false during shutdown */
std::atomic<bool> m_periodic_stats_enabled = {false};
void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
void update_image_cache_state(Context *on_finish);
- void start_workers();
void wake_up();
void dispatch_deferred_writes(void);
- void release_write_lanes(C_WriteRequestT *write_req);
void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
void append_scheduled_ops(void);
void enlist_op_appender();
- void schedule_append(GenericLogOperationsVectorT &ops);
- void schedule_append(GenericLogOperationsT &ops);
+ void schedule_append(rwl::GenericLogOperations &ops);
void flush_then_append_scheduled_ops(void);
void enlist_op_flusher();
- void schedule_flush_and_append(GenericLogOperationsVectorT &ops);
- template <typename V>
- void flush_pmem_buffer(V& ops);
- void alloc_op_log_entries(GenericLogOperationsT &ops);
- void flush_op_log_entries(GenericLogOperationsVectorT &ops);
- int append_op_log_entries(GenericLogOperationsT &ops);
- void complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
- void schedule_complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
+ void alloc_op_log_entries(rwl::GenericLogOperations &ops);
+ void flush_op_log_entries(rwl::GenericLogOperationsVector &ops);
+ int append_op_log_entries(rwl::GenericLogOperations &ops);
+ void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
+ void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
};
} // namespace cache
#include "common/config_proxy.h"
#include "common/ceph_json.h"
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::rwl::ImageCacheState: " << this << " " \
<< __func__ << ": "
#include <iostream>
#include "LogEntry.h"
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::rwl::LogEntry: " << this << " " \
<< __func__ << ": "
namespace rwl {
-bool GenericLogEntry::is_sync_point() {
- return ram_entry.is_sync_point();
-}
-
-bool GenericLogEntry::is_discard() {
- return ram_entry.is_discard();
-}
-
-bool GenericLogEntry::is_writesame() {
- return ram_entry.is_writesame();
-}
-
-bool GenericLogEntry::is_write() {
- return ram_entry.is_write();
-}
-
-bool GenericLogEntry::is_writer() {
- return ram_entry.is_writer();
-}
-
std::ostream& GenericLogEntry::format(std::ostream &os) const {
os << "ram_entry=[" << ram_entry << "], "
<< "pmem_entry=" << (void*)pmem_entry << ", "
return entry.format(os);
}
-std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const {
+std::ostream& GenericWriteLogEntry::format(std::ostream &os) const {
GenericLogEntry::format(os);
os << ", "
<< "sync_point_entry=[";
};
std::ostream &operator<<(std::ostream &os,
- const GeneralWriteLogEntry &entry) {
+ const GenericWriteLogEntry &entry) {
return entry.format(os);
}
+void WriteLogEntry::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
+ uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush) {
+ ram_entry.has_data = 1;
+ ram_entry.write_data = allocation->buffer_oid;
+ ceph_assert(!TOID_IS_NULL(ram_entry.write_data));
+ pmem_buffer = D_RW(ram_entry.write_data);
+ ram_entry.sync_gen_number = current_sync_gen;
+ if (persist_on_flush) {
+ /* Persist on flush. Sequence #0 is never used. */
+ ram_entry.write_sequence_number = 0;
+ } else {
+ /* Persist on write */
+ ram_entry.write_sequence_number = last_op_sequence_num;
+ ram_entry.sequenced = 1;
+ }
+ ram_entry.sync_point = 0;
+ ram_entry.discard = 0;
+}
+
void WriteLogEntry::init_pmem_bp() {
- assert(!pmem_bp.have_raw());
+ ceph_assert(!pmem_bp.have_raw());
pmem_bp = buffer::ptr(buffer::create_static(this->write_bytes(), (char*)pmem_buffer));
}
void WriteLogEntry::init_pmem_bl() {
pmem_bl.clear();
init_pmem_bp();
- assert(pmem_bp.have_raw());
+ ceph_assert(pmem_bp.have_raw());
int before_bl = pmem_bp.raw_nref();
this->init_bl(pmem_bp, pmem_bl);
int after_bl = pmem_bp.raw_nref();
}
/* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
-buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) {
+buffer::list& WriteLogEntry::get_pmem_bl() {
if (0 == bl_refs) {
- std::lock_guard locker(entry_bl_lock);
+ std::lock_guard locker(m_entry_bl_lock);
if (0 == bl_refs) {
init_pmem_bl();
}
};
/* Constructs a new bl containing copies of pmem_bp */
-void WriteLogEntry::copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl) {
- this->get_pmem_bl(entry_bl_lock);
+void WriteLogEntry::copy_pmem_bl(bufferlist *out_bl) {
+ this->get_pmem_bl();
/* pmem_bp is now initialized */
buffer::ptr cloned_bp(pmem_bp.clone());
out_bl->clear();
std::ostream& WriteLogEntry::format(std::ostream &os) const {
os << "(Write) ";
- GeneralWriteLogEntry::format(os);
+ GenericWriteLogEntry::format(os);
os << ", "
<< "pmem_buffer=" << (void*)pmem_buffer << ", ";
os << "pmem_bp=" << pmem_bp << ", ";
#ifndef CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
#define CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
+#include "common/ceph_mutex.h"
#include "librbd/Utils.h"
#include "librbd/cache/rwl/Types.h"
#include <atomic>
namespace rwl {
class SyncPointLogEntry;
-class GeneralWriteLogEntry;
+class GenericWriteLogEntry;
class WriteLogEntry;
class GenericLogEntry {
virtual ~GenericLogEntry() { };
GenericLogEntry(const GenericLogEntry&) = delete;
GenericLogEntry &operator=(const GenericLogEntry&) = delete;
- virtual unsigned int write_bytes() = 0;
- bool is_sync_point();
- bool is_discard();
- bool is_writesame();
- bool is_write();
- bool is_writer();
- virtual const GenericLogEntry* get_log_entry() = 0;
- virtual const SyncPointLogEntry* get_sync_point_log_entry() {
- return nullptr;
- }
- virtual const GeneralWriteLogEntry* get_gen_write_log_entry() {
- return nullptr;
- }
- virtual const WriteLogEntry* get_write_log_entry() {
- return nullptr;
- }
virtual std::ostream& format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const GenericLogEntry &entry);
ram_entry.sync_gen_number = sync_gen_number;
ram_entry.sync_point = 1;
};
+ ~SyncPointLogEntry() override {};
SyncPointLogEntry(const SyncPointLogEntry&) = delete;
SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete;
- virtual inline unsigned int write_bytes() {
- return 0;
- }
- const GenericLogEntry* get_log_entry() override {
- return get_sync_point_log_entry();
- }
- const SyncPointLogEntry* get_sync_point_log_entry() override {
- return this;
- }
std::ostream& format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const SyncPointLogEntry &entry);
};
-class GeneralWriteLogEntry : public GenericLogEntry {
+class GenericWriteLogEntry : public GenericLogEntry {
public:
uint32_t referring_map_entries = 0;
bool flushing = false;
bool flushed = false; /* or invalidated */
std::shared_ptr<SyncPointLogEntry> sync_point_entry;
- GeneralWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ GenericWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
const uint64_t image_offset_bytes, const uint64_t write_bytes)
: GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(sync_point_entry) { }
- GeneralWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
+ GenericWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
: GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(nullptr) { }
- GeneralWriteLogEntry(const GeneralWriteLogEntry&) = delete;
- GeneralWriteLogEntry &operator=(const GeneralWriteLogEntry&) = delete;
- virtual inline unsigned int write_bytes() {
+ ~GenericWriteLogEntry() override {};
+ GenericWriteLogEntry(const GenericWriteLogEntry&) = delete;
+ GenericWriteLogEntry &operator=(const GenericWriteLogEntry&) = delete;
+ inline unsigned int write_bytes() {
/* The valid bytes in this ops data buffer. Discard and WS override. */
return ram_entry.write_bytes;
};
/* The bytes in the image this op makes dirty. Discard and WS override. */
return write_bytes();
};
- const BlockExtent block_extent() {
+ BlockExtent block_extent() {
return ram_entry.block_extent();
}
- const GenericLogEntry* get_log_entry() override {
- return get_gen_write_log_entry();
- }
- const GeneralWriteLogEntry* get_gen_write_log_entry() override {
- return this;
- }
uint32_t get_map_ref() {
return(referring_map_entries);
}
void dec_map_ref() { referring_map_entries--; }
std::ostream &format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
- const GeneralWriteLogEntry &entry);
+ const GenericWriteLogEntry &entry);
};
-class WriteLogEntry : public GeneralWriteLogEntry {
+class WriteLogEntry : public GenericWriteLogEntry {
protected:
buffer::ptr pmem_bp;
buffer::list pmem_bl;
std::atomic<int> bl_refs = {0}; /* The refs held on pmem_bp by pmem_bl */
+ /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */
+ mutable ceph::mutex m_entry_bl_lock;
void init_pmem_bp();
uint8_t *pmem_buffer = nullptr;
WriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
const uint64_t image_offset_bytes, const uint64_t write_bytes)
- : GeneralWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) { }
+ : GenericWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes),
+ m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::rwl::WriteLogEntry::m_entry_bl_lock", this)))
+ { }
WriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
- : GeneralWriteLogEntry(nullptr, image_offset_bytes, write_bytes) { }
+ : GenericWriteLogEntry(nullptr, image_offset_bytes, write_bytes),
+ m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::rwl::WriteLogEntry::m_entry_bl_lock", this)))
+ { }
+ ~WriteLogEntry() override {};
WriteLogEntry(const WriteLogEntry&) = delete;
WriteLogEntry &operator=(const WriteLogEntry&) = delete;
- const BlockExtent block_extent();
+ void init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
+ uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush);
+ BlockExtent block_extent();
unsigned int reader_count();
/* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
- buffer::list &get_pmem_bl(ceph::mutex &entry_bl_lock);
+ buffer::list &get_pmem_bl();
/* Constructs a new bl containing copies of pmem_bp */
- void copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl);
- virtual const GenericLogEntry* get_log_entry() override {
- return get_write_log_entry();
- }
- const WriteLogEntry* get_write_log_entry() override {
- return this;
- }
+ void copy_pmem_bl(bufferlist *out_bl);
std::ostream &format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const WriteLogEntry &entry);
};
-} // namespace rwl
-} // namespace cache
-} // namespace librbd
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
#endif // CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
#include "LogOperation.h"
#include "librbd/cache/rwl/Types.h"
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \
<< __func__ << ": "
namespace rwl {
-template <typename T>
-GenericLogOperation<T>::GenericLogOperation(T &rwl, const utime_t dispatch_time)
- : rwl(rwl), dispatch_time(dispatch_time) {
+GenericLogOperation::GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter)
+ : m_perfcounter(perfcounter), dispatch_time(dispatch_time) {
}
-template <typename T>
-std::ostream& GenericLogOperation<T>::format(std::ostream &os) const {
+std::ostream& GenericLogOperation::format(std::ostream &os) const {
os << "dispatch_time=[" << dispatch_time << "], "
<< "buf_persist_time=[" << buf_persist_time << "], "
<< "buf_persist_comp_time=[" << buf_persist_comp_time << "], "
return os;
};
-template <typename T>
std::ostream &operator<<(std::ostream &os,
- const GenericLogOperation<T> &op) {
+ const GenericLogOperation &op) {
return op.format(os);
}
-template <typename T>
-SyncPointLogOperation<T>::SyncPointLogOperation(T &rwl,
- std::shared_ptr<SyncPoint<T>> sync_point,
- const utime_t dispatch_time)
- : GenericLogOperation<T>(rwl, dispatch_time), sync_point(sync_point) {
+SyncPointLogOperation::SyncPointLogOperation(ceph::mutex &lock,
+ std::shared_ptr<SyncPoint> sync_point,
+ const utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct)
+ : GenericLogOperation(dispatch_time, perfcounter), m_cct(cct), m_lock(lock), sync_point(sync_point) {
}
-template <typename T>
-SyncPointLogOperation<T>::~SyncPointLogOperation() { }
+SyncPointLogOperation::~SyncPointLogOperation() { }
-template <typename T>
-std::ostream &SyncPointLogOperation<T>::format(std::ostream &os) const {
+std::ostream &SyncPointLogOperation::format(std::ostream &os) const {
os << "(Sync Point) ";
- GenericLogOperation<T>::format(os);
+ GenericLogOperation::format(os);
os << ", "
<< "sync_point=[" << *sync_point << "]";
return os;
};
-template <typename T>
std::ostream &operator<<(std::ostream &os,
- const SyncPointLogOperation<T> &op) {
+ const SyncPointLogOperation &op) {
return op.format(os);
}
-template <typename T>
-void SyncPointLogOperation<T>::appending() {
+std::vector<Context*> SyncPointLogOperation::append_sync_point() {
std::vector<Context*> appending_contexts;
+ std::lock_guard locker(m_lock);
+ if (!sync_point->appending) {
+ sync_point->appending = true;
+ }
+ appending_contexts.swap(sync_point->on_sync_point_appending);
+ return appending_contexts;
+}
+
+void SyncPointLogOperation::clear_earlier_sync_point() {
+ std::lock_guard locker(m_lock);
+ ceph_assert(sync_point->later_sync_point);
+ ceph_assert(sync_point->later_sync_point->earlier_sync_point ==
+ sync_point);
+ sync_point->later_sync_point->earlier_sync_point = nullptr;
+}
+std::vector<Context*> SyncPointLogOperation::swap_on_sync_point_persisted() {
+ std::lock_guard locker(m_lock);
+ std::vector<Context*> persisted_contexts;
+ persisted_contexts.swap(sync_point->on_sync_point_persisted);
+ return persisted_contexts;
+}
+
+void SyncPointLogOperation::appending() {
ceph_assert(sync_point);
- {
- std::lock_guard locker(rwl.m_lock);
- if (!sync_point->m_appending) {
- ldout(rwl.m_image_ctx.cct, 20) << "Sync point op=[" << *this
- << "] appending" << dendl;
- sync_point->m_appending = true;
- }
- appending_contexts.swap(sync_point->m_on_sync_point_appending);
- }
+ ldout(m_cct, 20) << "Sync point op=[" << *this
+ << "] appending" << dendl;
+ auto appending_contexts = append_sync_point();
for (auto &ctx : appending_contexts) {
ctx->complete(0);
}
}
-template <typename T>
-void SyncPointLogOperation<T>::complete(int result) {
- std::vector<Context*> persisted_contexts;
-
+void SyncPointLogOperation::complete(int result) {
ceph_assert(sync_point);
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << "Sync point op =[" << *this
- << "] completed" << dendl;
- }
- {
- std::lock_guard locker(rwl.m_lock);
- /* Remove link from next sync point */
- ceph_assert(sync_point->later_sync_point);
- ceph_assert(sync_point->later_sync_point->earlier_sync_point ==
- sync_point);
- sync_point->later_sync_point->earlier_sync_point = nullptr;
- }
+ ldout(m_cct, 20) << "Sync point op =[" << *this
+ << "] completed" << dendl;
+ clear_earlier_sync_point();
/* Do append now in case completion occurred before the
* normal append callback executed, and to handle
* on_append work that was queued after the sync point
* entered the appending state. */
appending();
-
- {
- std::lock_guard locker(rwl.m_lock);
- /* The flush request that scheduled this op will be one of these
- * contexts */
- persisted_contexts.swap(sync_point->m_on_sync_point_persisted);
- // TODO handle flushed sync point in later PRs
- }
+ auto persisted_contexts = swap_on_sync_point_persisted();
for (auto &ctx : persisted_contexts) {
ctx->complete(result);
}
}
-template <typename T>
-GeneralWriteLogOperation<T>::GeneralWriteLogOperation(T &rwl,
- std::shared_ptr<SyncPoint<T>> sync_point,
- const utime_t dispatch_time)
- : GenericLogOperation<T>(rwl, dispatch_time),
+GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
+ const utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct)
+ : GenericLogOperation(dispatch_time, perfcounter),
m_lock(ceph::make_mutex(util::unique_lock_name(
- "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))), sync_point(sync_point) {
+ "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))),
+ m_cct(cct),
+ sync_point(sync_point) {
}
-template <typename T>
-GeneralWriteLogOperation<T>::~GeneralWriteLogOperation() { }
+GenericWriteLogOperation::~GenericWriteLogOperation() { }
-template <typename T>
-std::ostream &GeneralWriteLogOperation<T>::format(std::ostream &os) const {
- GenericLogOperation<T>::format(os);
+std::ostream &GenericWriteLogOperation::format(std::ostream &os) const {
+ GenericLogOperation::format(os);
return os;
};
-template <typename T>
std::ostream &operator<<(std::ostream &os,
- const GeneralWriteLogOperation<T> &op) {
+ const GenericWriteLogOperation &op) {
return op.format(os);
}
/* Called when the write log operation is appending and its log position is guaranteed */
-template <typename T>
-void GeneralWriteLogOperation<T>::appending() {
+void GenericWriteLogOperation::appending() {
Context *on_append = nullptr;
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << dendl;
- }
+ ldout(m_cct, 20) << __func__ << " " << this << dendl;
{
std::lock_guard locker(m_lock);
on_append = on_write_append;
on_write_append = nullptr;
}
if (on_append) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
- }
+ ldout(m_cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
on_append->complete(0);
}
}
/* Called when the write log operation is completed in all log replicas */
-template <typename T>
-void GeneralWriteLogOperation<T>::complete(int result) {
+void GenericWriteLogOperation::complete(int result) {
appending();
Context *on_persist = nullptr;
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << dendl;
- }
+ ldout(m_cct, 20) << __func__ << " " << this << dendl;
{
std::lock_guard locker(m_lock);
on_persist = on_write_persist;
on_write_persist = nullptr;
}
if (on_persist) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl;
- }
+ ldout(m_cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl;
on_persist->complete(result);
}
}
-template <typename T>
-WriteLogOperation<T>::WriteLogOperation(WriteLogOperationSet<T> &set,
- uint64_t image_offset_bytes, uint64_t write_bytes)
- : GeneralWriteLogOperation<T>(set.rwl, set.sync_point, set.dispatch_time),
+WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ CephContext *cct)
+ : GenericWriteLogOperation(set.sync_point, set.dispatch_time, set.perfcounter, cct),
log_entry(std::make_shared<WriteLogEntry>(set.sync_point->log_entry, image_offset_bytes, write_bytes)) {
on_write_append = set.extent_ops_appending->new_sub();
on_write_persist = set.extent_ops_persist->new_sub();
log_entry->sync_point_entry->bytes += write_bytes;
}
-template <typename T>
-WriteLogOperation<T>::~WriteLogOperation() { }
+WriteLogOperation::~WriteLogOperation() { }
-template <typename T>
-std::ostream &WriteLogOperation<T>::format(std::ostream &os) const {
+void WriteLogOperation::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation, uint64_t current_sync_gen,
+ uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset,
+ bool persist_on_flush) {
+ log_entry->init(has_data, allocation, current_sync_gen, last_op_sequence_num, persist_on_flush);
+ buffer_alloc = &(*allocation);
+ bl.substr_of(write_req_bl, buffer_offset,
+ log_entry->write_bytes());
+}
+
+std::ostream &WriteLogOperation::format(std::ostream &os) const {
os << "(Write) ";
- GeneralWriteLogOperation<T>::format(os);
+ GenericWriteLogOperation::format(os);
os << ", ";
if (log_entry) {
os << "log_entry=[" << *log_entry << "], ";
return os;
};
-template <typename T>
std::ostream &operator<<(std::ostream &os,
- const WriteLogOperation<T> &op) {
+ const WriteLogOperation &op) {
return op.format(os);
}
-template <typename T>
-WriteLogOperationSet<T>::WriteLogOperationSet(T &rwl, utime_t dispatched, std::shared_ptr<SyncPoint<T>> sync_point,
- bool persist_on_flush, Context *on_finish)
- : m_on_finish(on_finish), rwl(rwl),
- persist_on_flush(persist_on_flush), dispatch_time(dispatched), sync_point(sync_point) {
+void WriteLogOperation::complete(int result) {
+ GenericWriteLogOperation::complete(result);
+ m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, buf_persist_time - dispatch_time);
+ utime_t buf_lat = buf_persist_comp_time - buf_persist_time;
+ m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
+ m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, log_append_time - buf_persist_time);
+}
+
+void WriteLogOperation::copy_bl_to_pmem_buffer() {
+ /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
+ bufferlist::iterator i(&bl);
+ m_perfcounter->inc(l_librbd_rwl_log_op_bytes, log_entry->write_bytes());
+ ldout(m_cct, 20) << bl << dendl;
+ i.copy((unsigned)log_entry->write_bytes(), (char*)log_entry->pmem_buffer);
+}
+
+void WriteLogOperation::flush_pmem_buf_to_cache(PMEMobjpool *log_pool) {
+ buf_persist_time = ceph_clock_now();
+ pmemobj_flush(log_pool, log_entry->pmem_buffer, log_entry->write_bytes());
+}
+
+WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
+ bool persist_on_flush, CephContext *cct, Context *on_finish)
+ : m_cct(cct), m_on_finish(on_finish),
+ persist_on_flush(persist_on_flush),
+ dispatch_time(dispatched),
+ perfcounter(perfcounter),
+ sync_point(sync_point) {
on_ops_appending = sync_point->prior_log_entries_persisted->new_sub();
on_ops_persist = nullptr;
extent_ops_persist =
- new C_Gather(rwl.m_image_ctx.cct,
+ new C_Gather(m_cct,
new LambdaContext( [this](int r) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(this->rwl.m_image_ctx.cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
- }
+ ldout(this->m_cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
if (on_ops_persist) {
on_ops_persist->complete(r);
}
}));
auto appending_persist_sub = extent_ops_persist->new_sub();
extent_ops_appending =
- new C_Gather(rwl.m_image_ctx.cct,
+ new C_Gather(m_cct,
new LambdaContext( [this, appending_persist_sub](int r) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(this->rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
- }
+ ldout(this->m_cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
on_ops_appending->complete(r);
appending_persist_sub->complete(r);
}));
}
-template <typename T>
-WriteLogOperationSet<T>::~WriteLogOperationSet() { }
+WriteLogOperationSet::~WriteLogOperationSet() { }
-template <typename T>
std::ostream &operator<<(std::ostream &os,
- const WriteLogOperationSet<T> &s) {
+ const WriteLogOperationSet &s) {
os << "cell=" << (void*)s.cell << ", "
<< "extent_ops_appending=[" << s.extent_ops_appending << ", "
<< "extent_ops_persist=[" << s.extent_ops_persist << "]";
namespace rwl {
struct WriteBufferAllocation;
-template <typename T>
class WriteLogOperationSet;
-template <typename T>
class WriteLogOperation;
-template <typename T>
-class GeneralWriteLogOperation;
+class GenericWriteLogOperation;
-template <typename T>
class SyncPointLogOperation;
-template <typename T>
class GenericLogOperation;
-template <typename T>
-using GenericLogOperationSharedPtr = std::shared_ptr<GenericLogOperation<T>>;
+using GenericLogOperationSharedPtr = std::shared_ptr<GenericLogOperation>;
-template <typename T>
-using GenericLogOperationsVector = std::vector<GenericLogOperationSharedPtr<T>>;
+using GenericLogOperationsVector = std::vector<GenericLogOperationSharedPtr>;
-template <typename T>
class GenericLogOperation {
+protected:
+ PerfCounters *m_perfcounter = nullptr;
public:
- T &rwl;
utime_t dispatch_time; // When op created
utime_t buf_persist_time; // When buffer persist begins
utime_t buf_persist_comp_time; // When buffer persist completes
utime_t log_append_time; // When log append begins
utime_t log_append_comp_time; // When log append completes
- GenericLogOperation(T &rwl, const utime_t dispatch_time);
+ GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter);
virtual ~GenericLogOperation() { };
GenericLogOperation(const GenericLogOperation&) = delete;
GenericLogOperation &operator=(const GenericLogOperation&) = delete;
virtual std::ostream &format(std::ostream &os) const;
- template <typename U>
friend std::ostream &operator<<(std::ostream &os,
- const GenericLogOperation<U> &op);
+ const GenericLogOperation &op);
virtual const std::shared_ptr<GenericLogEntry> get_log_entry() = 0;
- virtual const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() {
- return nullptr;
- }
- virtual const std::shared_ptr<GeneralWriteLogEntry> get_gen_write_log_entry() {
- return nullptr;
- }
- virtual const std::shared_ptr<WriteLogEntry> get_write_log_entry() {
- return nullptr;
- }
virtual void appending() = 0;
virtual void complete(int r) = 0;
- virtual bool is_write() {
- return false;
- }
- virtual bool is_sync_point() {
+ virtual void mark_log_entry_completed() {};
+ virtual bool reserved_allocated() {
return false;
}
- virtual bool is_discard() {
- return false;
- }
- virtual bool is_writesame() {
- return false;
- }
- virtual bool is_writing_op() {
- return false;
- }
- virtual GeneralWriteLogOperation<T> *get_gen_write_op() {
- return nullptr;
- };
- virtual WriteLogOperation<T> *get_write_op() {
- return nullptr;
- };
+ virtual void copy_bl_to_pmem_buffer() {};
+ virtual void flush_pmem_buf_to_cache(PMEMobjpool *log_pool) {};
};
-template <typename T>
-class SyncPointLogOperation : public GenericLogOperation<T> {
+class SyncPointLogOperation : public GenericLogOperation {
+private:
+ CephContext *m_cct;
+ ceph::mutex &m_lock;
+ std::vector<Context*> append_sync_point();
+ void clear_earlier_sync_point();
+ std::vector<Context*> swap_on_sync_point_persisted();
public:
- using GenericLogOperation<T>::rwl;
- std::shared_ptr<SyncPoint<T>> sync_point;
- SyncPointLogOperation(T &rwl,
- std::shared_ptr<SyncPoint<T>> sync_point,
- const utime_t dispatch_time);
- ~SyncPointLogOperation();
+ std::shared_ptr<SyncPoint> sync_point;
+ SyncPointLogOperation(ceph::mutex &lock,
+ std::shared_ptr<SyncPoint> sync_point,
+ const utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct);
+ ~SyncPointLogOperation() override;
SyncPointLogOperation(const SyncPointLogOperation&) = delete;
SyncPointLogOperation &operator=(const SyncPointLogOperation&) = delete;
std::ostream &format(std::ostream &os) const;
- template <typename U>
friend std::ostream &operator<<(std::ostream &os,
- const SyncPointLogOperation<U> &op);
- const std::shared_ptr<GenericLogEntry> get_log_entry() {
- return get_sync_point_log_entry();
- }
- const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() {
+ const SyncPointLogOperation &op);
+ const std::shared_ptr<GenericLogEntry> get_log_entry() override {
return sync_point->log_entry;
}
- bool is_sync_point() {
- return true;
- }
- void appending();
- void complete(int r);
+ void appending() override;
+ void complete(int r) override;
};
-template <typename T>
-class GeneralWriteLogOperation : public GenericLogOperation<T> {
+class GenericWriteLogOperation : public GenericLogOperation {
protected:
ceph::mutex m_lock;
+ CephContext *m_cct;
public:
- using GenericLogOperation<T>::rwl;
- std::shared_ptr<SyncPoint<T>> sync_point;
+ std::shared_ptr<SyncPoint> sync_point;
Context *on_write_append = nullptr; /* Completion for things waiting on this
* write's position in the log to be
* guaranteed */
Context *on_write_persist = nullptr; /* Completion for things waiting on this
* write to persist */
- GeneralWriteLogOperation(T &rwl,
- std::shared_ptr<SyncPoint<T>> sync_point,
- const utime_t dispatch_time);
- ~GeneralWriteLogOperation();
- GeneralWriteLogOperation(const GeneralWriteLogOperation&) = delete;
- GeneralWriteLogOperation &operator=(const GeneralWriteLogOperation&) = delete;
+ GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
+ const utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct);
+ ~GenericWriteLogOperation() override;
+ GenericWriteLogOperation(const GenericWriteLogOperation&) = delete;
+ GenericWriteLogOperation &operator=(const GenericWriteLogOperation&) = delete;
std::ostream &format(std::ostream &os) const;
- template <typename U>
friend std::ostream &operator<<(std::ostream &os,
- const GeneralWriteLogOperation<U> &op);
- GeneralWriteLogOperation<T> *get_gen_write_op() {
- return this;
+ const GenericWriteLogOperation &op);
+ void mark_log_entry_completed() override{
+ sync_point->log_entry->writes_completed++;
}
- bool is_writing_op() {
+ bool reserved_allocated() override {
return true;
}
- void appending();
- void complete(int r);
+ void appending() override;
+ void complete(int r) override;
};
-template <typename T>
-class WriteLogOperation : public GeneralWriteLogOperation<T> {
+class WriteLogOperation : public GenericWriteLogOperation {
public:
- using GenericLogOperation<T>::rwl;
- using GeneralWriteLogOperation<T>::m_lock;
- using GeneralWriteLogOperation<T>::sync_point;
- using GeneralWriteLogOperation<T>::on_write_append;
- using GeneralWriteLogOperation<T>::on_write_persist;
+ using GenericWriteLogOperation::m_lock;
+ using GenericWriteLogOperation::sync_point;
+ using GenericWriteLogOperation::on_write_append;
+ using GenericWriteLogOperation::on_write_persist;
std::shared_ptr<WriteLogEntry> log_entry;
bufferlist bl;
WriteBufferAllocation *buffer_alloc = nullptr;
- WriteLogOperation(WriteLogOperationSet<T> &set, const uint64_t image_offset_bytes, const uint64_t write_bytes);
- ~WriteLogOperation();
+ WriteLogOperation(WriteLogOperationSet &set, const uint64_t image_offset_bytes,
+ const uint64_t write_bytes, CephContext *cct);
+ ~WriteLogOperation() override;
WriteLogOperation(const WriteLogOperation&) = delete;
WriteLogOperation &operator=(const WriteLogOperation&) = delete;
+ void init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation, uint64_t current_sync_gen,
+ uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset,
+ bool persist_on_flush);
std::ostream &format(std::ostream &os) const;
- template <typename U>
friend std::ostream &operator<<(std::ostream &os,
- const WriteLogOperation<T> &op);
- const std::shared_ptr<GenericLogEntry> get_log_entry() {
- return get_write_log_entry();
- }
- const std::shared_ptr<WriteLogEntry> get_write_log_entry() {
+ const WriteLogOperation &op);
+ const std::shared_ptr<GenericLogEntry> get_log_entry() override {
return log_entry;
}
- WriteLogOperation<T> *get_write_op() override {
- return this;
- }
- bool is_write() {
- return true;
- }
+
+ void complete(int r) override;
+ void copy_bl_to_pmem_buffer() override;
+ void flush_pmem_buf_to_cache(PMEMobjpool *log_pool) override;
};
-template <typename T>
class WriteLogOperationSet {
private:
+ CephContext *m_cct;
Context *m_on_finish;
public:
- T &rwl;
bool persist_on_flush;
BlockGuardCell *cell;
C_Gather *extent_ops_appending;
Context *on_ops_appending;
C_Gather *extent_ops_persist;
Context *on_ops_persist;
- GenericLogOperationsVector<T> operations;
+ GenericLogOperationsVector operations;
utime_t dispatch_time; /* When set created */
- std::shared_ptr<SyncPoint<T>> sync_point;
- WriteLogOperationSet(T &rwl, const utime_t dispatched, std::shared_ptr<SyncPoint<T>> sync_point,
- const bool persist_on_flush, Context *on_finish);
+ PerfCounters *perfcounter = nullptr;
+ std::shared_ptr<SyncPoint> sync_point;
+ WriteLogOperationSet(const utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
+ const bool persist_on_flush, CephContext *cct, Context *on_finish);
~WriteLogOperationSet();
WriteLogOperationSet(const WriteLogOperationSet&) = delete;
WriteLogOperationSet &operator=(const WriteLogOperationSet&) = delete;
- template <typename U>
friend std::ostream &operator<<(std::ostream &os,
- const WriteLogOperationSet<U> &s);
+ const WriteLogOperationSet &s);
};
-} // namespace rwl
-} // namespace cache
-} // namespace librbd
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
#endif // CEPH_LIBRBD_CACHE_RWL_LOG_OPERATION_H
#include "librbd/BlockGuard.h"
#include "librbd/cache/rwl/LogEntry.h"
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \
<< __func__ << ": "
namespace cache {
namespace rwl {
-typedef std::list<std::shared_ptr<GeneralWriteLogEntry>> GeneralWriteLogEntries;
+typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
template <typename T>
-C_GuardedBlockIORequest<T>::C_GuardedBlockIORequest(T &rwl)
- : rwl(rwl) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
- }
+C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
+ bufferlist&& bl, const int fadvise_flags, Context *user_req)
+ : rwl(rwl), image_extents(std::move(extents)),
+ bl(std::move(bl)), fadvise_flags(fadvise_flags),
+ user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
+ ldout(rwl.get_context(), 99) << this << dendl;
}
template <typename T>
-C_GuardedBlockIORequest<T>::~C_GuardedBlockIORequest() {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
- }
+C_BlockIORequest<T>::~C_BlockIORequest() {
+ ldout(rwl.get_context(), 99) << this << dendl;
ceph_assert(m_cell_released || !m_cell);
}
template <typename T>
-void C_GuardedBlockIORequest<T>::set_cell(BlockGuardCell *cell) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << cell << dendl;
- }
+std::ostream &operator<<(std::ostream &os,
+ const C_BlockIORequest<T> &req) {
+ os << "image_extents=[" << req.image_extents << "], "
+ << "image_extents_summary=[" << req.image_extents_summary << "], "
+ << "bl=" << req.bl << ", "
+ << "user_req=" << req.user_req << ", "
+ << "m_user_req_completed=" << req.m_user_req_completed << ", "
+ << "m_deferred=" << req.m_deferred << ", "
+ << "detained=" << req.detained << ", "
+ << "waited_lanes=" << req.waited_lanes << ", "
+ << "waited_entries=" << req.waited_entries << ", "
+ << "waited_buffers=" << req.waited_buffers << "";
+ return os;
+}
+
+template <typename T>
+void C_BlockIORequest<T>::set_cell(BlockGuardCell *cell) {
+ ldout(rwl.get_context(), 20) << this << " cell=" << cell << dendl;
ceph_assert(cell);
ceph_assert(!m_cell);
m_cell = cell;
}
template <typename T>
-BlockGuardCell *C_GuardedBlockIORequest<T>::get_cell(void) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl;
- }
+BlockGuardCell *C_BlockIORequest<T>::get_cell(void) {
+ ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
return m_cell;
}
template <typename T>
-void C_GuardedBlockIORequest<T>::release_cell() {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl;
- }
+void C_BlockIORequest<T>::release_cell() {
+ ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
ceph_assert(m_cell);
bool initial = false;
if (m_cell_released.compare_exchange_strong(initial, true)) {
rwl.release_guarded_request(m_cell);
} else {
- ldout(rwl.m_image_ctx.cct, 5) << "cell " << m_cell << " already released for " << this << dendl;
- }
-}
-
-template <typename T>
-C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
- bufferlist&& bl, const int fadvise_flags, Context *user_req)
- : C_GuardedBlockIORequest<T>(rwl), image_extents(std::move(extents)),
- bl(std::move(bl)), fadvise_flags(fadvise_flags),
- user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
- }
- /* Remove zero length image extents from input */
- for (auto it = image_extents.begin(); it != image_extents.end(); ) {
- if (0 == it->second) {
- it = image_extents.erase(it);
- continue;
- }
- ++it;
- }
-}
-
-template <typename T>
-C_BlockIORequest<T>::~C_BlockIORequest() {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+ ldout(rwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl;
}
}
-template <typename T>
-std::ostream &operator<<(std::ostream &os,
- const C_BlockIORequest<T> &req) {
- os << "image_extents=[" << req.image_extents << "], "
- << "image_extents_summary=[" << req.image_extents_summary << "], "
- << "bl=" << req.bl << ", "
- << "user_req=" << req.user_req << ", "
- << "m_user_req_completed=" << req.m_user_req_completed << ", "
- << "m_deferred=" << req.m_deferred << ", "
- << "detained=" << req.detained << ", "
- << "m_waited_lanes=" << req.m_waited_lanes << ", "
- << "m_waited_entries=" << req.m_waited_entries << ", "
- << "m_waited_buffers=" << req.m_waited_buffers << "";
- return os;
- }
-
template <typename T>
void C_BlockIORequest<T>::complete_user_request(int r) {
bool initial = false;
if (m_user_req_completed.compare_exchange_strong(initial, true)) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 15) << this << " completing user req" << dendl;
- }
+ ldout(rwl.get_context(), 15) << this << " completing user req" << dendl;
m_user_req_completed_time = ceph_clock_now();
user_req->complete(r);
// Set user_req as null as it is deleted
user_req = nullptr;
} else {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << this << " user req already completed" << dendl;
- }
+ ldout(rwl.get_context(), 20) << this << " user req already completed" << dendl;
}
}
template <typename T>
void C_BlockIORequest<T>::finish(int r) {
- ldout(rwl.m_image_ctx.cct, 20) << this << dendl;
+ ldout(rwl.get_context(), 20) << this << dendl;
complete_user_request(r);
bool initial = false;
if (m_finish_called.compare_exchange_strong(initial, true)) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 15) << this << " finishing" << dendl;
- }
+ ldout(rwl.get_context(), 15) << this << " finishing" << dendl;
finish_req(0);
} else {
- ldout(rwl.m_image_ctx.cct, 20) << this << " already finished" << dendl;
+ ldout(rwl.get_context(), 20) << this << " already finished" << dendl;
ceph_assert(0);
}
}
template <typename T>
C_WriteRequest<T>::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
- bufferlist&& bl, const int fadvise_flags, Context *user_req)
- : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
- }
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
+ m_lock(lock), m_perfcounter(perfcounter) {
+ ldout(rwl.get_context(), 99) << this << dendl;
}
template <typename T>
C_WriteRequest<T>::~C_WriteRequest() {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
- }
+ ldout(rwl.get_context(), 99) << this << dendl;
}
template <typename T>
std::ostream &operator<<(std::ostream &os,
- const C_WriteRequest<T> &req) {
+ const C_WriteRequest<T> &req) {
os << (C_BlockIORequest<T>&)req
- << " m_resources.allocated=" << req.resources.allocated;
- if (req.m_op_set) {
- os << "m_op_set=" << *req.m_op_set;
+ << " m_resources.allocated=" << req.m_resources.allocated;
+ if (req.op_set) {
+ os << "op_set=" << *req.op_set;
}
return os;
};
template <typename T>
void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.m_cell << dendl;
- }
+ ldout(rwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl;
- ceph_assert(guard_ctx.m_cell);
- this->detained = guard_ctx.m_state.detained; /* overlapped */
- this->m_queued = guard_ctx.m_state.queued; /* queued behind at least one barrier */
- this->set_cell(guard_ctx.m_cell);
+ ceph_assert(guard_ctx.cell);
+ this->detained = guard_ctx.state.detained; /* overlapped */
+ this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */
+ this->set_cell(guard_ctx.cell);
}
template <typename T>
void C_WriteRequest<T>::finish_req(int r) {
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
- }
+ ldout(rwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
/* Completed to caller by here (in finish(), which calls this) */
utime_t now = ceph_clock_now();
rwl.release_write_lanes(this);
+ ceph_assert(m_resources.allocated);
+ m_resources.allocated = false;
this->release_cell(); /* TODO: Consider doing this in appending state */
update_req_stats(now);
}
template <typename T>
-void C_WriteRequest<T>::setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied) {
+void C_WriteRequest<T>::setup_buffer_resources(
+ uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+ uint64_t &number_lanes, uint64_t &number_log_entries,
+ uint64_t &number_unpublished_reserves) {
+
+ ceph_assert(!m_resources.allocated);
+
+ auto image_extents_size = this->image_extents.size();
+ m_resources.buffers.reserve(image_extents_size);
+
+ bytes_cached = 0;
+ bytes_allocated = 0;
+ number_lanes = image_extents_size;
+ number_log_entries = image_extents_size;
+ number_unpublished_reserves = image_extents_size;
+
for (auto &extent : this->image_extents) {
- resources.buffers.emplace_back();
- struct WriteBufferAllocation &buffer = resources.buffers.back();
+ m_resources.buffers.emplace_back();
+ struct WriteBufferAllocation &buffer = m_resources.buffers.back();
buffer.allocation_size = MIN_WRITE_ALLOC_SIZE;
buffer.allocated = false;
bytes_cached += extent.second;
if (extent.second > buffer.allocation_size) {
buffer.allocation_size = extent.second;
}
+ bytes_allocated += buffer.allocation_size;
}
bytes_dirtied = bytes_cached;
}
template <typename T>
void C_WriteRequest<T>::setup_log_operations() {
- for (auto &extent : this->image_extents) {
- /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
- auto operation =
- std::make_shared<WriteLogOperation<T>>(*m_op_set, extent.first, extent.second);
- m_op_set->operations.emplace_back(operation);
+ {
+ std::lock_guard locker(m_lock);
+ // TODO: Add sync point if necessary
+ std::shared_ptr<SyncPoint> current_sync_point = rwl.get_current_sync_point();
+ uint64_t current_sync_gen = rwl.get_current_sync_gen();
+ op_set =
+ make_unique<WriteLogOperationSet>(this->m_dispatched_time,
+ m_perfcounter,
+ current_sync_point,
+ rwl.get_persist_on_flush(),
+ rwl.get_context(), this);
+ ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() << dendl;
+ ceph_assert(m_resources.allocated);
+ /* op_set->operations initialized differently for plain write or write same */
+ auto allocation = m_resources.buffers.begin();
+ uint64_t buffer_offset = 0;
+ for (auto &extent : this->image_extents) {
+ /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
+ auto operation =
+ std::make_shared<WriteLogOperation>(*op_set, extent.first, extent.second, rwl.get_context());
+ op_set->operations.emplace_back(operation);
+
+ /* A WS is also a write */
+ ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get()
+ << " operation=" << operation << dendl;
+ rwl.inc_last_op_sequence_num();
+ operation->init(true, allocation, current_sync_gen,
+ rwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush);
+ buffer_offset += operation->log_entry->write_bytes();
+ ldout(rwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl;
+ allocation++;
+ }
+ }
+ /* All extent ops subs created */
+ op_set->extent_ops_appending->activate();
+ op_set->extent_ops_persist->activate();
+
+ /* Write data */
+ for (auto &operation : op_set->operations) {
+ operation->copy_bl_to_pmem_buffer();
}
}
+template <typename T>
+bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_point) {
+ std::lock_guard locker(m_lock);
+ auto write_req_sp = this;
+ if (sync_point->earlier_sync_point) {
+ Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
+ write_req_sp->schedule_append();
+ });
+ sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
+ return true;
+ }
+ return false;
+}
+
template <typename T>
void C_WriteRequest<T>::schedule_append() {
ceph_assert(++m_appended == 1);
if (m_do_early_flush) {
/* This caller is waiting for persist, so we'll use their thread to
* expedite it */
- rwl.flush_pmem_buffer(this->m_op_set->operations);
- rwl.schedule_append(this->m_op_set->operations);
+ rwl.flush_pmem_buffer(this->op_set->operations);
+ rwl.schedule_append(this->op_set->operations);
} else {
/* This is probably not still the caller's thread, so do the payload
* flushing/replicating later. */
- rwl.schedule_flush_and_append(this->m_op_set->operations);
+ rwl.schedule_flush_and_append(this->op_set->operations);
}
}
* Lanes are released after the write persists via release_write_lanes()
*/
template <typename T>
-bool C_WriteRequest<T>::alloc_resources()
-{
- bool alloc_succeeds = true;
- bool no_space = false;
- utime_t alloc_start = ceph_clock_now();
- uint64_t bytes_allocated = 0;
- uint64_t bytes_cached = 0;
- uint64_t bytes_dirtied = 0;
-
- ceph_assert(!resources.allocated);
- resources.buffers.reserve(this->image_extents.size());
- {
- std::lock_guard locker(rwl.m_lock);
- if (rwl.m_free_lanes < this->image_extents.size()) {
- this->m_waited_lanes = true;
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << "not enough free lanes (need "
- << this->image_extents.size()
- << ", have " << rwl.m_free_lanes << ") "
- << *this << dendl;
- }
- alloc_succeeds = false;
- /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
- }
- if (rwl.m_free_log_entries < this->image_extents.size()) {
- this->m_waited_entries = true;
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << "not enough free entries (need "
- << this->image_extents.size()
- << ", have " << rwl.m_free_log_entries << ") "
- << *this << dendl;
- }
- alloc_succeeds = false;
- no_space = true; /* Entries must be retired */
- }
- /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
- if (rwl.m_bytes_allocated > rwl.m_bytes_allocated_cap) {
- if (!this->m_waited_buffers) {
- this->m_waited_buffers = true;
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" << rwl.m_bytes_allocated_cap
- << ", allocated=" << rwl.m_bytes_allocated
- << ") in write [" << *this << "]" << dendl;
- }
- }
- alloc_succeeds = false;
- no_space = true; /* Entries must be retired */
- }
- }
- if (alloc_succeeds) {
- setup_buffer_resources(bytes_cached, bytes_dirtied);
- }
-
- if (alloc_succeeds) {
- for (auto &buffer : resources.buffers) {
- bytes_allocated += buffer.allocation_size;
- utime_t before_reserve = ceph_clock_now();
- buffer.buffer_oid = pmemobj_reserve(rwl.m_log_pool,
- &buffer.buffer_alloc_action,
- buffer.allocation_size,
- 0 /* Object type */);
- buffer.allocation_lat = ceph_clock_now() - before_reserve;
- if (TOID_IS_NULL(buffer.buffer_oid)) {
- if (!this->m_waited_buffers) {
- this->m_waited_buffers = true;
- }
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 5) << "can't allocate all data buffers: "
- << pmemobj_errormsg() << ". "
- << *this << dendl;
- }
- alloc_succeeds = false;
- no_space = true; /* Entries need to be retired */
- break;
- } else {
- buffer.allocated = true;
- }
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
- << "." << buffer.buffer_oid.oid.off
- << ", size=" << buffer.allocation_size << dendl;
- }
- }
- }
-
- if (alloc_succeeds) {
- unsigned int num_extents = this->image_extents.size();
- std::lock_guard locker(rwl.m_lock);
- /* We need one free log entry per extent (each is a separate entry), and
- * one free "lane" for remote replication. */
- if ((rwl.m_free_lanes >= num_extents) &&
- (rwl.m_free_log_entries >= num_extents)) {
- rwl.m_free_lanes -= num_extents;
- rwl.m_free_log_entries -= num_extents;
- rwl.m_unpublished_reserves += num_extents;
- rwl.m_bytes_allocated += bytes_allocated;
- rwl.m_bytes_cached += bytes_cached;
- rwl.m_bytes_dirty += bytes_dirtied;
- resources.allocated = true;
- } else {
- alloc_succeeds = false;
- }
- }
-
- if (!alloc_succeeds) {
- /* On alloc failure, free any buffers we did allocate */
- for (auto &buffer : resources.buffers) {
- if (buffer.allocated) {
- pmemobj_cancel(rwl.m_log_pool, &buffer.buffer_alloc_action, 1);
- }
- }
- resources.buffers.clear();
- if (no_space) {
- /* Expedite flushing and/or retiring */
- std::lock_guard locker(rwl.m_lock);
- rwl.m_alloc_failed_since_retire = true;
- rwl.m_last_alloc_fail = ceph_clock_now();
- }
- }
-
- this->m_allocated_time = alloc_start;
- return alloc_succeeds;
+bool C_WriteRequest<T>::alloc_resources() {
+ this->allocated_time = ceph_clock_now();
+ return rwl.alloc_resources(this);
}
/**
* Takes custody of write_req. Resources must already be allocated.
*
* Locking:
- * Acquires m_lock
+ * Acquires lock
*/
template <typename T>
void C_WriteRequest<T>::dispatch()
{
- CephContext *cct = rwl.m_image_ctx.cct;
- GeneralWriteLogEntries log_entries;
- DeferredContexts on_exit;
+ CephContext *cct = rwl.get_context();
utime_t now = ceph_clock_now();
- auto write_req_sp = this;
this->m_dispatched_time = now;
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 15) << "name: " << rwl.m_image_ctx.name << " id: " << rwl.m_image_ctx.id
- << "write_req=" << this << " cell=" << this->get_cell() << dendl;
- }
-
- {
- uint64_t buffer_offset = 0;
- std::lock_guard locker(rwl.m_lock);
- Context *set_complete = this;
- // TODO: Add sync point if necessary
- //
- m_op_set =
- make_unique<WriteLogOperationSet<T>>(rwl, now, rwl.m_current_sync_point, rwl.m_persist_on_flush,
- set_complete);
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() << dendl;
- }
- ceph_assert(resources.allocated);
- /* m_op_set->operations initialized differently for plain write or write same */
- this->setup_log_operations();
- auto allocation = resources.buffers.begin();
- for (auto &gen_op : m_op_set->operations) {
- /* A WS is also a write */
- auto operation = gen_op->get_write_op();
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get()
- << " operation=" << operation << dendl;
- }
- log_entries.emplace_back(operation->log_entry);
- rwl.m_perfcounter->inc(l_librbd_rwl_log_ops, 1);
-
- operation->log_entry->ram_entry.has_data = 1;
- operation->log_entry->ram_entry.write_data = allocation->buffer_oid;
- // TODO: make std::shared_ptr
- operation->buffer_alloc = &(*allocation);
- ceph_assert(!TOID_IS_NULL(operation->log_entry->ram_entry.write_data));
- operation->log_entry->pmem_buffer = D_RW(operation->log_entry->ram_entry.write_data);
- operation->log_entry->ram_entry.sync_gen_number = rwl.m_current_sync_gen;
- if (m_op_set->persist_on_flush) {
- /* Persist on flush. Sequence #0 is never used. */
- operation->log_entry->ram_entry.write_sequence_number = 0;
- } else {
- /* Persist on write */
- operation->log_entry->ram_entry.write_sequence_number = ++rwl.m_last_op_sequence_num;
- operation->log_entry->ram_entry.sequenced = 1;
- }
- operation->log_entry->ram_entry.sync_point = 0;
- operation->log_entry->ram_entry.discard = 0;
- operation->bl.substr_of(this->bl, buffer_offset,
- operation->log_entry->write_bytes());
- buffer_offset += operation->log_entry->write_bytes();
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << "operation=[" << *operation << "]" << dendl;
- }
- allocation++;
- }
- }
- /* All extent ops subs created */
- m_op_set->extent_ops_appending->activate();
- m_op_set->extent_ops_persist->activate();
-
- /* Write data */
- for (auto &operation : m_op_set->operations) {
- /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
- auto write_op = operation->get_write_op();
- ceph_assert(write_op != nullptr);
- bufferlist::iterator i(&write_op->bl);
- rwl.m_perfcounter->inc(l_librbd_rwl_log_op_bytes, write_op->log_entry->write_bytes());
- if (RWL_VERBOSE_LOGGING) {
- ldout(cct, 20) << write_op->bl << dendl;
- }
- i.copy((unsigned)write_op->log_entry->write_bytes(), (char*)write_op->log_entry->pmem_buffer);
- }
-
- // TODO: Add to log map for read
-
- /*
- * Entries are added to m_log_entries in alloc_op_log_entries() when their
- * order is established. They're added to m_dirty_log_entries when the write
- * completes to all replicas. They must not be flushed before then. We don't
- * prevent the application from reading these before they persist. If we
- * supported coherent shared access, that might be a problem (the write could
- * fail after another initiator had read it). As it is the cost of running
- * reads through the block guard (and exempting them from the barrier, which
- * doesn't need to apply to them) to prevent reading before the previous
- * write of that data persists doesn't seem justified.
- */
-
- if (rwl.m_persist_on_flush_early_user_comp &&
- m_op_set->persist_on_flush) {
- /*
- * We're done with the caller's buffer, and not guaranteeing
- * persistence until the next flush. The block guard for this
- * write_req will not be released until the write is persisted
- * everywhere, but the caller's request can complete now.
- */
- this->complete_user_request(0);
- }
+ ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
+ setup_log_operations();
bool append_deferred = false;
- {
- std::lock_guard locker(rwl.m_lock);
- if (!m_op_set->persist_on_flush &&
- m_op_set->sync_point->earlier_sync_point) {
- /* In persist-on-write mode, we defer the append of this write until the
- * previous sync point is appending (meaning all the writes before it are
- * persisted and that previous sync point can now appear in the
- * log). Since we insert sync points in persist-on-write mode when writes
- * have already completed to the current sync point, this limits us to
- * one inserted sync point in flight at a time, and gives the next
- * inserted sync point some time to accumulate a few writes if they
- * arrive soon. Without this we can insert an absurd number of sync
- * points, each with one or two writes. That uses a lot of log entries,
- * and limits flushing to very few writes at a time. */
- m_do_early_flush = false;
- Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
- write_req_sp->schedule_append();
- });
- m_op_set->sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
- append_deferred = true;
- } else {
- /* The prior sync point is done, so we'll schedule append here. If this is
- * persist-on-write, and probably still the caller's thread, we'll use this
- * caller's thread to perform the persist & replication of the payload
- * buffer. */
- m_do_early_flush =
- !(this->detained || this->m_queued || this->m_deferred || m_op_set->persist_on_flush);
- }
+ if (!op_set->persist_on_flush &&
+ append_write_request(op_set->sync_point)) {
+ /* In persist-on-write mode, we defer the append of this write until the
+ * previous sync point is appending (meaning all the writes before it are
+ * persisted and that previous sync point can now appear in the
+ * log). Since we insert sync points in persist-on-write mode when writes
+ * have already completed to the current sync point, this limits us to
+ * one inserted sync point in flight at a time, and gives the next
+ * inserted sync point some time to accumulate a few writes if they
+ * arrive soon. Without this we can insert an absurd number of sync
+ * points, each with one or two writes. That uses a lot of log entries,
+ * and limits flushing to very few writes at a time. */
+ m_do_early_flush = false;
+ append_deferred = true;
+ } else {
+ /* The prior sync point is done, so we'll schedule append here. If this is
+ * persist-on-write, and probably still the caller's thread, we'll use this
+ * caller's thread to perform the persist & replication of the payload
+ * buffer. */
+ m_do_early_flush =
+ !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush);
}
if (!append_deferred) {
this->schedule_append();
GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { }
void GuardedRequestFunctionContext::finish(int r) {
- ceph_assert(m_cell);
+ ceph_assert(cell);
m_callback(*this);
}
std::ostream &operator<<(std::ostream &os,
const GuardedRequest &r) {
- os << "guard_ctx->m_state=[" << r.guard_ctx->m_state << "], "
+ os << "guard_ctx->state=[" << r.guard_ctx->state << "], "
<< "block_extent.block_start=" << r.block_extent.block_start << ", "
<< "block_extent.block_start=" << r.block_extent.block_end;
return os;
};
-} // namespace rwl
-} // namespace cache
-} // namespace librbd
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
class GuardedRequestFunctionContext;
-/**
- * A request that can be deferred in a BlockGuard to sequence
- * overlapping operations.
- */
-template <typename T>
-class C_GuardedBlockIORequest : public Context {
-public:
- T &rwl;
- C_GuardedBlockIORequest(T &rwl);
- ~C_GuardedBlockIORequest();
- C_GuardedBlockIORequest(const C_GuardedBlockIORequest&) = delete;
- C_GuardedBlockIORequest &operator=(const C_GuardedBlockIORequest&) = delete;
-
- virtual const char *get_name() const = 0;
- void set_cell(BlockGuardCell *cell);
- BlockGuardCell *get_cell(void);
- void release_cell();
-
-private:
- std::atomic<bool> m_cell_released = {false};
- BlockGuardCell* m_cell = nullptr;
+struct WriteRequestResources {
+ bool allocated = false;
+ std::vector<WriteBufferAllocation> buffers;
};
/**
+ * A request that can be deferred in a BlockGuard to sequence
+ * overlapping operations.
* This is the custodian of the BlockGuard cell for this IO, and the
* state information about the progress of this IO. This object lives
* until the IO is persisted in all (live) log replicas. User request
* may be completed from here before the IO persists.
*/
template <typename T>
-class C_BlockIORequest : public C_GuardedBlockIORequest<T> {
+class C_BlockIORequest : public Context {
public:
- using C_GuardedBlockIORequest<T>::rwl;
-
+ T &rwl;
io::Extents image_extents;
bufferlist bl;
int fadvise_flags;
Context *user_req; /* User write request */
ExtentsSummary<io::Extents> image_extents_summary;
bool detained = false; /* Detained in blockguard (overlapped with a prior IO) */
+ utime_t allocated_time; /* When allocation began */
+ bool waited_lanes = false; /* This IO waited for free persist/replicate lanes */
+ bool waited_entries = false; /* This IO waited for free log entries */
+ bool waited_buffers = false; /* This IO waited for data buffers (pmemobj_reserve() failed) */
C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
bufferlist&& bl, const int fadvise_flags, Context *user_req);
- virtual ~C_BlockIORequest();
+ ~C_BlockIORequest() override;
+ C_BlockIORequest(const C_BlockIORequest&) = delete;
+ C_BlockIORequest &operator=(const C_BlockIORequest&) = delete;
+
+ void set_cell(BlockGuardCell *cell);
+ BlockGuardCell *get_cell(void);
+ void release_cell();
void complete_user_request(int r);
void finish(int r);
virtual void dispatch() = 0;
- virtual const char *get_name() const override {
+ virtual const char *get_name() const {
return "C_BlockIORequest";
}
+ uint64_t get_image_extents_size() {
+ return image_extents.size();
+ }
+ void set_io_waited_for_lanes(bool waited) {
+ waited_lanes = waited;
+ }
+ void set_io_waited_for_entries(bool waited) {
+ waited_entries = waited;
+ }
+ void set_io_waited_for_buffers(bool waited) {
+ waited_buffers = waited;
+ }
+ bool has_io_waited_for_buffers() {
+ return waited_buffers;
+ }
+ std::vector<WriteBufferAllocation>& get_resources_buffers() {
+ return m_resources.buffers;
+ }
+
+ void set_allocated(bool allocated) {
+ if (allocated) {
+ m_resources.allocated = true;
+ } else {
+ m_resources.buffers.clear();
+ }
+ }
+
+ virtual void setup_buffer_resources(
+ uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+ uint64_t &number_lanes, uint64_t &number_log_entries,
+ uint64_t &number_unpublished_reserves) {};
protected:
utime_t m_arrived_time;
- utime_t m_allocated_time; /* When allocation began */
utime_t m_dispatched_time; /* When dispatch began */
utime_t m_user_req_completed_time;
- bool m_waited_lanes = false; /* This IO waited for free persist/replicate lanes */
- bool m_waited_entries = false; /* This IO waited for free log entries */
- bool m_waited_buffers = false; /* This IO waited for data buffers (pmemobj_reserve() failed) */
std::atomic<bool> m_deferred = {false}; /* Deferred because this or a prior IO had to wait for write resources */
+ WriteRequestResources m_resources;
private:
std::atomic<bool> m_user_req_completed = {false};
std::atomic<bool> m_finish_called = {false};
+ std::atomic<bool> m_cell_released = {false};
+ BlockGuardCell* m_cell = nullptr;
template <typename U>
friend std::ostream &operator<<(std::ostream &os,
const C_BlockIORequest<U> &req);
};
-struct WriteRequestResources {
- bool allocated = false;
- std::vector<WriteBufferAllocation> buffers;
-};
-
/**
* This is the custodian of the BlockGuard cell for this write. Block
* guard is not released until the write persists everywhere (this is
class C_WriteRequest : public C_BlockIORequest<T> {
public:
using C_BlockIORequest<T>::rwl;
- WriteRequestResources resources;
+ unique_ptr<WriteLogOperationSet> op_set = nullptr;
C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
- bufferlist&& bl, const int fadvise_flags, Context *user_req);
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req);
- ~C_WriteRequest();
+ ~C_WriteRequest() override;
void blockguard_acquired(GuardedRequestFunctionContext &guard_ctx);
/* Common finish to plain write and compare-and-write (if it writes) */
- virtual void finish_req(int r);
+ void finish_req(int r) override;
/* Compare and write will override this */
virtual void update_req_stats(utime_t &now) {
// TODO: Add in later PRs
}
- virtual bool alloc_resources() override;
-
- /* Plain writes will allocate one buffer per request extent */
- virtual void setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied);
+ bool alloc_resources() override;
void deferred_handler() override { }
virtual void setup_log_operations();
+ bool append_write_request(std::shared_ptr<SyncPoint> sync_point);
+
virtual void schedule_append();
const char *get_name() const override {
return "C_WriteRequest";
}
+protected:
+ using C_BlockIORequest<T>::m_resources;
+ /* Plain writes will allocate one buffer per request extent */
+ void setup_buffer_resources(
+ uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+ uint64_t &number_lanes, uint64_t &number_log_entries,
+ uint64_t &number_unpublished_reserves) override;
+
private:
- unique_ptr<WriteLogOperationSet<T>> m_op_set = nullptr;
bool m_do_early_flush = false;
std::atomic<int> m_appended = {0};
bool m_queued = false;
-
+ ceph::mutex &m_lock;
+ PerfCounters *m_perfcounter = nullptr;
template <typename U>
friend std::ostream &operator<<(std::ostream &os,
const C_WriteRequest<U> &req);
class GuardedRequestFunctionContext : public Context {
public:
- BlockGuardCell *m_cell = nullptr;
- BlockGuardReqState m_state;
+ BlockGuardCell *cell = nullptr;
+ BlockGuardReqState state;
GuardedRequestFunctionContext(boost::function<void(GuardedRequestFunctionContext&)> &&callback);
- ~GuardedRequestFunctionContext(void);
+ ~GuardedRequestFunctionContext(void) override;
GuardedRequestFunctionContext(const GuardedRequestFunctionContext&) = delete;
GuardedRequestFunctionContext &operator=(const GuardedRequestFunctionContext&) = delete;
GuardedRequest(const BlockExtent block_extent,
GuardedRequestFunctionContext *on_guard_acquire, bool barrier = false)
: block_extent(block_extent), guard_ctx(on_guard_acquire) {
- guard_ctx->m_state.barrier = barrier;
+ guard_ctx->state.barrier = barrier;
}
friend std::ostream &operator<<(std::ostream &os,
const GuardedRequest &r);
};
-} // namespace rwl
-} // namespace cache
-} // namespace librbd
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
-#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H
+#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H
#include "SyncPoint.h"
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::rwl::SyncPoint: " << this << " " \
<< __func__ << ": "
namespace cache {
namespace rwl {
-template <typename T>
-SyncPoint<T>::SyncPoint(T &rwl, const uint64_t sync_gen_num)
- : rwl(rwl), log_entry(std::make_shared<SyncPointLogEntry>(sync_gen_num)) {
- prior_log_entries_persisted = new C_Gather(rwl.m_image_ctx.cct, nullptr);
- sync_point_persist = new C_Gather(rwl.m_image_ctx.cct, nullptr);
+SyncPoint::SyncPoint(uint64_t sync_gen_num, CephContext *cct)
+ : log_entry(std::make_shared<SyncPointLogEntry>(sync_gen_num)), m_cct(cct) {
+ prior_log_entries_persisted = new C_Gather(cct, nullptr);
+ sync_point_persist = new C_Gather(cct, nullptr);
on_sync_point_appending.reserve(MAX_WRITES_PER_SYNC_POINT + 2);
on_sync_point_persisted.reserve(MAX_WRITES_PER_SYNC_POINT + 2);
- if (RWL_VERBOSE_LOGGING) {
- ldout(rwl.m_image_ctx.cct, 20) << "sync point " << sync_gen_num << dendl;
- }
+ ldout(m_cct, 20) << "sync point " << sync_gen_num << dendl;
}
-template <typename T>
-SyncPoint<T>::~SyncPoint() {
+SyncPoint::~SyncPoint() {
ceph_assert(on_sync_point_appending.empty());
ceph_assert(on_sync_point_persisted.empty());
ceph_assert(!earlier_sync_point);
}
-template <typename T>
-std::ostream &SyncPoint<T>::format(std::ostream &os) const {
- os << "log_entry=[" << *log_entry << "], "
- << "earlier_sync_point=" << earlier_sync_point << ", "
- << "later_sync_point=" << later_sync_point << ", "
- << "final_op_sequence_num=" << final_op_sequence_num << ", "
- << "prior_log_entries_persisted=" << prior_log_entries_persisted << ", "
- << "prior_log_entries_persisted_complete=" << prior_log_entries_persisted_complete << ", "
- << "append_scheduled=" << append_scheduled << ", "
- << "appending=" << appending << ", "
- << "on_sync_point_appending=" << on_sync_point_appending.size() << ", "
- << "on_sync_point_persisted=" << on_sync_point_persisted.size() << "";
+std::ostream &operator<<(std::ostream &os,
+ const SyncPoint &p) {
+ os << "log_entry=[" << *p.log_entry << "], "
+ << "earlier_sync_point=" << p.earlier_sync_point << ", "
+ << "later_sync_point=" << p.later_sync_point << ", "
+ << "final_op_sequence_num=" << p.final_op_sequence_num << ", "
+ << "prior_log_entries_persisted=" << p.prior_log_entries_persisted << ", "
+ << "prior_log_entries_persisted_complete=" << p.prior_log_entries_persisted_complete << ", "
+ << "append_scheduled=" << p.append_scheduled << ", "
+ << "appending=" << p.appending << ", "
+ << "on_sync_point_appending=" << p.on_sync_point_appending.size() << ", "
+ << "on_sync_point_persisted=" << p.on_sync_point_persisted.size() << "";
return os;
-};
+}
} // namespace rwl
} // namespace cache
namespace librbd {
namespace cache {
namespace rwl {
-/* Limit work between sync points */
-static const uint64_t MAX_WRITES_PER_SYNC_POINT = 256;
-template <typename T>
class SyncPoint {
public:
- T &rwl;
std::shared_ptr<SyncPointLogEntry> log_entry;
- /* Use m_lock for earlier/later links */
- std::shared_ptr<SyncPoint<T>> earlier_sync_point; /* NULL if earlier has completed */
- std::shared_ptr<SyncPoint<T>> later_sync_point;
+ /* Use lock for earlier/later links */
+ std::shared_ptr<SyncPoint> earlier_sync_point; /* NULL if earlier has completed */
+ std::shared_ptr<SyncPoint> later_sync_point;
uint64_t final_op_sequence_num = 0;
/* A sync point can't appear in the log until all the writes bearing
* it and all the prior sync points have been appended and
* aio_flush() calls are added to this. */
std::vector<Context*> on_sync_point_persisted;
- SyncPoint(T &rwl, const uint64_t sync_gen_num);
+ SyncPoint(uint64_t sync_gen_num, CephContext *cct);
~SyncPoint();
SyncPoint(const SyncPoint&) = delete;
SyncPoint &operator=(const SyncPoint&) = delete;
- std::ostream &format(std::ostream &os) const;
+
+private:
+ CephContext *m_cct;
friend std::ostream &operator<<(std::ostream &os,
- const SyncPoint &p) {
- return p.format(os);
- }
+ const SyncPoint &p);
};
} // namespace rwl
#include "common/ceph_context.h"
#include "include/Context.h"
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
#undef dout_prefix
#define dout_prefix *_dout << "librbd::cache::rwl::Types: " << this << " " \
<< __func__ << ": "
* convert between image and block extents here using a "block size"
* of 1.
*/
-const BlockExtent block_extent(const uint64_t offset_bytes, const uint64_t length_bytes)
+BlockExtent convert_to_block_extent(const uint64_t offset_bytes, const uint64_t length_bytes)
{
return BlockExtent(offset_bytes,
offset_bytes + length_bytes - 1);
}
-const BlockExtent WriteLogPmemEntry::block_extent() {
- return BlockExtent(librbd::cache::rwl::block_extent(image_offset_bytes, write_bytes));
+BlockExtent WriteLogPmemEntry::block_extent() {
+ return convert_to_block_extent(image_offset_bytes, write_bytes);
}
-bool WriteLogPmemEntry::is_sync_point() {
- return sync_point;
-}
-
-bool WriteLogPmemEntry::is_discard() {
- return discard;
-}
-
-bool WriteLogPmemEntry::is_writesame() {
- return writesame;
-}
-
-bool WriteLogPmemEntry::is_write() {
- /* Log entry is a basic write */
- return !is_sync_point() && !is_discard() && !is_writesame();
-}
-
-bool WriteLogPmemEntry::is_writer() {
- /* Log entry is any type that writes data */
- return is_write() || is_discard() || is_writesame();
-}
-
-const uint64_t WriteLogPmemEntry::get_offset_bytes() {
+uint64_t WriteLogPmemEntry::get_offset_bytes() {
return image_offset_bytes;
}
-const uint64_t WriteLogPmemEntry::get_write_bytes() {
+uint64_t WriteLogPmemEntry::get_write_bytes() {
return write_bytes;
}
};
template <typename ExtentsType>
-ExtentsSummary<ExtentsType>::ExtentsSummary(const ExtentsType &extents) {
- total_bytes = 0;
- first_image_byte = 0;
- last_image_byte = 0;
+ExtentsSummary<ExtentsType>::ExtentsSummary(const ExtentsType &extents)
+ : total_bytes(0), first_image_byte(0), last_image_byte(0)
+{
if (extents.empty()) return;
/* These extents refer to image offsets between first_image_byte
* and last_image_byte, inclusive, but we don't guarantee here
template <typename T>
std::ostream &operator<<(std::ostream &os,
- const ExtentsSummary<T> &s) {
+ const ExtentsSummary<T> &s) {
os << "total_bytes=" << s.total_bytes << ", "
<< "first_image_byte=" << s.first_image_byte << ", "
<< "last_image_byte=" << s.last_image_byte << "";
l_librbd_rwl_last,
};
+namespace librbd {
+namespace cache {
+namespace rwl {
+
+/* Limit work between sync points */
+const uint64_t MAX_WRITES_PER_SYNC_POINT = 256;
+
const uint32_t MIN_WRITE_ALLOC_SIZE = 512;
const uint32_t LOG_STATS_INTERVAL_SECONDS = 5;
const uint8_t RWL_POOL_VERSION = 1;
const uint64_t MAX_LOG_ENTRIES = (1024 * 1024);
-namespace librbd {
-namespace cache {
-namespace rwl {
-
-static const bool RWL_VERBOSE_LOGGING = false;
-
/* Defer a set of Contexts until destruct/exit. Used for deferring
* work on a given thread until a required lock is dropped. */
class DeferredContexts {
: image_offset_bytes(image_offset_bytes), write_bytes(write_bytes),
entry_valid(0), sync_point(0), sequenced(0), has_data(0), discard(0), writesame(0) {
}
- const BlockExtent block_extent();
- bool is_sync_point();
- bool is_discard();
- bool is_writesame();
- bool is_write();
- bool is_writer();
- const uint64_t get_offset_bytes();
- const uint64_t get_write_bytes();
+ BlockExtent block_extent();
+ uint64_t get_offset_bytes();
+ uint64_t get_write_bytes();
friend std::ostream& operator<<(std::ostream& os,
const WriteLogPmemEntry &entry);
};
utime_t allocation_lat;
};
+static inline io::Extent image_extent(const BlockExtent& block_extent) {
+ return io::Extent(block_extent.block_start,
+ block_extent.block_end - block_extent.block_start + 1);
+}
+
template <typename ExtentsType>
class ExtentsSummary {
public:
uint64_t total_bytes;
uint64_t first_image_byte;
uint64_t last_image_byte;
- ExtentsSummary(const ExtentsType &extents);
+ explicit ExtentsSummary(const ExtentsType &extents);
template <typename U>
friend std::ostream &operator<<(std::ostream &os,
const ExtentsSummary<U> &s);
- const BlockExtent block_extent() {
+ BlockExtent block_extent() {
return BlockExtent(first_image_byte, last_image_byte);
}
- const io::Extent image_extent(const BlockExtent& block_extent) {
- return io::Extent(block_extent.block_start,
- block_extent.block_end - block_extent.block_start + 1);
- }
- const io::Extent image_extent() {
+ io::Extent image_extent() {
return image_extent(block_extent());
}
};
-} // namespace rwl
-} // namespace cache
-} // namespace librbd
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
#endif // CEPH_LIBRBD_CACHE_RWL_TYPES_H
void expect_context_complete(MockContextRWL& mock_context, int r) {
EXPECT_CALL(mock_context, complete(r))
.WillRepeatedly(Invoke([&mock_context](int r) {
- mock_context.do_complete(r);
- }));
+ mock_context.do_complete(r);
+ }));
}
void expect_metadata_set(MockImageCtx& mock_image_ctx) {
ASSERT_EQ(0, finish_ctx2.wait());
}
+TEST_F(TestMockCacheReplicatedWriteLog, aio_write) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockReplicatedWriteLog rwl(mock_image_ctx, get_cache_state(mock_image_ctx));
+
+ MockContextRWL finish_ctx1;
+ expect_op_work_queue(mock_image_ctx);
+ expect_metadata_set(mock_image_ctx);
+ expect_context_complete(finish_ctx1, 0);
+ rwl.init(&finish_ctx1);
+ ASSERT_EQ(0, finish_ctx1.wait());
+
+ MockContextRWL finish_ctx2;
+ expect_context_complete(finish_ctx2, 0);
+ Extents image_extents{{0, 4096}};
+ bufferlist bl;
+ bl.append(std::string(4096, '1'));
+ int fadvise_flags = 0;
+ rwl.aio_write(std::move(image_extents), std::move(bl), fadvise_flags, &finish_ctx2);
+ ASSERT_EQ(0, finish_ctx2.wait());
+
+ MockContextRWL finish_ctx3;
+ expect_context_complete(finish_ctx3, 0);
+ rwl.shut_down(&finish_ctx3);
+ ASSERT_EQ(0, finish_ctx3.wait());
+}
+
} // namespace cache
} // namespace librbd