Every update request is mapped to a WriteLogPmemEntry log entry on PMEM, and these added classes
are mapped to their log entries in RAM.
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
if(WITH_RBD_RWL)
set(librbd_internal_srcs
${librbd_internal_srcs}
- cache/rwl/Types.cc
- cache/rwl/LogEntry.cc
cache/rwl/ImageCacheState.cc
+ cache/rwl/LogEntry.cc
+ cache/rwl/Types.cc
cache/ReplicatedWriteLog.cc)
endif()
m_image_ctx(image_ctx),
m_log_pool_config_size(DEFAULT_POOL_SIZE),
m_image_writeback(image_ctx),
- m_lock("librbd::cache::ReplicatedWriteLog::m_lock",
- true, true),
+ m_lock(ceph::make_mutex(util::unique_lock_name(
+ "librbd::cache::ReplicatedWriteLog::m_lock", this))),
m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl",
4,
""),
template <typename I>
void ReplicatedWriteLog<I>::arm_periodic_stats() {
- ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
if (m_periodic_stats_enabled) {
m_timer_ctx = new LambdaContext(
[this](int r) {
namespace rwl {
+class SyncPointLogEntry;
+class GeneralWriteLogEntry;
+class WriteLogEntry;
class GenericLogEntry;
+
+typedef std::list<std::shared_ptr<GeneralWriteLogEntry>> GeneralWriteLogEntries;
+typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
+/**** Write log entries end ****/
+
class DeferredContexts;
template <typename> class ImageCacheState;
} // namespace rwl
return ram_entry.is_writer();
}
-std::ostream &format(std::ostream &os, const GenericLogEntry &entry) {
- os << "ram_entry=[" << entry.ram_entry << "], "
- << "pmem_entry=" << (void*)entry.pmem_entry << ", "
- << "log_entry_index=" << entry.log_entry_index << ", "
- << "completed=" << entry.completed;
+std::ostream& GenericLogEntry::format(std::ostream &os) const {
+ os << "ram_entry=[" << ram_entry << "], "
+ << "pmem_entry=" << (void*)pmem_entry << ", "
+ << "log_entry_index=" << log_entry_index << ", "
+ << "completed=" << completed;
return os;
}
std::ostream &operator<<(std::ostream &os,
const GenericLogEntry &entry) {
- return entry.format(os, entry);
+ return entry.format(os);
+}
+
+std::ostream& SyncPointLogEntry::format(std::ostream &os) const {
+ os << "(Sync Point) ";
+ GenericLogEntry::format(os);
+ os << ", "
+ << "writes=" << writes << ", "
+ << "bytes=" << bytes << ", "
+ << "writes_completed=" << writes_completed << ", "
+ << "writes_flushed=" << writes_flushed << ", "
+ << "prior_sync_point_flushed=" << prior_sync_point_flushed << ", "
+ << "next_sync_point_entry=" << next_sync_point_entry;
+ return os;
+};
+
+std::ostream &operator<<(std::ostream &os,
+ const SyncPointLogEntry &entry) {
+ return entry.format(os);
+}
+
+std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const {
+ GenericLogEntry::format(os);
+ os << ", "
+ << "sync_point_entry=[";
+ if (sync_point_entry) {
+ os << *sync_point_entry;
+ } else {
+ os << "nullptr";
+ }
+ os << "], "
+ << "referring_map_entries=" << referring_map_entries << ", "
+ << "flushing=" << flushing << ", "
+ << "flushed=" << flushed;
+ return os;
+};
+
+std::ostream &operator<<(std::ostream &os,
+ const GeneralWriteLogEntry &entry) {
+ return entry.format(os);
+}
+
+void WriteLogEntry::init_pmem_bp() {
+ assert(!pmem_bp.have_raw());
+ pmem_bp = buffer::ptr(buffer::create_static(this->write_bytes(), (char*)pmem_buffer));
+}
+
+void WriteLogEntry::init_pmem_bl() {
+ pmem_bl.clear();
+ init_pmem_bp();
+ assert(pmem_bp.have_raw());
+ int before_bl = pmem_bp.raw_nref();
+ this->init_bl(pmem_bp, pmem_bl);
+ int after_bl = pmem_bp.raw_nref();
+ bl_refs = after_bl - before_bl;
+}
+
+unsigned int WriteLogEntry::reader_count() {
+ if (pmem_bp.have_raw()) {
+ return (pmem_bp.raw_nref() - bl_refs - 1);
+ } else {
+ return 0;
+ }
+}
+
+/* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
+buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) {
+ if (0 == bl_refs) {
+ std::lock_guard locker(entry_bl_lock);
+ if (0 == bl_refs) {
+ init_pmem_bl();
+ }
+ assert(0 != bl_refs);
+ }
+ return pmem_bl;
+};
+
+/* Constructs a new bl containing copies of pmem_bp */
+void WriteLogEntry::copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl) {
+ this->get_pmem_bl(entry_bl_lock);
+ /* pmem_bp is now initialized */
+ buffer::ptr cloned_bp(pmem_bp.clone());
+ out_bl->clear();
+ this->init_bl(cloned_bp, *out_bl);
+}
+
+std::ostream& WriteLogEntry::format(std::ostream &os) const {
+ os << "(Write) ";
+ GeneralWriteLogEntry::format(os);
+ os << ", "
+ << "pmem_buffer=" << (void*)pmem_buffer << ", ";
+ os << "pmem_bp=" << pmem_bp << ", ";
+ os << "pmem_bl=" << pmem_bl << ", ";
+ os << "bl_refs=" << bl_refs;
+ return os;
+};
+
+std::ostream &operator<<(std::ostream &os,
+ const WriteLogEntry &entry) {
+ return entry.format(os);
}
} // namespace rwl
#define CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
#include "librbd/cache/rwl/Types.h"
+#include <atomic>
+#include <memory>
namespace librbd {
namespace cache {
namespace rwl {
+class SyncPointLogEntry;
+class GeneralWriteLogEntry;
+class WriteLogEntry;
+
class GenericLogEntry {
public:
WriteLogPmemEntry ram_entry;
bool is_writesame();
bool is_write();
bool is_writer();
- virtual std::ostream &format(std::ostream &os, const GenericLogEntry &entry) const;
+ virtual const GenericLogEntry* get_log_entry() = 0;
+ virtual const SyncPointLogEntry* get_sync_point_log_entry() { return nullptr;}
+ virtual const GeneralWriteLogEntry* get_gen_write_log_entry() { return nullptr; }
+ virtual const WriteLogEntry* get_write_log_entry() { return nullptr; }
+ virtual std::ostream& format(std::ostream &os) const;
friend std::ostream &operator<<(std::ostream &os,
const GenericLogEntry &entry);
};
+class SyncPointLogEntry : public GenericLogEntry {
+public:
+ /* Writing entries using this sync gen number */
+ std::atomic<unsigned int> writes = {0};
+ /* Total bytes for all writing entries using this sync gen number */
+ std::atomic<uint64_t> bytes = {0};
+ /* Writing entries using this sync gen number that have completed */
+ std::atomic<unsigned int> writes_completed = {0};
+ /* Writing entries using this sync gen number that have completed flushing to the writeback interface */
+ std::atomic<unsigned int> writes_flushed = {0};
+ /* All writing entries using all prior sync gen numbers have been flushed */
+ std::atomic<bool> prior_sync_point_flushed = {true};
+ std::shared_ptr<SyncPointLogEntry> next_sync_point_entry = nullptr;
+ SyncPointLogEntry(const uint64_t sync_gen_number) {
+ ram_entry.sync_gen_number = sync_gen_number;
+ ram_entry.sync_point = 1;
+ };
+ SyncPointLogEntry(const SyncPointLogEntry&) = delete;
+ SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete;
+ virtual inline unsigned int write_bytes() { return 0; }
+ const GenericLogEntry* get_log_entry() override { return get_sync_point_log_entry(); }
+ const SyncPointLogEntry* get_sync_point_log_entry() override { return this; }
+ std::ostream& format(std::ostream &os) const;
+ friend std::ostream &operator<<(std::ostream &os,
+ const SyncPointLogEntry &entry);
+};
+
+class GeneralWriteLogEntry : public GenericLogEntry {
+public:
+ uint32_t referring_map_entries = 0;
+ bool flushing = false;
+ bool flushed = false; /* or invalidated */
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry;
+ GeneralWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ const uint64_t image_offset_bytes, const uint64_t write_bytes)
+ : GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(sync_point_entry) { }
+ GeneralWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
+ : GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(nullptr) { }
+ GeneralWriteLogEntry(const GeneralWriteLogEntry&) = delete;
+ GeneralWriteLogEntry &operator=(const GeneralWriteLogEntry&) = delete;
+ virtual inline unsigned int write_bytes() {
+ /* The valid bytes in this ops data buffer. Discard and WS override. */
+ return ram_entry.write_bytes;
+ };
+ virtual inline unsigned int bytes_dirty() {
+ /* The bytes in the image this op makes dirty. Discard and WS override. */
+ return write_bytes();
+ };
+ const BlockExtent block_extent() { return ram_entry.block_extent(); }
+ const GenericLogEntry* get_log_entry() override { return get_gen_write_log_entry(); }
+ const GeneralWriteLogEntry* get_gen_write_log_entry() override { return this; }
+ uint32_t get_map_ref() { return(referring_map_entries); }
+ void inc_map_ref() { referring_map_entries++; }
+ void dec_map_ref() { referring_map_entries--; }
+ std::ostream &format(std::ostream &os) const;
+ friend std::ostream &operator<<(std::ostream &os,
+ const GeneralWriteLogEntry &entry);
+};
+
+class WriteLogEntry : public GeneralWriteLogEntry {
+protected:
+ buffer::ptr pmem_bp;
+ buffer::list pmem_bl;
+ std::atomic<int> bl_refs = {0}; /* The refs held on pmem_bp by pmem_bl */
+
+ void init_pmem_bp();
+
+ /* Write same will override */
+ virtual void init_bl(buffer::ptr &bp, buffer::list &bl) {
+ bl.append(bp);
+ }
+
+ void init_pmem_bl();
+
+public:
+ uint8_t *pmem_buffer = nullptr;
+ WriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ const uint64_t image_offset_bytes, const uint64_t write_bytes)
+ : GeneralWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) { }
+ WriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
+ : GeneralWriteLogEntry(nullptr, image_offset_bytes, write_bytes) { }
+ WriteLogEntry(const WriteLogEntry&) = delete;
+ WriteLogEntry &operator=(const WriteLogEntry&) = delete;
+ const BlockExtent block_extent();
+ unsigned int reader_count();
+ /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
+ buffer::list &get_pmem_bl(ceph::mutex &entry_bl_lock);
+ /* Constructs a new bl containing copies of pmem_bp */
+ void copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl);
+ virtual const GenericLogEntry* get_log_entry() override { return get_write_log_entry(); }
+ const WriteLogEntry* get_write_log_entry() override { return this; }
+ std::ostream &format(std::ostream &os) const;
+ friend std::ostream &operator<<(std::ostream &os,
+ const WriteLogEntry &entry);
+};
+
} // namespace rwl
} // namespace cache
} // namespace librbd
contexts.push_back(ctx);
}
+/*
+ * A BlockExtent identifies a range by first and last.
+ *
+ * An Extent ("image extent") identifies a range by start and length.
+ *
+ * The ImageCache interface is defined in terms of image extents, and
+ * requires no alignment of the beginning or end of the extent. We
+ * convert between image and block extents here using a "block size"
+ * of 1.
+ */
+const BlockExtent block_extent(const uint64_t offset_bytes, const uint64_t length_bytes)
+{
+ return BlockExtent(offset_bytes,
+ offset_bytes + length_bytes - 1);
+}
+
+const BlockExtent WriteLogPmemEntry::block_extent() {
+ return BlockExtent(librbd::cache::rwl::block_extent(image_offset_bytes, write_bytes));
+}
+
bool WriteLogPmemEntry::is_sync_point() {
return sync_point;
}
#include <vector>
#include <libpmemobj.h>
+#include "librbd/BlockGuard.h"
class Context;
: image_offset_bytes(image_offset_bytes), write_bytes(write_bytes),
entry_valid(0), sync_point(0), sequenced(0), has_data(0), discard(0), writesame(0) {
}
+ const BlockExtent block_extent();
bool is_sync_point();
bool is_discard();
bool is_writesame();