]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common: introduce bl::_carriage to track writeable area.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 22 Oct 2018 16:19:52 +0000 (18:19 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 1 Feb 2019 21:54:51 +0000 (22:54 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/common/buffer.cc
src/include/buffer.h

index 4e309ac5eb7a649d468c66f1c8cfeb4c73790425..6a79d2431da72a13e2dee79e2d612f23a5947707 100644 (file)
@@ -996,6 +996,7 @@ static ceph::spinlock debug_lock;
 
   buffer::list::list(list&& other) noexcept
     : _buffers(std::move(other._buffers)),
+      _carriage(&always_empty_bptr),
       _len(other._len),
       _memcopy_count(other._memcopy_count),
       last_p(this) {
@@ -1006,6 +1007,7 @@ static ceph::spinlock debug_lock;
   {
     std::swap(_len, other._len);
     std::swap(_memcopy_count, other._memcopy_count);
+    std::swap(_carriage, other._carriage);
     _buffers.swap(other._buffers);
     //last_p.swap(other.last_p);
     last_p = begin();
@@ -1209,6 +1211,7 @@ static ceph::spinlock debug_lock;
   void buffer::list::rebuild()
   {
     if (_len == 0) {
+      _carriage = &always_empty_bptr;
       _buffers.clear_and_dispose();
       return;
     }
@@ -1227,8 +1230,10 @@ static ceph::spinlock debug_lock;
       pos += node.length();
     }
     _memcopy_count += pos;
+    _carriage = &always_empty_bptr;
     _buffers.clear_and_dispose();
     if (likely(nb->length())) {
+      _carriage = nb.get();
       _buffers.push_back(*nb.release());
     }
     invalidate_crc();
@@ -1308,6 +1313,7 @@ static ceph::spinlock debug_lock;
     if (get_append_buffer_unused_tail_length() < prealloc) {
       auto ptr = ptr_node::create(buffer::create_page_aligned(prealloc));
       ptr->set_length(0);   // unused, so far.
+      _carriage = ptr.get();
       _buffers.push_back(*ptr.release());
     }
   }
@@ -1327,6 +1333,8 @@ static ceph::spinlock debug_lock;
     if (!(flags & CLAIM_ALLOW_NONSHAREABLE))
       bl.make_shareable();
     _buffers.splice_back(bl._buffers);
+    bl._carriage = &always_empty_bptr;
+    bl._buffers.clear_and_dispose();
     bl._len = 0;
     bl.last_p = bl.begin();
   }
@@ -1391,12 +1399,19 @@ static ceph::spinlock debug_lock;
       auto buf = ptr_node::create(
        raw_combined::create(CEPH_BUFFER_APPEND_SIZE, 0, get_mempool()));
       buf->set_length(0);   // unused, so far.
+      _carriage = buf.get();
       _buffers.push_back(*buf.release());
+    } else if (unlikely(_carriage != &_buffers.back())) {
+      auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
+      _carriage = bptr.get();
+      _buffers.push_back(*bptr.release());
     }
-    _buffers.back().append(c);
+    _carriage->append(c);
     _len++;
   }
 
+  buffer::ptr buffer::list::always_empty_bptr;
+
   buffer::ptr_node& buffer::list::refill_append_space(const unsigned len)
   {
     // make a new buffer.  fill out a complete page, factoring in the
@@ -1407,6 +1422,7 @@ static ceph::spinlock debug_lock;
     auto new_back = \
       ptr_node::create(raw_combined::create(alen, 0, get_mempool()));
     new_back->set_length(0);   // unused, so far.
+    _carriage = new_back.get();
     _buffers.push_back(*new_back.release());
     return _buffers.back();
   }
