}
/// Get ref to raw buffer
- bufferptr &get_bptr() {
+ virtual bufferptr &get_bptr() {
assert(ptr.has_value());
return *ptr;
}
- const bufferptr &get_bptr() const {
+ virtual const bufferptr &get_bptr() const {
assert(ptr.has_value());
return *ptr;
}
return extent_index_hook.is_linked();
}
- /// set bufferptr
- void set_bptr(ceph::bufferptr &&nptr) {
- ptr = nptr;
- }
-
/// hook for intrusive ref list (mainly dirty or lru list)
boost::intrusive::list_member_hook<> primary_ref_list_hook;
using primary_ref_list_member_options = boost::intrusive::member_hook<
poffset = offset;
}
+ /// set bufferptr
+ void set_bptr(ceph::bufferptr &&nptr) {
+ ptr = nptr;
+ }
+
/**
* maybe_generate_relative
*
}
};
+class overwrite_buf_t {
+public:
+ overwrite_buf_t() = default;
+ bool is_empty() const {
+ return changes.empty() && !has_cached_bptr();
+ }
+ bool has_cached_bptr() const {
+ return ptr.has_value();
+ }
+ void add(const block_delta_t &b) {
+ changes.push_back(b);
+ }
+ void apply_changes_to(bufferptr &b) const {
+ assert(!changes.empty());
+ for (auto p : changes) {
+ auto iter = p.bl.cbegin();
+ iter.copy(p.bl.length(), b.c_str() + p.offset);
+ }
+ changes.clear();
+ }
+ const bufferptr &get_cached_bptr(const bufferptr &_ptr) const {
+ apply_changes_to_cache(_ptr);
+ return *ptr;
+ }
+ bufferptr &get_cached_bptr(const bufferptr &_ptr) {
+ apply_changes_to_cache(_ptr);
+ return *ptr;
+ }
+ bufferptr &&move_cached_bptr() {
+ assert(has_cached_bptr());
+ apply_changes_to(*ptr);
+ return std::move(*ptr);
+ }
+private:
+ void apply_changes_to_cache(const bufferptr &_ptr) const {
+ assert(!is_empty());
+ if (!has_cached_bptr()) {
+ ptr = ceph::buffer::copy(_ptr.c_str(), _ptr.length());
+ }
+ if (!changes.empty()) {
+ apply_changes_to(*ptr);
+ }
+ }
+ mutable std::vector<block_delta_t> changes = {};
+ mutable std::optional<ceph::bufferptr> ptr = std::nullopt;
+};
+
struct ObjectDataBlock : crimson::os::seastore::LogicalCachedExtent {
using Ref = TCachedExtentRef<ObjectDataBlock>;
interval_set<extent_len_t> modified_region;
+ // to provide the local modified view during transaction
+ overwrite_buf_t cached_overwrites;
+
explicit ObjectDataBlock(ceph::bufferptr &&ptr)
: LogicalCachedExtent(std::move(ptr)) {}
- explicit ObjectDataBlock(const ObjectDataBlock &other)
- : LogicalCachedExtent(other), modified_region(other.modified_region) {}
+ explicit ObjectDataBlock(const ObjectDataBlock &other, share_buffer_t s)
+ : LogicalCachedExtent(other, s), modified_region(other.modified_region) {}
explicit ObjectDataBlock(extent_len_t length)
: LogicalCachedExtent(length) {}
CachedExtentRef duplicate_for_write(Transaction&) final {
- return CachedExtentRef(new ObjectDataBlock(*this));
+ return CachedExtentRef(new ObjectDataBlock(*this, share_buffer_t{}));
};
static constexpr extent_types_t TYPE = extent_types_t::OBJECT_DATA_BLOCK;
}
void overwrite(extent_len_t offset, bufferlist bl) {
- auto iter = bl.cbegin();
- iter.copy(bl.length(), get_bptr().c_str() + offset);
- delta.push_back({offset, bl.length(), bl});
+ block_delta_t b {offset, bl.length(), bl};
+ cached_overwrites.add(b);
+ delta.push_back(b);
modified_region.union_insert(offset, bl.length());
}
modified_region.clear();
}
+ void prepare_commit() final {
+ if (is_mutation_pending() || is_exist_mutation_pending()) {
+ ceph_assert(!cached_overwrites.is_empty());
+ if (cached_overwrites.has_cached_bptr()) {
+ set_bptr(cached_overwrites.move_cached_bptr());
+ } else {
+ // The optimized path to minimize data copy
+ cached_overwrites.apply_changes_to(CachedExtent::get_bptr());
+ }
+ } else {
+ assert(cached_overwrites.is_empty());
+ }
+ }
+
void logical_on_delta_write() final {
delta.clear();
}
+
+ bufferptr &get_bptr() override {
+ if (cached_overwrites.is_empty()) {
+ return CachedExtent::get_bptr();
+ } else {
+ return cached_overwrites.get_cached_bptr(CachedExtent::get_bptr());
+ }
+ }
+
+ const bufferptr &get_bptr() const override {
+ if (cached_overwrites.is_empty()) {
+ return CachedExtent::get_bptr();
+ } else {
+ return cached_overwrites.get_cached_bptr(CachedExtent::get_bptr());
+ }
+ }
};
using ObjectDataBlockRef = TCachedExtentRef<ObjectDataBlock>;
return ret;
}
+ using remap_entry = TransactionManager::remap_entry;
+ LBAMappingRef remap_pin(
+ Transaction &t,
+ LBAMappingRef &&opin,
+ extent_len_t new_offset,
+ extent_len_t new_len) {
+ auto pin = with_trans_intr(t, [&](auto& trans) {
+ return tm->remap_pin<ObjectDataBlock>(
+ trans, std::move(opin), std::array{
+ remap_entry(new_offset, new_len)}
+ ).si_then([](auto ret) {
+ return std::move(ret[0]);
+ });
+ }).handle_error(crimson::ct_error::eagain::handle([] {
+ LBAMappingRef t = nullptr;
+ return t;
+ }), crimson::ct_error::pass_further_all{}).unsafe_get0();
+ EXPECT_TRUE(pin);
+ return pin;
+ }
+
+ ObjectDataBlockRef get_extent(
+ Transaction &t,
+ laddr_t addr,
+ extent_len_t len) {
+ auto ext = with_trans_intr(t, [&](auto& trans) {
+ return tm->read_extent<ObjectDataBlock>(trans, addr, len);
+ }).unsafe_get0();
+ EXPECT_EQ(addr, ext->get_laddr());
+ return ext;
+ }
+
seastar::future<> set_up_fut() final {
onode = new TestOnode(
DEFAULT_OBJECT_DATA_RESERVATION,
});
}
+TEST_P(object_data_handler_test_t, overwrite_then_read_within_transaction) {
+ run_async([this] {
+ set_overwrite_threshold();
+ auto t = create_mutate_transaction();
+ auto base = 4096 * 4;
+ auto len = 4096 * 6;
+ write(*t, base, len, 'a');
+ submit_transaction(std::move(t));
+
+ t = create_mutate_transaction();
+ {
+ auto pins = get_mappings(base, len);
+ assert(pins.size() == 1);
+ auto pin1 = remap_pin(*t, std::move(pins.front()), 4096, 8192);
+ auto ext = get_extent(*t, base + 4096, 4096 * 2);
+ ASSERT_TRUE(ext->is_exist_clean());
+ ext = tm->get_mutable_extent(*t, ext)->cast<ObjectDataBlock>();
+
+ auto l = 4096;
+ memset(
+ known_contents.c_str() + base + 4096,
+ 'z',
+ l);
+ bufferlist bl;
+ bl.append(
+ bufferptr(
+ known_contents,
+ base + 4096,
+ l));
+
+ ext->overwrite(0, bl);
+ ASSERT_TRUE(ext->is_exist_mutation_pending());
+ }
+ submit_transaction(std::move(t));
+ read(base + 4096, 4096);
+ read(base + 4096, 8192);
+ restart();
+ epm->check_usage();
+ read(base + 4096, 8192);
+
+ t = create_mutate_transaction();
+ base = 0;
+ len = 4096 * 3;
+ write(*t, base, len, 'a');
+ submit_transaction(std::move(t));
+
+ t = create_mutate_transaction();
+ write(*t, base + 4096, 4096, 'b');
+ read(*t, base + 1024, 4096 + 1024);
+ write(*t, base + 8192, 4096, 'c');
+ read(*t, base + 2048, 8192);
+ write(*t, base, 4096, 'd');
+ write(*t, base + 4096, 4096, 'x');
+ submit_transaction(std::move(t));
+ read(base + 1024, 8192 - 1024);
+ read(base, 4096 * 3);
+ restart();
+ epm->check_usage();
+ read(base, 4096 * 3);
+
+ auto t1 = create_mutate_transaction();
+ write(*t1, base + 4096, 4096, 'e');
+ read(*t1, base + 4096, 4096);
+ auto t2 = create_read_transaction();
+ bufferlist committed = with_trans_intr(*t2, [&](auto &t) {
+ return ObjectDataHandler(MAX_OBJECT_SIZE).read(
+ ObjectDataHandler::context_t{
+ *tm,
+ t,
+ *onode
+ },
+ base + 4096,
+ 4096);
+ }).unsafe_get0();
+ bufferlist pending;
+ pending.append(
+ bufferptr(
+ known_contents,
+ base + 4096,
+ 4096));
+ EXPECT_EQ(committed.length(), pending.length());
+ EXPECT_NE(committed, pending);
+ unset_overwrite_threshold();
+ });
+}
+
INSTANTIATE_TEST_SUITE_P(
object_data_handler_test,
object_data_handler_test_t,