m_image_ctx(image_ctx),
m_log_pool_config_size(DEFAULT_POOL_SIZE),
m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+ m_log_retire_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::ReplicatedWriteLog::m_log_retire_lock", this))),
m_entry_reader_lock("librbd::cache::ReplicatedWriteLog::m_entry_reader_lock"),
m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))),
std::lock_guard locker(m_lock);
m_timer->cancel_event(m_timer_ctx);
m_thread_pool.stop();
+ ceph_assert(m_deferred_ios.size() == 0);
+ ceph_assert(m_ops_to_flush.size() == 0);
+ ceph_assert(m_ops_to_append.size() == 0);
+ ceph_assert(m_flush_ops_in_flight == 0);
+
m_log_pool = nullptr;
delete m_cache_state;
m_cache_state = nullptr;
template <typename I>
void ReplicatedWriteLog<I>::process_work() {
- // TODO: handle retiring entries in later PRs
CephContext *cct = m_image_ctx.cct;
int max_iterations = 4;
bool wake_up_requested = false;
+ uint64_t aggressive_high_water_bytes = m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_bytes = m_bytes_allocated_cap * RETIRE_HIGH_WATER;
+ uint64_t low_water_bytes = m_bytes_allocated_cap * RETIRE_LOW_WATER;
+ uint64_t aggressive_high_water_entries = m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_entries = m_total_log_entries * RETIRE_HIGH_WATER;
+ uint64_t low_water_entries = m_total_log_entries * RETIRE_LOW_WATER;
ldout(cct, 20) << dendl;
std::lock_guard locker(m_lock);
m_wake_up_requested = false;
}
- // TODO: retire entries if fulfill conditions
+ if (m_alloc_failed_since_retire ||
+ m_bytes_allocated > high_water_bytes ||
+ (m_log_entries.size() > high_water_entries)) {
+ int retired = 0;
+ utime_t started = ceph_clock_now();
+ ldout(m_image_ctx.cct, 10) << "alloc_fail=" << m_alloc_failed_since_retire
+ << ", allocated > high_water="
+ << (m_bytes_allocated > high_water_bytes)
+ << ", allocated_entries > high_water="
+ << (m_log_entries.size() > high_water_entries)
+ << dendl;
+ while (m_alloc_failed_since_retire ||
+ (m_bytes_allocated > high_water_bytes) ||
+ (m_log_entries.size() > high_water_entries) ||
+ (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) &&
+ (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
+ if (!retire_entries((m_shutting_down ||
+ (m_bytes_allocated > aggressive_high_water_bytes) ||
+ (m_log_entries.size() > aggressive_high_water_entries))
+ ? MAX_ALLOC_PER_TRANSACTION
+ : MAX_FREE_PER_TRANSACTION)) {
+ break;
+ }
+ retired++;
+ dispatch_deferred_writes();
+ process_writeback_dirty_entries();
+ }
+ ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
+ }
dispatch_deferred_writes();
process_writeback_dirty_entries();
m_dirty_log_entries.push_front(log_entry);
} else {
ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
+ log_entry->set_flushed(true);
m_bytes_dirty -= log_entry->bytes_dirty();
sync_point_writer_flushed(log_entry->get_sync_point_entry());
ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
m_blocks_to_log_entries.add_log_entries(log_entries);
}
+template <typename I>
+bool ReplicatedWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ return log_entry->can_retire();
+}
+
+/**
+ * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
+ * that are eligible to be retired. Returns true if anything was
+ * retired.
+ */
+template <typename I>
+bool ReplicatedWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogEntriesVector retiring_entries;
+ uint32_t initial_first_valid_entry;
+ uint32_t first_valid_entry;
+
+ std::lock_guard retire_locker(m_log_retire_lock);
+ ldout(cct, 20) << "Look for entries to retire" << dendl;
+ {
+ /* Entry readers can't be added while we hold m_entry_reader_lock */
+ RWLock::WLocker entry_reader_locker(m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
+ initial_first_valid_entry = m_first_valid_entry;
+ first_valid_entry = m_first_valid_entry;
+ auto entry = m_log_entries.front();
+ while (!m_log_entries.empty() &&
+ retiring_entries.size() < frees_per_tx &&
+ can_retire_entry(entry)) {
+ if (entry->log_entry_index != first_valid_entry) {
+ lderr(cct) << "Retiring entry index (" << entry->log_entry_index
+ << ") and first valid log entry index (" << first_valid_entry
+ << ") must be ==." << dendl;
+ }
+ ceph_assert(entry->log_entry_index == first_valid_entry);
+ first_valid_entry = (first_valid_entry + 1) % m_total_log_entries;
+ m_log_entries.pop_front();
+ retiring_entries.push_back(entry);
+ /* Remove entry from map so there will be no more readers */
+ if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
+ auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
+ if (gen_write_entry) {
+ m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
+ }
+ }
+ entry = m_log_entries.front();
+ }
+ }
+
+ if (retiring_entries.size()) {
+ ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+ utime_t tx_start;
+ utime_t tx_end;
+ /* Advance first valid entry and release buffers */
+ {
+ uint64_t flushed_sync_gen;
+ std::lock_guard append_locker(m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = m_flushed_sync_gen;
+ }
+
+ tx_start = ceph_clock_now();
+ TX_BEGIN(m_log_pool) {
+ if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+ ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
+ << D_RO(pool_root)->flushed_sync_gen << " to "
+ << flushed_sync_gen << dendl;
+ D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+ }
+ D_RW(pool_root)->first_valid_entry = first_valid_entry;
+ for (auto &entry: retiring_entries) {
+ if (entry->write_bytes()) {
+ ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
+ << "." << entry->ram_entry.write_data.oid.off << dendl;
+ TX_FREE(entry->ram_entry.write_data);
+ } else {
+ ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
+ }
+ }
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(cct) << "failed to commit free of" << retiring_entries.size() << " log entries (" << m_log_pool_name << ")" << dendl;
+ ceph_assert(false);
+ } TX_FINALLY {
+ } TX_END;
+ tx_end = ceph_clock_now();
+ }
+ m_perfcounter->tinc(l_librbd_rwl_retire_tx_t, tx_end - tx_start);
+ m_perfcounter->hinc(l_librbd_rwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), retiring_entries.size());
+
+ /* Update runtime copy of first_valid, and free entries counts */
+ {
+ std::lock_guard locker(m_lock);
+
+ ceph_assert(m_first_valid_entry == initial_first_valid_entry);
+ m_first_valid_entry = first_valid_entry;
+ m_free_log_entries += retiring_entries.size();
+ for (auto &entry: retiring_entries) {
+ if (entry->write_bytes()) {
+ ceph_assert(m_bytes_cached >= entry->write_bytes());
+ m_bytes_cached -= entry->write_bytes();
+ uint64_t entry_allocation_size = entry->write_bytes();
+ if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
+ entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
+ }
+ ceph_assert(m_bytes_allocated >= entry_allocation_size);
+ m_bytes_allocated -= entry_allocation_size;
+ }
+ }
+ m_alloc_failed_since_retire = false;
+ wake_up();
+ }
+ } else {
+ ldout(cct, 20) << "Nothing to retire" << dendl;
+ return false;
+ }
+ return true;
+}
+
} // namespace cache
} // namespace librbd
virtual bool can_writeback() const {
return false;
}
+ // TODO: discard need to override this
+ virtual bool can_retire() const {
+ return false;
+ }
+ virtual void set_flushed(bool flushed) {
+ ceph_assert(false);
+ }
+ virtual unsigned int write_bytes() const {
+ return 0;
+ };
virtual unsigned int bytes_dirty() const {
return 0;
};
~SyncPointLogEntry() override {};
SyncPointLogEntry(const SyncPointLogEntry&) = delete;
SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete;
+ bool can_retire() const override {
+ return this->completed;
+ }
std::ostream& format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const SyncPointLogEntry &entry);
~GenericWriteLogEntry() override {};
GenericWriteLogEntry(const GenericWriteLogEntry&) = delete;
GenericWriteLogEntry &operator=(const GenericWriteLogEntry&) = delete;
- virtual unsigned int write_bytes() const {
+ unsigned int write_bytes() const override {
/* The valid bytes in this ops data buffer. Discard and WS override. */
return ram_entry.write_bytes;
};
return sync_point_entry;
}
virtual void copy_pmem_bl(bufferlist *out_bl) = 0;
+ void set_flushed(bool flushed) override {
+ m_flushed = flushed;
+ }
+ bool get_flushed() const {
+ return m_flushed;
+ }
std::ostream &format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const GenericWriteLogEntry &entry);
+
+private:
+ bool m_flushed = false; /* or invalidated */
};
class WriteLogEntry : public GenericWriteLogEntry {
void init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush);
BlockExtent block_extent();
- unsigned int reader_count();
+ unsigned int reader_count() const;
/* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
buffer::list &get_pmem_bl();
/* Constructs a new bl containing copies of pmem_bp */
void copy_pmem_bl(bufferlist *out_bl) override;
void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
Context *ctx) override;
+ bool can_retire() const override {
+ return (this->completed && this->get_flushed() && (0 == reader_count()));
+ }
std::ostream &format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const WriteLogEntry &entry);