From 555e1011106d4e8174a2740f3cee5b5fec25e16e Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Mon, 17 Feb 2020 15:43:27 +0800 Subject: [PATCH] librbd: flush dirty entries to osd Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/cache/ImageWriteback.h | 26 ++- src/librbd/cache/ReplicatedWriteLog.cc | 230 ++++++++++++++++++++++++- src/librbd/cache/ReplicatedWriteLog.h | 19 ++ src/librbd/cache/rwl/LogEntry.cc | 23 ++- src/librbd/cache/rwl/LogEntry.h | 24 ++- src/librbd/cache/rwl/Types.h | 3 + 6 files changed, 312 insertions(+), 13 deletions(-) diff --git a/src/librbd/cache/ImageWriteback.h b/src/librbd/cache/ImageWriteback.h index 382c57c1d390d..bbcc85c8ae543 100644 --- a/src/librbd/cache/ImageWriteback.h +++ b/src/librbd/cache/ImageWriteback.h @@ -16,13 +16,35 @@ struct ImageCtx; namespace cache { +class ImageWritebackInterface { +public: + typedef std::vector > Extents; + virtual ~ImageWritebackInterface() { + } + virtual void aio_read(Extents &&image_extents, ceph::bufferlist *bl, + int fadvise_flags, Context *on_finish) = 0; + virtual void aio_write(Extents &&image_extents, ceph::bufferlist&& bl, + int fadvise_flags, Context *on_finish) = 0; + virtual void aio_discard(uint64_t offset, uint64_t length, + uint32_t discard_granularity_bytes, Context *on_finish) = 0; + virtual void aio_flush(Context *on_finish) = 0 ; + virtual void aio_writesame(uint64_t offset, uint64_t length, + ceph::bufferlist&& bl, + int fadvise_flags, Context *on_finish) = 0; + virtual void aio_compare_and_write(Extents &&image_extents, + ceph::bufferlist&& cmp_bl, + ceph::bufferlist&& bl, + uint64_t *mismatch_offset, + int fadvise_flags, Context *on_finish) = 0; +}; + /** * client-side, image extent cache writeback handler */ template -class ImageWriteback { +class ImageWriteback : public ImageWritebackInterface { public: - typedef std::vector > Extents; + using ImageWritebackInterface::Extents; explicit ImageWriteback(ImageCtxT &image_ctx); diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index b43c073e88a0a..ca0fd2751f3de 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -42,6 +42,7 @@ ReplicatedWriteLog::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::Imag m_image_ctx(image_ctx), m_log_pool_config_size(DEFAULT_POOL_SIZE), m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct), + m_entry_reader_lock("librbd::cache::ReplicatedWriteLog::m_entry_reader_lock"), m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))), m_log_append_lock(ceph::make_mutex(util::unique_lock_name( @@ -546,11 +547,6 @@ void ReplicatedWriteLog::aio_compare_and_write(Extents &&image_extents, Context *on_finish) { } -template -void ReplicatedWriteLog::wake_up() { - //TODO: handle the task to flush data from cache device to OSD -} - template void ReplicatedWriteLog::flush(Context *on_finish) { } @@ -1352,6 +1348,215 @@ C_FlushRequest>* ReplicatedWriteLog::make_flush_req(Con return flush_req; } +template +void ReplicatedWriteLog::wake_up() { + CephContext *cct = m_image_ctx.cct; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (!m_wake_up_enabled) { + // wake_up is disabled during shutdown after flushing completes + ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl; + return; + } + + if (m_wake_up_requested && m_wake_up_scheduled) { + return; + } + + ldout(cct, 20) << dendl; + + /* Wake-up can be requested while it's already scheduled */ + m_wake_up_requested = true; + + /* Wake-up cannot be scheduled if it's already scheduled */ + if (m_wake_up_scheduled) { + return; + } + m_wake_up_scheduled = true; + m_async_process_work++; + m_async_op_tracker.start_op(); + m_work_queue.queue(new LambdaContext( + [this](int r) { + process_work(); + m_async_op_tracker.finish_op(); + m_async_process_work--; + }), 0); +} + +template +void ReplicatedWriteLog::process_work() { + // TODO: handle retiring entries in later PRs + CephContext *cct = m_image_ctx.cct; + int max_iterations = 4; + bool wake_up_requested = false; + + ldout(cct, 20) << dendl; + + do { + { + std::lock_guard locker(m_lock); + m_wake_up_requested = false; + } + // TODO: retire entries if fulfill conditions + dispatch_deferred_writes(); + process_writeback_dirty_entries(); + + { + std::lock_guard locker(m_lock); + wake_up_requested = m_wake_up_requested; + } + } while (wake_up_requested && --max_iterations > 0); + + { + std::lock_guard locker(m_lock); + m_wake_up_scheduled = false; + /* Reschedule if it's still requested */ + if (m_wake_up_requested) { + wake_up(); + } + } +} + +template +bool ReplicatedWriteLog::can_flush_entry(std::shared_ptr log_entry) { + CephContext *cct = m_image_ctx.cct; + + ldout(cct, 20) << "" << dendl; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + //TODO handle invalidate + + /* For OWB we can flush entries with the same sync gen number (write between + * aio_flush() calls) concurrently. Here we'll consider an entry flushable if + * its sync gen number is <= the lowest sync gen number carried by all the + * entries currently flushing. + * + * If the entry considered here bears a sync gen number lower than a + * previously flushed entry, the application had to have submitted the write + * bearing the higher gen number before the write with the lower gen number + * completed. So, flushing these concurrently is OK. + * + * If the entry considered here bears a sync gen number higher than a + * currently flushing entry, the write with the lower gen number may have + * completed to the application before the write with the higher sync gen + * number was submitted, and the application may rely on that completion + * order for volume consistency. In this case the entry will not be + * considered flushable until all the entries bearing lower sync gen numbers + * finish flushing. + */ + + if (m_flush_ops_in_flight && + (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) { + return false; + } + + return (log_entry->can_writeback() && + (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) && + (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT)); +} + +template +Context* ReplicatedWriteLog::construct_flush_entry_ctx(std::shared_ptr log_entry) { + //TODO handle writesame, invalidate and discard in later PRs + CephContext *cct = m_image_ctx.cct; + + ldout(cct, 20) << "" << dendl; + ceph_assert(m_entry_reader_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (!m_flush_ops_in_flight || + (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { + m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number; + } + m_flush_ops_in_flight += 1; + /* For write same this is the bytes affected bt the flush op, not the bytes transferred */ + m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes; + + /* Flush write completion action */ + Context *ctx = new LambdaContext( + [this, log_entry](int r) { + { + std::lock_guard locker(m_lock); + if (r < 0) { + lderr(m_image_ctx.cct) << "failed to flush log entry" + << cpp_strerror(r) << dendl; + m_dirty_log_entries.push_front(log_entry); + } else { + ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty()); + m_bytes_dirty -= log_entry->bytes_dirty(); + sync_point_writer_flushed(log_entry->get_sync_point_entry()); + ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry + << dendl; + } + m_flush_ops_in_flight -= 1; + m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes; + wake_up(); + } + }); + /* Flush through lower cache before completing */ + ctx = new LambdaContext( + [this, ctx](int r) { + if (r < 0) { + lderr(m_image_ctx.cct) << "failed to flush log entry" + << cpp_strerror(r) << dendl; + ctx->complete(r); + } else { + m_image_writeback.aio_flush(ctx); + } + }); + + return new LambdaContext( + [this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(m_image_writeback, ctx); + }), 0); + }); +} + +template +void ReplicatedWriteLog::process_writeback_dirty_entries() { + CephContext *cct = m_image_ctx.cct; + bool all_clean = false; + int flushed = 0; + + ldout(cct, 20) << "Look for dirty entries" << dendl; + { + DeferredContexts post_unlock; + std::shared_lock entry_reader_locker(m_entry_reader_lock); + while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) { + std::lock_guard locker(m_lock); + if (m_shutting_down) { + ldout(cct, 5) << "Flush during shutdown supressed" << dendl; + /* Do flush complete only when all flush ops are finished */ + all_clean = !m_flush_ops_in_flight; + break; + } + if (m_dirty_log_entries.empty()) { + ldout(cct, 20) << "Nothing new to flush" << dendl; + /* Do flush complete only when all flush ops are finished */ + all_clean = !m_flush_ops_in_flight; + break; + } + auto candidate = m_dirty_log_entries.front(); + bool flushable = can_flush_entry(candidate); + if (flushable) { + post_unlock.add(construct_flush_entry_ctx(candidate)); + flushed++; + m_dirty_log_entries.pop_front(); + } else { + ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; + break; + } + } + } + + if (all_clean) { + // TODO: all flusing complete + } +} + /* Update/persist the last flushed sync point in the log */ template void ReplicatedWriteLog::persist_last_flushed_sync_gen() @@ -1417,6 +1622,21 @@ bool ReplicatedWriteLog::handle_flushed_sync_point(std::shared_ptr +void ReplicatedWriteLog::sync_point_writer_flushed(std::shared_ptr log_entry) +{ + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(log_entry); + log_entry->writes_flushed++; + + /* If this entry might be completely flushed, look closer */ + if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) { + ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point=" + << *log_entry << dendl; + handle_flushed_sync_point(log_entry); + } +} + /* Make a new sync point and flush the previous during initialization, when there may or may * not be a previous sync point */ template diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index 30e53bc6afd08..482d2f9af5183 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -35,6 +35,7 @@ class GenericLogEntry; typedef std::list> WriteLogEntries; typedef std::list> GenericLogEntries; +typedef std::list> GenericWriteLogEntries; /**** Write log entries end ****/ @@ -135,6 +136,7 @@ private: librbd::cache::rwl::ImageCacheState* m_cache_state = nullptr; std::atomic m_initialized = {false}; + std::atomic m_shutting_down = {false}; PMEMobjpool *m_log_pool = nullptr; const char* m_rwl_pool_layout_name; @@ -183,9 +185,14 @@ private: std::atomic m_async_append_ops = {0}; std::atomic m_async_complete_ops = {0}; std::atomic m_async_null_flush_finish = {0}; + std::atomic m_async_process_work = {0}; /* Acquire locks in order declared here */ + /* Hold a read lock on m_entry_reader_lock to add readers to log entry + * bufs. Hold a write lock to prevent readers from being added (e.g. when + * removing log entrys from the map). No lock required to remove readers. */ + mutable RWLock m_entry_reader_lock; /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */ mutable ceph::mutex m_deferred_dispatch_lock; /* Hold m_log_append_lock while appending or retiring log entries. */ @@ -201,6 +208,9 @@ private: bool m_barrier_in_progress = false; BlockGuardCell *m_barrier_cell = nullptr; + bool m_wake_up_requested = false; + bool m_wake_up_scheduled = false; + bool m_wake_up_enabled = true; bool m_appending = false; bool m_dispatching_deferred_ops = false; @@ -216,6 +226,10 @@ private: std::shared_ptr m_current_sync_point = nullptr; bool m_persist_on_flush = false; /* If false, persist each write before completion */ + int m_flush_ops_in_flight = 0; + int m_flush_bytes_in_flight = 0; + uint64_t m_lowest_flushing_sync_gen = 0; + /* Writes that have left the block guard, but are waiting for resources */ C_BlockIORequests m_deferred_ios; /* Throttle writes concurrently allocating & replicating */ @@ -240,9 +254,14 @@ private: void rwl_init(Context *on_finish, rwl::DeferredContexts &later); void update_image_cache_state(Context *on_finish); void wake_up(); + void process_work(); + bool can_flush_entry(const std::shared_ptr log_entry); + Context *construct_flush_entry_ctx(const std::shared_ptr log_entry); void persist_last_flushed_sync_gen(); bool handle_flushed_sync_point(std::shared_ptr log_entry); + void sync_point_writer_flushed(std::shared_ptr log_entry); + void process_writeback_dirty_entries(); void init_flush_new_sync_point(rwl::DeferredContexts &later); void new_sync_point(rwl::DeferredContexts &later); diff --git a/src/librbd/cache/rwl/LogEntry.cc b/src/librbd/cache/rwl/LogEntry.cc index 16858b5053764..9f72df7d77e62 100644 --- a/src/librbd/cache/rwl/LogEntry.cc +++ b/src/librbd/cache/rwl/LogEntry.cc @@ -3,6 +3,7 @@ #include #include "LogEntry.h" +#include "librbd/cache/ImageWriteback.h" #define dout_subsys ceph_subsys_rbd_rwl #undef dout_prefix @@ -46,6 +47,13 @@ std::ostream &operator<<(std::ostream &os, return entry.format(os); } +bool GenericWriteLogEntry::can_writeback() { + return (this->completed && + (ram_entry.sequenced || + (sync_point_entry && + sync_point_entry->completed))); +} + std::ostream& GenericWriteLogEntry::format(std::ostream &os) const { GenericLogEntry::format(os); os << ", " @@ -56,9 +64,7 @@ std::ostream& GenericWriteLogEntry::format(std::ostream &os) const { os << "nullptr"; } os << "], " - << "referring_map_entries=" << referring_map_entries << ", " - << "flushing=" << flushing << ", " - << "flushed=" << flushed; + << "referring_map_entries=" << referring_map_entries; return os; }; @@ -130,6 +136,17 @@ void WriteLogEntry::copy_pmem_bl(bufferlist *out_bl) { this->init_bl(cloned_bp, *out_bl); } +void WriteLogEntry::writeback(librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx) { + /* Pass a copy of the pmem buffer to ImageWriteback (which may hang on to the bl even after flush()). */ + bufferlist entry_bl; + buffer::list entry_bl_copy; + copy_pmem_bl(&entry_bl_copy); + entry_bl_copy.begin(0).copy(write_bytes(), entry_bl); + image_writeback.aio_write({{ram_entry.image_offset_bytes, ram_entry.write_bytes}}, + std::move(entry_bl), 0, ctx); +} + std::ostream& WriteLogEntry::format(std::ostream &os) const { os << "(Write) "; GenericWriteLogEntry::format(os); diff --git a/src/librbd/cache/rwl/LogEntry.h b/src/librbd/cache/rwl/LogEntry.h index b26fddb366f41..471102f6c5e83 100644 --- a/src/librbd/cache/rwl/LogEntry.h +++ b/src/librbd/cache/rwl/LogEntry.h @@ -12,6 +12,7 @@ namespace librbd { namespace cache { +class ImageWritebackInterface; namespace rwl { class SyncPointLogEntry; @@ -30,6 +31,19 @@ public: virtual ~GenericLogEntry() { }; GenericLogEntry(const GenericLogEntry&) = delete; GenericLogEntry &operator=(const GenericLogEntry&) = delete; + virtual bool can_writeback() { + return false; + } + virtual inline unsigned int bytes_dirty() { + return 0; + }; + virtual std::shared_ptr get_sync_point_entry() { + return nullptr; + } + virtual void writeback(librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx) { + ceph_assert(false); + }; virtual std::ostream& format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const GenericLogEntry &entry); @@ -63,8 +77,6 @@ public: class GenericWriteLogEntry : public GenericLogEntry { public: uint32_t referring_map_entries = 0; - bool flushing = false; - bool flushed = false; /* or invalidated */ std::shared_ptr sync_point_entry; GenericWriteLogEntry(std::shared_ptr sync_point_entry, const uint64_t image_offset_bytes, const uint64_t write_bytes) @@ -78,7 +90,7 @@ public: /* The valid bytes in this ops data buffer. Discard and WS override. */ return ram_entry.write_bytes; }; - virtual inline unsigned int bytes_dirty() { + inline unsigned int bytes_dirty() override { /* The bytes in the image this op makes dirty. Discard and WS override. */ return write_bytes(); }; @@ -90,6 +102,10 @@ public: } void inc_map_ref() { referring_map_entries++; } void dec_map_ref() { referring_map_entries--; } + bool can_writeback() override; + std::shared_ptr get_sync_point_entry() override { + return sync_point_entry; + } std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const GenericWriteLogEntry &entry); @@ -136,6 +152,8 @@ public: buffer::list &get_pmem_bl(); /* Constructs a new bl containing copies of pmem_bp */ void copy_pmem_bl(bufferlist *out_bl); + void writeback(librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx) override; std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const WriteLogEntry &entry); diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index d5de52a4c9ef6..bd2ece22e2187 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -141,6 +141,9 @@ namespace librbd { namespace cache { namespace rwl { +const int IN_FLIGHT_FLUSH_WRITE_LIMIT = 64; +const int IN_FLIGHT_FLUSH_BYTES_LIMIT = (1 * 1024 * 1024); + /* Limit work between sync points */ const uint64_t MAX_WRITES_PER_SYNC_POINT = 256; const uint64_t MAX_BYTES_PER_SYNC_POINT = (1024 * 1024 * 8); -- 2.39.5