From ed3983edb96144c84a2cf18f9f49a6df35635af4 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Thu, 21 Nov 2019 16:11:01 +0800 Subject: [PATCH] librbd: aio_write to flush data into cache device This part calls WriteQuest to flush data into cache device. Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/cache/ReplicatedWriteLog.cc | 780 +++++++++++++++++- src/librbd/cache/ReplicatedWriteLog.h | 111 ++- src/librbd/cache/rwl/LogEntry.cc | 6 +- src/librbd/cache/rwl/LogEntry.h | 49 +- src/librbd/cache/rwl/LogOperation.cc | 17 +- src/librbd/cache/rwl/LogOperation.h | 76 +- src/librbd/cache/rwl/Request.cc | 52 +- src/librbd/cache/rwl/Request.h | 41 + src/librbd/cache/rwl/SyncPoint.cc | 2 +- src/librbd/cache/rwl/Types.cc | 2 +- src/librbd/cache/rwl/Types.h | 8 +- .../cache/test_mock_ReplicatedWriteLog.cc | 12 +- 12 files changed, 1098 insertions(+), 58 deletions(-) diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index 5415fa569d0..343790a41c4 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -33,15 +33,25 @@ using namespace librbd::cache::rwl; typedef ReplicatedWriteLog::Extent Extent; typedef ReplicatedWriteLog::Extents Extents; +const unsigned long int ops_appended_together = MAX_ALLOC_PER_TRANSACTION; + template ReplicatedWriteLog::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::ImageCacheState* cache_state) : m_cache_state(cache_state), m_rwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_rwl)), m_image_ctx(image_ctx), m_log_pool_config_size(DEFAULT_POOL_SIZE), - m_image_writeback(image_ctx), + m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct), + m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))), + m_log_append_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::ReplicatedWriteLog::m_log_append_lock", this))), m_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::ReplicatedWriteLog::m_lock", this))), + m_blockguard_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::ReplicatedWriteLog::m_blockguard_lock", this))), + m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::ReplicatedWriteLog::m_entry_bl_lock", this))), m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl", 4, ""), @@ -337,7 +347,7 @@ void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later m_log_pool_config_size, (S_IWUSR | S_IRUSR))) == NULL) { lderr(cct) << "failed to create pool (" << m_log_pool_name << ")" - << pmemobj_errormsg() << dendl; + << pmemobj_errormsg() << dendl; m_cache_state->present = false; m_cache_state->clean = true; m_cache_state->empty = true; @@ -407,6 +417,9 @@ void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later // TODO: Will init sync point, this will be covered in later PR. // init_flush_new_sync_point(later); + ++m_current_sync_gen; + auto new_sync_point = std::make_shared(*this, m_current_sync_gen); + m_current_sync_point = new_sync_point; m_initialized = true; // Start the thread @@ -469,6 +482,41 @@ void ReplicatedWriteLog::aio_write(Extents &&image_extents, bufferlist&& bl, int fadvise_flags, Context *on_finish) { + CephContext *cct = m_image_ctx.cct; + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "aio_write" << dendl; + } + utime_t now = ceph_clock_now(); + m_perfcounter->inc(l_librbd_rwl_wr_req, 1); + + ceph_assert(m_initialized); + { + std::shared_lock image_locker(m_image_ctx.image_lock); + if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { + on_finish->complete(-EROFS); + return; + } + } + + if (ExtentsSummary(image_extents).total_bytes == 0) { + on_finish->complete(0); + return; + } + + auto *write_req = + new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, on_finish); + m_perfcounter->inc(l_librbd_rwl_wr_bytes, write_req->image_extents_summary.total_bytes); + + /* The lambda below will be called when the block guard for all + * blocks affected by this write is obtained */ + GuardedRequestFunctionContext *guarded_ctx = + new GuardedRequestFunctionContext([this, write_req](GuardedRequestFunctionContext &guard_ctx) { + write_req->blockguard_acquired(guard_ctx); + alloc_and_dispatch_io_req(write_req); + }); + + detain_guarded_request(GuardedRequest(write_req->image_extents_summary.block_extent(), + guarded_ctx)); } template @@ -495,6 +543,11 @@ void ReplicatedWriteLog::aio_compare_and_write(Extents &&image_extents, Context *on_finish) { } +template +void ReplicatedWriteLog::wake_up() { + //TODO: handle the task to flush data from cache device to OSD +} + template void ReplicatedWriteLog::flush(Context *on_finish) { } @@ -503,9 +556,730 @@ template void ReplicatedWriteLog::invalidate(Context *on_finish) { } +template +BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_helper(GuardedRequest &req) +{ + CephContext *cct = m_image_ctx.cct; + BlockGuardCell *cell; + + ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << dendl; + } + + int r = m_write_log_guard.detain(req.block_extent, &req, &cell); + ceph_assert(r>=0); + if (r > 0) { + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "detaining guarded request due to in-flight requests: " + << "req=" << req << dendl; + } + return nullptr; + } + + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "in-flight request cell: " << cell << dendl; + } + return cell; +} + +template +BlockGuardCell* ReplicatedWriteLog::detain_guarded_request_barrier_helper(GuardedRequest &req) +{ + BlockGuardCell *cell = nullptr; + + ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << dendl; + } + + if (m_barrier_in_progress) { + req.guard_ctx->m_state.queued = true; + m_awaiting_barrier.push_back(req); + } else { + bool barrier = req.guard_ctx->m_state.barrier; + if (barrier) { + m_barrier_in_progress = true; + req.guard_ctx->m_state.current_barrier = true; + } + cell = detain_guarded_request_helper(req); + if (barrier) { + /* Only non-null if the barrier acquires the guard now */ + m_barrier_cell = cell; + } + } + + return cell; +} + +template +void ReplicatedWriteLog::detain_guarded_request(GuardedRequest &&req) +{ + BlockGuardCell *cell = nullptr; + + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << dendl; + } + { + std::lock_guard locker(m_blockguard_lock); + cell = detain_guarded_request_barrier_helper(req); + } + if (cell) { + req.guard_ctx->m_cell = cell; + req.guard_ctx->complete(0); + } +} + +template +void ReplicatedWriteLog::release_guarded_request(BlockGuardCell *released_cell) +{ + CephContext *cct = m_image_ctx.cct; + WriteLogGuard::BlockOperations block_reqs; + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "released_cell=" << released_cell << dendl; + } + + { + std::lock_guard locker(m_blockguard_lock); + m_write_log_guard.release(released_cell, &block_reqs); + + for (auto &req : block_reqs) { + req.guard_ctx->m_state.detained = true; + BlockGuardCell *detained_cell = detain_guarded_request_helper(req); + if (detained_cell) { + if (req.guard_ctx->m_state.current_barrier) { + /* The current barrier is acquiring the block guard, so now we know its cell */ + m_barrier_cell = detained_cell; + /* detained_cell could be == released_cell here */ + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; + } + } + req.guard_ctx->m_cell = detained_cell; + m_work_queue.queue(req.guard_ctx); + } + } + + if (m_barrier_in_progress && (released_cell == m_barrier_cell)) { + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; + } + /* The released cell is the current barrier request */ + m_barrier_in_progress = false; + m_barrier_cell = nullptr; + /* Move waiting requests into the blockguard. Stop if there's another barrier */ + while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) { + auto &req = m_awaiting_barrier.front(); + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; + } + BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req); + if (detained_cell) { + req.guard_ctx->m_cell = detained_cell; + m_work_queue.queue(req.guard_ctx); + } + m_awaiting_barrier.pop_front(); + } + } + } + + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "exit" << dendl; + } +} + +/* + * Performs the log event append operation for all of the scheduled + * events. + */ +template +void ReplicatedWriteLog::append_scheduled_ops(void) +{ + GenericLogOperationsT ops; + int append_result = 0; + bool ops_remain = false; + bool appending = false; /* true if we set m_appending */ + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << dendl; + } + do { + ops.clear(); + + { + std::lock_guard locker(m_lock); + if (!appending && m_appending) { + /* Another thread is appending */ + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; + } + return; + } + if (m_ops_to_append.size()) { + appending = true; + m_appending = true; + auto last_in_batch = m_ops_to_append.begin(); + unsigned int ops_to_append = m_ops_to_append.size(); + if (ops_to_append > ops_appended_together) { + ops_to_append = ops_appended_together; + } + std::advance(last_in_batch, ops_to_append); + ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); + ops_remain = true; /* Always check again before leaving */ + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " << m_ops_to_append.size() << " remain" << dendl; + } + } else { + ops_remain = false; + if (appending) { + appending = false; + m_appending = false; + } + } + } + + if (ops.size()) { + std::lock_guard locker(m_log_append_lock); + alloc_op_log_entries(ops); + append_result = append_op_log_entries(ops); + } + + int num_ops = ops.size(); + if (num_ops) { + /* New entries may be flushable. Completion will wake up flusher. */ + complete_op_log_entries(std::move(ops), append_result); + } + } while (ops_remain); +} + +template +void ReplicatedWriteLog::enlist_op_appender() +{ + m_async_append_ops++; + m_async_op_tracker.start_op(); + Context *append_ctx = new LambdaContext([this](int r) { + append_scheduled_ops(); + m_async_append_ops--; + m_async_op_tracker.finish_op(); + }); + m_work_queue.queue(append_ctx); +} + +/* + * Takes custody of ops. They'll all get their log entries appended, + * and have their on_write_persist contexts completed once they and + * all prior log entries are persisted everywhere. + */ +template +void ReplicatedWriteLog::schedule_append(GenericLogOperationsT &ops) +{ + bool need_finisher; + GenericLogOperationsVectorT appending; + + std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); + { + std::lock_guard locker(m_lock); + + need_finisher = m_ops_to_append.empty() && !m_appending; + m_ops_to_append.splice(m_ops_to_append.end(), ops); + } + + if (need_finisher) { + enlist_op_appender(); + } + + for (auto &op : appending) { + op->appending(); + } +} + +template +void ReplicatedWriteLog::schedule_append(GenericLogOperationsVectorT &ops) +{ + GenericLogOperationsT to_append(ops.begin(), ops.end()); + + schedule_append(to_append); +} + +const unsigned long int ops_flushed_together = 4; +/* + * Performs the pmem buffer flush on all scheduled ops, then schedules + * the log event append operation for all of them. + */ +template +void ReplicatedWriteLog::flush_then_append_scheduled_ops(void) +{ + GenericLogOperationsT ops; + bool ops_remain = false; + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << dendl; + } + do { + { + ops.clear(); + std::lock_guard locker(m_lock); + if (m_ops_to_flush.size()) { + auto last_in_batch = m_ops_to_flush.begin(); + unsigned int ops_to_flush = m_ops_to_flush.size(); + if (ops_to_flush > ops_flushed_together) { + ops_to_flush = ops_flushed_together; + } + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl; + } + std::advance(last_in_batch, ops_to_flush); + ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch); + ops_remain = !m_ops_to_flush.empty(); + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " << m_ops_to_flush.size() << " remain" << dendl; + } + } else { + ops_remain = false; + } + } + if (ops_remain) { + enlist_op_flusher(); + } + + /* Ops subsequently scheduled for flush may finish before these, + * which is fine. We're unconcerned with completion order until we + * get to the log message append step. */ + if (ops.size()) { + flush_pmem_buffer(ops); + schedule_append(ops); + } + } while (ops_remain); + append_scheduled_ops(); +} + +template +void ReplicatedWriteLog::enlist_op_flusher() +{ + m_async_flush_ops++; + m_async_op_tracker.start_op(); + Context *flush_ctx = new LambdaContext([this](int r) { + flush_then_append_scheduled_ops(); + m_async_flush_ops--; + m_async_op_tracker.finish_op(); + }); + m_work_queue.queue(flush_ctx); +} + +/* + * Takes custody of ops. They'll all get their pmem blocks flushed, + * then get their log entries appended. + */ +template +void ReplicatedWriteLog::schedule_flush_and_append(GenericLogOperationsVectorT &ops) +{ + GenericLogOperationsT to_flush(ops.begin(), ops.end()); + bool need_finisher; + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << dendl; + } + { + std::lock_guard locker(m_lock); + + need_finisher = m_ops_to_flush.empty(); + m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush); + } + + if (need_finisher) { + enlist_op_flusher(); + } +} + +/* + * Flush the pmem regions for the data blocks of a set of operations + * + * V is expected to be GenericLogOperations, or GenericLogOperationsVector + */ +template +template +void ReplicatedWriteLog::flush_pmem_buffer(V& ops) +{ + for (auto &operation : ops) { + if (operation->is_write() || operation->is_writesame()) { + operation->buf_persist_time = ceph_clock_now(); + auto write_entry = operation->get_write_log_entry(); + + pmemobj_flush(m_log_pool, write_entry->pmem_buffer, write_entry->write_bytes()); + } + } + + /* Drain once for all */ + pmemobj_drain(m_log_pool); + + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + if (operation->is_write() || operation->is_writesame()) { + operation->buf_persist_comp_time = now; + } else { + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; + } + } + } +} + +/* + * Allocate the (already reserved) write log entries for a set of operations. + * + * Locking: + * Acquires m_lock + */ +template +void ReplicatedWriteLog::alloc_op_log_entries(GenericLogOperationsT &ops) +{ + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); + + ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock)); + + /* Allocate the (already reserved) log entries */ + std::lock_guard locker(m_lock); + + for (auto &operation : ops) { + uint32_t entry_index = m_first_free_entry; + m_first_free_entry = (m_first_free_entry + 1) % m_total_log_entries; + auto &log_entry = operation->get_log_entry(); + log_entry->log_entry_index = entry_index; + log_entry->ram_entry.entry_index = entry_index; + log_entry->pmem_entry = &pmem_log_entries[entry_index]; + log_entry->ram_entry.entry_valid = 1; + m_log_entries.push_back(log_entry); + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; + } + } +} + +/* + * Flush the persistent write log entries set of ops. The entries must + * be contiguous in persistent memory. + */ +template +void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVectorT &ops) +{ + if (ops.empty()) { + return; + } + + if (ops.size() > 1) { + ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry); + } + + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " " + << "start address=" << ops.front()->get_log_entry()->pmem_entry << " " + << "bytes=" << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) + << dendl; + } + pmemobj_flush(m_log_pool, + ops.front()->get_log_entry()->pmem_entry, + ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))); +} + +/* + * Write and persist the (already allocated) write log entries and + * data buffer allocations for a set of ops. The data buffer for each + * of these must already have been persisted to its reserved area. + */ +template +int ReplicatedWriteLog::append_op_log_entries(GenericLogOperationsT &ops) +{ + CephContext *cct = m_image_ctx.cct; + GenericLogOperationsVectorT entries_to_flush; + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + int ret = 0; + + ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock)); + + if (ops.empty()) { + return 0; + } + entries_to_flush.reserve(ops_appended_together); + + /* Write log entries to ring and persist */ + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + if (!entries_to_flush.empty()) { + /* Flush these and reset the list if the current entry wraps to the + * tail of the ring */ + if (entries_to_flush.back()->get_log_entry()->log_entry_index > + operation->get_log_entry()->log_entry_index) { + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at " + << "operation=[" << *operation << "]" << dendl; + } + flush_op_log_entries(entries_to_flush); + entries_to_flush.clear(); + now = ceph_clock_now(); + } + } + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index=" + << operation->get_log_entry()->log_entry_index << " " + << "from " << &operation->get_log_entry()->ram_entry << " " + << "to " << operation->get_log_entry()->pmem_entry << " " + << "operation=[" << *operation << "]" << dendl; + } + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 05) << "APPENDING: index=" + << operation->get_log_entry()->log_entry_index << " " + << "operation=[" << *operation << "]" << dendl; + } + operation->log_append_time = now; + *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry; + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "APPENDING: index=" + << operation->get_log_entry()->log_entry_index << " " + << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry << "]" << dendl; + } + entries_to_flush.push_back(operation); + } + flush_op_log_entries(entries_to_flush); + + /* Drain once for all */ + pmemobj_drain(m_log_pool); + + /* + * Atomically advance the log head pointer and publish the + * allocations for all the data buffers they refer to. + */ + utime_t tx_start = ceph_clock_now(); + TX_BEGIN(m_log_pool) { + D_RW(pool_root)->first_free_entry = m_first_free_entry; + for (auto &operation : ops) { + if (operation->is_write() || operation->is_writesame()) { + auto write_op = (std::shared_ptr&) operation; + pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1); + } else { + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; + } + } + } + } TX_ONCOMMIT { + } TX_ONABORT { + lderr(cct) << "failed to commit " << ops.size() << " log entries (" << m_log_pool_name << ")" << dendl; + ceph_assert(false); + ret = -EIO; + } TX_FINALLY { + } TX_END; + + utime_t tx_end = ceph_clock_now(); + m_perfcounter->tinc(l_librbd_rwl_append_tx_t, tx_end - tx_start); + m_perfcounter->hinc(l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size()); + for (auto &operation : ops) { + operation->log_append_comp_time = tx_end; + } + + return ret; +} + +/* + * Complete a set of write ops with the result of append_op_entries. + */ +template +void ReplicatedWriteLog::complete_op_log_entries(GenericLogOperationsT &&ops, const int result) +{ + GenericLogEntries dirty_entries; + int published_reserves = 0; + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; + } + for (auto &op : ops) { + utime_t now = ceph_clock_now(); + auto log_entry = op->get_log_entry(); + log_entry->completed = true; + if (op->is_writing_op()) { + op->get_gen_write_op()->sync_point->log_entry->writes_completed++; + dirty_entries.push_back(log_entry); + } + if (op->is_write() || op->is_writesame()) { + published_reserves++; + } + if (op->is_discard()) { + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << __func__ << ": completing discard" << dendl; + } + } + op->complete(result); + if (op->is_write()) { + m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, op->buf_persist_time - op->dispatch_time); + } + m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, op->log_append_time - op->dispatch_time); + m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time); + m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(), + log_entry->ram_entry.write_bytes); + if (op->is_write()) { + utime_t buf_lat = op->buf_persist_comp_time - op->buf_persist_time; + m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat); + m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(), + log_entry->ram_entry.write_bytes); + m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, op->log_append_time - op->buf_persist_time); + } + utime_t app_lat = op->log_append_comp_time - op->log_append_time; + m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat); + m_perfcounter->hinc(l_librbd_rwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(), + log_entry->ram_entry.write_bytes); + m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_cmp_t, now - op->log_append_time); + } + + { + std::lock_guard locker(m_lock); + m_unpublished_reserves -= published_reserves; + m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries); + + /* New entries may be flushable */ + wake_up(); + } +} + +/** + * Dispatch as many deferred writes as possible + */ +template +void ReplicatedWriteLog::dispatch_deferred_writes(void) +{ + C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */ + C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */ + bool allocated = false; /* front_req allocate succeeded */ + bool cleared_dispatching_flag = false; + + /* If we can't become the dispatcher, we'll exit */ + { + std::lock_guard locker(m_lock); + if (m_dispatching_deferred_ops || + !m_deferred_ios.size()) { + return; + } + m_dispatching_deferred_ops = true; + } + + /* There are ops to dispatch, and this should be the only thread dispatching them */ + { + std::lock_guard deferred_dispatch(m_deferred_dispatch_lock); + do { + { + std::lock_guard locker(m_lock); + ceph_assert(m_dispatching_deferred_ops); + if (allocated) { + /* On the 2..n-1 th time we get m_lock, front_req->alloc_resources() will + * have succeeded, and we'll need to pop it off the deferred ops list + * here. */ + ceph_assert(front_req); + ceph_assert(!allocated_req); + m_deferred_ios.pop_front(); + allocated_req = front_req; + front_req = nullptr; + allocated = false; + } + ceph_assert(!allocated); + if (!allocated && front_req) { + /* front_req->alloc_resources() failed on the last iteration. We'll stop dispatching. */ + front_req = nullptr; + ceph_assert(!cleared_dispatching_flag); + m_dispatching_deferred_ops = false; + cleared_dispatching_flag = true; + } else { + ceph_assert(!front_req); + if (m_deferred_ios.size()) { + /* New allocation candidate */ + front_req = m_deferred_ios.front(); + } else { + ceph_assert(!cleared_dispatching_flag); + m_dispatching_deferred_ops = false; + cleared_dispatching_flag = true; + } + } + } + /* Try allocating for front_req before we decide what to do with allocated_req + * (if any) */ + if (front_req) { + ceph_assert(!cleared_dispatching_flag); + allocated = front_req->alloc_resources(); + } + if (allocated_req && front_req && allocated) { + /* Push dispatch of the first allocated req to a wq */ + m_work_queue.queue(new LambdaContext( + [this, allocated_req](int r) { + allocated_req->dispatch(); + }), 0); + allocated_req = nullptr; + } + ceph_assert(!(allocated_req && front_req && allocated)); + + /* Continue while we're still considering the front of the deferred ops list */ + } while (front_req); + ceph_assert(!allocated); + } + ceph_assert(cleared_dispatching_flag); + + /* If any deferred requests were allocated, the last one will still be in allocated_req */ + if (allocated_req) { + allocated_req->dispatch(); + } +} + +/** + * Returns the lanes used by this write, and attempts to dispatch the next + * deferred write + */ +template +void ReplicatedWriteLog::release_write_lanes(C_WriteRequestT *write_req) +{ + { + std::lock_guard locker(m_lock); + ceph_assert(write_req->resources.allocated); + m_free_lanes += write_req->image_extents.size(); + write_req->resources.allocated = false; + } + dispatch_deferred_writes(); +} + +/** + * Attempts to allocate log resources for a write. Write is dispatched if + * resources are available, or queued if they aren't. + */ +template +void ReplicatedWriteLog::alloc_and_dispatch_io_req(C_BlockIORequestT *req) +{ + bool dispatch_here = false; + + { + /* If there are already deferred writes, queue behind them for resources */ + { + std::lock_guard locker(m_lock); + dispatch_here = m_deferred_ios.empty(); + } + if (dispatch_here) { + dispatch_here = req->alloc_resources(); + } + if (dispatch_here) { + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; + } + req->dispatch(); + } else { + req->deferred(); + { + std::lock_guard locker(m_lock); + m_deferred_ios.push_back(req); + } + if (RWL_VERBOSE_LOGGING) { + ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; + } + dispatch_deferred_writes(); + } + } +} } // namespace cache } // namespace librbd +#ifndef TEST_F template class librbd::cache::ReplicatedWriteLog; template class librbd::cache::ImageCache; - +#endif diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index c5f37b76f37..adbdddca9bd 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -12,6 +12,8 @@ #include "librbd/Utils.h" #include "librbd/BlockGuard.h" #include "librbd/cache/Types.h" +#include "librbd/cache/rwl/LogOperation.h" +#include "librbd/cache/rwl/Request.h" #include #include @@ -37,9 +39,23 @@ typedef std::list> GenericLogEntries; /**** Write log entries end ****/ +typedef librbd::BlockGuard WriteLogGuard; + +template +struct C_GuardedBlockIORequest; class DeferredContexts; template class ImageCacheState; + +template +struct C_BlockIORequest; + +template +struct C_WriteRequest; + +template +using GenericLogOperations = std::list>; + } // namespace rwl @@ -78,6 +94,35 @@ public: void flush(Context *on_finish) override; private: + using This = ReplicatedWriteLog; + using SyncPointT = rwl::SyncPoint; + using GenericLogOperationT = rwl::GenericLogOperation; + using GenericLogOperationSharedPtrT = rwl::GenericLogOperationSharedPtr; + using WriteLogOperationT = rwl::WriteLogOperation; + using WriteLogOperationSetT = rwl::WriteLogOperationSet; + using SyncPointLogOperationT = rwl::SyncPointLogOperation; + using GenericLogOperationsT = rwl::GenericLogOperations; + using GenericLogOperationsVectorT = rwl::GenericLogOperationsVector; + using C_BlockIORequestT = rwl::C_BlockIORequest; + using C_WriteRequestT = rwl::C_WriteRequest; + + friend class rwl::SyncPoint; + friend class rwl::GenericLogOperation; + friend class rwl::GeneralWriteLogOperation; + friend class rwl::WriteLogOperation; + friend class rwl::WriteLogOperationSet; + friend class rwl::SyncPointLogOperation; + friend struct rwl::C_GuardedBlockIORequest; + friend struct rwl::C_BlockIORequest; + friend struct rwl::C_WriteRequest; + typedef std::list *> C_WriteRequests; + typedef std::list *> C_BlockIORequests; + + BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req); + BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req); + void detain_guarded_request(rwl::GuardedRequest &&req); + void release_guarded_request(BlockGuardCell *cell); + librbd::cache::rwl::ImageCacheState* m_cache_state = nullptr; std::atomic m_initialized = {false}; @@ -99,8 +144,11 @@ private: uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */ uint64_t m_bytes_allocated_cap = 0; - ImageWriteback m_image_writeback; + utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */ + std::atomic m_alloc_failed_since_retire = {false}; + ImageWriteback m_image_writeback; + rwl::WriteLogGuard m_write_log_guard; /* * When m_first_free_entry == m_first_valid_entry, the log is * empty. There is always at least one free entry, which can't be @@ -111,23 +159,62 @@ private: /* Starts at 0 for a new write log. Incremented on every flush. */ uint64_t m_current_sync_gen = 0; + std::shared_ptr m_current_sync_point = nullptr; /* Starts at 0 on each sync gen increase. Incremented before applied to an operation */ uint64_t m_last_op_sequence_num = 0; /* All writes bearing this and all prior sync gen numbers are flushed */ uint64_t m_flushed_sync_gen = 0; + bool m_persist_on_write_until_flush = true; + /* True if it's safe to complete a user request in persist-on-flush + * mode before the write is persisted. This is only true if there is + * a local copy of the write data, or if local write failure always + * causes local node failure. */ + bool m_persist_on_flush_early_user_comp = false; /* Assume local write failure does not cause node failure */ + bool m_persist_on_flush = false; /* If false, persist each write before completion */ + bool m_flush_seen = false; + + AsyncOpTracker m_async_op_tracker; + /* Debug counters for the places m_async_op_tracker is used */ + std::atomic m_async_flush_ops = {0}; + std::atomic m_async_append_ops = {0}; + std::atomic m_async_complete_ops = {0}; + /* Acquire locks in order declared here */ + /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */ + mutable ceph::mutex m_deferred_dispatch_lock; + /* Hold m_log_append_lock while appending or retiring log entries. */ + mutable ceph::mutex m_log_append_lock; + /* Used for most synchronization */ mutable ceph::mutex m_lock; + /* Used in release/detain to make BlockGuard preserve submission order */ + mutable ceph::mutex m_blockguard_lock; + /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */ + mutable ceph::mutex m_entry_bl_lock; + + /* Use m_blockguard_lock for the following 3 things */ + rwl::WriteLogGuard::BlockOperations m_awaiting_barrier; + bool m_barrier_in_progress = false; + BlockGuardCell *m_barrier_cell = nullptr; + + bool m_appending = false; + bool m_dispatching_deferred_ops = false; - librbd::cache::Contexts m_flush_complete_contexts; + GenericLogOperationsT m_ops_to_flush; /* Write ops needing flush in local log */ + GenericLogOperationsT m_ops_to_append; /* Write ops needing event append in local log */ /* New entries are at the back. Oldest at the front */ rwl::GenericLogEntries m_log_entries; rwl::GenericLogEntries m_dirty_log_entries; + /* Writes that have left the block guard, but are waiting for resources */ + C_BlockIORequests m_deferred_ios; + /* Throttle writes concurrently allocating & replicating */ + unsigned int m_free_lanes = MAX_CONCURRENT_WRITES; + unsigned int m_unpublished_reserves = 0; PerfCounters *m_perfcounter = nullptr; /* Initialized from config, then set false during shutdown */ @@ -147,6 +234,26 @@ private: void rwl_init(Context *on_finish, rwl::DeferredContexts &later); void update_image_cache_state(Context *on_finish); + void start_workers(); + void wake_up(); + + void dispatch_deferred_writes(void); + void release_write_lanes(C_WriteRequestT *write_req); + void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req); + void append_scheduled_ops(void); + void enlist_op_appender(); + void schedule_append(GenericLogOperationsVectorT &ops); + void schedule_append(GenericLogOperationsT &ops); + void flush_then_append_scheduled_ops(void); + void enlist_op_flusher(); + void schedule_flush_and_append(GenericLogOperationsVectorT &ops); + template + void flush_pmem_buffer(V& ops); + void alloc_op_log_entries(GenericLogOperationsT &ops); + void flush_op_log_entries(GenericLogOperationsVectorT &ops); + int append_op_log_entries(GenericLogOperationsT &ops); + void complete_op_log_entries(GenericLogOperationsT &&ops, const int r); + void schedule_complete_op_log_entries(GenericLogOperationsT &&ops, const int r); }; } // namespace cache diff --git a/src/librbd/cache/rwl/LogEntry.cc b/src/librbd/cache/rwl/LogEntry.cc index 5fb67ad7847..50bf5a9a1c0 100644 --- a/src/librbd/cache/rwl/LogEntry.cc +++ b/src/librbd/cache/rwl/LogEntry.cc @@ -62,7 +62,7 @@ std::ostream& SyncPointLogEntry::format(std::ostream &os) const { }; std::ostream &operator<<(std::ostream &os, - const SyncPointLogEntry &entry) { + const SyncPointLogEntry &entry) { return entry.format(os); } @@ -83,7 +83,7 @@ std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const { }; std::ostream &operator<<(std::ostream &os, - const GeneralWriteLogEntry &entry) { + const GeneralWriteLogEntry &entry) { return entry.format(os); } @@ -143,7 +143,7 @@ std::ostream& WriteLogEntry::format(std::ostream &os) const { }; std::ostream &operator<<(std::ostream &os, - const WriteLogEntry &entry) { + const WriteLogEntry &entry) { return entry.format(os); } diff --git a/src/librbd/cache/rwl/LogEntry.h b/src/librbd/cache/rwl/LogEntry.h index e1a8744b908..b6d73b3f535 100644 --- a/src/librbd/cache/rwl/LogEntry.h +++ b/src/librbd/cache/rwl/LogEntry.h @@ -4,6 +4,7 @@ #ifndef CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H #define CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H +#include "librbd/Utils.h" #include "librbd/cache/rwl/Types.h" #include #include @@ -35,9 +36,15 @@ public: bool is_write(); bool is_writer(); virtual const GenericLogEntry* get_log_entry() = 0; - virtual const SyncPointLogEntry* get_sync_point_log_entry() { return nullptr;} - virtual const GeneralWriteLogEntry* get_gen_write_log_entry() { return nullptr; } - virtual const WriteLogEntry* get_write_log_entry() { return nullptr; } + virtual const SyncPointLogEntry* get_sync_point_log_entry() { + return nullptr; + } + virtual const GeneralWriteLogEntry* get_gen_write_log_entry() { + return nullptr; + } + virtual const WriteLogEntry* get_write_log_entry() { + return nullptr; + } virtual std::ostream& format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const GenericLogEntry &entry); @@ -62,9 +69,15 @@ public: }; SyncPointLogEntry(const SyncPointLogEntry&) = delete; SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete; - virtual inline unsigned int write_bytes() { return 0; } - const GenericLogEntry* get_log_entry() override { return get_sync_point_log_entry(); } - const SyncPointLogEntry* get_sync_point_log_entry() override { return this; } + virtual inline unsigned int write_bytes() { + return 0; + } + const GenericLogEntry* get_log_entry() override { + return get_sync_point_log_entry(); + } + const SyncPointLogEntry* get_sync_point_log_entry() override { + return this; + } std::ostream& format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const SyncPointLogEntry &entry); @@ -91,10 +104,18 @@ public: /* The bytes in the image this op makes dirty. Discard and WS override. */ return write_bytes(); }; - const BlockExtent block_extent() { return ram_entry.block_extent(); } - const GenericLogEntry* get_log_entry() override { return get_gen_write_log_entry(); } - const GeneralWriteLogEntry* get_gen_write_log_entry() override { return this; } - uint32_t get_map_ref() { return(referring_map_entries); } + const BlockExtent block_extent() { + return ram_entry.block_extent(); + } + const GenericLogEntry* get_log_entry() override { + return get_gen_write_log_entry(); + } + const GeneralWriteLogEntry* get_gen_write_log_entry() override { + return this; + } + uint32_t get_map_ref() { + return(referring_map_entries); + } void inc_map_ref() { referring_map_entries++; } void dec_map_ref() { referring_map_entries--; } std::ostream &format(std::ostream &os) const; @@ -132,8 +153,12 @@ public: buffer::list &get_pmem_bl(ceph::mutex &entry_bl_lock); /* Constructs a new bl containing copies of pmem_bp */ void copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl); - virtual const GenericLogEntry* get_log_entry() override { return get_write_log_entry(); } - const WriteLogEntry* get_write_log_entry() override { return this; } + virtual const GenericLogEntry* get_log_entry() override { + return get_write_log_entry(); + } + const WriteLogEntry* get_write_log_entry() override { + return this; + } std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const WriteLogEntry &entry); diff --git a/src/librbd/cache/rwl/LogOperation.cc b/src/librbd/cache/rwl/LogOperation.cc index 11ed3e4b2a0..a7cb581b536 100644 --- a/src/librbd/cache/rwl/LogOperation.cc +++ b/src/librbd/cache/rwl/LogOperation.cc @@ -8,7 +8,7 @@ #define dout_subsys ceph_subsys_rbd #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \ - << __func__ << ": " + << __func__ << ": " namespace librbd { @@ -33,7 +33,7 @@ std::ostream& GenericLogOperation::format(std::ostream &os) const { template std::ostream &operator<<(std::ostream &os, - const GenericLogOperation &op) { + const GenericLogOperation &op) { return op.format(os); } @@ -58,7 +58,7 @@ std::ostream &SyncPointLogOperation::format(std::ostream &os) const { template std::ostream &operator<<(std::ostream &os, - const SyncPointLogOperation &op) { + const SyncPointLogOperation &op) { return op.format(os); } @@ -122,7 +122,8 @@ GeneralWriteLogOperation::GeneralWriteLogOperation(T &rwl, std::shared_ptr> sync_point, const utime_t dispatch_time) : GenericLogOperation(rwl, dispatch_time), - m_lock("librbd::cache::rwl::GeneralWriteLogOperation::m_lock"), sync_point(sync_point) { + m_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))), sync_point(sync_point) { } template @@ -136,7 +137,7 @@ std::ostream &GeneralWriteLogOperation::format(std::ostream &os) const { template std::ostream &operator<<(std::ostream &os, - const GeneralWriteLogOperation &op) { + const GeneralWriteLogOperation &op) { return op.format(os); } @@ -212,7 +213,7 @@ std::ostream &WriteLogOperation::format(std::ostream &os) const { template std::ostream &operator<<(std::ostream &os, - const WriteLogOperation &op) { + const WriteLogOperation &op) { return op.format(os); } @@ -220,9 +221,9 @@ std::ostream &operator<<(std::ostream &os, template WriteLogOperationSet::WriteLogOperationSet(T &rwl, utime_t dispatched, std::shared_ptr> sync_point, bool persist_on_flush, Context *on_finish) - : rwl(rwl), m_on_finish(on_finish), + : m_on_finish(on_finish), rwl(rwl), persist_on_flush(persist_on_flush), dispatch_time(dispatched), sync_point(sync_point) { - on_ops_appending = sync_point->m_prior_log_entries_persisted->new_sub(); + on_ops_appending = sync_point->prior_log_entries_persisted->new_sub(); on_ops_persist = nullptr; extent_ops_persist = new C_Gather(rwl.m_image_ctx.cct, diff --git a/src/librbd/cache/rwl/LogOperation.h b/src/librbd/cache/rwl/LogOperation.h index 96aaa692872..dc0dd547356 100644 --- a/src/librbd/cache/rwl/LogOperation.h +++ b/src/librbd/cache/rwl/LogOperation.h @@ -52,18 +52,38 @@ public: friend std::ostream &operator<<(std::ostream &os, const GenericLogOperation &op); virtual const std::shared_ptr get_log_entry() = 0; - virtual const std::shared_ptr get_sync_point_log_entry() { return nullptr; } - virtual const std::shared_ptr get_gen_write_log_entry() { return nullptr; } - virtual const std::shared_ptr get_write_log_entry() { return nullptr; } + virtual const std::shared_ptr get_sync_point_log_entry() { + return nullptr; + } + virtual const std::shared_ptr get_gen_write_log_entry() { + return nullptr; + } + virtual const std::shared_ptr get_write_log_entry() { + return nullptr; + } virtual void appending() = 0; virtual void complete(int r) = 0; - virtual bool is_write() { return false; } - virtual bool is_sync_point() { return false; } - virtual bool is_discard() { return false; } - virtual bool is_writesame() { return false; } - virtual bool is_writing_op() { return false; } - virtual GeneralWriteLogOperation *get_gen_write_op() { return nullptr; }; - virtual WriteLogOperation *get_write_op() { return nullptr; }; + virtual bool is_write() { + return false; + } + virtual bool is_sync_point() { + return false; + } + virtual bool is_discard() { + return false; + } + virtual bool is_writesame() { + return false; + } + virtual bool is_writing_op() { + return false; + } + virtual GeneralWriteLogOperation *get_gen_write_op() { + return nullptr; + }; + virtual WriteLogOperation *get_write_op() { + return nullptr; + }; }; template @@ -81,9 +101,15 @@ public: template friend std::ostream &operator<<(std::ostream &os, const SyncPointLogOperation &op); - const std::shared_ptr get_log_entry() { return get_sync_point_log_entry(); } - const std::shared_ptr get_sync_point_log_entry() { return sync_point->log_entry; } - bool is_sync_point() { return true; } + const std::shared_ptr get_log_entry() { + return get_sync_point_log_entry(); + } + const std::shared_ptr get_sync_point_log_entry() { + return sync_point->log_entry; + } + bool is_sync_point() { + return true; + } void appending(); void complete(int r); }; @@ -110,8 +136,12 @@ public: template friend std::ostream &operator<<(std::ostream &os, const GeneralWriteLogOperation &op); - GeneralWriteLogOperation *get_gen_write_op() { return this; }; - bool is_writing_op() { return true; } + GeneralWriteLogOperation *get_gen_write_op() { + return this; + } + bool is_writing_op() { + return true; + } void appending(); void complete(int r); }; @@ -135,10 +165,18 @@ public: template friend std::ostream &operator<<(std::ostream &os, const WriteLogOperation &op); - const std::shared_ptr get_log_entry() { return get_write_log_entry(); } - const std::shared_ptr get_write_log_entry() { return log_entry; } - WriteLogOperation *get_write_op() override { return this; } - bool is_write() { return true; } + const std::shared_ptr get_log_entry() { + return get_write_log_entry(); + } + const std::shared_ptr get_write_log_entry() { + return log_entry; + } + WriteLogOperation *get_write_op() override { + return this; + } + bool is_write() { + return true; + } }; diff --git a/src/librbd/cache/rwl/Request.cc b/src/librbd/cache/rwl/Request.cc index b02f6d9f321..c16f3363113 100644 --- a/src/librbd/cache/rwl/Request.cc +++ b/src/librbd/cache/rwl/Request.cc @@ -176,6 +176,18 @@ std::ostream &operator<<(std::ostream &os, return os; }; +template +void C_WriteRequest::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.m_cell << dendl; + } + + ceph_assert(guard_ctx.m_cell); + this->detained = guard_ctx.m_state.detained; /* overlapped */ + this->m_queued = guard_ctx.m_state.queued; /* queued behind at least one barrier */ + this->set_cell(guard_ctx.m_cell); +} + template void C_WriteRequest::finish_req(int r) { if (RWL_VERBOSE_LOGGING) { @@ -216,7 +228,17 @@ void C_WriteRequest::setup_log_operations() { template void C_WriteRequest::schedule_append() { - // TODO: call rwl to complete it + ceph_assert(++m_appended == 1); + if (m_do_early_flush) { + /* This caller is waiting for persist, so we'll use their thread to + * expedite it */ + rwl.flush_pmem_buffer(this->m_op_set->operations); + rwl.schedule_append(this->m_op_set->operations); + } else { + /* This is probably not still the caller's thread, so do the payload + * flushing/replicating later. */ + rwl.schedule_flush_and_append(this->m_op_set->operations); + } } /** @@ -237,7 +259,6 @@ bool C_WriteRequest::alloc_resources() uint64_t bytes_cached = 0; uint64_t bytes_dirtied = 0; - ceph_assert(!rwl.m_lock.is_locked_by_me()); ceph_assert(!resources.allocated); resources.buffers.reserve(this->image_extents.size()); { @@ -503,6 +524,33 @@ void C_WriteRequest::dispatch() } } +std::ostream &operator<<(std::ostream &os, + const BlockGuardReqState &r) { + os << "barrier=" << r.barrier << ", " + << "current_barrier=" << r.current_barrier << ", " + << "detained=" << r.detained << ", " + << "queued=" << r.queued; + return os; +}; + +GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::function &&callback) + : m_callback(std::move(callback)){ } + +GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { } + +void GuardedRequestFunctionContext::finish(int r) { + ceph_assert(m_cell); + m_callback(*this); +} + +std::ostream &operator<<(std::ostream &os, + const GuardedRequest &r) { + os << "guard_ctx->m_state=[" << r.guard_ctx->m_state << "], " + << "block_extent.block_start=" << r.block_extent.block_start << ", " + << "block_extent.block_start=" << r.block_extent.block_end; + return os; +}; + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/rwl/Request.h b/src/librbd/cache/rwl/Request.h index 9556c4f4f57..7af0c196e88 100644 --- a/src/librbd/cache/rwl/Request.h +++ b/src/librbd/cache/rwl/Request.h @@ -15,6 +15,8 @@ class BlockGuardCell; namespace cache { namespace rwl { +class GuardedRequestFunctionContext; + /** * A request that can be deferred in a BlockGuard to sequence * overlapping operations. @@ -117,6 +119,8 @@ public: ~C_WriteRequest(); + void blockguard_acquired(GuardedRequestFunctionContext &guard_ctx); + /* Common finish to plain write and compare-and-write (if it writes) */ virtual void finish_req(int r); @@ -152,6 +156,43 @@ private: const C_WriteRequest &req); }; +struct BlockGuardReqState { + bool barrier = false; /* This is a barrier request */ + bool current_barrier = false; /* This is the currently active barrier */ + bool detained = false; + bool queued = false; /* Queued for barrier */ + friend std::ostream &operator<<(std::ostream &os, + const BlockGuardReqState &r); +}; + +class GuardedRequestFunctionContext : public Context { +public: + BlockGuardCell *m_cell = nullptr; + BlockGuardReqState m_state; + GuardedRequestFunctionContext(boost::function &&callback); + ~GuardedRequestFunctionContext(void); + GuardedRequestFunctionContext(const GuardedRequestFunctionContext&) = delete; + GuardedRequestFunctionContext &operator=(const GuardedRequestFunctionContext&) = delete; + +private: + boost::function m_callback; + void finish(int r) override; +}; + +class GuardedRequest { +public: + const BlockExtent block_extent; + GuardedRequestFunctionContext *guard_ctx; /* Work to do when guard on range obtained */ + + GuardedRequest(const BlockExtent block_extent, + GuardedRequestFunctionContext *on_guard_acquire, bool barrier = false) + : block_extent(block_extent), guard_ctx(on_guard_acquire) { + guard_ctx->m_state.barrier = barrier; + } + friend std::ostream &operator<<(std::ostream &os, + const GuardedRequest &r); +}; + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/rwl/SyncPoint.cc b/src/librbd/cache/rwl/SyncPoint.cc index f486b934d0d..c9c4582d8b2 100644 --- a/src/librbd/cache/rwl/SyncPoint.cc +++ b/src/librbd/cache/rwl/SyncPoint.cc @@ -6,7 +6,7 @@ #define dout_subsys ceph_subsys_rbd #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::rwl::SyncPoint: " << this << " " \ - << __func__ << ": " + << __func__ << ": " namespace librbd { namespace cache { diff --git a/src/librbd/cache/rwl/Types.cc b/src/librbd/cache/rwl/Types.cc index 768b85b3fa3..3a6cfa6a5ea 100644 --- a/src/librbd/cache/rwl/Types.cc +++ b/src/librbd/cache/rwl/Types.cc @@ -22,7 +22,7 @@ DeferredContexts::~DeferredContexts() { } void DeferredContexts::add(Context* ctx) { - contexts.push_back(ctx); + contexts.push_back(ctx); } /* diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index ecf65545356..ac76aaf1f85 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -141,6 +141,9 @@ const uint32_t MIN_WRITE_ALLOC_SIZE = 512; const uint32_t LOG_STATS_INTERVAL_SECONDS = 5; /**** Write log entries ****/ +const unsigned long int MAX_ALLOC_PER_TRANSACTION = 8; +const unsigned long int MAX_FREE_PER_TRANSACTION = 1; +const unsigned int MAX_CONCURRENT_WRITES = 256; const uint64_t DEFAULT_POOL_SIZE = 1u<<30; const uint64_t MIN_POOL_SIZE = DEFAULT_POOL_SIZE; @@ -246,10 +249,9 @@ public: const BlockExtent block_extent() { return BlockExtent(first_image_byte, last_image_byte); } - const io::Extent image_extent(const BlockExtent& block_extent) - { + const io::Extent image_extent(const BlockExtent& block_extent) { return io::Extent(block_extent.block_start, - block_extent.block_end - block_extent.block_start + 1); + block_extent.block_end - block_extent.block_start + 1); } const io::Extent image_extent() { return image_extent(block_extent()); diff --git a/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc b/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc index 43d4a820c63..8cf86556c0b 100644 --- a/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc +++ b/src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc @@ -41,6 +41,10 @@ inline ImageCtx *get_image_ctx(MockImageCtx *image_ctx) { // template definitions #include "librbd/cache/ImageWriteback.cc" #include "librbd/cache/rwl/ImageCacheState.cc" +#include "librbd/cache/rwl/SyncPoint.cc" +#include "librbd/cache/rwl/Request.cc" +#include "librbd/cache/rwl/Types.cc" +#include "librbd/cache/rwl/LogOperation.cc" template class librbd::cache::ImageWriteback; template class librbd::cache::rwl::ImageCacheState; @@ -65,8 +69,8 @@ struct TestMockCacheReplicatedWriteLog : public TestMockFixture { void validate_cache_state(librbd::ImageCtx *image_ctx, MockImageCacheStateRWL &state, bool present, bool empty, bool clean, - string host="", string path="", - uint64_t size=0) { + string host="", string path="", + uint64_t size=0) { ConfigProxy &config = image_ctx->config; ASSERT_EQ(present, state.present); ASSERT_EQ(empty, state.empty); @@ -96,8 +100,8 @@ struct TestMockCacheReplicatedWriteLog : public TestMockFixture { void expect_context_complete(MockContextRWL& mock_context, int r) { EXPECT_CALL(mock_context, complete(r)) .WillRepeatedly(Invoke([&mock_context](int r) { - mock_context.do_complete(r); - })); + mock_context.do_complete(r); + })); } void expect_metadata_set(MockImageCtx& mock_image_ctx) { -- 2.39.5