typedef ReplicatedWriteLog<ImageCtx>::Extent Extent;
typedef ReplicatedWriteLog<ImageCtx>::Extents Extents;
+const unsigned long int ops_appended_together = MAX_ALLOC_PER_TRANSACTION;
+
template <typename I>
ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::ImageCacheState<I>* cache_state)
: m_cache_state(cache_state),
m_rwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_rwl)),
m_image_ctx(image_ctx),
m_log_pool_config_size(DEFAULT_POOL_SIZE),
- m_image_writeback(image_ctx),
+ m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+ m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))),
+ m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::ReplicatedWriteLog::m_log_append_lock", this))),
m_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::ReplicatedWriteLog::m_lock", this))),
+ m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::ReplicatedWriteLog::m_blockguard_lock", this))),
+ m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::ReplicatedWriteLog::m_entry_bl_lock", this))),
m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl",
4,
""),
m_log_pool_config_size,
(S_IWUSR | S_IRUSR))) == NULL) {
lderr(cct) << "failed to create pool (" << m_log_pool_name << ")"
- << pmemobj_errormsg() << dendl;
+ << pmemobj_errormsg() << dendl;
m_cache_state->present = false;
m_cache_state->clean = true;
m_cache_state->empty = true;
// TODO: Will init sync point, this will be covered in later PR.
// init_flush_new_sync_point(later);
+ ++m_current_sync_gen;
+ auto new_sync_point = std::make_shared<SyncPointT>(*this, m_current_sync_gen);
+ m_current_sync_point = new_sync_point;
m_initialized = true;
// Start the thread
bufferlist&& bl,
int fadvise_flags,
Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "aio_write" << dendl;
+ }
+ utime_t now = ceph_clock_now();
+ m_perfcounter->inc(l_librbd_rwl_wr_req, 1);
+
+ ceph_assert(m_initialized);
+ {
+ std::shared_lock image_locker(m_image_ctx.image_lock);
+ if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+ on_finish->complete(-EROFS);
+ return;
+ }
+ }
+
+ if (ExtentsSummary<Extents>(image_extents).total_bytes == 0) {
+ on_finish->complete(0);
+ return;
+ }
+
+ auto *write_req =
+ new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, on_finish);
+ m_perfcounter->inc(l_librbd_rwl_wr_bytes, write_req->image_extents_summary.total_bytes);
+
+ /* The lambda below will be called when the block guard for all
+ * blocks affected by this write is obtained */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, write_req](GuardedRequestFunctionContext &guard_ctx) {
+ write_req->blockguard_acquired(guard_ctx);
+ alloc_and_dispatch_io_req(write_req);
+ });
+
+ detain_guarded_request(GuardedRequest(write_req->image_extents_summary.block_extent(),
+ guarded_ctx));
}
template <typename I>
Context *on_finish) {
}
+template <typename I>
+void ReplicatedWriteLog<I>::wake_up() {
+ //TODO: handle the task to flush data from cache device to OSD
+}
+
template <typename I>
void ReplicatedWriteLog<I>::flush(Context *on_finish) {
}
void ReplicatedWriteLog<I>::invalidate(Context *on_finish) {
}
+template <typename I>
+BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
+{
+ CephContext *cct = m_image_ctx.cct;
+ BlockGuardCell *cell;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << dendl;
+ }
+
+ int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
+ ceph_assert(r>=0);
+ if (r > 0) {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
+ << "req=" << req << dendl;
+ }
+ return nullptr;
+ }
+
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
+ }
+ return cell;
+}
+
+template <typename I>
+BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(GuardedRequest &req)
+{
+ BlockGuardCell *cell = nullptr;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+ }
+
+ if (m_barrier_in_progress) {
+ req.guard_ctx->m_state.queued = true;
+ m_awaiting_barrier.push_back(req);
+ } else {
+ bool barrier = req.guard_ctx->m_state.barrier;
+ if (barrier) {
+ m_barrier_in_progress = true;
+ req.guard_ctx->m_state.current_barrier = true;
+ }
+ cell = detain_guarded_request_helper(req);
+ if (barrier) {
+ /* Only non-null if the barrier acquires the guard now */
+ m_barrier_cell = cell;
+ }
+ }
+
+ return cell;
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::detain_guarded_request(GuardedRequest &&req)
+{
+ BlockGuardCell *cell = nullptr;
+
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+ }
+ {
+ std::lock_guard locker(m_blockguard_lock);
+ cell = detain_guarded_request_barrier_helper(req);
+ }
+ if (cell) {
+ req.guard_ctx->m_cell = cell;
+ req.guard_ctx->complete(0);
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
+{
+ CephContext *cct = m_image_ctx.cct;
+ WriteLogGuard::BlockOperations block_reqs;
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "released_cell=" << released_cell << dendl;
+ }
+
+ {
+ std::lock_guard locker(m_blockguard_lock);
+ m_write_log_guard.release(released_cell, &block_reqs);
+
+ for (auto &req : block_reqs) {
+ req.guard_ctx->m_state.detained = true;
+ BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
+ if (detained_cell) {
+ if (req.guard_ctx->m_state.current_barrier) {
+ /* The current barrier is acquiring the block guard, so now we know its cell */
+ m_barrier_cell = detained_cell;
+ /* detained_cell could be == released_cell here */
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
+ }
+ }
+ req.guard_ctx->m_cell = detained_cell;
+ m_work_queue.queue(req.guard_ctx);
+ }
+ }
+
+ if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
+ }
+ /* The released cell is the current barrier request */
+ m_barrier_in_progress = false;
+ m_barrier_cell = nullptr;
+ /* Move waiting requests into the blockguard. Stop if there's another barrier */
+ while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
+ auto &req = m_awaiting_barrier.front();
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
+ }
+ BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
+ if (detained_cell) {
+ req.guard_ctx->m_cell = detained_cell;
+ m_work_queue.queue(req.guard_ctx);
+ }
+ m_awaiting_barrier.pop_front();
+ }
+ }
+ }
+
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(cct, 20) << "exit" << dendl;
+ }
+}
+
+/*
+ * Performs the log event append operation for all of the scheduled
+ * events.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::append_scheduled_ops(void)
+{
+ GenericLogOperationsT ops;
+ int append_result = 0;
+ bool ops_remain = false;
+ bool appending = false; /* true if we set m_appending */
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+ }
+ do {
+ ops.clear();
+
+ {
+ std::lock_guard locker(m_lock);
+ if (!appending && m_appending) {
+ /* Another thread is appending */
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
+ }
+ return;
+ }
+ if (m_ops_to_append.size()) {
+ appending = true;
+ m_appending = true;
+ auto last_in_batch = m_ops_to_append.begin();
+ unsigned int ops_to_append = m_ops_to_append.size();
+ if (ops_to_append > ops_appended_together) {
+ ops_to_append = ops_appended_together;
+ }
+ std::advance(last_in_batch, ops_to_append);
+ ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
+ ops_remain = true; /* Always check again before leaving */
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " << m_ops_to_append.size() << " remain" << dendl;
+ }
+ } else {
+ ops_remain = false;
+ if (appending) {
+ appending = false;
+ m_appending = false;
+ }
+ }
+ }
+
+ if (ops.size()) {
+ std::lock_guard locker(m_log_append_lock);
+ alloc_op_log_entries(ops);
+ append_result = append_op_log_entries(ops);
+ }
+
+ int num_ops = ops.size();
+ if (num_ops) {
+ /* New entries may be flushable. Completion will wake up flusher. */
+ complete_op_log_entries(std::move(ops), append_result);
+ }
+ } while (ops_remain);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::enlist_op_appender()
+{
+ m_async_append_ops++;
+ m_async_op_tracker.start_op();
+ Context *append_ctx = new LambdaContext([this](int r) {
+ append_scheduled_ops();
+ m_async_append_ops--;
+ m_async_op_tracker.finish_op();
+ });
+ m_work_queue.queue(append_ctx);
+}
+
+/*
+ * Takes custody of ops. They'll all get their log entries appended,
+ * and have their on_write_persist contexts completed once they and
+ * all prior log entries are persisted everywhere.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsT &ops)
+{
+ bool need_finisher;
+ GenericLogOperationsVectorT appending;
+
+ std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
+ {
+ std::lock_guard locker(m_lock);
+
+ need_finisher = m_ops_to_append.empty() && !m_appending;
+ m_ops_to_append.splice(m_ops_to_append.end(), ops);
+ }
+
+ if (need_finisher) {
+ enlist_op_appender();
+ }
+
+ for (auto &op : appending) {
+ op->appending();
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsVectorT &ops)
+{
+ GenericLogOperationsT to_append(ops.begin(), ops.end());
+
+ schedule_append(to_append);
+}
+
+const unsigned long int ops_flushed_together = 4;
+/*
+ * Performs the pmem buffer flush on all scheduled ops, then schedules
+ * the log event append operation for all of them.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
+{
+ GenericLogOperationsT ops;
+ bool ops_remain = false;
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+ }
+ do {
+ {
+ ops.clear();
+ std::lock_guard locker(m_lock);
+ if (m_ops_to_flush.size()) {
+ auto last_in_batch = m_ops_to_flush.begin();
+ unsigned int ops_to_flush = m_ops_to_flush.size();
+ if (ops_to_flush > ops_flushed_together) {
+ ops_to_flush = ops_flushed_together;
+ }
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
+ }
+ std::advance(last_in_batch, ops_to_flush);
+ ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
+ ops_remain = !m_ops_to_flush.empty();
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " << m_ops_to_flush.size() << " remain" << dendl;
+ }
+ } else {
+ ops_remain = false;
+ }
+ }
+ if (ops_remain) {
+ enlist_op_flusher();
+ }
+
+ /* Ops subsequently scheduled for flush may finish before these,
+ * which is fine. We're unconcerned with completion order until we
+ * get to the log message append step. */
+ if (ops.size()) {
+ flush_pmem_buffer(ops);
+ schedule_append(ops);
+ }
+ } while (ops_remain);
+ append_scheduled_ops();
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::enlist_op_flusher()
+{
+ m_async_flush_ops++;
+ m_async_op_tracker.start_op();
+ Context *flush_ctx = new LambdaContext([this](int r) {
+ flush_then_append_scheduled_ops();
+ m_async_flush_ops--;
+ m_async_op_tracker.finish_op();
+ });
+ m_work_queue.queue(flush_ctx);
+}
+
+/*
+ * Takes custody of ops. They'll all get their pmem blocks flushed,
+ * then get their log entries appended.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVectorT &ops)
+{
+ GenericLogOperationsT to_flush(ops.begin(), ops.end());
+ bool need_finisher;
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+ }
+ {
+ std::lock_guard locker(m_lock);
+
+ need_finisher = m_ops_to_flush.empty();
+ m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
+ }
+
+ if (need_finisher) {
+ enlist_op_flusher();
+ }
+}
+
+/*
+ * Flush the pmem regions for the data blocks of a set of operations
+ *
+ * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
+ */
+template <typename I>
+template <typename V>
+void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
+{
+ for (auto &operation : ops) {
+ if (operation->is_write() || operation->is_writesame()) {
+ operation->buf_persist_time = ceph_clock_now();
+ auto write_entry = operation->get_write_log_entry();
+
+ pmemobj_flush(m_log_pool, write_entry->pmem_buffer, write_entry->write_bytes());
+ }
+ }
+
+ /* Drain once for all */
+ pmemobj_drain(m_log_pool);
+
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (operation->is_write() || operation->is_writesame()) {
+ operation->buf_persist_comp_time = now;
+ } else {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+ }
+ }
+ }
+}
+
+/*
+ * Allocate the (already reserved) write log entries for a set of operations.
+ *
+ * Locking:
+ * Acquires m_lock
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperationsT &ops)
+{
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
+
+ /* Allocate the (already reserved) log entries */
+ std::lock_guard locker(m_lock);
+
+ for (auto &operation : ops) {
+ uint32_t entry_index = m_first_free_entry;
+ m_first_free_entry = (m_first_free_entry + 1) % m_total_log_entries;
+ auto &log_entry = operation->get_log_entry();
+ log_entry->log_entry_index = entry_index;
+ log_entry->ram_entry.entry_index = entry_index;
+ log_entry->pmem_entry = &pmem_log_entries[entry_index];
+ log_entry->ram_entry.entry_valid = 1;
+ m_log_entries.push_back(log_entry);
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
+ }
+ }
+}
+
+/*
+ * Flush the persistent write log entries set of ops. The entries must
+ * be contiguous in persistent memory.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVectorT &ops)
+{
+ if (ops.empty()) {
+ return;
+ }
+
+ if (ops.size() > 1) {
+ ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
+ }
+
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
+ << "start address=" << ops.front()->get_log_entry()->pmem_entry << " "
+ << "bytes=" << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+ << dendl;
+ }
+ pmemobj_flush(m_log_pool,
+ ops.front()->get_log_entry()->pmem_entry,
+ ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
+}
+
+/*
+ * Write and persist the (already allocated) write log entries and
+ * data buffer allocations for a set of ops. The data buffer for each
+ * of these must already have been persisted to its reserved area.
+ */
+template <typename I>
+int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
+{
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogOperationsVectorT entries_to_flush;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ int ret = 0;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
+
+ if (ops.empty()) {
+ return 0;
+ }
+ entries_to_flush.reserve(ops_appended_together);
+
+ /* Write log entries to ring and persist */
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (!entries_to_flush.empty()) {
+ /* Flush these and reset the list if the current entry wraps to the
+ * tail of the ring */
+ if (entries_to_flush.back()->get_log_entry()->log_entry_index >
+ operation->get_log_entry()->log_entry_index) {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+ << "operation=[" << *operation << "]" << dendl;
+ }
+ flush_op_log_entries(entries_to_flush);
+ entries_to_flush.clear();
+ now = ceph_clock_now();
+ }
+ }
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "from " << &operation->get_log_entry()->ram_entry << " "
+ << "to " << operation->get_log_entry()->pmem_entry << " "
+ << "operation=[" << *operation << "]" << dendl;
+ }
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 05) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "operation=[" << *operation << "]" << dendl;
+ }
+ operation->log_append_time = now;
+ *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry << "]" << dendl;
+ }
+ entries_to_flush.push_back(operation);
+ }
+ flush_op_log_entries(entries_to_flush);
+
+ /* Drain once for all */
+ pmemobj_drain(m_log_pool);
+
+ /*
+ * Atomically advance the log head pointer and publish the
+ * allocations for all the data buffers they refer to.
+ */
+ utime_t tx_start = ceph_clock_now();
+ TX_BEGIN(m_log_pool) {
+ D_RW(pool_root)->first_free_entry = m_first_free_entry;
+ for (auto &operation : ops) {
+ if (operation->is_write() || operation->is_writesame()) {
+ auto write_op = (std::shared_ptr<WriteLogOperationT>&) operation;
+ pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
+ } else {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+ }
+ }
+ }
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(cct) << "failed to commit " << ops.size() << " log entries (" << m_log_pool_name << ")" << dendl;
+ ceph_assert(false);
+ ret = -EIO;
+ } TX_FINALLY {
+ } TX_END;
+
+ utime_t tx_end = ceph_clock_now();
+ m_perfcounter->tinc(l_librbd_rwl_append_tx_t, tx_end - tx_start);
+ m_perfcounter->hinc(l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+ for (auto &operation : ops) {
+ operation->log_append_comp_time = tx_end;
+ }
+
+ return ret;
+}
+
+/*
+ * Complete a set of write ops with the result of append_op_entries.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperationsT &&ops, const int result)
+{
+ GenericLogEntries dirty_entries;
+ int published_reserves = 0;
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
+ }
+ for (auto &op : ops) {
+ utime_t now = ceph_clock_now();
+ auto log_entry = op->get_log_entry();
+ log_entry->completed = true;
+ if (op->is_writing_op()) {
+ op->get_gen_write_op()->sync_point->log_entry->writes_completed++;
+ dirty_entries.push_back(log_entry);
+ }
+ if (op->is_write() || op->is_writesame()) {
+ published_reserves++;
+ }
+ if (op->is_discard()) {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << __func__ << ": completing discard" << dendl;
+ }
+ }
+ op->complete(result);
+ if (op->is_write()) {
+ m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, op->buf_persist_time - op->dispatch_time);
+ }
+ m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, op->log_append_time - op->dispatch_time);
+ m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
+ m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ if (op->is_write()) {
+ utime_t buf_lat = op->buf_persist_comp_time - op->buf_persist_time;
+ m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
+ m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, op->log_append_time - op->buf_persist_time);
+ }
+ utime_t app_lat = op->log_append_comp_time - op->log_append_time;
+ m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat);
+ m_perfcounter->hinc(l_librbd_rwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_cmp_t, now - op->log_append_time);
+ }
+
+ {
+ std::lock_guard locker(m_lock);
+ m_unpublished_reserves -= published_reserves;
+ m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
+
+ /* New entries may be flushable */
+ wake_up();
+ }
+}
+
+/**
+ * Dispatch as many deferred writes as possible
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::dispatch_deferred_writes(void)
+{
+ C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */
+ C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
+ bool allocated = false; /* front_req allocate succeeded */
+ bool cleared_dispatching_flag = false;
+
+ /* If we can't become the dispatcher, we'll exit */
+ {
+ std::lock_guard locker(m_lock);
+ if (m_dispatching_deferred_ops ||
+ !m_deferred_ios.size()) {
+ return;
+ }
+ m_dispatching_deferred_ops = true;
+ }
+
+ /* There are ops to dispatch, and this should be the only thread dispatching them */
+ {
+ std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
+ do {
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dispatching_deferred_ops);
+ if (allocated) {
+ /* On the 2..n-1 th time we get m_lock, front_req->alloc_resources() will
+ * have succeeded, and we'll need to pop it off the deferred ops list
+ * here. */
+ ceph_assert(front_req);
+ ceph_assert(!allocated_req);
+ m_deferred_ios.pop_front();
+ allocated_req = front_req;
+ front_req = nullptr;
+ allocated = false;
+ }
+ ceph_assert(!allocated);
+ if (!allocated && front_req) {
+ /* front_req->alloc_resources() failed on the last iteration. We'll stop dispatching. */
+ front_req = nullptr;
+ ceph_assert(!cleared_dispatching_flag);
+ m_dispatching_deferred_ops = false;
+ cleared_dispatching_flag = true;
+ } else {
+ ceph_assert(!front_req);
+ if (m_deferred_ios.size()) {
+ /* New allocation candidate */
+ front_req = m_deferred_ios.front();
+ } else {
+ ceph_assert(!cleared_dispatching_flag);
+ m_dispatching_deferred_ops = false;
+ cleared_dispatching_flag = true;
+ }
+ }
+ }
+ /* Try allocating for front_req before we decide what to do with allocated_req
+ * (if any) */
+ if (front_req) {
+ ceph_assert(!cleared_dispatching_flag);
+ allocated = front_req->alloc_resources();
+ }
+ if (allocated_req && front_req && allocated) {
+ /* Push dispatch of the first allocated req to a wq */
+ m_work_queue.queue(new LambdaContext(
+ [this, allocated_req](int r) {
+ allocated_req->dispatch();
+ }), 0);
+ allocated_req = nullptr;
+ }
+ ceph_assert(!(allocated_req && front_req && allocated));
+
+ /* Continue while we're still considering the front of the deferred ops list */
+ } while (front_req);
+ ceph_assert(!allocated);
+ }
+ ceph_assert(cleared_dispatching_flag);
+
+ /* If any deferred requests were allocated, the last one will still be in allocated_req */
+ if (allocated_req) {
+ allocated_req->dispatch();
+ }
+}
+
+/**
+ * Returns the lanes used by this write, and attempts to dispatch the next
+ * deferred write
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::release_write_lanes(C_WriteRequestT *write_req)
+{
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(write_req->resources.allocated);
+ m_free_lanes += write_req->image_extents.size();
+ write_req->resources.allocated = false;
+ }
+ dispatch_deferred_writes();
+}
+
+/**
+ * Attempts to allocate log resources for a write. Write is dispatched if
+ * resources are available, or queued if they aren't.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
+{
+ bool dispatch_here = false;
+
+ {
+ /* If there are already deferred writes, queue behind them for resources */
+ {
+ std::lock_guard locker(m_lock);
+ dispatch_here = m_deferred_ios.empty();
+ }
+ if (dispatch_here) {
+ dispatch_here = req->alloc_resources();
+ }
+ if (dispatch_here) {
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
+ }
+ req->dispatch();
+ } else {
+ req->deferred();
+ {
+ std::lock_guard locker(m_lock);
+ m_deferred_ios.push_back(req);
+ }
+ if (RWL_VERBOSE_LOGGING) {
+ ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
+ }
+ dispatch_deferred_writes();
+ }
+ }
+}
} // namespace cache
} // namespace librbd
+#ifndef TEST_F
template class librbd::cache::ReplicatedWriteLog<librbd::ImageCtx>;
template class librbd::cache::ImageCache<librbd::ImageCtx>;
-
+#endif
#include "librbd/Utils.h"
#include "librbd/BlockGuard.h"
#include "librbd/cache/Types.h"
+#include "librbd/cache/rwl/LogOperation.h"
+#include "librbd/cache/rwl/Request.h"
#include <functional>
#include <list>
/**** Write log entries end ****/
+typedef librbd::BlockGuard<GuardedRequest> WriteLogGuard;
+
+template <typename T>
+struct C_GuardedBlockIORequest;
class DeferredContexts;
template <typename> class ImageCacheState;
+
+template <typename T>
+struct C_BlockIORequest;
+
+template <typename T>
+struct C_WriteRequest;
+
+template <typename T>
+using GenericLogOperations = std::list<GenericLogOperationSharedPtr<T>>;
+
} // namespace rwl
void flush(Context *on_finish) override;
private:
+ using This = ReplicatedWriteLog<ImageCtxT>;
+ using SyncPointT = rwl::SyncPoint<This>;
+ using GenericLogOperationT = rwl::GenericLogOperation<This>;
+ using GenericLogOperationSharedPtrT = rwl::GenericLogOperationSharedPtr<This>;
+ using WriteLogOperationT = rwl::WriteLogOperation<This>;
+ using WriteLogOperationSetT = rwl::WriteLogOperationSet<This>;
+ using SyncPointLogOperationT = rwl::SyncPointLogOperation<This>;
+ using GenericLogOperationsT = rwl::GenericLogOperations<This>;
+ using GenericLogOperationsVectorT = rwl::GenericLogOperationsVector<This>;
+ using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
+ using C_WriteRequestT = rwl::C_WriteRequest<This>;
+
+ friend class rwl::SyncPoint<This>;
+ friend class rwl::GenericLogOperation<This>;
+ friend class rwl::GeneralWriteLogOperation<This>;
+ friend class rwl::WriteLogOperation<This>;
+ friend class rwl::WriteLogOperationSet<This>;
+ friend class rwl::SyncPointLogOperation<This>;
+ friend struct rwl::C_GuardedBlockIORequest<This>;
+ friend struct rwl::C_BlockIORequest<This>;
+ friend struct rwl::C_WriteRequest<This>;
+ typedef std::list<rwl::C_WriteRequest<This> *> C_WriteRequests;
+ typedef std::list<rwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+ BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req);
+ BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req);
+ void detain_guarded_request(rwl::GuardedRequest &&req);
+ void release_guarded_request(BlockGuardCell *cell);
+
librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
std::atomic<bool> m_initialized = {false};
uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */
uint64_t m_bytes_allocated_cap = 0;
- ImageWriteback<ImageCtxT> m_image_writeback;
+ utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */
+ std::atomic<bool> m_alloc_failed_since_retire = {false};
+ ImageWriteback<ImageCtxT> m_image_writeback;
+ rwl::WriteLogGuard m_write_log_guard;
/*
* When m_first_free_entry == m_first_valid_entry, the log is
* empty. There is always at least one free entry, which can't be
/* Starts at 0 for a new write log. Incremented on every flush. */
uint64_t m_current_sync_gen = 0;
+ std::shared_ptr<SyncPointT> m_current_sync_point = nullptr;
/* Starts at 0 on each sync gen increase. Incremented before applied
to an operation */
uint64_t m_last_op_sequence_num = 0;
/* All writes bearing this and all prior sync gen numbers are flushed */
uint64_t m_flushed_sync_gen = 0;
+ bool m_persist_on_write_until_flush = true;
+ /* True if it's safe to complete a user request in persist-on-flush
+ * mode before the write is persisted. This is only true if there is
+ * a local copy of the write data, or if local write failure always
+ * causes local node failure. */
+ bool m_persist_on_flush_early_user_comp = false; /* Assume local write failure does not cause node failure */
+ bool m_persist_on_flush = false; /* If false, persist each write before completion */
+ bool m_flush_seen = false;
+
+ AsyncOpTracker m_async_op_tracker;
+ /* Debug counters for the places m_async_op_tracker is used */
+ std::atomic<int> m_async_flush_ops = {0};
+ std::atomic<int> m_async_append_ops = {0};
+ std::atomic<int> m_async_complete_ops = {0};
+
/* Acquire locks in order declared here */
+ /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
+ mutable ceph::mutex m_deferred_dispatch_lock;
+ /* Hold m_log_append_lock while appending or retiring log entries. */
+ mutable ceph::mutex m_log_append_lock;
+
/* Used for most synchronization */
mutable ceph::mutex m_lock;
+ /* Used in release/detain to make BlockGuard preserve submission order */
+ mutable ceph::mutex m_blockguard_lock;
+ /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */
+ mutable ceph::mutex m_entry_bl_lock;
+
+ /* Use m_blockguard_lock for the following 3 things */
+ rwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
+ bool m_barrier_in_progress = false;
+ BlockGuardCell *m_barrier_cell = nullptr;
+
+ bool m_appending = false;
+ bool m_dispatching_deferred_ops = false;
- librbd::cache::Contexts m_flush_complete_contexts;
+ GenericLogOperationsT m_ops_to_flush; /* Write ops needing flush in local log */
+ GenericLogOperationsT m_ops_to_append; /* Write ops needing event append in local log */
/* New entries are at the back. Oldest at the front */
rwl::GenericLogEntries m_log_entries;
rwl::GenericLogEntries m_dirty_log_entries;
+ /* Writes that have left the block guard, but are waiting for resources */
+ C_BlockIORequests m_deferred_ios;
+ /* Throttle writes concurrently allocating & replicating */
+ unsigned int m_free_lanes = MAX_CONCURRENT_WRITES;
+ unsigned int m_unpublished_reserves = 0;
PerfCounters *m_perfcounter = nullptr;
/* Initialized from config, then set false during shutdown */
void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
void update_image_cache_state(Context *on_finish);
+ void start_workers();
+ void wake_up();
+
+ void dispatch_deferred_writes(void);
+ void release_write_lanes(C_WriteRequestT *write_req);
+ void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
+ void append_scheduled_ops(void);
+ void enlist_op_appender();
+ void schedule_append(GenericLogOperationsVectorT &ops);
+ void schedule_append(GenericLogOperationsT &ops);
+ void flush_then_append_scheduled_ops(void);
+ void enlist_op_flusher();
+ void schedule_flush_and_append(GenericLogOperationsVectorT &ops);
+ template <typename V>
+ void flush_pmem_buffer(V& ops);
+ void alloc_op_log_entries(GenericLogOperationsT &ops);
+ void flush_op_log_entries(GenericLogOperationsVectorT &ops);
+ int append_op_log_entries(GenericLogOperationsT &ops);
+ void complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
+ void schedule_complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
};
} // namespace cache