@@ -1418,7 +1434,16 @@ static ceph::spinlock debug_lock;
     const unsigned free_in_last = get_append_buffer_unused_tail_length();
     const unsigned first_round = std::min(len, free_in_last);
     if (first_round) {
-      _buffers.back().append(data, first_round);
+      // _buffers and carriage can desynchronize when 1) a new ptr
+      // we don't own has been added into the _buffers 2) _buffers
+      // has been emptied as as a result of std::move or stolen by
+      // claim_append.
+      if (unlikely(_carriage != &_buffers.back())) {
+        auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
+       _carriage = bptr.get();
+       _buffers.push_back(*bptr.release());
+      }
+      _carriage->append(data, first_round);
     }
 
     const unsigned second_round = len - first_round;
@@ -1452,7 +1477,8 @@ static ceph::spinlock debug_lock;
       }
     }
     // add new item to list
-    push_back(ptr_node::create(bp, off, len));
+    _buffers.push_back(*ptr_node::create(bp, off, len).release());
+    _len += len;
   }
 
   void buffer::list::append(const list& bl)
@@ -1484,11 +1510,13 @@ static ceph::spinlock debug_lock;
       auto& new_back = refill_append_space(len);
       new_back.set_length(len);
       return { new_back.c_str() };
+    } else if (unlikely(_carriage != &_buffers.back())) {
+      auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
+      _carriage = bptr.get();
+      _buffers.push_back(*bptr.release());
     }
-
-    auto& cur_back = _buffers.back();
-    cur_back.set_length(cur_back.length() + len);
-    return { cur_back.end_c_str() - len };
+    _carriage->set_length(_carriage->length() + len);
+    return { _carriage->end_c_str() - len };
   }
 
   void buffer::list::prepend_zero(unsigned len)
@@ -1506,7 +1534,12 @@ static ceph::spinlock debug_lock;
     const unsigned free_in_last = get_append_buffer_unused_tail_length();
     const unsigned first_round = std::min(len, free_in_last);
     if (first_round) {
-      _buffers.back().append_zeros(first_round);
+      if (unlikely(_carriage != &_buffers.back())) {
+        auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
+       _carriage = bptr.get();
+       _buffers.push_back(*bptr.release());
+      }
+      _carriage->append_zeros(first_round);
     }
 
     const unsigned second_round = len - first_round;
@@ -1640,6 +1673,8 @@ static ceph::spinlock debug_lock;
       ++curbuf_prev;
     }
     
