m_image_ctx(image_ctx),
m_log_pool_config_size(DEFAULT_POOL_SIZE),
m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+ m_entry_reader_lock("librbd::cache::ReplicatedWriteLog::m_entry_reader_lock"),
m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))),
m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
Context *on_finish) {
}
-template <typename I>
-void ReplicatedWriteLog<I>::wake_up() {
- //TODO: handle the task to flush data from cache device to OSD
-}
-
template <typename I>
void ReplicatedWriteLog<I>::flush(Context *on_finish) {
}
return flush_req;
}
+template <typename I>
+void ReplicatedWriteLog<I>::wake_up() {
+ CephContext *cct = m_image_ctx.cct;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (!m_wake_up_enabled) {
+ // wake_up is disabled during shutdown after flushing completes
+ ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
+ return;
+ }
+
+ if (m_wake_up_requested && m_wake_up_scheduled) {
+ return;
+ }
+
+ ldout(cct, 20) << dendl;
+
+ /* Wake-up can be requested while it's already scheduled */
+ m_wake_up_requested = true;
+
+ /* Wake-up cannot be scheduled if it's already scheduled */
+ if (m_wake_up_scheduled) {
+ return;
+ }
+ m_wake_up_scheduled = true;
+ m_async_process_work++;
+ m_async_op_tracker.start_op();
+ m_work_queue.queue(new LambdaContext(
+ [this](int r) {
+ process_work();
+ m_async_op_tracker.finish_op();
+ m_async_process_work--;
+ }), 0);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::process_work() {
+ // TODO: handle retiring entries in later PRs
+ CephContext *cct = m_image_ctx.cct;
+ int max_iterations = 4;
+ bool wake_up_requested = false;
+
+ ldout(cct, 20) << dendl;
+
+ do {
+ {
+ std::lock_guard locker(m_lock);
+ m_wake_up_requested = false;
+ }
+ // TODO: retire entries if fulfill conditions
+ dispatch_deferred_writes();
+ process_writeback_dirty_entries();
+
+ {
+ std::lock_guard locker(m_lock);
+ wake_up_requested = m_wake_up_requested;
+ }
+ } while (wake_up_requested && --max_iterations > 0);
+
+ {
+ std::lock_guard locker(m_lock);
+ m_wake_up_scheduled = false;
+ /* Reschedule if it's still requested */
+ if (m_wake_up_requested) {
+ wake_up();
+ }
+ }
+}
+
+template <typename I>
+bool ReplicatedWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << "" << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ //TODO handle invalidate
+
+ /* For OWB we can flush entries with the same sync gen number (write between
+ * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
+ * its sync gen number is <= the lowest sync gen number carried by all the
+ * entries currently flushing.
+ *
+ * If the entry considered here bears a sync gen number lower than a
+ * previously flushed entry, the application had to have submitted the write
+ * bearing the higher gen number before the write with the lower gen number
+ * completed. So, flushing these concurrently is OK.
+ *
+ * If the entry considered here bears a sync gen number higher than a
+ * currently flushing entry, the write with the lower gen number may have
+ * completed to the application before the write with the higher sync gen
+ * number was submitted, and the application may rely on that completion
+ * order for volume consistency. In this case the entry will not be
+ * considered flushable until all the entries bearing lower sync gen numbers
+ * finish flushing.
+ */
+
+ if (m_flush_ops_in_flight &&
+ (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
+ return false;
+ }
+
+ return (log_entry->can_writeback() &&
+ (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
+ (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
+}
+
+template <typename I>
+Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericLogEntry> log_entry) {
+ //TODO handle writesame, invalidate and discard in later PRs
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << "" << dendl;
+ ceph_assert(m_entry_reader_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (!m_flush_ops_in_flight ||
+ (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+ m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
+ }
+ m_flush_ops_in_flight += 1;
+ /* For write same this is the bytes affected bt the flush op, not the bytes transferred */
+ m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
+
+ /* Flush write completion action */
+ Context *ctx = new LambdaContext(
+ [this, log_entry](int r) {
+ {
+ std::lock_guard locker(m_lock);
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << "failed to flush log entry"
+ << cpp_strerror(r) << dendl;
+ m_dirty_log_entries.push_front(log_entry);
+ } else {
+ ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
+ m_bytes_dirty -= log_entry->bytes_dirty();
+ sync_point_writer_flushed(log_entry->get_sync_point_entry());
+ ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
+ << dendl;
+ }
+ m_flush_ops_in_flight -= 1;
+ m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
+ wake_up();
+ }
+ });
+ /* Flush through lower cache before completing */
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << "failed to flush log entry"
+ << cpp_strerror(r) << dendl;
+ ctx->complete(r);
+ } else {
+ m_image_writeback.aio_flush(ctx);
+ }
+ });
+
+ return new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(m_image_writeback, ctx);
+ }), 0);
+ });
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::process_writeback_dirty_entries() {
+ CephContext *cct = m_image_ctx.cct;
+ bool all_clean = false;
+ int flushed = 0;
+
+ ldout(cct, 20) << "Look for dirty entries" << dendl;
+ {
+ DeferredContexts post_unlock;
+ std::shared_lock entry_reader_locker(m_entry_reader_lock);
+ while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
+ std::lock_guard locker(m_lock);
+ if (m_shutting_down) {
+ ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
+ /* Do flush complete only when all flush ops are finished */
+ all_clean = !m_flush_ops_in_flight;
+ break;
+ }
+ if (m_dirty_log_entries.empty()) {
+ ldout(cct, 20) << "Nothing new to flush" << dendl;
+ /* Do flush complete only when all flush ops are finished */
+ all_clean = !m_flush_ops_in_flight;
+ break;
+ }
+ auto candidate = m_dirty_log_entries.front();
+ bool flushable = can_flush_entry(candidate);
+ if (flushable) {
+ post_unlock.add(construct_flush_entry_ctx(candidate));
+ flushed++;
+ m_dirty_log_entries.pop_front();
+ } else {
+ ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
+ break;
+ }
+ }
+ }
+
+ if (all_clean) {
+ // TODO: all flusing complete
+ }
+}
+
/* Update/persist the last flushed sync point in the log */
template <typename I>
void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
return false;
}
+template <typename I>
+void ReplicatedWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(log_entry);
+ log_entry->writes_flushed++;
+
+ /* If this entry might be completely flushed, look closer */
+ if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
+ ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
+ << *log_entry << dendl;
+ handle_flushed_sync_point(log_entry);
+ }
+}
+
/* Make a new sync point and flush the previous during initialization, when there may or may
* not be a previous sync point */
template <typename I>
typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
+typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
/**** Write log entries end ****/
librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
std::atomic<bool> m_initialized = {false};
+ std::atomic<bool> m_shutting_down = {false};
PMEMobjpool *m_log_pool = nullptr;
const char* m_rwl_pool_layout_name;
std::atomic<int> m_async_append_ops = {0};
std::atomic<int> m_async_complete_ops = {0};
std::atomic<int> m_async_null_flush_finish = {0};
+ std::atomic<int> m_async_process_work = {0};
/* Acquire locks in order declared here */
+ /* Hold a read lock on m_entry_reader_lock to add readers to log entry
+ * bufs. Hold a write lock to prevent readers from being added (e.g. when
+ * removing log entrys from the map). No lock required to remove readers. */
+ mutable RWLock m_entry_reader_lock;
/* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
mutable ceph::mutex m_deferred_dispatch_lock;
/* Hold m_log_append_lock while appending or retiring log entries. */
bool m_barrier_in_progress = false;
BlockGuardCell *m_barrier_cell = nullptr;
+ bool m_wake_up_requested = false;
+ bool m_wake_up_scheduled = false;
+ bool m_wake_up_enabled = true;
bool m_appending = false;
bool m_dispatching_deferred_ops = false;
std::shared_ptr<rwl::SyncPoint> m_current_sync_point = nullptr;
bool m_persist_on_flush = false; /* If false, persist each write before completion */
+ int m_flush_ops_in_flight = 0;
+ int m_flush_bytes_in_flight = 0;
+ uint64_t m_lowest_flushing_sync_gen = 0;
+
/* Writes that have left the block guard, but are waiting for resources */
C_BlockIORequests m_deferred_ios;
/* Throttle writes concurrently allocating & replicating */
void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
void update_image_cache_state(Context *on_finish);
void wake_up();
+ void process_work();
+ bool can_flush_entry(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
+ Context *construct_flush_entry_ctx(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
void persist_last_flushed_sync_gen();
bool handle_flushed_sync_point(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
+ void sync_point_writer_flushed(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
+ void process_writeback_dirty_entries();
void init_flush_new_sync_point(rwl::DeferredContexts &later);
void new_sync_point(rwl::DeferredContexts &later);
namespace librbd {
namespace cache {
+class ImageWritebackInterface;
namespace rwl {
class SyncPointLogEntry;
virtual ~GenericLogEntry() { };
GenericLogEntry(const GenericLogEntry&) = delete;
GenericLogEntry &operator=(const GenericLogEntry&) = delete;
+ virtual bool can_writeback() {
+ return false;
+ }
+ virtual inline unsigned int bytes_dirty() {
+ return 0;
+ };
+ virtual std::shared_ptr<SyncPointLogEntry> get_sync_point_entry() {
+ return nullptr;
+ }
+ virtual void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx) {
+ ceph_assert(false);
+ };
virtual std::ostream& format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const GenericLogEntry &entry);
class GenericWriteLogEntry : public GenericLogEntry {
public:
uint32_t referring_map_entries = 0;
- bool flushing = false;
- bool flushed = false; /* or invalidated */
std::shared_ptr<SyncPointLogEntry> sync_point_entry;
GenericWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
const uint64_t image_offset_bytes, const uint64_t write_bytes)
/* The valid bytes in this ops data buffer. Discard and WS override. */
return ram_entry.write_bytes;
};
- virtual inline unsigned int bytes_dirty() {
+ inline unsigned int bytes_dirty() override {
/* The bytes in the image this op makes dirty. Discard and WS override. */
return write_bytes();
};
}
void inc_map_ref() { referring_map_entries++; }
void dec_map_ref() { referring_map_entries--; }
+ bool can_writeback() override;
+ std::shared_ptr<SyncPointLogEntry> get_sync_point_entry() override {
+ return sync_point_entry;
+ }
std::ostream &format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const GenericWriteLogEntry &entry);
buffer::list &get_pmem_bl();
/* Constructs a new bl containing copies of pmem_bp */
void copy_pmem_bl(bufferlist *out_bl);
+ void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx) override;
std::ostream &format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const WriteLogEntry &entry);