From 80179c76d503b704b6fdb953a8ba47f5267017d1 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Wed, 20 Nov 2019 22:21:55 +0800 Subject: [PATCH] librbd: add WriteRequest to handle write This class is used to handle write operations. Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/CMakeLists.txt | 1 + src/librbd/cache/ImageCache.h | 1 + src/librbd/cache/ReplicatedWriteLog.cc | 12 +- src/librbd/cache/ReplicatedWriteLog.h | 1 + src/librbd/cache/rwl/LogEntry.cc | 2 +- src/librbd/cache/rwl/LogOperation.cc | 8 +- src/librbd/cache/rwl/Request.cc | 508 +++++++++++++++++++++++++ src/librbd/cache/rwl/Request.h | 159 ++++++++ src/librbd/cache/rwl/SyncPoint.cc | 6 +- src/librbd/cache/rwl/Types.cc | 34 ++ src/librbd/cache/rwl/Types.h | 36 ++ src/librbd/io/Types.h | 4 +- 12 files changed, 752 insertions(+), 20 deletions(-) create mode 100644 src/librbd/cache/rwl/Request.cc create mode 100644 src/librbd/cache/rwl/Request.h diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 9dfb760d6d8..0645d781ee9 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -166,6 +166,7 @@ if(WITH_RBD_RWL) cache/rwl/ImageCacheState.cc cache/rwl/LogEntry.cc cache/rwl/LogOperation.cc + cache/rwl/Request.cc cache/rwl/SyncPoint.cc cache/rwl/Types.cc cache/ReplicatedWriteLog.cc) diff --git a/src/librbd/cache/ImageCache.h b/src/librbd/cache/ImageCache.h index bd3adcf4272..01ca6d188bf 100644 --- a/src/librbd/cache/ImageCache.h +++ b/src/librbd/cache/ImageCache.h @@ -23,6 +23,7 @@ struct ImageCache { protected: ImageCache() {} public: + typedef io::Extent Extent; typedef io::Extents Extents; virtual ~ImageCache() {} diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index 9f1e13040b0..5415fa569d0 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -25,22 +25,12 @@ #define dout_prefix *_dout << "librbd::cache::ReplicatedWriteLog: " << this << " " \ << __func__ << ": " -const uint32_t MIN_WRITE_ALLOC_SIZE = 512; -const uint32_t LOG_STATS_INTERVAL_SECONDS = 5; - -/**** Write log entries ****/ -const uint64_t DEFAULT_POOL_SIZE = 1u<<30; -const uint64_t MIN_POOL_SIZE = DEFAULT_POOL_SIZE; -constexpr double USABLE_SIZE = (7.0 / 10); -const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16; -const uint8_t RWL_POOL_VERSION = 1; -const uint64_t MAX_LOG_ENTRIES = (1024 * 1024); - namespace librbd { namespace cache { using namespace librbd::cache::rwl; +typedef ReplicatedWriteLog::Extent Extent; typedef ReplicatedWriteLog::Extents Extents; template diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index b81b90bd6eb..c5f37b76f37 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -46,6 +46,7 @@ template class ImageCacheState; template class ReplicatedWriteLog : public ImageCache { public: + using typename ImageCache::Extent; using typename ImageCache::Extents; ReplicatedWriteLog(ImageCtxT &image_ctx, librbd::cache::rwl::ImageCacheState* cache_state); diff --git a/src/librbd/cache/rwl/LogEntry.cc b/src/librbd/cache/rwl/LogEntry.cc index 3accf27c766..5fb67ad7847 100644 --- a/src/librbd/cache/rwl/LogEntry.cc +++ b/src/librbd/cache/rwl/LogEntry.cc @@ -117,7 +117,7 @@ buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) { if (0 == bl_refs) { init_pmem_bl(); } - assert(0 != bl_refs); + ceph_assert(0 != bl_refs); } return pmem_bl; }; diff --git a/src/librbd/cache/rwl/LogOperation.cc b/src/librbd/cache/rwl/LogOperation.cc index c31cd909e2b..11ed3e4b2a0 100644 --- a/src/librbd/cache/rwl/LogOperation.cc +++ b/src/librbd/cache/rwl/LogOperation.cc @@ -66,7 +66,7 @@ template void SyncPointLogOperation::appending() { std::vector appending_contexts; - assert(sync_point); + ceph_assert(sync_point); { std::lock_guard locker(rwl.m_lock); if (!sync_point->m_appending) { @@ -85,7 +85,7 @@ template void SyncPointLogOperation::complete(int result) { std::vector persisted_contexts; - assert(sync_point); + ceph_assert(sync_point); if (RWL_VERBOSE_LOGGING) { ldout(rwl.m_image_ctx.cct, 20) << "Sync point op =[" << *this << "] completed" << dendl; @@ -93,8 +93,8 @@ void SyncPointLogOperation::complete(int result) { { std::lock_guard locker(rwl.m_lock); /* Remove link from next sync point */ - assert(sync_point->later_sync_point); - assert(sync_point->later_sync_point->earlier_sync_point == + ceph_assert(sync_point->later_sync_point); + ceph_assert(sync_point->later_sync_point->earlier_sync_point == sync_point); sync_point->later_sync_point->earlier_sync_point = nullptr; } diff --git a/src/librbd/cache/rwl/Request.cc b/src/librbd/cache/rwl/Request.cc new file mode 100644 index 00000000000..b02f6d9f321 --- /dev/null +++ b/src/librbd/cache/rwl/Request.cc @@ -0,0 +1,508 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Request.h" +#include "librbd/BlockGuard.h" +#include "librbd/cache/rwl/LogEntry.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \ + << __func__ << ": " + +namespace librbd { +namespace cache { +namespace rwl { + +typedef std::list> GeneralWriteLogEntries; + +template +C_GuardedBlockIORequest::C_GuardedBlockIORequest(T &rwl) + : rwl(rwl) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + } +} + +template +C_GuardedBlockIORequest::~C_GuardedBlockIORequest() { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + } + ceph_assert(m_cell_released || !m_cell); +} + +template +void C_GuardedBlockIORequest::set_cell(BlockGuardCell *cell) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << cell << dendl; + } + ceph_assert(cell); + ceph_assert(!m_cell); + m_cell = cell; +} + +template +BlockGuardCell *C_GuardedBlockIORequest::get_cell(void) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl; + } + return m_cell; +} + +template +void C_GuardedBlockIORequest::release_cell() { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl; + } + ceph_assert(m_cell); + bool initial = false; + if (m_cell_released.compare_exchange_strong(initial, true)) { + rwl.release_guarded_request(m_cell); + } else { + ldout(rwl.m_image_ctx.cct, 5) << "cell " << m_cell << " already released for " << this << dendl; + } +} + +template +C_BlockIORequest::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents, + bufferlist&& bl, const int fadvise_flags, Context *user_req) + : C_GuardedBlockIORequest(rwl), image_extents(std::move(extents)), + bl(std::move(bl)), fadvise_flags(fadvise_flags), + user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + } + /* Remove zero length image extents from input */ + for (auto it = image_extents.begin(); it != image_extents.end(); ) { + if (0 == it->second) { + it = image_extents.erase(it); + continue; + } + ++it; + } +} + +template +C_BlockIORequest::~C_BlockIORequest() { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + } +} + +template +std::ostream &operator<<(std::ostream &os, + const C_BlockIORequest &req) { + os << "image_extents=[" << req.image_extents << "], " + << "image_extents_summary=[" << req.image_extents_summary << "], " + << "bl=" << req.bl << ", " + << "user_req=" << req.user_req << ", " + << "m_user_req_completed=" << req.m_user_req_completed << ", " + << "m_deferred=" << req.m_deferred << ", " + << "detained=" << req.detained << ", " + << "m_waited_lanes=" << req.m_waited_lanes << ", " + << "m_waited_entries=" << req.m_waited_entries << ", " + << "m_waited_buffers=" << req.m_waited_buffers << ""; + return os; + } + +template +void C_BlockIORequest::complete_user_request(int r) { + bool initial = false; + if (m_user_req_completed.compare_exchange_strong(initial, true)) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 15) << this << " completing user req" << dendl; + } + m_user_req_completed_time = ceph_clock_now(); + user_req->complete(r); + // Set user_req as null as it is deleted + user_req = nullptr; + } else { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << this << " user req already completed" << dendl; + } + } +} + +template +void C_BlockIORequest::finish(int r) { + ldout(rwl.m_image_ctx.cct, 20) << this << dendl; + + complete_user_request(r); + bool initial = false; + if (m_finish_called.compare_exchange_strong(initial, true)) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 15) << this << " finishing" << dendl; + } + finish_req(0); + } else { + ldout(rwl.m_image_ctx.cct, 20) << this << " already finished" << dendl; + ceph_assert(0); + } +} + +template +void C_BlockIORequest::deferred() { + bool initial = false; + if (m_deferred.compare_exchange_strong(initial, true)) { + deferred_handler(); + } +} + +template +C_WriteRequest::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents, + bufferlist&& bl, const int fadvise_flags, Context *user_req) + : C_BlockIORequest(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + } +} + +template +C_WriteRequest::~C_WriteRequest() { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 99) << this << dendl; + } +} + +template +std::ostream &operator<<(std::ostream &os, + const C_WriteRequest &req) { + os << (C_BlockIORequest&)req + << " m_resources.allocated=" << req.resources.allocated; + if (req.m_op_set) { + os << "m_op_set=" << *req.m_op_set; + } + return os; +}; + +template +void C_WriteRequest::finish_req(int r) { + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; + } + + /* Completed to caller by here (in finish(), which calls this) */ + utime_t now = ceph_clock_now(); + rwl.release_write_lanes(this); + this->release_cell(); /* TODO: Consider doing this in appending state */ + update_req_stats(now); +} + +template +void C_WriteRequest::setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied) { + for (auto &extent : this->image_extents) { + resources.buffers.emplace_back(); + struct WriteBufferAllocation &buffer = resources.buffers.back(); + buffer.allocation_size = MIN_WRITE_ALLOC_SIZE; + buffer.allocated = false; + bytes_cached += extent.second; + if (extent.second > buffer.allocation_size) { + buffer.allocation_size = extent.second; + } + } + bytes_dirtied = bytes_cached; +} + +template +void C_WriteRequest::setup_log_operations() { + for (auto &extent : this->image_extents) { + /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */ + auto operation = + std::make_shared>(*m_op_set, extent.first, extent.second); + m_op_set->operations.emplace_back(operation); + } +} + +template +void C_WriteRequest::schedule_append() { + // TODO: call rwl to complete it +} + +/** + * Attempts to allocate log resources for a write. Returns true if successful. + * + * Resources include 1 lane per extent, 1 log entry per extent, and the payload + * data space for each extent. + * + * Lanes are released after the write persists via release_write_lanes() + */ +template +bool C_WriteRequest::alloc_resources() +{ + bool alloc_succeeds = true; + bool no_space = false; + utime_t alloc_start = ceph_clock_now(); + uint64_t bytes_allocated = 0; + uint64_t bytes_cached = 0; + uint64_t bytes_dirtied = 0; + + ceph_assert(!rwl.m_lock.is_locked_by_me()); + ceph_assert(!resources.allocated); + resources.buffers.reserve(this->image_extents.size()); + { + std::lock_guard locker(rwl.m_lock); + if (rwl.m_free_lanes < this->image_extents.size()) { + this->m_waited_lanes = true; + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << "not enough free lanes (need " + << this->image_extents.size() + << ", have " << rwl.m_free_lanes << ") " + << *this << dendl; + } + alloc_succeeds = false; + /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ + } + if (rwl.m_free_log_entries < this->image_extents.size()) { + this->m_waited_entries = true; + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << "not enough free entries (need " + << this->image_extents.size() + << ", have " << rwl.m_free_log_entries << ") " + << *this << dendl; + } + alloc_succeeds = false; + no_space = true; /* Entries must be retired */ + } + /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ + if (rwl.m_bytes_allocated > rwl.m_bytes_allocated_cap) { + if (!this->m_waited_buffers) { + this->m_waited_buffers = true; + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" << rwl.m_bytes_allocated_cap + << ", allocated=" << rwl.m_bytes_allocated + << ") in write [" << *this << "]" << dendl; + } + } + alloc_succeeds = false; + no_space = true; /* Entries must be retired */ + } + } + if (alloc_succeeds) { + setup_buffer_resources(bytes_cached, bytes_dirtied); + } + + if (alloc_succeeds) { + for (auto &buffer : resources.buffers) { + bytes_allocated += buffer.allocation_size; + utime_t before_reserve = ceph_clock_now(); + buffer.buffer_oid = pmemobj_reserve(rwl.m_log_pool, + &buffer.buffer_alloc_action, + buffer.allocation_size, + 0 /* Object type */); + buffer.allocation_lat = ceph_clock_now() - before_reserve; + if (TOID_IS_NULL(buffer.buffer_oid)) { + if (!this->m_waited_buffers) { + this->m_waited_buffers = true; + } + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 5) << "can't allocate all data buffers: " + << pmemobj_errormsg() << ". " + << *this << dendl; + } + alloc_succeeds = false; + no_space = true; /* Entries need to be retired */ + break; + } else { + buffer.allocated = true; + } + if (RWL_VERBOSE_LOGGING) { + ldout(rwl.m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo + << "." << buffer.buffer_oid.oid.off + << ", size=" << buffer.allocation_size << dendl; + } + } + } + + if (alloc_succeeds) { + unsigned int num_extents = this->image_extents.size(); + std::lock_guard locker(rwl.m_lock); + /* We need one free log entry per extent (each is a separate entry), and + * one free "lane" for remote replication. */ + if ((rwl.m_free_lanes >= num_extents) && + (rwl.m_free_log_entries >= num_extents)) { + rwl.m_free_lanes -= num_extents; + rwl.m_free_log_entries -= num_extents; + rwl.m_unpublished_reserves += num_extents; + rwl.m_bytes_allocated += bytes_allocated; + rwl.m_bytes_cached += bytes_cached; + rwl.m_bytes_dirty += bytes_dirtied; + resources.allocated = true; + } else { + alloc_succeeds = false; + } + } + + if (!alloc_succeeds) { + /* On alloc failure, free any buffers we did allocate */ + for (auto &buffer : resources.buffers) { + if (buffer.allocated) { + pmemobj_cancel(rwl.m_log_pool, &buffer.buffer_alloc_action, 1); + } + } + resources.buffers.clear(); + if (no_space) { + /* Expedite flushing and/or retiring */ + std::lock_guard locker(rwl.m_lock); + rwl.m_alloc_failed_since_retire = true; + rwl.m_last_alloc_fail = ceph_clock_now(); + } + } + + this->m_allocated_time = alloc_start; + return alloc_succeeds; +} + +/** + * Takes custody of write_req. Resources must already be allocated. + * + * Locking: + * Acquires m_lock + */ +template +void C_WriteRequest::dispatch() +{ + CephContext *cct = rwl.m_image_ctx.cct; + GeneralWriteLogEntries log_entries; + DeferredContexts on_exit; + utime_t now = ceph_clock_now(); + auto write_req_sp = this; + this->m_dispatched_time = now; + + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 15) << "name: " << rwl.m_image_ctx.name << " id: " << rwl.m_image_ctx.id + << "write_req=" << this << " cell=" << this->get_cell() << dendl; + } + + { + uint64_t buffer_offset = 0; + std::lock_guard locker(rwl.m_lock); + Context *set_complete = this; + // TODO: Add sync point if necessary + // + m_op_set = + make_unique>(rwl, now, rwl.m_current_sync_point, rwl.m_persist_on_flush, + set_complete); + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() << dendl; + } + ceph_assert(resources.allocated); + /* m_op_set->operations initialized differently for plain write or write same */ + this->setup_log_operations(); + auto allocation = resources.buffers.begin(); + for (auto &gen_op : m_op_set->operations) { + /* A WS is also a write */ + auto operation = gen_op->get_write_op(); + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() + << " operation=" << operation << dendl; + } + log_entries.emplace_back(operation->log_entry); + rwl.m_perfcounter->inc(l_librbd_rwl_log_ops, 1); + + operation->log_entry->ram_entry.has_data = 1; + operation->log_entry->ram_entry.write_data = allocation->buffer_oid; + // TODO: make std::shared_ptr + operation->buffer_alloc = &(*allocation); + ceph_assert(!TOID_IS_NULL(operation->log_entry->ram_entry.write_data)); + operation->log_entry->pmem_buffer = D_RW(operation->log_entry->ram_entry.write_data); + operation->log_entry->ram_entry.sync_gen_number = rwl.m_current_sync_gen; + if (m_op_set->persist_on_flush) { + /* Persist on flush. Sequence #0 is never used. */ + operation->log_entry->ram_entry.write_sequence_number = 0; + } else { + /* Persist on write */ + operation->log_entry->ram_entry.write_sequence_number = ++rwl.m_last_op_sequence_num; + operation->log_entry->ram_entry.sequenced = 1; + } + operation->log_entry->ram_entry.sync_point = 0; + operation->log_entry->ram_entry.discard = 0; + operation->bl.substr_of(this->bl, buffer_offset, + operation->log_entry->write_bytes()); + buffer_offset += operation->log_entry->write_bytes(); + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << "operation=[" << *operation << "]" << dendl; + } + allocation++; + } + } + /* All extent ops subs created */ + m_op_set->extent_ops_appending->activate(); + m_op_set->extent_ops_persist->activate(); + + /* Write data */ + for (auto &operation : m_op_set->operations) { + /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */ + auto write_op = operation->get_write_op(); + ceph_assert(write_op != nullptr); + bufferlist::iterator i(&write_op->bl); + rwl.m_perfcounter->inc(l_librbd_rwl_log_op_bytes, write_op->log_entry->write_bytes()); + if (RWL_VERBOSE_LOGGING) { + ldout(cct, 20) << write_op->bl << dendl; + } + i.copy((unsigned)write_op->log_entry->write_bytes(), (char*)write_op->log_entry->pmem_buffer); + } + + // TODO: Add to log map for read + + /* + * Entries are added to m_log_entries in alloc_op_log_entries() when their + * order is established. They're added to m_dirty_log_entries when the write + * completes to all replicas. They must not be flushed before then. We don't + * prevent the application from reading these before they persist. If we + * supported coherent shared access, that might be a problem (the write could + * fail after another initiator had read it). As it is the cost of running + * reads through the block guard (and exempting them from the barrier, which + * doesn't need to apply to them) to prevent reading before the previous + * write of that data persists doesn't seem justified. + */ + + if (rwl.m_persist_on_flush_early_user_comp && + m_op_set->persist_on_flush) { + /* + * We're done with the caller's buffer, and not guaranteeing + * persistence until the next flush. The block guard for this + * write_req will not be released until the write is persisted + * everywhere, but the caller's request can complete now. + */ + this->complete_user_request(0); + } + + bool append_deferred = false; + { + std::lock_guard locker(rwl.m_lock); + if (!m_op_set->persist_on_flush && + m_op_set->sync_point->earlier_sync_point) { + /* In persist-on-write mode, we defer the append of this write until the + * previous sync point is appending (meaning all the writes before it are + * persisted and that previous sync point can now appear in the + * log). Since we insert sync points in persist-on-write mode when writes + * have already completed to the current sync point, this limits us to + * one inserted sync point in flight at a time, and gives the next + * inserted sync point some time to accumulate a few writes if they + * arrive soon. Without this we can insert an absurd number of sync + * points, each with one or two writes. That uses a lot of log entries, + * and limits flushing to very few writes at a time. */ + m_do_early_flush = false; + Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) { + write_req_sp->schedule_append(); + }); + m_op_set->sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx); + append_deferred = true; + } else { + /* The prior sync point is done, so we'll schedule append here. If this is + * persist-on-write, and probably still the caller's thread, we'll use this + * caller's thread to perform the persist & replication of the payload + * buffer. */ + m_do_early_flush = + !(this->detained || this->m_queued || this->m_deferred || m_op_set->persist_on_flush); + } + } + if (!append_deferred) { + this->schedule_append(); + } +} + +} // namespace rwl +} // namespace cache +} // namespace librbd diff --git a/src/librbd/cache/rwl/Request.h b/src/librbd/cache/rwl/Request.h new file mode 100644 index 00000000000..9556c4f4f57 --- /dev/null +++ b/src/librbd/cache/rwl/Request.h @@ -0,0 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_RWL_REQUEST_H +#define CEPH_LIBRBD_CACHE_RWL_REQUEST_H + +#include "include/Context.h" +#include "librbd/cache/ImageCache.h" +#include "librbd/cache/rwl/Types.h" +#include "librbd/cache/rwl/LogOperation.h" + +namespace librbd { +class BlockGuardCell; + +namespace cache { +namespace rwl { + +/** + * A request that can be deferred in a BlockGuard to sequence + * overlapping operations. + */ +template +class C_GuardedBlockIORequest : public Context { +public: + T &rwl; + C_GuardedBlockIORequest(T &rwl); + ~C_GuardedBlockIORequest(); + C_GuardedBlockIORequest(const C_GuardedBlockIORequest&) = delete; + C_GuardedBlockIORequest &operator=(const C_GuardedBlockIORequest&) = delete; + + virtual const char *get_name() const = 0; + void set_cell(BlockGuardCell *cell); + BlockGuardCell *get_cell(void); + void release_cell(); + +private: + std::atomic m_cell_released = {false}; + BlockGuardCell* m_cell = nullptr; +}; + +/** + * This is the custodian of the BlockGuard cell for this IO, and the + * state information about the progress of this IO. This object lives + * until the IO is persisted in all (live) log replicas. User request + * may be completed from here before the IO persists. + */ +template +class C_BlockIORequest : public C_GuardedBlockIORequest { +public: + using C_GuardedBlockIORequest::rwl; + + io::Extents image_extents; + bufferlist bl; + int fadvise_flags; + Context *user_req; /* User write request */ + ExtentsSummary image_extents_summary; + bool detained = false; /* Detained in blockguard (overlapped with a prior IO) */ + + C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents, + bufferlist&& bl, const int fadvise_flags, Context *user_req); + virtual ~C_BlockIORequest(); + + void complete_user_request(int r); + void finish(int r); + virtual void finish_req(int r) = 0; + + virtual bool alloc_resources() = 0; + + void deferred(); + + virtual void deferred_handler() = 0; + + virtual void dispatch() = 0; + + virtual const char *get_name() const override { + return "C_BlockIORequest"; + } + +protected: + utime_t m_arrived_time; + utime_t m_allocated_time; /* When allocation began */ + utime_t m_dispatched_time; /* When dispatch began */ + utime_t m_user_req_completed_time; + bool m_waited_lanes = false; /* This IO waited for free persist/replicate lanes */ + bool m_waited_entries = false; /* This IO waited for free log entries */ + bool m_waited_buffers = false; /* This IO waited for data buffers (pmemobj_reserve() failed) */ + std::atomic m_deferred = {false}; /* Deferred because this or a prior IO had to wait for write resources */ + +private: + std::atomic m_user_req_completed = {false}; + std::atomic m_finish_called = {false}; + + template + friend std::ostream &operator<<(std::ostream &os, + const C_BlockIORequest &req); +}; + +struct WriteRequestResources { + bool allocated = false; + std::vector buffers; +}; + +/** + * This is the custodian of the BlockGuard cell for this write. Block + * guard is not released until the write persists everywhere (this is + * how we guarantee to each log replica that they will never see + * overlapping writes). + */ +template +class C_WriteRequest : public C_BlockIORequest { +public: + using C_BlockIORequest::rwl; + WriteRequestResources resources; + + C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents, + bufferlist&& bl, const int fadvise_flags, Context *user_req); + + ~C_WriteRequest(); + + /* Common finish to plain write and compare-and-write (if it writes) */ + virtual void finish_req(int r); + + /* Compare and write will override this */ + virtual void update_req_stats(utime_t &now) { + // TODO: Add in later PRs + } + virtual bool alloc_resources() override; + + /* Plain writes will allocate one buffer per request extent */ + virtual void setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied); + + void deferred_handler() override { } + + void dispatch() override; + + virtual void setup_log_operations(); + + virtual void schedule_append(); + + const char *get_name() const override { + return "C_WriteRequest"; + } + +private: + unique_ptr> m_op_set = nullptr; + bool m_do_early_flush = false; + std::atomic m_appended = {0}; + bool m_queued = false; + + template + friend std::ostream &operator<<(std::ostream &os, + const C_WriteRequest &req); +}; + +} // namespace rwl +} // namespace cache +} // namespace librbd + +#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H diff --git a/src/librbd/cache/rwl/SyncPoint.cc b/src/librbd/cache/rwl/SyncPoint.cc index a135c6c3d5b..f486b934d0d 100644 --- a/src/librbd/cache/rwl/SyncPoint.cc +++ b/src/librbd/cache/rwl/SyncPoint.cc @@ -26,9 +26,9 @@ SyncPoint::SyncPoint(T &rwl, const uint64_t sync_gen_num) template SyncPoint::~SyncPoint() { - assert(on_sync_point_appending.empty()); - assert(on_sync_point_persisted.empty()); - assert(!earlier_sync_point); + ceph_assert(on_sync_point_appending.empty()); + ceph_assert(on_sync_point_persisted.empty()); + ceph_assert(!earlier_sync_point); } template diff --git a/src/librbd/cache/rwl/Types.cc b/src/librbd/cache/rwl/Types.cc index 8da93875e71..768b85b3fa3 100644 --- a/src/librbd/cache/rwl/Types.cc +++ b/src/librbd/cache/rwl/Types.cc @@ -92,6 +92,40 @@ std::ostream& operator<<(std::ostream& os, return os; }; +template +ExtentsSummary::ExtentsSummary(const ExtentsType &extents) { + total_bytes = 0; + first_image_byte = 0; + last_image_byte = 0; + if (extents.empty()) return; + /* These extents refer to image offsets between first_image_byte + * and last_image_byte, inclusive, but we don't guarantee here + * that they address all of those bytes. There may be gaps. */ + first_image_byte = extents.front().first; + last_image_byte = first_image_byte + extents.front().second; + for (auto &extent : extents) { + /* Ignore zero length extents */ + if (extent.second) { + total_bytes += extent.second; + if (extent.first < first_image_byte) { + first_image_byte = extent.first; + } + if ((extent.first + extent.second) > last_image_byte) { + last_image_byte = extent.first + extent.second; + } + } + } +} + +template +std::ostream &operator<<(std::ostream &os, + const ExtentsSummary &s) { + os << "total_bytes=" << s.total_bytes << ", " + << "first_image_byte=" << s.first_image_byte << ", " + << "last_image_byte=" << s.last_image_byte << ""; + return os; +}; + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index 972fd2ea724..ecf65545356 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -7,6 +7,7 @@ #include #include #include "librbd/BlockGuard.h" +#include "librbd/io/Types.h" class Context; @@ -136,6 +137,18 @@ enum { l_librbd_rwl_last, }; +const uint32_t MIN_WRITE_ALLOC_SIZE = 512; +const uint32_t LOG_STATS_INTERVAL_SECONDS = 5; + +/**** Write log entries ****/ + +const uint64_t DEFAULT_POOL_SIZE = 1u<<30; +const uint64_t MIN_POOL_SIZE = DEFAULT_POOL_SIZE; +constexpr double USABLE_SIZE = (7.0 / 10); +const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16; +const uint8_t RWL_POOL_VERSION = 1; +const uint64_t MAX_LOG_ENTRIES = (1024 * 1024); + namespace librbd { namespace cache { namespace rwl { @@ -220,6 +233,29 @@ struct WriteBufferAllocation { utime_t allocation_lat; }; +template +class ExtentsSummary { +public: + uint64_t total_bytes; + uint64_t first_image_byte; + uint64_t last_image_byte; + ExtentsSummary(const ExtentsType &extents); + template + friend std::ostream &operator<<(std::ostream &os, + const ExtentsSummary &s); + const BlockExtent block_extent() { + return BlockExtent(first_image_byte, last_image_byte); + } + const io::Extent image_extent(const BlockExtent& block_extent) + { + return io::Extent(block_extent.block_start, + block_extent.block_end - block_extent.block_start + 1); + } + const io::Extent image_extent() { + return image_extent(block_extent()); + } +}; + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h index 53c8f4d74f7..1742d70fb42 100644 --- a/src/librbd/io/Types.h +++ b/src/librbd/io/Types.h @@ -81,7 +81,9 @@ using striper::LightweightBufferExtents; using striper::LightweightObjectExtent; using striper::LightweightObjectExtents; -typedef std::vector > Extents; +typedef std::pair Extent; +typedef std::vector Extents; + typedef std::map ExtentMap; } // namespace io -- 2.39.5