+    _carriage = &always_empty_bptr;
+
     while (len > 0) {
       // partial?
       if (off + len < (*curbuf).length()) {
index 6b2dc275744b149cb9d178122343bc32d4b35d34..334a594ab230fe2314d32d95844262c81d6afcf5 100644 (file)
@@ -178,7 +178,9 @@ namespace buffer CEPH_BUFFER_API {
    */
   class CEPH_BUFFER_API ptr {
     raw *_raw;
+  public: // dirty hack for testing; if it works, this will be abstracted
     unsigned _off, _len;
+  private:
 
     void release();
 
@@ -653,6 +655,11 @@ namespace buffer CEPH_BUFFER_API {
   private:
     // my private bits
     buffers_t _buffers;
+
+    // track bufferptr we can modify (especially ::append() to). Not all bptrs
+    // bufferlist holds have this trait -- if somebody ::push_back(const ptr&),
+    // he expects it won't change.
+    ptr* _carriage;
     unsigned _len;
     unsigned _memcopy_count; //the total of memcopy using rebuild().
 
@@ -780,7 +787,7 @@ namespace buffer CEPH_BUFFER_API {
          bp = buffer::create(len);
          pos = bp.c_str();
        } else {
-         pos = pbl->_buffers.back().end_c_str();
+         pos = pbl->_carriage->end_c_str();
        }
       }
 
@@ -959,18 +966,35 @@ namespace buffer CEPH_BUFFER_API {
   private:
     mutable iterator last_p;
 
+    // always_empty_bptr has no underlying raw but its _len is always 0.
+    // This is useful for e.g. get_append_buffer_unused_tail_length() as
+    // it allows to avoid conditionals on hot paths.
+    static ptr always_empty_bptr;
     ptr_node& refill_append_space(const unsigned len);
 
   public:
     // cons/des
-    list() : _len(0), _memcopy_count(0), last_p(this) {}
+    list()
+      : _carriage(&always_empty_bptr),
+        _len(0),
+        _memcopy_count(0),
+        last_p(this) {
+    }
+    // cppcheck-suppress noExplicitConstructor
     // cppcheck-suppress noExplicitConstructor
-    list(unsigned prealloc) : _len(0), _memcopy_count(0), last_p(this) {
+    list(unsigned prealloc)
+      : _carriage(&always_empty_bptr),
+        _len(0),
+        _memcopy_count(0),
+       last_p(this) {
       reserve(prealloc);
     }
 
-    list(const list& other) : _len(other._len),
-                             _memcopy_count(other._memcopy_count), last_p(this) {
+    list(const list& other)
+      : _carriage(&always_empty_bptr),
+        _len(other._len),
+        _memcopy_count(other._memcopy_count),
+        last_p(this) {
       _buffers.clone_from(other._buffers);
       make_shareable();
     }
@@ -982,6 +1006,7 @@ namespace buffer CEPH_BUFFER_API {
 
     list& operator= (const list& other) {
       if (this != &other) {
+        _carriage = &always_empty_bptr;
         _buffers.clone_from(other._buffers);
         _len = other._len;
        make_shareable();
@@ -990,6 +1015,7 @@ namespace buffer CEPH_BUFFER_API {
     }
     list& operator= (list&& other) noexcept {
       _buffers = std::move(other._buffers);
+      _carriage = other._carriage;
       _len = other._len;
       _memcopy_count = other._memcopy_count;
       last_p = begin();
@@ -1007,16 +1033,7 @@ namespace buffer CEPH_BUFFER_API {
     void try_assign_to_mempool(int pool);
 
     size_t get_append_buffer_unused_tail_length() const {
-      if (_buffers.empty()) {
-       return 0;
-      }
-
-      auto& buf = _buffers.back();
-      if (buf.raw_nref() != 1) {
-       return 0;
-      }
-
-      return buf.unused_tail_length();
+      return _carriage->unused_tail_length();
     }
 
     unsigned get_memcopy_count() const {return _memcopy_count; }
@@ -1054,6 +1071,7 @@ namespace buffer CEPH_BUFFER_API {
 
     // modifiers
     void clear() noexcept {
+      _carriage = &always_empty_bptr;
       _buffers.clear_and_dispose();
       _len = 0;
       _memcopy_count = 0;
@@ -1070,6 +1088,7 @@ namespace buffer CEPH_BUFFER_API {
        return;
       _len += bp.length();
       _buffers.push_back(*ptr_node::create(std::move(bp)).release());
+      _carriage = &always_empty_bptr;
     }
     void push_back(const ptr_node&) = delete;
     void push_back(ptr_node&) = delete;
@@ -1077,11 +1096,13 @@ namespace buffer CEPH_BUFFER_API {
     void push_back(std::unique_ptr<ptr_node, ptr_node::disposer> bp) {
       if (bp->length() == 0)
        return;
+      _carriage = bp.get();
       _len += bp->length();
       _buffers.push_back(*bp.release());
     }
     void push_back(raw* const r) {
       _buffers.push_back(*ptr_node::create(r).release());
+      _carriage = &_buffers.back();
       _len += _buffers.back().length();
     }
 
@@ -1123,9 +1144,10 @@ namespace buffer CEPH_BUFFER_API {
     {
       if (this != &bl) {
         clear();
-       for (const auto& pb : bl._buffers) {
-          push_back(static_cast<const ptr&>(pb));
+       for (const auto& bp : bl._buffers) {
+          _buffers.push_back(*ptr_node::create(bp).release());
         }
+        _len = bl._len;
       }
     }