From ded528e66f4bac477b448a81ef170b925aacadc4 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Mon, 22 Oct 2018 18:19:52 +0200 Subject: [PATCH] common: introduce bl::_carriage to track writeable area. Signed-off-by: Radoslaw Zarzynski --- src/common/buffer.cc | 51 +++++++++++++++++++++++++++++++++------- src/include/buffer.h | 56 ++++++++++++++++++++++++++++++-------------- 2 files changed, 82 insertions(+), 25 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 4e309ac5eb7..6a79d2431da 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -996,6 +996,7 @@ static ceph::spinlock debug_lock; buffer::list::list(list&& other) noexcept : _buffers(std::move(other._buffers)), + _carriage(&always_empty_bptr), _len(other._len), _memcopy_count(other._memcopy_count), last_p(this) { @@ -1006,6 +1007,7 @@ static ceph::spinlock debug_lock; { std::swap(_len, other._len); std::swap(_memcopy_count, other._memcopy_count); + std::swap(_carriage, other._carriage); _buffers.swap(other._buffers); //last_p.swap(other.last_p); last_p = begin(); @@ -1209,6 +1211,7 @@ static ceph::spinlock debug_lock; void buffer::list::rebuild() { if (_len == 0) { + _carriage = &always_empty_bptr; _buffers.clear_and_dispose(); return; } @@ -1227,8 +1230,10 @@ static ceph::spinlock debug_lock; pos += node.length(); } _memcopy_count += pos; + _carriage = &always_empty_bptr; _buffers.clear_and_dispose(); if (likely(nb->length())) { + _carriage = nb.get(); _buffers.push_back(*nb.release()); } invalidate_crc(); @@ -1308,6 +1313,7 @@ static ceph::spinlock debug_lock; if (get_append_buffer_unused_tail_length() < prealloc) { auto ptr = ptr_node::create(buffer::create_page_aligned(prealloc)); ptr->set_length(0); // unused, so far. + _carriage = ptr.get(); _buffers.push_back(*ptr.release()); } } @@ -1327,6 +1333,8 @@ static ceph::spinlock debug_lock; if (!(flags & CLAIM_ALLOW_NONSHAREABLE)) bl.make_shareable(); _buffers.splice_back(bl._buffers); + bl._carriage = &always_empty_bptr; + bl._buffers.clear_and_dispose(); bl._len = 0; bl.last_p = bl.begin(); } @@ -1391,12 +1399,19 @@ static ceph::spinlock debug_lock; auto buf = ptr_node::create( raw_combined::create(CEPH_BUFFER_APPEND_SIZE, 0, get_mempool())); buf->set_length(0); // unused, so far. + _carriage = buf.get(); _buffers.push_back(*buf.release()); + } else if (unlikely(_carriage != &_buffers.back())) { + auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); + _carriage = bptr.get(); + _buffers.push_back(*bptr.release()); } - _buffers.back().append(c); + _carriage->append(c); _len++; } + buffer::ptr buffer::list::always_empty_bptr; + buffer::ptr_node& buffer::list::refill_append_space(const unsigned len) { // make a new buffer. fill out a complete page, factoring in the @@ -1407,6 +1422,7 @@ static ceph::spinlock debug_lock; auto new_back = \ ptr_node::create(raw_combined::create(alen, 0, get_mempool())); new_back->set_length(0); // unused, so far. + _carriage = new_back.get(); _buffers.push_back(*new_back.release()); return _buffers.back(); } @@ -1418,7 +1434,16 @@ static ceph::spinlock debug_lock; const unsigned free_in_last = get_append_buffer_unused_tail_length(); const unsigned first_round = std::min(len, free_in_last); if (first_round) { - _buffers.back().append(data, first_round); + // _buffers and carriage can desynchronize when 1) a new ptr + // we don't own has been added into the _buffers 2) _buffers + // has been emptied as as a result of std::move or stolen by + // claim_append. + if (unlikely(_carriage != &_buffers.back())) { + auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); + _carriage = bptr.get(); + _buffers.push_back(*bptr.release()); + } + _carriage->append(data, first_round); } const unsigned second_round = len - first_round; @@ -1452,7 +1477,8 @@ static ceph::spinlock debug_lock; } } // add new item to list - push_back(ptr_node::create(bp, off, len)); + _buffers.push_back(*ptr_node::create(bp, off, len).release()); + _len += len; } void buffer::list::append(const list& bl) @@ -1484,11 +1510,13 @@ static ceph::spinlock debug_lock; auto& new_back = refill_append_space(len); new_back.set_length(len); return { new_back.c_str() }; + } else if (unlikely(_carriage != &_buffers.back())) { + auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); + _carriage = bptr.get(); + _buffers.push_back(*bptr.release()); } - - auto& cur_back = _buffers.back(); - cur_back.set_length(cur_back.length() + len); - return { cur_back.end_c_str() - len }; + _carriage->set_length(_carriage->length() + len); + return { _carriage->end_c_str() - len }; } void buffer::list::prepend_zero(unsigned len) @@ -1506,7 +1534,12 @@ static ceph::spinlock debug_lock; const unsigned free_in_last = get_append_buffer_unused_tail_length(); const unsigned first_round = std::min(len, free_in_last); if (first_round) { - _buffers.back().append_zeros(first_round); + if (unlikely(_carriage != &_buffers.back())) { + auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); + _carriage = bptr.get(); + _buffers.push_back(*bptr.release()); + } + _carriage->append_zeros(first_round); } const unsigned second_round = len - first_round; @@ -1640,6 +1673,8 @@ static ceph::spinlock debug_lock; ++curbuf_prev; } + _carriage = &always_empty_bptr; + while (len > 0) { // partial? if (off + len < (*curbuf).length()) { diff --git a/src/include/buffer.h b/src/include/buffer.h index 6b2dc275744..334a594ab23 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -178,7 +178,9 @@ namespace buffer CEPH_BUFFER_API { */ class CEPH_BUFFER_API ptr { raw *_raw; + public: // dirty hack for testing; if it works, this will be abstracted unsigned _off, _len; + private: void release(); @@ -653,6 +655,11 @@ namespace buffer CEPH_BUFFER_API { private: // my private bits buffers_t _buffers; + + // track bufferptr we can modify (especially ::append() to). Not all bptrs + // bufferlist holds have this trait -- if somebody ::push_back(const ptr&), + // he expects it won't change. + ptr* _carriage; unsigned _len; unsigned _memcopy_count; //the total of memcopy using rebuild(). @@ -780,7 +787,7 @@ namespace buffer CEPH_BUFFER_API { bp = buffer::create(len); pos = bp.c_str(); } else { - pos = pbl->_buffers.back().end_c_str(); + pos = pbl->_carriage->end_c_str(); } } @@ -959,18 +966,35 @@ namespace buffer CEPH_BUFFER_API { private: mutable iterator last_p; + // always_empty_bptr has no underlying raw but its _len is always 0. + // This is useful for e.g. get_append_buffer_unused_tail_length() as + // it allows to avoid conditionals on hot paths. + static ptr always_empty_bptr; ptr_node& refill_append_space(const unsigned len); public: // cons/des - list() : _len(0), _memcopy_count(0), last_p(this) {} + list() + : _carriage(&always_empty_bptr), + _len(0), + _memcopy_count(0), + last_p(this) { + } + // cppcheck-suppress noExplicitConstructor // cppcheck-suppress noExplicitConstructor - list(unsigned prealloc) : _len(0), _memcopy_count(0), last_p(this) { + list(unsigned prealloc) + : _carriage(&always_empty_bptr), + _len(0), + _memcopy_count(0), + last_p(this) { reserve(prealloc); } - list(const list& other) : _len(other._len), - _memcopy_count(other._memcopy_count), last_p(this) { + list(const list& other) + : _carriage(&always_empty_bptr), + _len(other._len), + _memcopy_count(other._memcopy_count), + last_p(this) { _buffers.clone_from(other._buffers); make_shareable(); } @@ -982,6 +1006,7 @@ namespace buffer CEPH_BUFFER_API { list& operator= (const list& other) { if (this != &other) { + _carriage = &always_empty_bptr; _buffers.clone_from(other._buffers); _len = other._len; make_shareable(); @@ -990,6 +1015,7 @@ namespace buffer CEPH_BUFFER_API { } list& operator= (list&& other) noexcept { _buffers = std::move(other._buffers); + _carriage = other._carriage; _len = other._len; _memcopy_count = other._memcopy_count; last_p = begin(); @@ -1007,16 +1033,7 @@ namespace buffer CEPH_BUFFER_API { void try_assign_to_mempool(int pool); size_t get_append_buffer_unused_tail_length() const { - if (_buffers.empty()) { - return 0; - } - - auto& buf = _buffers.back(); - if (buf.raw_nref() != 1) { - return 0; - } - - return buf.unused_tail_length(); + return _carriage->unused_tail_length(); } unsigned get_memcopy_count() const {return _memcopy_count; } @@ -1054,6 +1071,7 @@ namespace buffer CEPH_BUFFER_API { // modifiers void clear() noexcept { + _carriage = &always_empty_bptr; _buffers.clear_and_dispose(); _len = 0; _memcopy_count = 0; @@ -1070,6 +1088,7 @@ namespace buffer CEPH_BUFFER_API { return; _len += bp.length(); _buffers.push_back(*ptr_node::create(std::move(bp)).release()); + _carriage = &always_empty_bptr; } void push_back(const ptr_node&) = delete; void push_back(ptr_node&) = delete; @@ -1077,11 +1096,13 @@ namespace buffer CEPH_BUFFER_API { void push_back(std::unique_ptr bp) { if (bp->length() == 0) return; + _carriage = bp.get(); _len += bp->length(); _buffers.push_back(*bp.release()); } void push_back(raw* const r) { _buffers.push_back(*ptr_node::create(r).release()); + _carriage = &_buffers.back(); _len += _buffers.back().length(); } @@ -1123,9 +1144,10 @@ namespace buffer CEPH_BUFFER_API { { if (this != &bl) { clear(); - for (const auto& pb : bl._buffers) { - push_back(static_cast(pb)); + for (const auto& bp : bl._buffers) { + _buffers.push_back(*ptr_node::create(bp).release()); } + _len = bl._len; } } -- 2.39.5