From: Radoslaw Zarzynski Date: Tue, 28 Jan 2020 11:39:56 +0000 (+0100) Subject: include, common: introduce unsigned-typed _num to bufferlist. X-Git-Tag: v15.1.1~496^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a99cce08526510d136eb97b0ff656936a2153f46;p=ceph.git include, common: introduce unsigned-typed _num to bufferlist. This commit introduces `_num` field for counting the number of nodes a bufferlist instance stores. It duplicates `_size` in bl's `_buffers` with one but principal difference: space overhead. `_size` is typed as `size_t` while `bufferlist::_len` is and always was `unsigned`. On my x86 platform this means 64 bits vs 32 bits. After the change `sizeof(bufferlist)` is still `40 bytes`. Further commit will will drop `_size` shrinking bufferlist to just `32` bytes which is exactly half of cache-line on nowadays x86. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/common/buffer.cc b/src/common/buffer.cc index b55a0aa488e0..f62351958a5c 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -925,13 +925,15 @@ static ceph::spinlock debug_lock; buffer::list::list(list&& other) noexcept : _buffers(std::move(other._buffers)), _carriage(&always_empty_bptr), - _len(other._len) { + _len(other._len), + _num(other._num) { other.clear(); } void buffer::list::swap(list& other) noexcept { std::swap(_len, other._len); + std::swap(_num, other._num); std::swap(_carriage, other._carriage); _buffers.swap(other._buffers); } @@ -1090,7 +1092,7 @@ static ceph::spinlock debug_lock; bool buffer::list::is_contiguous() const { - return _buffers.size() <= 1; + return _num <= 1; } bool buffer::list::is_n_page_sized() const @@ -1127,11 +1129,11 @@ static ceph::spinlock debug_lock; uint64_t buffer::list::get_wasted_space() const { - if (_buffers.size() == 1) + if (_num == 1) return _buffers.back().wasted(); std::vector raw_vec; - raw_vec.reserve(_buffers.size()); + raw_vec.reserve(_num); for (const auto& p : _buffers) raw_vec.push_back(p._raw); std::sort(raw_vec.begin(), raw_vec.end()); @@ -1156,6 +1158,7 @@ static ceph::spinlock debug_lock; if (_len == 0) { _carriage = &always_empty_bptr; _buffers.clear_and_dispose(); + _num = 0; return; } if ((_len & ~CEPH_PAGE_MASK) == 0) @@ -1172,11 +1175,14 @@ static ceph::spinlock debug_lock; nb->copy_in(pos, node.length(), node.c_str(), false); pos += node.length(); } - _carriage = &always_empty_bptr; _buffers.clear_and_dispose(); if (likely(nb->length())) { _carriage = nb.get(); _buffers.push_back(*nb.release()); + _num = 1; + } else { + _carriage = &always_empty_bptr; + _num = 0; } invalidate_crc(); } @@ -1192,8 +1198,7 @@ static ceph::spinlock debug_lock; { bool had_to_rebuild = false; - if (max_buffers && _buffers.size() > max_buffers - && _len > (max_buffers * align_size)) { + if (max_buffers && _num > max_buffers && _len > (max_buffers * align_size)) { align_size = round_up_to(round_up_to(_len, max_buffers) / max_buffers, align_size); } auto p = std::begin(_buffers); @@ -1223,8 +1228,10 @@ static ceph::spinlock debug_lock; offset += p->length(); // no need to reallocate, relinking is enough thankfully to bi::list. auto p_after = _buffers.erase_after(p_prev); + _num -= 1; unaligned._buffers.push_back(*p); unaligned._len += p->length(); + unaligned._num += 1; p = p_after; } while (p != std::end(_buffers) && (!p->is_aligned(align_memory) || @@ -1237,6 +1244,7 @@ static ceph::spinlock debug_lock; had_to_rebuild = true; } _buffers.insert_after(p_prev, *ptr_node::create(unaligned._buffers.front()).release()); + _num += 1; ++p_prev; } return had_to_rebuild; @@ -1254,6 +1262,7 @@ static ceph::spinlock debug_lock; ptr->set_length(0); // unused, so far. _carriage = ptr.get(); _buffers.push_back(*ptr.release()); + _num += 1; } } @@ -1269,10 +1278,12 @@ static ceph::spinlock debug_lock; { // steal the other guy's buffers _len += bl._len; + _num += bl._num; _buffers.splice_back(bl._buffers); bl._carriage = &always_empty_bptr; bl._buffers.clear_and_dispose(); bl._len = 0; + bl._num = 0; } void buffer::list::claim_append_piecewise(list& bl) @@ -1295,10 +1306,12 @@ static ceph::spinlock debug_lock; buf->set_length(0); // unused, so far. _carriage = buf.get(); _buffers.push_back(*buf.release()); + _num += 1; } else if (unlikely(_carriage != &_buffers.back())) { auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); _carriage = bptr.get(); _buffers.push_back(*bptr.release()); + _num += 1; } _carriage->append(c); _len++; @@ -1318,6 +1331,7 @@ static ceph::spinlock debug_lock; new_back->set_length(0); // unused, so far. _carriage = new_back.get(); _buffers.push_back(*new_back.release()); + _num += 1; return _buffers.back(); } @@ -1336,6 +1350,7 @@ static ceph::spinlock debug_lock; auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); _carriage = bptr.get(); _buffers.push_back(*bptr.release()); + _num += 1; } _carriage->append(data, first_round); } @@ -1363,6 +1378,7 @@ static ceph::spinlock debug_lock; buffer::ptr_node::create(buffer::create(len)).release(); new_back->set_length(0); // unused, so far. _buffers.push_back(*new_back); + _num += 1; _carriage = new_back; return { new_back->c_str(), &new_back->_len, &_len }; } else { @@ -1370,6 +1386,7 @@ static ceph::spinlock debug_lock; auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); _carriage = bptr.get(); _buffers.push_back(*bptr.release()); + _num += 1; } return { _carriage->end_c_str(), &_carriage->_len, &_len }; } @@ -1400,11 +1417,13 @@ static ceph::spinlock debug_lock; // add new item to list _buffers.push_back(*ptr_node::create(bp, off, len).release()); _len += len; + _num += 1; } void buffer::list::append(const list& bl) { _len += bl._len; + _num += bl._num; for (const auto& node : bl._buffers) { _buffers.push_back(*ptr_node::create(node).release()); } @@ -1435,6 +1454,7 @@ static ceph::spinlock debug_lock; auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); _carriage = bptr.get(); _buffers.push_back(*bptr.release()); + _num += 1; } _carriage->set_length(_carriage->length() + len); return { _carriage->end_c_str() - len }; @@ -1445,6 +1465,7 @@ static ceph::spinlock debug_lock; auto bp = ptr_node::create(len); bp->zero(false); _len += len; + _num += 1; _buffers.push_front(*bp.release()); } @@ -1459,6 +1480,7 @@ static ceph::spinlock debug_lock; auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0); _carriage = bptr.get(); _buffers.push_back(*bptr.release()); + _num += 1; } _carriage->append_zeros(first_round); } @@ -1541,6 +1563,7 @@ static ceph::spinlock debug_lock; //cout << "copying partial of " << *curbuf << std::endl; _buffers.push_back(*ptr_node::create( *curbuf, off, len ).release()); _len += len; + _num += 1; break; } @@ -1549,6 +1572,7 @@ static ceph::spinlock debug_lock; unsigned howmuch = curbuf->length() - off; _buffers.push_back(*ptr_node::create( *curbuf, off, howmuch ).release()); _len += howmuch; + _num += 1; len -= howmuch; off = 0; ++curbuf; @@ -1591,6 +1615,7 @@ static ceph::spinlock debug_lock; _buffers.insert_after(curbuf_prev, *ptr_node::create(*curbuf, 0, off).release()); _len += off; + _num += 1; ++curbuf_prev; } @@ -1615,6 +1640,7 @@ static ceph::spinlock debug_lock; if (claim_by) claim_by->append( *curbuf, off, howmuch ); _len -= (*curbuf).length(); + _num -= 1; curbuf = _buffers.erase_after_and_dispose(curbuf_prev); len -= howmuch; off = 0; diff --git a/src/common/buffer_seastar.cc b/src/common/buffer_seastar.cc index fd535f268f38..0d4b0fd9960a 100644 --- a/src/common/buffer_seastar.cc +++ b/src/common/buffer_seastar.cc @@ -74,7 +74,7 @@ ptr::operator seastar::temporary_buffer() && list::operator seastar::net::packet() && { seastar::net::packet p; - p.reserve(_buffers.size()); + p.reserve(_num); for (auto& ptr : _buffers) { // append each ptr as a temporary_buffer p = seastar::net::packet(std::move(p), std::move(ptr)); diff --git a/src/include/buffer.h b/src/include/buffer.h index e6cb0907f21a..9cb14d49ab56 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -673,7 +673,7 @@ inline namespace v14_2_0 { // bufferlist holds have this trait -- if somebody ::push_back(const ptr&), // he expects it won't change. ptr* _carriage; - unsigned _len; + unsigned _len, _num; template class CEPH_BUFFER_API iterator_impl { @@ -953,19 +953,22 @@ inline namespace v14_2_0 { // cons/des list() : _carriage(&always_empty_bptr), - _len(0) { + _len(0), + _num(0) { } // cppcheck-suppress noExplicitConstructor // cppcheck-suppress noExplicitConstructor list(unsigned prealloc) : _carriage(&always_empty_bptr), - _len(0) { + _len(0), + _num(0) { reserve(prealloc); } list(const list& other) : _carriage(&always_empty_bptr), - _len(other._len) { + _len(other._len), + _num(other._num) { _buffers.clone_from(other._buffers); } list(list&& other) noexcept; @@ -979,6 +982,7 @@ inline namespace v14_2_0 { _carriage = &always_empty_bptr; _buffers.clone_from(other._buffers); _len = other._len; + _num = other._num; } return *this; } @@ -986,12 +990,20 @@ inline namespace v14_2_0 { _buffers = std::move(other._buffers); _carriage = other._carriage; _len = other._len; + _num = other._num; other.clear(); return *this; } uint64_t get_wasted_space() const; - unsigned get_num_buffers() const { return _buffers.size(); } + unsigned get_num_buffers() const { +#ifdef __CEPH__ + ceph_assert(_buffers.size() == _num); +#else + assert(_buffers.size() == _num); +#endif // __CEPH__ + return _num; + } const ptr_node& front() const { return _buffers.front(); } const ptr_node& back() const { return _buffers.back(); } @@ -1041,17 +1053,20 @@ inline namespace v14_2_0 { _carriage = &always_empty_bptr; _buffers.clear_and_dispose(); _len = 0; + _num = 0; } void push_back(const ptr& bp) { if (bp.length() == 0) return; _buffers.push_back(*ptr_node::create(bp).release()); _len += bp.length(); + _num += 1; } void push_back(ptr&& bp) { if (bp.length() == 0) return; _len += bp.length(); + _num += 1; _buffers.push_back(*ptr_node::create(std::move(bp)).release()); _carriage = &always_empty_bptr; } @@ -1063,6 +1078,7 @@ inline namespace v14_2_0 { return; _carriage = bp.get(); _len += bp->length(); + _num += 1; _buffers.push_back(*bp.release()); } void push_back(raw* const r) = delete; @@ -1070,6 +1086,7 @@ inline namespace v14_2_0 { _buffers.push_back(*ptr_node::create(std::move(r)).release()); _carriage = &_buffers.back(); _len += _buffers.back().length(); + _num += 1; } void zero(); @@ -1102,6 +1119,7 @@ inline namespace v14_2_0 { _buffers.push_back(*ptr_node::create(bp).release()); } _len = bl._len; + _num = bl._num; } } @@ -1184,11 +1202,11 @@ inline namespace v14_2_0 { template void prepare_iov(VectorT *piov) const { #ifdef __CEPH__ - ceph_assert(_buffers.size() <= IOV_MAX); + ceph_assert(_num <= IOV_MAX); #else - assert(_buffers.size() <= IOV_MAX); + assert(_num <= IOV_MAX); #endif - piov->resize(_buffers.size()); + piov->resize(_num); unsigned n = 0; for (auto& p : _buffers) { (*piov)[n].iov_base = (void *)p.c_str();