// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
-#include <libpmemobj.h>
#include "AbstractWriteLog.h"
#include "include/buffer.h"
#include "include/Context.h"
typedef AbstractWriteLog<ImageCtx>::Extent Extent;
typedef AbstractWriteLog<ImageCtx>::Extents Extents;
-const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
-
template <typename I>
AbstractWriteLog<I>::AbstractWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
- : m_cache_state(cache_state),
+ : m_write_log_guard(image_ctx.cct),
+ m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
+ m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
+ m_thread_pool(
+ image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl", 4, ""),
+ m_cache_state(cache_state),
m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl)),
m_image_ctx(image_ctx),
m_log_pool_config_size(DEFAULT_POOL_SIZE),
- m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+ m_image_writeback(image_ctx),
m_log_retire_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
- m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
- "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
- m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
+ m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
m_lock(ceph::make_mutex(util::unique_lock_name(
"librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
- m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
- "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
m_blocks_to_log_entries(image_ctx.cct),
- m_thread_pool(image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl",
- 4,
- ""),
m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
ceph::make_timespan(
image_ctx.config.template get_val<uint64_t>(
ceph_assert(m_ops_to_append.size() == 0);
ceph_assert(m_flush_ops_in_flight == 0);
- m_log_pool = nullptr;
delete m_cache_state;
m_cache_state = nullptr;
}
}
}
-/*
- * Loads the log entries from an existing log.
- *
- * Creates the in-memory structures to represent the state of the
- * re-opened log.
- *
- * Finds the last appended sync point, and any sync points referred to
- * in log entries, but missing from the log. These missing sync points
- * are created and scheduled for append. Some rudimentary consistency
- * checking is done.
- *
- * Rebuilds the m_blocks_to_log_entries map, to make log entries
- * readable.
- *
- * Places all writes on the dirty entries list, which causes them all
- * to be flushed.
- *
- */
template <typename I>
-void AbstractWriteLog<I>::load_existing_entries(DeferredContexts &later) {
- TOID(struct WriteLogPoolRoot) pool_root;
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
- struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
- uint64_t entry_index = m_first_valid_entry;
- /* The map below allows us to find sync point log entries by sync
- * gen number, which is necessary so write entries can be linked to
- * their sync points. */
- std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
- /* The map below tracks sync points referred to in writes but not
- * appearing in the sync_point_entries map. We'll use this to
- * determine which sync points are missing and need to be
- * created. */
- std::map<uint64_t, bool> missing_sync_points;
-
- /*
- * Read the existing log entries. Construct an in-memory log entry
- * object of the appropriate type for each. Add these to the global
- * log entries list.
- *
- * Write entries will not link to their sync points yet. We'll do
- * that in the next pass. Here we'll accumulate a map of sync point
- * gen numbers that are referred to in writes but do not appearing in
- * the log.
- */
- while (entry_index != m_first_free_entry) {
- WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
- std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> log_entry,
+ WriteLogPmemEntry *pmem_entry, std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
+ int entry_index) {
bool writer = pmem_entry->is_writer();
-
- ceph_assert(pmem_entry->entry_index == entry_index);
if (pmem_entry->is_sync_point()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
<< " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl;
<< " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl;
auto write_entry =
std::make_shared<WriteLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes);
- write_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+ write_data_to_buffer(write_entry, pmem_entry);
log_entry = write_entry;
} else if (pmem_entry->is_writesame()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
auto ws_entry =
std::make_shared<WriteSameLogEntry>(nullptr, pmem_entry->image_offset_bytes,
pmem_entry->write_bytes, pmem_entry->ws_datalen);
- ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+ write_data_to_buffer(ws_entry, pmem_entry);
log_entry = ws_entry;
} else if (pmem_entry->is_discard()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
missing_sync_points[pmem_entry->sync_gen_number] = true;
}
}
+}
- log_entry->ram_entry = *pmem_entry;
- log_entry->pmem_entry = pmem_entry;
- log_entry->log_entry_index = entry_index;
- log_entry->completed = true;
-
- m_log_entries.push_back(log_entry);
-
- entry_index = (entry_index + 1) % m_total_log_entries;
- }
-
+template <typename I>
+void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
+ DeferredContexts &later) {
/* Create missing sync points. These must not be appended until the
* entry reload is complete and the write map is up to
* date. Currently this is handled by the deferred contexts object
void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << dendl;
- TOID(struct WriteLogPoolRoot) pool_root;
ceph_assert(m_cache_state);
std::lock_guard locker(m_lock);
ceph_assert(!m_initialized);
std::string log_poolset_name = pwl_path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".poolset";
m_log_pool_config_size = max(m_cache_state->size, MIN_POOL_SIZE);
- if (access(log_poolset_name.c_str(), F_OK) == 0) {
- m_log_pool_name = log_poolset_name;
- m_log_is_poolset = true;
- } else {
- m_log_pool_name = log_pool_name;
- ldout(cct, 5) << "Poolset file " << log_poolset_name
- << " not present (or can't open). Using unreplicated pool" << dendl;
- }
+ m_log_pool_name = log_pool_name;
+ get_pool_name(log_poolset_name);
if ((!m_cache_state->present) &&
(access(m_log_pool_name.c_str(), F_OK) == 0)) {
}
}
- if (access(m_log_pool_name.c_str(), F_OK) != 0) {
- if ((m_log_pool =
- pmemobj_create(m_log_pool_name.c_str(),
- m_pwl_pool_layout_name,
- m_log_pool_config_size,
- (S_IWUSR | S_IRUSR))) == NULL) {
- lderr(cct) << "failed to create pool (" << m_log_pool_name << ")"
- << pmemobj_errormsg() << dendl;
- m_cache_state->present = false;
- m_cache_state->clean = true;
- m_cache_state->empty = true;
- /* TODO: filter/replace errnos that are meaningless to the caller */
- on_finish->complete(-errno);
- return;
- }
- m_cache_state->present = true;
- m_cache_state->clean = true;
- m_cache_state->empty = true;
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-
- /* new pool, calculate and store metadata */
- size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
- size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry);
- uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
- if (num_small_writes > MAX_LOG_ENTRIES) {
- num_small_writes = MAX_LOG_ENTRIES;
- }
- if (num_small_writes <= 2) {
- lderr(cct) << "num_small_writes needs to > 2" << dendl;
- on_finish->complete(-EINVAL);
- return;
- }
- m_log_pool_actual_size = m_log_pool_config_size;
- m_bytes_allocated_cap = effective_pool_size;
- /* Log ring empty */
- m_first_free_entry = 0;
- m_first_valid_entry = 0;
- TX_BEGIN(m_log_pool) {
- TX_ADD(pool_root);
- D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION;
- D_RW(pool_root)->log_entries =
- TX_ZALLOC(struct WriteLogPmemEntry,
- sizeof(struct WriteLogPmemEntry) * num_small_writes);
- D_RW(pool_root)->pool_size = m_log_pool_actual_size;
- D_RW(pool_root)->flushed_sync_gen = m_flushed_sync_gen;
- D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
- D_RW(pool_root)->num_log_entries = num_small_writes;
- D_RW(pool_root)->first_free_entry = m_first_free_entry;
- D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
- } TX_ONCOMMIT {
- m_total_log_entries = D_RO(pool_root)->num_log_entries;
- m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
- } TX_ONABORT {
- m_total_log_entries = 0;
- m_free_log_entries = 0;
- lderr(cct) << "failed to initialize pool (" << m_log_pool_name << ")" << dendl;
- on_finish->complete(-pmemobj_tx_errno());
- return;
- } TX_FINALLY {
- } TX_END;
- } else {
- m_cache_state->present = true;
- /* Open existing pool */
- if ((m_log_pool =
- pmemobj_open(m_log_pool_name.c_str(),
- m_pwl_pool_layout_name)) == NULL) {
- lderr(cct) << "failed to open pool (" << m_log_pool_name << "): "
- << pmemobj_errormsg() << dendl;
- on_finish->complete(-errno);
- return;
- }
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
- if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
- // TODO: will handle upgrading version in the future
- lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
- << " expected " << RWL_POOL_VERSION << dendl;
- on_finish->complete(-EINVAL);
- return;
- }
- if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
- lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
- << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
- on_finish->complete(-EINVAL);
- return;
- }
- m_log_pool_actual_size = D_RO(pool_root)->pool_size;
- m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
- m_total_log_entries = D_RO(pool_root)->num_log_entries;
- m_first_free_entry = D_RO(pool_root)->first_free_entry;
- m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
- if (m_first_free_entry < m_first_valid_entry) {
- /* Valid entries wrap around the end of the ring, so first_free is lower
- * than first_valid. If first_valid was == first_free+1, the entry at
- * first_free would be empty. The last entry is never used, so in
- * that case there would be zero free log entries. */
- m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
- } else {
- /* first_valid is <= first_free. If they are == we have zero valid log
- * entries, and n-1 free log entries */
- m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
- }
- size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
- m_bytes_allocated_cap = effective_pool_size;
- load_existing_entries(later);
- m_cache_state->clean = m_dirty_log_entries.empty();
- m_cache_state->empty = m_log_entries.empty();
- }
+ initialize_pool(on_finish, later);
ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
<< " log entries, " << m_free_log_entries << " of which are free."
m_wake_up_enabled = false;
m_cache_state->clean = true;
m_log_entries.clear();
- if (m_log_pool) {
- ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
- pmemobj_close(m_log_pool);
- }
- if (m_cache_state->clean) {
- if (m_log_is_poolset) {
- ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
- } else {
- ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << m_log_pool_name << dendl;
- if (remove(m_log_pool_name.c_str()) != 0) {
- lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << m_log_pool_name << "\": "
- << pmemobj_errormsg() << dendl;
- } else {
- m_cache_state->clean = true;
- m_cache_state->empty = true;
- m_cache_state->present = false;
- }
- }
- } else {
- if (m_log_is_poolset) {
- ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
- } else {
- ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << m_log_pool_name << dendl;
- }
- }
+
+ remove_pool_file();
+
if (m_perfcounter) {
perf_stop();
}
ldout(cct, 20) << "exit" << dendl;
}
-/*
- * Performs the log event append operation for all of the scheduled
- * events.
- */
template <typename I>
-void AbstractWriteLog<I>::append_scheduled_ops(void)
+void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
+ bool &appending, bool isRWL)
{
- GenericLogOperations ops;
- int append_result = 0;
- bool ops_remain = false;
- bool appending = false; /* true if we set m_appending */
- ldout(m_image_ctx.cct, 20) << dendl;
- do {
- ops.clear();
-
- {
- std::lock_guard locker(m_lock);
- if (!appending && m_appending) {
- /* Another thread is appending */
- ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
- return;
+ const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
+ : MAX_WRITES_PER_SYNC_POINT;
+ {
+ std::lock_guard locker(m_lock);
+ if (!appending && m_appending) {
+ /* Another thread is appending */
+ ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
+ return;
+ }
+ if (m_ops_to_append.size()) {
+ appending = true;
+ m_appending = true;
+ auto last_in_batch = m_ops_to_append.begin();
+ unsigned int ops_to_append = m_ops_to_append.size();
+ if (ops_to_append > OPS_APPENDED) {
+ ops_to_append = OPS_APPENDED;
}
- if (m_ops_to_append.size()) {
- appending = true;
- m_appending = true;
- auto last_in_batch = m_ops_to_append.begin();
- unsigned int ops_to_append = m_ops_to_append.size();
- if (ops_to_append > OPS_APPENDED_TOGETHER) {
- ops_to_append = OPS_APPENDED_TOGETHER;
- }
- std::advance(last_in_batch, ops_to_append);
- ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
- ops_remain = true; /* Always check again before leaving */
- ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
- << m_ops_to_append.size() << " remain" << dendl;
- } else {
- ops_remain = false;
- if (appending) {
- appending = false;
- m_appending = false;
- }
+ std::advance(last_in_batch, ops_to_append);
+ ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
+ ops_remain = true; /* Always check again before leaving */
+ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
+ << m_ops_to_append.size() << " remain" << dendl;
+ } else if (isRWL) {
+ ops_remain = false;
+ if (appending) {
+ appending = false;
+ m_appending = false;
}
}
-
- if (ops.size()) {
- std::lock_guard locker(m_log_append_lock);
- alloc_op_log_entries(ops);
- append_result = append_op_log_entries(ops);
- }
-
- int num_ops = ops.size();
- if (num_ops) {
- /* New entries may be flushable. Completion will wake up flusher. */
- complete_op_log_entries(std::move(ops), append_result);
- }
- } while (ops_remain);
+ }
}
template <typename I>
m_work_queue.queue(append_ctx);
}
-/*
- * Takes custody of ops. They'll all get their log entries appended,
- * and have their on_write_persist contexts completed once they and
- * all prior log entries are persisted everywhere.
- */
-template <typename I>
-void AbstractWriteLog<I>::schedule_append(GenericLogOperations &ops)
-{
- bool need_finisher;
- GenericLogOperationsVector appending;
-
- std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
- {
- std::lock_guard locker(m_lock);
-
- need_finisher = m_ops_to_append.empty() && !m_appending;
- m_ops_to_append.splice(m_ops_to_append.end(), ops);
- }
-
- if (need_finisher) {
- enlist_op_appender();
- }
-
- for (auto &op : appending) {
- op->appending();
- }
-}
-
template <typename I>
void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops)
{
GenericLogOperations to_append(ops.begin(), ops.end());
- schedule_append(to_append);
+ schedule_append_ops(to_append);
}
template <typename I>
{
GenericLogOperations to_append { op };
- schedule_append(to_append);
-}
-
-const unsigned long int ops_flushed_together = 4;
-/*
- * Performs the pmem buffer flush on all scheduled ops, then schedules
- * the log event append operation for all of them.
- */
-template <typename I>
-void AbstractWriteLog<I>::flush_then_append_scheduled_ops(void)
-{
- GenericLogOperations ops;
- bool ops_remain = false;
- ldout(m_image_ctx.cct, 20) << dendl;
- do {
- {
- ops.clear();
- std::lock_guard locker(m_lock);
- if (m_ops_to_flush.size()) {
- auto last_in_batch = m_ops_to_flush.begin();
- unsigned int ops_to_flush = m_ops_to_flush.size();
- if (ops_to_flush > ops_flushed_together) {
- ops_to_flush = ops_flushed_together;
- }
- ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
- std::advance(last_in_batch, ops_to_flush);
- ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
- ops_remain = !m_ops_to_flush.empty();
- ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
- << m_ops_to_flush.size() << " remain" << dendl;
- } else {
- ops_remain = false;
- }
- }
- if (ops_remain) {
- enlist_op_flusher();
- }
-
- /* Ops subsequently scheduled for flush may finish before these,
- * which is fine. We're unconcerned with completion order until we
- * get to the log message append step. */
- if (ops.size()) {
- flush_pmem_buffer(ops);
- schedule_append(ops);
- }
- } while (ops_remain);
- append_scheduled_ops();
-}
-
-template <typename I>
-void AbstractWriteLog<I>::enlist_op_flusher()
-{
- m_async_flush_ops++;
- m_async_op_tracker.start_op();
- Context *flush_ctx = new LambdaContext([this](int r) {
- flush_then_append_scheduled_ops();
- m_async_flush_ops--;
- m_async_op_tracker.finish_op();
- });
- m_work_queue.queue(flush_ctx);
-}
-
-/*
- * Takes custody of ops. They'll all get their pmem blocks flushed,
- * then get their log entries appended.
- */
-template <typename I>
-void AbstractWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
-{
- GenericLogOperations to_flush(ops.begin(), ops.end());
- bool need_finisher;
- ldout(m_image_ctx.cct, 20) << dendl;
- {
- std::lock_guard locker(m_lock);
-
- need_finisher = m_ops_to_flush.empty();
- m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
- }
-
- if (need_finisher) {
- enlist_op_flusher();
- }
-}
-
-/*
- * Flush the pmem regions for the data blocks of a set of operations
- *
- * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
- */
-template <typename I>
-template <typename V>
-void AbstractWriteLog<I>::flush_pmem_buffer(V& ops)
-{
- for (auto &operation : ops) {
- operation->flush_pmem_buf_to_cache(m_log_pool);
- }
-
- /* Drain once for all */
- pmemobj_drain(m_log_pool);
-
- utime_t now = ceph_clock_now();
- for (auto &operation : ops) {
- if (operation->reserved_allocated()) {
- operation->buf_persist_comp_time = now;
- } else {
- ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
- }
- }
-}
-
-/*
- * Allocate the (already reserved) write log entries for a set of operations.
- *
- * Locking:
- * Acquires lock
- */
-template <typename I>
-void AbstractWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
-{
- TOID(struct WriteLogPoolRoot) pool_root;
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
- struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
-
- ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
-
- /* Allocate the (already reserved) log entries */
- std::lock_guard locker(m_lock);
-
- for (auto &operation : ops) {
- uint32_t entry_index = m_first_free_entry;
- m_first_free_entry = (m_first_free_entry + 1) % m_total_log_entries;
- auto &log_entry = operation->get_log_entry();
- log_entry->log_entry_index = entry_index;
- log_entry->ram_entry.entry_index = entry_index;
- log_entry->pmem_entry = &pmem_log_entries[entry_index];
- log_entry->ram_entry.entry_valid = 1;
- m_log_entries.push_back(log_entry);
- ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
- }
-}
-
-/*
- * Flush the persistent write log entries set of ops. The entries must
- * be contiguous in persistent memory.
- */
-template <typename I>
-void AbstractWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
-{
- if (ops.empty()) {
- return;
- }
-
- if (ops.size() > 1) {
- ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
- }
-
- ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
- << "start address="
- << ops.front()->get_log_entry()->pmem_entry << " "
- << "bytes="
- << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
- << dendl;
- pmemobj_flush(m_log_pool,
- ops.front()->get_log_entry()->pmem_entry,
- ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
-}
-
-/*
- * Write and persist the (already allocated) write log entries and
- * data buffer allocations for a set of ops. The data buffer for each
- * of these must already have been persisted to its reserved area.
- */
-template <typename I>
-int AbstractWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
-{
- CephContext *cct = m_image_ctx.cct;
- GenericLogOperationsVector entries_to_flush;
- TOID(struct WriteLogPoolRoot) pool_root;
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
- int ret = 0;
-
- ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
-
- if (ops.empty()) {
- return 0;
- }
- entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
-
- /* Write log entries to ring and persist */
- utime_t now = ceph_clock_now();
- for (auto &operation : ops) {
- if (!entries_to_flush.empty()) {
- /* Flush these and reset the list if the current entry wraps to the
- * tail of the ring */
- if (entries_to_flush.back()->get_log_entry()->log_entry_index >
- operation->get_log_entry()->log_entry_index) {
- ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
- << "operation=[" << *operation << "]" << dendl;
- flush_op_log_entries(entries_to_flush);
- entries_to_flush.clear();
- now = ceph_clock_now();
- }
- }
- ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
- << operation->get_log_entry()->log_entry_index << " "
- << "from " << &operation->get_log_entry()->ram_entry << " "
- << "to " << operation->get_log_entry()->pmem_entry << " "
- << "operation=[" << *operation << "]" << dendl;
- ldout(m_image_ctx.cct, 05) << "APPENDING: index="
- << operation->get_log_entry()->log_entry_index << " "
- << "operation=[" << *operation << "]" << dendl;
- operation->log_append_time = now;
- *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
- ldout(m_image_ctx.cct, 20) << "APPENDING: index="
- << operation->get_log_entry()->log_entry_index << " "
- << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
- << "]" << dendl;
- entries_to_flush.push_back(operation);
- }
- flush_op_log_entries(entries_to_flush);
-
- /* Drain once for all */
- pmemobj_drain(m_log_pool);
-
- /*
- * Atomically advance the log head pointer and publish the
- * allocations for all the data buffers they refer to.
- */
- utime_t tx_start = ceph_clock_now();
- TX_BEGIN(m_log_pool) {
- D_RW(pool_root)->first_free_entry = m_first_free_entry;
- for (auto &operation : ops) {
- if (operation->reserved_allocated()) {
- auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
- pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
- } else {
- ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
- }
- }
- } TX_ONCOMMIT {
- } TX_ONABORT {
- lderr(cct) << "failed to commit " << ops.size()
- << " log entries (" << m_log_pool_name << ")" << dendl;
- ceph_assert(false);
- ret = -EIO;
- } TX_FINALLY {
- } TX_END;
-
- utime_t tx_end = ceph_clock_now();
- m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
- m_perfcounter->hinc(
- l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
- for (auto &operation : ops) {
- operation->log_append_comp_time = tx_end;
- }
-
- return ret;
+ schedule_append_ops(to_append);
}
/*
}
template <typename I>
-bool AbstractWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
+ uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+ uint64_t &num_lanes, uint64_t &num_log_entries,
+ uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap){
bool alloc_succeeds = true;
bool no_space = false;
- uint64_t bytes_allocated = 0;
- uint64_t bytes_cached = 0;
- uint64_t bytes_dirtied = 0;
- uint64_t num_lanes = 0;
- uint64_t num_unpublished_reserves = 0;
- uint64_t num_log_entries = 0;
-
- // Setup buffer, and get all the number of required resources
- req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
- num_lanes, num_log_entries, num_unpublished_reserves);
-
{
std::lock_guard locker(m_lock);
if (m_free_lanes < num_lanes) {
no_space = true; /* Entries must be retired */
}
/* Don't attempt buffer allocate if we've exceeded the "full" threshold */
- if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
+ if (m_bytes_allocated + bytes_allocated > bytes_allocated_cap) {
if (!req->has_io_waited_for_buffers()) {
req->set_io_waited_for_entries(true);
ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
- << m_bytes_allocated_cap
+ << bytes_allocated_cap
<< ", allocated=" << m_bytes_allocated
<< ") in write [" << *req << "]" << dendl;
}
}
}
- std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
if (alloc_succeeds) {
- for (auto &buffer : buffers) {
- utime_t before_reserve = ceph_clock_now();
- buffer.buffer_oid = pmemobj_reserve(m_log_pool,
- &buffer.buffer_alloc_action,
- buffer.allocation_size,
- 0 /* Object type */);
- buffer.allocation_lat = ceph_clock_now() - before_reserve;
- if (TOID_IS_NULL(buffer.buffer_oid)) {
- if (!req->has_io_waited_for_buffers()) {
- req->set_io_waited_for_entries(true);
- }
- ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
- << pmemobj_errormsg() << ". "
- << *req << dendl;
- alloc_succeeds = false;
- no_space = true; /* Entries need to be retired */
- break;
- } else {
- buffer.allocated = true;
- }
- ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
- << "." << buffer.buffer_oid.oid.off
- << ", size=" << buffer.allocation_size << dendl;
- }
+ reserve_pmem(req, alloc_succeeds, no_space);
}
if (alloc_succeeds) {
}
}
- if (!alloc_succeeds) {
- /* On alloc failure, free any buffers we did allocate */
- for (auto &buffer : buffers) {
- if (buffer.allocated) {
- pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
- }
- }
- if (no_space) {
- /* Expedite flushing and/or retiring */
- std::lock_guard locker(m_lock);
- m_alloc_failed_since_retire = true;
- m_last_alloc_fail = ceph_clock_now();
- }
+ if (!alloc_succeeds && no_space) {
+ /* Expedite flushing and/or retiring */
+ std::lock_guard locker(m_lock);
+ m_alloc_failed_since_retire = true;
+ m_last_alloc_fail = ceph_clock_now();
}
- req->set_allocated(alloc_succeeds);
-
return alloc_succeeds;
}
}), 0);
}
-template <typename I>
-void AbstractWriteLog<I>::process_work() {
- CephContext *cct = m_image_ctx.cct;
- int max_iterations = 4;
- bool wake_up_requested = false;
- uint64_t aggressive_high_water_bytes = m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
- uint64_t high_water_bytes = m_bytes_allocated_cap * RETIRE_HIGH_WATER;
- uint64_t low_water_bytes = m_bytes_allocated_cap * RETIRE_LOW_WATER;
- uint64_t aggressive_high_water_entries = m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
- uint64_t high_water_entries = m_total_log_entries * RETIRE_HIGH_WATER;
- uint64_t low_water_entries = m_total_log_entries * RETIRE_LOW_WATER;
-
- ldout(cct, 20) << dendl;
-
- do {
- {
- std::lock_guard locker(m_lock);
- m_wake_up_requested = false;
- }
- if (m_alloc_failed_since_retire || m_invalidating ||
- m_bytes_allocated > high_water_bytes ||
- (m_log_entries.size() > high_water_entries)) {
- int retired = 0;
- utime_t started = ceph_clock_now();
- ldout(m_image_ctx.cct, 10) << "alloc_fail=" << m_alloc_failed_since_retire
- << ", allocated > high_water="
- << (m_bytes_allocated > high_water_bytes)
- << ", allocated_entries > high_water="
- << (m_log_entries.size() > high_water_entries)
- << dendl;
- while (m_alloc_failed_since_retire || m_invalidating ||
- (m_bytes_allocated > high_water_bytes) ||
- (m_log_entries.size() > high_water_entries) ||
- (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) &&
- (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
- if (!retire_entries((m_shutting_down || m_invalidating ||
- (m_bytes_allocated > aggressive_high_water_bytes) ||
- (m_log_entries.size() > aggressive_high_water_entries))
- ? MAX_ALLOC_PER_TRANSACTION
- : MAX_FREE_PER_TRANSACTION)) {
- break;
- }
- retired++;
- dispatch_deferred_writes();
- process_writeback_dirty_entries();
- }
- ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
- }
- dispatch_deferred_writes();
- process_writeback_dirty_entries();
-
- {
- std::lock_guard locker(m_lock);
- wake_up_requested = m_wake_up_requested;
- }
- } while (wake_up_requested && --max_iterations > 0);
-
- {
- std::lock_guard locker(m_lock);
- m_wake_up_scheduled = false;
- /* Reschedule if it's still requested */
- if (m_wake_up_requested) {
- wake_up();
- }
- }
-}
-
template <typename I>
bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
CephContext *cct = m_image_ctx.cct;
}
template <typename I>
-Context* AbstractWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericLogEntry> log_entry) {
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+ bool invalidating) {
CephContext *cct = m_image_ctx.cct;
- bool invalidating = m_invalidating; // snapshot so we behave consistently
ldout(cct, 20) << "" << dendl;
ceph_assert(m_entry_reader_lock.is_locked());
m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
}
m_flush_ops_in_flight += 1;
- /* For write same this is the bytes affected bt the flush op, not the bytes transferred */
+ /* For write same this is the bytes affected by the flush op, not the bytes transferred */
m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
/* Flush write completion action */
m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
}
});
-
- if (invalidating) {
- return ctx;
- }
- return new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(m_image_writeback, ctx);
- }), 0);
- });
+ return ctx;
}
template <typename I>
}
}
-/**
- * Update/persist the last flushed sync point in the log
- */
-template <typename I>
-void AbstractWriteLog<I>::persist_last_flushed_sync_gen()
-{
- TOID(struct WriteLogPoolRoot) pool_root;
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
- uint64_t flushed_sync_gen;
-
- std::lock_guard append_locker(m_log_append_lock);
- {
- std::lock_guard locker(m_lock);
- flushed_sync_gen = m_flushed_sync_gen;
- }
-
- if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
- ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
- << D_RO(pool_root)->flushed_sync_gen << " to "
- << flushed_sync_gen << dendl;
- TX_BEGIN(m_log_pool) {
- D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
- } TX_ONCOMMIT {
- } TX_ONABORT {
- lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
- ceph_assert(false);
- } TX_FINALLY {
- } TX_END;
- }
-}
-
/* Returns true if the specified SyncPointLogEntry is considered flushed, and
* the log will be updated to reflect this. */
template <typename I>
return log_entry->can_retire();
}
-/**
- * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
- * that are eligible to be retired. Returns true if anything was
- * retired.
- */
-template <typename I>
-bool AbstractWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
- CephContext *cct = m_image_ctx.cct;
- GenericLogEntriesVector retiring_entries;
- uint32_t initial_first_valid_entry;
- uint32_t first_valid_entry;
-
- std::lock_guard retire_locker(m_log_retire_lock);
- ldout(cct, 20) << "Look for entries to retire" << dendl;
- {
- /* Entry readers can't be added while we hold m_entry_reader_lock */
- RWLock::WLocker entry_reader_locker(m_entry_reader_lock);
- std::lock_guard locker(m_lock);
- initial_first_valid_entry = m_first_valid_entry;
- first_valid_entry = m_first_valid_entry;
- auto entry = m_log_entries.front();
- while (!m_log_entries.empty() &&
- retiring_entries.size() < frees_per_tx &&
- can_retire_entry(entry)) {
- if (entry->log_entry_index != first_valid_entry) {
- lderr(cct) << "Retiring entry index (" << entry->log_entry_index
- << ") and first valid log entry index (" << first_valid_entry
- << ") must be ==." << dendl;
- }
- ceph_assert(entry->log_entry_index == first_valid_entry);
- first_valid_entry = (first_valid_entry + 1) % m_total_log_entries;
- m_log_entries.pop_front();
- retiring_entries.push_back(entry);
- /* Remove entry from map so there will be no more readers */
- if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
- auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
- if (gen_write_entry) {
- m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
- }
- }
- entry = m_log_entries.front();
- }
- }
-
- if (retiring_entries.size()) {
- ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
- TOID(struct WriteLogPoolRoot) pool_root;
- pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-
- utime_t tx_start;
- utime_t tx_end;
- /* Advance first valid entry and release buffers */
- {
- uint64_t flushed_sync_gen;
- std::lock_guard append_locker(m_log_append_lock);
- {
- std::lock_guard locker(m_lock);
- flushed_sync_gen = m_flushed_sync_gen;
- }
-
- tx_start = ceph_clock_now();
- TX_BEGIN(m_log_pool) {
- if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
- ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
- << D_RO(pool_root)->flushed_sync_gen << " to "
- << flushed_sync_gen << dendl;
- D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
- }
- D_RW(pool_root)->first_valid_entry = first_valid_entry;
- for (auto &entry: retiring_entries) {
- if (entry->write_bytes()) {
- ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
- << "." << entry->ram_entry.write_data.oid.off << dendl;
- TX_FREE(entry->ram_entry.write_data);
- } else {
- ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
- }
- }
- } TX_ONCOMMIT {
- } TX_ONABORT {
- lderr(cct) << "failed to commit free of" << retiring_entries.size() << " log entries (" << m_log_pool_name << ")" << dendl;
- ceph_assert(false);
- } TX_FINALLY {
- } TX_END;
- tx_end = ceph_clock_now();
- }
- m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
- m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), retiring_entries.size());
-
- /* Update runtime copy of first_valid, and free entries counts */
- {
- std::lock_guard locker(m_lock);
-
- ceph_assert(m_first_valid_entry == initial_first_valid_entry);
- m_first_valid_entry = first_valid_entry;
- m_free_log_entries += retiring_entries.size();
- for (auto &entry: retiring_entries) {
- if (entry->write_bytes()) {
- ceph_assert(m_bytes_cached >= entry->write_bytes());
- m_bytes_cached -= entry->write_bytes();
- uint64_t entry_allocation_size = entry->write_bytes();
- if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
- entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
- }
- ceph_assert(m_bytes_allocated >= entry_allocation_size);
- m_bytes_allocated -= entry_allocation_size;
- }
- }
- m_alloc_failed_since_retire = false;
- wake_up();
- }
- } else {
- ldout(cct, 20) << "Nothing to retire" << dendl;
- return false;
- }
- return true;
-}
-
} // namespace pwl
} // namespace cache
} // namespace librbd
template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;
-template void librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>:: \
- flush_pmem_buffer(std::vector<std::shared_ptr< \
- librbd::cache::pwl::GenericLogOperation>>&);
#include "librbd/cache/pwl/LogOperation.h"
#include "librbd/cache/pwl/Request.h"
#include "librbd/cache/pwl/LogMap.h"
+#include "librbd/cache/pwl/Types.h"
#include <functional>
#include <list>
namespace pwl {
-class SyncPointLogEntry;
+class GenericLogEntry;
class GenericWriteLogEntry;
+class SyncPointLogEntry;
class WriteLogEntry;
-class GenericLogEntry;
+struct WriteLogPmemEntry;
typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
typedef io::Extents Extents;
AbstractWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
- ~AbstractWriteLog();
+ virtual ~AbstractWriteLog();
AbstractWriteLog(const AbstractWriteLog&) = delete;
AbstractWriteLog &operator=(const AbstractWriteLog&) = delete;
/// IO methods
- void read(Extents&& image_extents, ceph::bufferlist *bl,
+ void read(
+ Extents&& image_extents, ceph::bufferlist *bl,
int fadvise_flags, Context *on_finish);
- void write(Extents&& image_extents, ceph::bufferlist&& bl,
+ void write(
+ Extents&& image_extents, ceph::bufferlist&& bl,
int fadvise_flags,
Context *on_finish);
- void discard(uint64_t offset, uint64_t length,
+ void discard(
+ uint64_t offset, uint64_t length,
uint32_t discard_granularity_bytes,
Context *on_finish);
- void flush(io::FlushSource flush_source, Context *on_finish);
- void writesame(uint64_t offset, uint64_t length,
+ void flush(
+ io::FlushSource flush_source, Context *on_finish);
+ void writesame(
+ uint64_t offset, uint64_t length,
ceph::bufferlist&& bl,
int fadvise_flags, Context *on_finish);
- void compare_and_write(Extents&& image_extents,
+ void compare_and_write(
+ Extents&& image_extents,
ceph::bufferlist&& cmp_bl, ceph::bufferlist&& bl,
uint64_t *mismatch_offset,int fadvise_flags,
Context *on_finish);
CephContext * get_context();
void release_guarded_request(BlockGuardCell *cell);
void release_write_lanes(C_BlockIORequestT *req);
- bool alloc_resources(C_BlockIORequestT *req);
- template <typename V>
- void flush_pmem_buffer(V& ops);
+ virtual bool alloc_resources(C_BlockIORequestT *req) = 0;
+ virtual void setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush) = 0;
void schedule_append(pwl::GenericLogOperationsVector &ops);
void schedule_append(pwl::GenericLogOperationSharedPtr op);
- void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops);
void flush_new_sync_point(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
+
std::shared_ptr<pwl::SyncPoint> get_current_sync_point() {
return m_current_sync_point;
}
return m_free_log_entries;
}
void add_into_log_map(pwl::GenericWriteLogEntries &log_entries);
-protected:
- typedef std::list<pwl::C_WriteRequest<This> *> C_WriteRequests;
- typedef std::list<pwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+private:
+ typedef std::list<pwl::C_WriteRequest<This> *> C_WriteRequests;
+ typedef std::list<pwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+ std::atomic<bool> m_initialized = {false};
+
+ uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */
+ utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */
+
+ pwl::WriteLogGuard m_write_log_guard;
+
+ /* Starts at 0 for a new write log. Incremented on every flush. */
+ uint64_t m_current_sync_gen = 0;
+ /* Starts at 0 on each sync gen increase. Incremented before applied
+ to an operation */
+ uint64_t m_last_op_sequence_num = 0;
+
+ bool m_persist_on_write_until_flush = true;
+
+ /* Debug counters for the places m_async_op_tracker is used */
+ std::atomic<int> m_async_append_ops = {0};
+ std::atomic<int> m_async_complete_ops = {0};
+ std::atomic<int> m_async_null_flush_finish = {0};
+ std::atomic<int> m_async_process_work = {0};
+
+ /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
+ mutable ceph::mutex m_deferred_dispatch_lock;
+
+ /* Used in release/detain to make BlockGuard preserve submission order */
+ mutable ceph::mutex m_blockguard_lock;
+
+ /* Use m_blockguard_lock for the following 3 things */
+ bool m_barrier_in_progress = false;
+ BlockGuardCell *m_barrier_cell = nullptr;
+
+ bool m_wake_up_enabled = true;
+
+ Contexts m_flush_complete_contexts;
+
+ std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
+ bool m_persist_on_flush = false; /* If false, persist each write before completion */
+
+ int m_flush_ops_in_flight = 0;
+ int m_flush_bytes_in_flight = 0;
+ uint64_t m_lowest_flushing_sync_gen = 0;
+
+ /* Writes that have left the block guard, but are waiting for resources */
+ C_BlockIORequests m_deferred_ios;
+ /* Throttle writes concurrently allocating & replicating */
+ unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
+
+ /* Initialized from config, then set false during shutdown */
+ std::atomic<bool> m_periodic_stats_enabled = {false};
+ SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
+ mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
+ Context *m_timer_ctx = nullptr;
+
+ ThreadPool m_thread_pool;
+
+ uint32_t m_discard_granularity_bytes;
BlockGuardCell* detain_guarded_request_helper(pwl::GuardedRequest &req);
BlockGuardCell* detain_guarded_request_barrier_helper(pwl::GuardedRequest &req);
pwl::GuardedRequestFunctionContext *guarded_ctx,
bool is_barrier);
+ void perf_start(const std::string name);
+ void perf_stop();
+ void log_perf();
+ void periodic_stats();
+ void arm_periodic_stats();
+
+ void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
+ void update_image_cache_state(Context *on_finish);
+
+ void flush_dirty_entries(Context *on_finish);
+ bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
+ bool handle_flushed_sync_point(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+ void sync_point_writer_flushed(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+
+ void init_flush_new_sync_point(pwl::DeferredContexts &later);
+ void new_sync_point(pwl::DeferredContexts &later);
+ pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
+ void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
+
+ void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
+ void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
+ void internal_flush(bool invalidate, Context *on_finish);
+
+protected:
librbd::cache::pwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
- std::atomic<bool> m_initialized = {false};
std::atomic<bool> m_shutting_down = {false};
std::atomic<bool> m_invalidating = {false};
- PMEMobjpool *m_log_pool = nullptr;
const char* m_pwl_pool_layout_name;
ImageCtxT &m_image_ctx;
std::atomic<uint64_t> m_bytes_allocated = {0}; /* Total bytes allocated in write buffers */
uint64_t m_bytes_cached = 0; /* Total bytes used in write buffers */
- uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */
uint64_t m_bytes_allocated_cap = 0;
- utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */
std::atomic<bool> m_alloc_failed_since_retire = {false};
ImageWriteback<ImageCtxT> m_image_writeback;
- pwl::WriteLogGuard m_write_log_guard;
/*
* When m_first_free_entry == m_first_valid_entry, the log is
* empty. There is always at least one free entry, which can't be
uint64_t m_first_free_entry = 0; /* Entries from here to m_first_valid_entry-1 are free */
uint64_t m_first_valid_entry = 0; /* Entries from here to m_first_free_entry-1 are valid */
- /* Starts at 0 for a new write log. Incremented on every flush. */
- uint64_t m_current_sync_gen = 0;
- /* Starts at 0 on each sync gen increase. Incremented before applied
- to an operation */
- uint64_t m_last_op_sequence_num = 0;
/* All writes bearing this and all prior sync gen numbers are flushed */
uint64_t m_flushed_sync_gen = 0;
- bool m_persist_on_write_until_flush = true;
-
AsyncOpTracker m_async_op_tracker;
/* Debug counters for the places m_async_op_tracker is used */
std::atomic<int> m_async_flush_ops = {0};
- std::atomic<int> m_async_append_ops = {0};
- std::atomic<int> m_async_complete_ops = {0};
- std::atomic<int> m_async_null_flush_finish = {0};
- std::atomic<int> m_async_process_work = {0};
/* Acquire locks in order declared here */
* bufs. Hold a write lock to prevent readers from being added (e.g. when
* removing log entrys from the map). No lock required to remove readers. */
mutable RWLock m_entry_reader_lock;
- /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
- mutable ceph::mutex m_deferred_dispatch_lock;
/* Hold m_log_append_lock while appending or retiring log entries. */
mutable ceph::mutex m_log_append_lock;
/* Used for most synchronization */
mutable ceph::mutex m_lock;
- /* Used in release/detain to make BlockGuard preserve submission order */
- mutable ceph::mutex m_blockguard_lock;
-
/* Use m_blockguard_lock for the following 3 things */
pwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
- bool m_barrier_in_progress = false;
- BlockGuardCell *m_barrier_cell = nullptr;
bool m_wake_up_requested = false;
bool m_wake_up_scheduled = false;
- bool m_wake_up_enabled = true;
bool m_appending = false;
bool m_dispatching_deferred_ops = false;
- Contexts m_flush_complete_contexts;
-
pwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
pwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
PerfCounters *m_perfcounter = nullptr;
- std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
- bool m_persist_on_flush = false; /* If false, persist each write before completion */
-
- int m_flush_ops_in_flight = 0;
- int m_flush_bytes_in_flight = 0;
- uint64_t m_lowest_flushing_sync_gen = 0;
-
- /* Writes that have left the block guard, but are waiting for resources */
- C_BlockIORequests m_deferred_ios;
- /* Throttle writes concurrently allocating & replicating */
- unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
unsigned int m_unpublished_reserves = 0;
- /* Initialized from config, then set false during shutdown */
- std::atomic<bool> m_periodic_stats_enabled = {false};
- SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
- mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
- Context *m_timer_ctx = nullptr;
-
- ThreadPool m_thread_pool;
ContextWQ m_work_queue;
- uint32_t m_discard_granularity_bytes;
-
- void perf_start(const std::string name);
- void perf_stop();
- void log_perf();
- void periodic_stats();
- void arm_periodic_stats();
-
- void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
- void update_image_cache_state(Context *on_finish);
- void load_existing_entries(pwl::DeferredContexts &later);
void wake_up();
- void process_work();
- void flush_dirty_entries(Context *on_finish);
- bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
- Context *construct_flush_entry_ctx(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
- void persist_last_flushed_sync_gen();
- bool handle_flushed_sync_point(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
- void sync_point_writer_flushed(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+ void update_entries(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry,
+ pwl::WriteLogPmemEntry *pmem_entry, std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t, std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
+ int entry_index);
+ void update_sync_points(
+ std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t, std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
+ pwl::DeferredContexts &later);
+ Context *construct_flush_entry(
+ const std::shared_ptr<pwl::GenericLogEntry> log_entry, bool invalidating);
void process_writeback_dirty_entries();
bool can_retire_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
- bool retire_entries(const unsigned long int frees_per_tx);
-
- void init_flush_new_sync_point(pwl::DeferredContexts &later);
- void new_sync_point(pwl::DeferredContexts &later);
- pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
- void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
void dispatch_deferred_writes(void);
- void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
- void append_scheduled_ops(void);
void enlist_op_appender();
- void schedule_append(pwl::GenericLogOperations &ops);
- void flush_then_append_scheduled_ops(void);
- void enlist_op_flusher();
- void alloc_op_log_entries(pwl::GenericLogOperations &ops);
- void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
- int append_op_log_entries(pwl::GenericLogOperations &ops);
void complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
- void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
- void internal_flush(bool invalidate, Context *on_finish);
+
+ bool check_allocation(
+ C_BlockIORequestT *req,
+ uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+ uint64_t &num_lanes, uint64_t &num_log_entries,
+ uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap);
+ void append_scheduled(
+ pwl::GenericLogOperations &ops, bool &ops_remain, bool &appending, bool isRWL=false);
+
+ virtual void process_work() = 0;
+ virtual void append_scheduled_ops(void) = 0;
+ virtual void schedule_append_ops(pwl::GenericLogOperations &ops) = 0;
+ virtual void remove_pool_file() = 0;
+ virtual void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) = 0;
+ virtual void write_data_to_buffer(
+ std::shared_ptr<pwl::WriteLogEntry> ws_entry, pwl::WriteLogPmemEntry *pmem_entry) {}
+ virtual void get_pool_name(const std::string log_poolset_name) {}
+ virtual void alloc_op_log_entries(pwl::GenericLogOperations &ops) {}
+ virtual bool retire_entries(const unsigned long int frees_per_tx) {return false;}
+ virtual void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) {}
+ virtual void persist_last_flushed_sync_gen() {}
+ virtual void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) {}
+ virtual Context *construct_flush_entry_ctx(
+ const std::shared_ptr<pwl::GenericLogEntry> log_entry) {return nullptr;}
};
} // namespace pwl
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// // vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab
+#include <libpmemobj.h>
#include "ReplicatedWriteLog.h"
#include "include/buffer.h"
#include "include/Context.h"
#include "common/Timer.h"
#include "common/perf_counters.h"
#include "librbd/ImageCtx.h"
+#include "librbd/asio/ContextWQ.h"
#include "librbd/cache/pwl/ImageCacheState.h"
#include "librbd/cache/pwl/LogEntry.h"
+#include "librbd/cache/pwl/Types.h"
#include <map>
#include <vector>
namespace librbd {
namespace cache {
namespace pwl {
+
using namespace librbd::cache::pwl;
+const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
+
template <typename I>
-ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
+ReplicatedWriteLog<I>::ReplicatedWriteLog(
+ I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
: AbstractWriteLog<I>(image_ctx, cache_state)
{
}
+template <typename I>
+ReplicatedWriteLog<I>::~ReplicatedWriteLog() {
+ m_log_pool = nullptr;
+}
+
+/*
+ * Allocate the (already reserved) write log entries for a set of operations.
+ *
+ * Locking:
+ * Acquires lock
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
+{
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+
+ ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
+
+ /* Allocate the (already reserved) log entries */
+ std::lock_guard locker(m_lock);
+
+ for (auto &operation : ops) {
+ uint32_t entry_index = this->m_first_free_entry;
+ this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries;
+ auto &log_entry = operation->get_log_entry();
+ log_entry->log_entry_index = entry_index;
+ log_entry->ram_entry.entry_index = entry_index;
+ log_entry->pmem_entry = &pmem_log_entries[entry_index];
+ log_entry->ram_entry.entry_valid = 1;
+ m_log_entries.push_back(log_entry);
+ ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
+ }
+}
+
+/*
+ * Write and persist the (already allocated) write log entries and
+ * data buffer allocations for a set of ops. The data buffer for each
+ * of these must already have been persisted to its reserved area.
+ */
+template <typename I>
+int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
+{
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogOperationsVector entries_to_flush;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ int ret = 0;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
+
+ if (ops.empty()) {
+ return 0;
+ }
+ entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
+
+ /* Write log entries to ring and persist */
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (!entries_to_flush.empty()) {
+ /* Flush these and reset the list if the current entry wraps to the
+ * tail of the ring */
+ if (entries_to_flush.back()->get_log_entry()->log_entry_index >
+ operation->get_log_entry()->log_entry_index) {
+ ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+ << "operation=[" << *operation << "]" << dendl;
+ flush_op_log_entries(entries_to_flush);
+ entries_to_flush.clear();
+ now = ceph_clock_now();
+ }
+ }
+ ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "from " << &operation->get_log_entry()->ram_entry << " "
+ << "to " << operation->get_log_entry()->pmem_entry << " "
+ << "operation=[" << *operation << "]" << dendl;
+ ldout(m_image_ctx.cct, 05) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "operation=[" << *operation << "]" << dendl;
+ operation->log_append_time = now;
+ *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
+ ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index << " "
+ << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
+ << "]" << dendl;
+ entries_to_flush.push_back(operation);
+ }
+ flush_op_log_entries(entries_to_flush);
+
+ /* Drain once for all */
+ pmemobj_drain(m_log_pool);
+
+ /*
+ * Atomically advance the log head pointer and publish the
+ * allocations for all the data buffers they refer to.
+ */
+ utime_t tx_start = ceph_clock_now();
+ TX_BEGIN(m_log_pool) {
+ D_RW(pool_root)->first_free_entry = this->m_first_free_entry;
+ for (auto &operation : ops) {
+ if (operation->reserved_allocated()) {
+ auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
+ pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
+ } else {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+ }
+ }
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(cct) << "failed to commit " << ops.size()
+ << " log entries (" << this->m_log_pool_name << ")" << dendl;
+ ceph_assert(false);
+ ret = -EIO;
+ } TX_FINALLY {
+ } TX_END;
+
+ utime_t tx_end = ceph_clock_now();
+ m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
+ m_perfcounter->hinc(
+ l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+ for (auto &operation : ops) {
+ operation->log_append_comp_time = tx_end;
+ }
+
+ return ret;
+}
+
+/*
+ * Flush the persistent write log entries set of ops. The entries must
+ * be contiguous in persistent memory.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
+{
+ if (ops.empty()) {
+ return;
+ }
+
+ if (ops.size() > 1) {
+ ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
+ }
+
+ ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
+ << "start address="
+ << ops.front()->get_log_entry()->pmem_entry << " "
+ << "bytes="
+ << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+ << dendl;
+ pmemobj_flush(m_log_pool,
+ ops.front()->get_log_entry()->pmem_entry,
+ ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::get_pool_name(const std::string log_poolset_name) {
+ CephContext *cct = m_image_ctx.cct;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (access(log_poolset_name.c_str(), F_OK) == 0) {
+ this->m_log_pool_name = log_poolset_name;
+ this->m_log_is_poolset = true;
+ } else {
+ ldout(cct, 5) << "Poolset file " << log_poolset_name
+ << " not present (or can't open). Using unreplicated pool" << dendl;
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::remove_pool_file() {
+ if (m_log_pool) {
+ ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+ pmemobj_close(m_log_pool);
+ }
+ if (m_cache_state->clean) {
+ if (this->m_log_is_poolset) {
+ ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl;
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl;
+ if (remove(this->m_log_pool_name.c_str()) != 0) {
+ lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": "
+ << pmemobj_errormsg() << dendl;
+ } else {
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ m_cache_state->present = false;
+ }
+ }
+ } else {
+ if (this->m_log_is_poolset) {
+ ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl;
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl;
+ }
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
+ CephContext *cct = m_image_ctx.cct;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
+ if ((m_log_pool =
+ pmemobj_create(this->m_log_pool_name.c_str(),
+ this->m_pwl_pool_layout_name,
+ this->m_log_pool_config_size,
+ (S_IWUSR | S_IRUSR))) == NULL) {
+ lderr(cct) << "failed to create pool (" << this->m_log_pool_name << ")"
+ << pmemobj_errormsg() << dendl;
+ m_cache_state->present = false;
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ /* TODO: filter/replace errnos that are meaningless to the caller */
+ on_finish->complete(-errno);
+ return;
+ }
+ m_cache_state->present = true;
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+ /* new pool, calculate and store metadata */
+ size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
+ size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry);
+ uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
+ if (num_small_writes > MAX_LOG_ENTRIES) {
+ num_small_writes = MAX_LOG_ENTRIES;
+ }
+ if (num_small_writes <= 2) {
+ lderr(cct) << "num_small_writes needs to > 2" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ this->m_log_pool_actual_size = this->m_log_pool_config_size;
+ this->m_bytes_allocated_cap = effective_pool_size;
+ /* Log ring empty */
+ m_first_free_entry = 0;
+ m_first_valid_entry = 0;
+ TX_BEGIN(m_log_pool) {
+ TX_ADD(pool_root);
+ D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION;
+ D_RW(pool_root)->log_entries =
+ TX_ZALLOC(struct WriteLogPmemEntry,
+ sizeof(struct WriteLogPmemEntry) * num_small_writes);
+ D_RW(pool_root)->pool_size = this->m_log_pool_actual_size;
+ D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen;
+ D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
+ D_RW(pool_root)->num_log_entries = num_small_writes;
+ D_RW(pool_root)->first_free_entry = m_first_free_entry;
+ D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
+ } TX_ONCOMMIT {
+ this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
+ this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
+ } TX_ONABORT {
+ this->m_total_log_entries = 0;
+ this->m_free_log_entries = 0;
+ lderr(cct) << "failed to initialize pool (" << this->m_log_pool_name << ")" << dendl;
+ on_finish->complete(-pmemobj_tx_errno());
+ return;
+ } TX_FINALLY {
+ } TX_END;
+ } else {
+ m_cache_state->present = true;
+ /* Open existing pool */
+ if ((m_log_pool =
+ pmemobj_open(this->m_log_pool_name.c_str(),
+ this->m_pwl_pool_layout_name)) == NULL) {
+ lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): "
+ << pmemobj_errormsg() << dendl;
+ on_finish->complete(-errno);
+ return;
+ }
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
+ // TODO: will handle upgrading version in the future
+ lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
+ << " expected " << RWL_POOL_VERSION << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
+ lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
+ << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ this->m_log_pool_actual_size = D_RO(pool_root)->pool_size;
+ this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
+ this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
+ m_first_free_entry = D_RO(pool_root)->first_free_entry;
+ m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
+ if (m_first_free_entry < m_first_valid_entry) {
+ /* Valid entries wrap around the end of the ring, so first_free is lower
+ * than first_valid. If first_valid was == first_free+1, the entry at
+ * first_free would be empty. The last entry is never used, so in
+ * that case there would be zero free log entries. */
+ this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
+ } else {
+ /* first_valid is <= first_free. If they are == we have zero valid log
+ * entries, and n-1 free log entries */
+ this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
+ }
+ size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
+ this->m_bytes_allocated_cap = effective_pool_size;
+ load_existing_entries(later);
+ m_cache_state->clean = this->m_dirty_log_entries.empty();
+ m_cache_state->empty = m_log_entries.empty();
+ }
+}
+
+/*
+ * Loads the log entries from an existing log.
+ *
+ * Creates the in-memory structures to represent the state of the
+ * re-opened log.
+ *
+ * Finds the last appended sync point, and any sync points referred to
+ * in log entries, but missing from the log. These missing sync points
+ * are created and scheduled for append. Some rudimentary consistency
+ * checking is done.
+ *
+ * Rebuilds the m_blocks_to_log_entries map, to make log entries
+ * readable.
+ *
+ * Places all writes on the dirty entries list, which causes them all
+ * to be flushed.
+ *
+ */
+
+template <typename I>
+void ReplicatedWriteLog<I>::load_existing_entries(DeferredContexts &later) {
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+ uint64_t entry_index = m_first_valid_entry;
+ /* The map below allows us to find sync point log entries by sync
+ * gen number, which is necessary so write entries can be linked to
+ * their sync points. */
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
+ /* The map below tracks sync points referred to in writes but not
+ * appearing in the sync_point_entries map. We'll use this to
+ * determine which sync points are missing and need to be
+ * created. */
+ std::map<uint64_t, bool> missing_sync_points;
+
+ /*
+ * Read the existing log entries. Construct an in-memory log entry
+ * object of the appropriate type for each. Add these to the global
+ * log entries list.
+ *
+ * Write entries will not link to their sync points yet. We'll do
+ * that in the next pass. Here we'll accumulate a map of sync point
+ * gen numbers that are referred to in writes but do not appearing in
+ * the log.
+ */
+ while (entry_index != m_first_free_entry) {
+ WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
+ std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+ ceph_assert(pmem_entry->entry_index == entry_index);
+
+ this->update_entries(log_entry, pmem_entry, missing_sync_points,
+ sync_point_entries, entry_index);
+
+ log_entry->ram_entry = *pmem_entry;
+ log_entry->pmem_entry = pmem_entry;
+ log_entry->log_entry_index = entry_index;
+ log_entry->completed = true;
+
+ m_log_entries.push_back(log_entry);
+
+ entry_index = (entry_index + 1) % this->m_total_log_entries;
+ }
+
+ this->update_sync_points(missing_sync_points, sync_point_entries, later);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::write_data_to_buffer(std::shared_ptr<WriteLogEntry> ws_entry,
+ WriteLogPmemEntry *pmem_entry) {
+ ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+}
+
+/**
+ * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
+ * that are eligible to be retired. Returns true if anything was
+ * retired.
+ */
+template <typename I>
+bool ReplicatedWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogEntriesVector retiring_entries;
+ uint32_t initial_first_valid_entry;
+ uint32_t first_valid_entry;
+
+ std::lock_guard retire_locker(this->m_log_retire_lock);
+ ldout(cct, 20) << "Look for entries to retire" << dendl;
+ {
+ /* Entry readers can't be added while we hold m_entry_reader_lock */
+ RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
+ initial_first_valid_entry = this->m_first_valid_entry;
+ first_valid_entry = this->m_first_valid_entry;
+ auto entry = m_log_entries.front();
+ while (!m_log_entries.empty() &&
+ retiring_entries.size() < frees_per_tx &&
+ this->can_retire_entry(entry)) {
+ if (entry->log_entry_index != first_valid_entry) {
+ lderr(cct) << "Retiring entry index (" << entry->log_entry_index
+ << ") and first valid log entry index (" << first_valid_entry
+ << ") must be ==." << dendl;
+ }
+ ceph_assert(entry->log_entry_index == first_valid_entry);
+ first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries;
+ m_log_entries.pop_front();
+ retiring_entries.push_back(entry);
+ /* Remove entry from map so there will be no more readers */
+ if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
+ auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
+ if (gen_write_entry) {
+ this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
+ }
+ }
+ entry = m_log_entries.front();
+ }
+ }
+
+ if (retiring_entries.size()) {
+ ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+ utime_t tx_start;
+ utime_t tx_end;
+ /* Advance first valid entry and release buffers */
+ {
+ uint64_t flushed_sync_gen;
+ std::lock_guard append_locker(this->m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = this->m_flushed_sync_gen;
+ }
+
+ tx_start = ceph_clock_now();
+ TX_BEGIN(m_log_pool) {
+ if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+ ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
+ << D_RO(pool_root)->flushed_sync_gen << " to "
+ << flushed_sync_gen << dendl;
+ D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+ }
+ D_RW(pool_root)->first_valid_entry = first_valid_entry;
+ for (auto &entry: retiring_entries) {
+ if (entry->write_bytes()) {
+ ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
+ << "." << entry->ram_entry.write_data.oid.off << dendl;
+ TX_FREE(entry->ram_entry.write_data);
+ } else {
+ ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
+ }
+ }
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(cct) << "failed to commit free of" << retiring_entries.size()
+ << " log entries (" << this->m_log_pool_name << ")" << dendl;
+ ceph_assert(false);
+ } TX_FINALLY {
+ } TX_END;
+ tx_end = ceph_clock_now();
+ }
+ m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
+ m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(),
+ retiring_entries.size());
+
+ /* Update runtime copy of first_valid, and free entries counts */
+ {
+ std::lock_guard locker(m_lock);
+
+ ceph_assert(this->m_first_valid_entry == initial_first_valid_entry);
+ this->m_first_valid_entry = first_valid_entry;
+ this->m_free_log_entries += retiring_entries.size();
+ for (auto &entry: retiring_entries) {
+ if (entry->write_bytes()) {
+ ceph_assert(this->m_bytes_cached >= entry->write_bytes());
+ this->m_bytes_cached -= entry->write_bytes();
+ uint64_t entry_allocation_size = entry->write_bytes();
+ if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
+ entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
+ }
+ ceph_assert(this->m_bytes_allocated >= entry_allocation_size);
+ this->m_bytes_allocated -= entry_allocation_size;
+ }
+ }
+ this->m_alloc_failed_since_retire = false;
+ this->wake_up();
+ }
+ } else {
+ ldout(cct, 20) << "Nothing to retire" << dendl;
+ return false;
+ }
+ return true;
+}
+
+template <typename I>
+Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(
+ std::shared_ptr<GenericLogEntry> log_entry) {
+ bool invalidating = this->m_invalidating; // snapshot so we behave consistently
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (invalidating) {
+ return ctx;
+ }
+ return new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+}
+
+const unsigned long int ops_flushed_together = 4;
+/*
+ * Performs the pmem buffer flush on all scheduled ops, then schedules
+ * the log event append operation for all of them.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
+{
+ GenericLogOperations ops;
+ bool ops_remain = false;
+ ldout(m_image_ctx.cct, 20) << dendl;
+ do {
+ {
+ ops.clear();
+ std::lock_guard locker(m_lock);
+ if (m_ops_to_flush.size()) {
+ auto last_in_batch = m_ops_to_flush.begin();
+ unsigned int ops_to_flush = m_ops_to_flush.size();
+ if (ops_to_flush > ops_flushed_together) {
+ ops_to_flush = ops_flushed_together;
+ }
+ ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
+ std::advance(last_in_batch, ops_to_flush);
+ ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
+ ops_remain = !m_ops_to_flush.empty();
+ ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
+ << m_ops_to_flush.size() << " remain" << dendl;
+ } else {
+ ops_remain = false;
+ }
+ }
+ if (ops_remain) {
+ enlist_op_flusher();
+ }
+
+ /* Ops subsequently scheduled for flush may finish before these,
+ * which is fine. We're unconcerned with completion order until we
+ * get to the log message append step. */
+ if (ops.size()) {
+ flush_pmem_buffer(ops);
+ schedule_append_ops(ops);
+ }
+ } while (ops_remain);
+ append_scheduled_ops();
+}
+
+/*
+ * Performs the log event append operation for all of the scheduled
+ * events.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::append_scheduled_ops(void) {
+ GenericLogOperations ops;
+ int append_result = 0;
+ bool ops_remain = false;
+ bool appending = false; /* true if we set m_appending */
+ ldout(m_image_ctx.cct, 20) << dendl;
+ do {
+ ops.clear();
+ this->append_scheduled(ops, ops_remain, appending, true);
+
+ if (ops.size()) {
+ std::lock_guard locker(this->m_log_append_lock);
+ alloc_op_log_entries(ops);
+ append_result = append_op_log_entries(ops);
+ }
+
+ int num_ops = ops.size();
+ if (num_ops) {
+ /* New entries may be flushable. Completion will wake up flusher. */
+ this->complete_op_log_entries(std::move(ops), append_result);
+ }
+ } while (ops_remain);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::enlist_op_flusher()
+{
+ this->m_async_flush_ops++;
+ this->m_async_op_tracker.start_op();
+ Context *flush_ctx = new LambdaContext([this](int r) {
+ flush_then_append_scheduled_ops();
+ this->m_async_flush_ops--;
+ this->m_async_op_tracker.finish_op();
+ });
+ this->m_work_queue.queue(flush_ctx);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush) {
+ if (do_early_flush) {
+ /* This caller is waiting for persist, so we'll use their thread to
+ * expedite it */
+ flush_pmem_buffer(ops);
+ this->schedule_append(ops);
+ } else {
+ /* This is probably not still the caller's thread, so do the payload
+ * flushing/replicating later. */
+ schedule_flush_and_append(ops);
+ }
+}
+
+/*
+ * Takes custody of ops. They'll all get their log entries appended,
+ * and have their on_write_persist contexts completed once they and
+ * all prior log entries are persisted everywhere.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append_ops(GenericLogOperations &ops)
+{
+ bool need_finisher;
+ GenericLogOperationsVector appending;
+
+ std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
+ {
+ std::lock_guard locker(m_lock);
+
+ need_finisher = this->m_ops_to_append.empty() && !this->m_appending;
+ this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
+ }
+
+ if (need_finisher) {
+ this->enlist_op_appender();
+ }
+
+ for (auto &op : appending) {
+ op->appending();
+ }
+}
+
+/*
+ * Takes custody of ops. They'll all get their pmem blocks flushed,
+ * then get their log entries appended.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
+{
+ GenericLogOperations to_flush(ops.begin(), ops.end());
+ bool need_finisher;
+ ldout(m_image_ctx.cct, 20) << dendl;
+ {
+ std::lock_guard locker(m_lock);
+
+ need_finisher = m_ops_to_flush.empty();
+ m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
+ }
+
+ if (need_finisher) {
+ enlist_op_flusher();
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::process_work() {
+ CephContext *cct = m_image_ctx.cct;
+ int max_iterations = 4;
+ bool wake_up_requested = false;
+ uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
+ uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER;
+ uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER;
+ uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER;
+
+ ldout(cct, 20) << dendl;
+
+ do {
+ {
+ std::lock_guard locker(m_lock);
+ this->m_wake_up_requested = false;
+ }
+ if (this->m_alloc_failed_since_retire || this->m_invalidating ||
+ this->m_bytes_allocated > high_water_bytes ||
+ (m_log_entries.size() > high_water_entries)) {
+ int retired = 0;
+ utime_t started = ceph_clock_now();
+ ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
+ << ", allocated > high_water="
+ << (this->m_bytes_allocated > high_water_bytes)
+ << ", allocated_entries > high_water="
+ << (m_log_entries.size() > high_water_entries)
+ << dendl;
+ while (this->m_alloc_failed_since_retire || this->m_invalidating ||
+ (this->m_bytes_allocated > high_water_bytes) ||
+ (m_log_entries.size() > high_water_entries) ||
+ (((this->m_bytes_allocated > low_water_bytes) ||
+ (m_log_entries.size() > low_water_entries)) &&
+ (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
+ if (!retire_entries((this->m_shutting_down || this->m_invalidating ||
+ (this->m_bytes_allocated > aggressive_high_water_bytes) ||
+ (m_log_entries.size() > aggressive_high_water_entries))
+ ? MAX_ALLOC_PER_TRANSACTION
+ : MAX_FREE_PER_TRANSACTION)) {
+ break;
+ }
+ retired++;
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+ }
+ ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
+ }
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+
+ {
+ std::lock_guard locker(m_lock);
+ wake_up_requested = this->m_wake_up_requested;
+ }
+ } while (wake_up_requested && --max_iterations > 0);
+
+ {
+ std::lock_guard locker(m_lock);
+ this->m_wake_up_scheduled = false;
+ /* Reschedule if it's still requested */
+ if (this->m_wake_up_requested) {
+ this->wake_up();
+ }
+ }
+}
+
+/*
+ * Flush the pmem regions for the data blocks of a set of operations
+ *
+ * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
+ */
+template <typename I>
+template <typename V>
+void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
+{
+ for (auto &operation : ops) {
+ operation->flush_pmem_buf_to_cache(m_log_pool);
+ }
+
+ /* Drain once for all */
+ pmemobj_drain(m_log_pool);
+
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (operation->reserved_allocated()) {
+ operation->buf_persist_comp_time = now;
+ } else {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+ }
+ }
+}
+
+/**
+ * Update/persist the last flushed sync point in the log
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
+{
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ uint64_t flushed_sync_gen;
+
+ std::lock_guard append_locker(this->m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = this->m_flushed_sync_gen;
+ }
+
+ if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+ ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
+ << D_RO(pool_root)->flushed_sync_gen << " to "
+ << flushed_sync_gen << dendl;
+ TX_BEGIN(m_log_pool) {
+ D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
+ ceph_assert(false);
+ } TX_FINALLY {
+ } TX_END;
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::reserve_pmem(C_BlockIORequestT *req,
+ bool &alloc_succeeds, bool &no_space) {
+ std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+ for (auto &buffer : buffers) {
+ utime_t before_reserve = ceph_clock_now();
+ buffer.buffer_oid = pmemobj_reserve(m_log_pool,
+ &buffer.buffer_alloc_action,
+ buffer.allocation_size,
+ 0 /* Object type */);
+ buffer.allocation_lat = ceph_clock_now() - before_reserve;
+ if (TOID_IS_NULL(buffer.buffer_oid)) {
+ if (!req->has_io_waited_for_buffers()) {
+ req->set_io_waited_for_entries(true);
+ }
+ ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
+ << pmemobj_errormsg() << ". "
+ << *req << dendl;
+ alloc_succeeds = false;
+ no_space = true; /* Entries need to be retired */
+ break;
+ } else {
+ buffer.allocated = true;
+ }
+ ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
+ << "." << buffer.buffer_oid.oid.off
+ << ", size=" << buffer.allocation_size << dendl;
+ }
+}
+
+template <typename I>
+bool ReplicatedWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+ bool alloc_succeeds = true;
+ uint64_t bytes_allocated = 0;
+ uint64_t bytes_cached = 0;
+ uint64_t bytes_dirtied = 0;
+ uint64_t num_lanes = 0;
+ uint64_t num_unpublished_reserves = 0;
+ uint64_t num_log_entries = 0;
+
+ ldout(m_image_ctx.cct, 20) << dendl;
+ // Setup buffer, and get all the number of required resources
+ req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
+ num_lanes, num_log_entries, num_unpublished_reserves);
+
+ alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, bytes_allocated,
+ num_lanes, num_log_entries, num_unpublished_reserves,
+ this->m_bytes_allocated_cap);
+
+ std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+ if (!alloc_succeeds) {
+ /* On alloc failure, free any buffers we did allocate */
+ for (auto &buffer : buffers) {
+ if (buffer.allocated) {
+ pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
+ }
+ }
+ }
+
+ req->set_allocated(alloc_succeeds);
+ return alloc_succeeds;
+}
+
} // namespace pwl
} // namespace cache
} // namespace librbd
template <typename ImageCtxT>
class ReplicatedWriteLog : public AbstractWriteLog<ImageCtxT> {
public:
- typedef io::Extent Extent;
- typedef io::Extents Extents;
-
- ReplicatedWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
+ ReplicatedWriteLog(
+ ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
~ReplicatedWriteLog();
ReplicatedWriteLog(const ReplicatedWriteLog&) = delete;
ReplicatedWriteLog &operator=(const ReplicatedWriteLog&) = delete;
using C_WriteSameRequestT = pwl::C_WriteSameRequest<This>;
using C_CompAndWriteRequestT = pwl::C_CompAndWriteRequest<This>;
+ PMEMobjpool *m_log_pool = nullptr;
+
+ void remove_pool_file();
+ void load_existing_entries(pwl::DeferredContexts &later);
+ void alloc_op_log_entries(pwl::GenericLogOperations &ops);
+ int append_op_log_entries(pwl::GenericLogOperations &ops);
+ void flush_then_append_scheduled_ops(void);
+ void enlist_op_flusher();
+ void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
+ template <typename V>
+ void flush_pmem_buffer(V& ops);
+
+protected:
+ using AbstractWriteLog<ImageCtxT>::m_lock;
+ using AbstractWriteLog<ImageCtxT>::m_log_entries;
+ using AbstractWriteLog<ImageCtxT>::m_image_ctx;
+ using AbstractWriteLog<ImageCtxT>::m_perfcounter;
+ using AbstractWriteLog<ImageCtxT>::m_ops_to_flush;
+ using AbstractWriteLog<ImageCtxT>::m_cache_state;
+ using AbstractWriteLog<ImageCtxT>::m_first_free_entry;
+ using AbstractWriteLog<ImageCtxT>::m_first_valid_entry;
+
+ void process_work() override;
+ void schedule_append_ops(pwl::GenericLogOperations &ops) override;
+ void append_scheduled_ops(void) override;
+ void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) override;
+ bool retire_entries(const unsigned long int frees_per_tx) override;
+ void persist_last_flushed_sync_gen() override;
+ bool alloc_resources(C_BlockIORequestT *req) override;
+ void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) override;
+ void setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush) override;
+ Context *construct_flush_entry_ctx(
+ const std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
+ void get_pool_name(const std::string log_poolset_name) override;
+ void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override;
+ void write_data_to_buffer(
+ std::shared_ptr<pwl::WriteLogEntry> ws_entry,
+ pwl::WriteLogPmemEntry *pmem_entry) override;
};
} // namespace pwl
template <typename T>
void C_WriteRequest<T>::schedule_append() {
ceph_assert(++m_appended == 1);
- if (m_do_early_flush) {
- /* This caller is waiting for persist, so we'll use their thread to
- * expedite it */
- pwl.flush_pmem_buffer(this->op_set->operations);
- pwl.schedule_append(this->op_set->operations);
- } else {
- /* This is probably not still the caller's thread, so do the payload
- * flushing/replicating later. */
- pwl.schedule_flush_and_append(this->op_set->operations);
- }
+ pwl.setup_schedule_append(this->op_set->operations, m_do_early_flush);
}
/**