]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
include/frag: use more efficient small vectors
authorPatrick Donnelly <pdonnell@redhat.com>
Tue, 18 Dec 2018 16:46:23 +0000 (08:46 -0800)
committerPatrick Donnelly <pdonnell@redhat.com>
Tue, 18 Dec 2018 21:21:57 +0000 (13:21 -0800)
This is a performance refactor. The API now allows any external container type
but giving it a vector/stack is most efficient.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/include/frag.h

index 285f4775e36ca200a9eb8c270cf14eda9946d8d4..5e8b154f12fa1d2926f5ae87d72acb6c4884a715 100644 (file)
 #ifndef CEPH_FRAG_H
 #define CEPH_FRAG_H
 
-#include <stdint.h>
-#include <list>
+#include <boost/container/small_vector.hpp>
+
 #include <iostream>
+
+#include <stdint.h>
 #include <stdio.h>
 
 #include "buffer.h"
  *    matches the network/netmask concept.
  */
 
-typedef uint32_t _frag_t;
-
 class frag_t {
   /*
    * encoding is dictated by frag_* functions in ceph_fs.h.  use those
    * helpers _exclusively_.
    */
- public:
-  _frag_t _enc;  
+public:
+  using _frag_t = uint32_t;
   
-  frag_t() : _enc(0) { }
+  frag_t() = default;
   frag_t(unsigned v, unsigned b) : _enc(ceph_frag_make(b, v)) { }
   frag_t(_frag_t e) : _enc(e) { }
 
@@ -113,7 +113,8 @@ class frag_t {
     ceph_assert(i < (1<<nb));
     return frag_t(ceph_frag_make_child(_enc, nb, i));
   }
-  void split(int nb, std::list<frag_t>& fragments) const {
+  template<typename T>
+  void split(int nb, T& fragments) const {
     ceph_assert(nb > 0);
     unsigned nway = 1 << nb;
     for (unsigned i=0; i<nway; i++) 
@@ -149,6 +150,18 @@ class frag_t {
     }
     return false;
   }
+
+  void encode(bufferlist& bl) const {
+    encode_raw(_enc, bl);
+  }
+  void decode(bufferlist::const_iterator& p) {
+    __u32 v;
+    decode_raw(v, p);
+    _enc = v;
+  }
+
+private:
+  _frag_t _enc = 0;
 };
 
 inline std::ostream& operator<<(std::ostream& out, const frag_t& hb)
@@ -163,14 +176,10 @@ inline std::ostream& operator<<(std::ostream& out, const frag_t& hb)
   return out << '*';
 }
 
-inline void encode(const frag_t &f, bufferlist& bl) { encode_raw(f._enc, bl); }
-inline void decode(frag_t &f, bufferlist::const_iterator& p) {
-  __u32 v;
-  decode_raw(v, p);
-  f._enc = v;
-}
-
+inline void encode(const frag_t &f, bufferlist& bl) { f.encode(bl); }
+inline void decode(frag_t &f, bufferlist::const_iterator& p) { f.decode(p); }
 
+using frag_vec_t = boost::container::small_vector<frag_t, 4>;
 
 /**
  * fragtree_t -- partition an entire namespace into one or more frag_t's. 
@@ -207,37 +216,35 @@ public:
 
   
   bool is_leaf(frag_t x) const {
-    std::list<frag_t> ls;
-    get_leaves_under(x, ls);
+    frag_vec_t s;
+    get_leaves_under(x, s);
     //generic_dout(10) << "is_leaf(" << x << ") -> " << ls << dendl;
-    if (!ls.empty() &&
-       ls.front() == x &&
-       ls.size() == 1)
-      return true;
-    return false;
+    return s.size() == 1 && s.front() == x;
   }
 
   /**
    * get_leaves -- list all leaves
    */
-  void get_leaves(std::list<frag_t>& ls) const {
-    return get_leaves_under_split(frag_t(), ls);
+  template<typename T>
+  void get_leaves(T& c) const {
+    return get_leaves_under_split(frag_t(), c);
   }
 
   /**
    * get_leaves_under_split -- list all leaves under a known split point (or root)
    */
-  void get_leaves_under_split(frag_t under, std::list<frag_t>& ls) const {
-    std::list<frag_t> q;
-    q.push_back(under);
-    while (!q.empty()) {
-      frag_t t = q.back();
-      q.pop_back();
+  template<typename T>
+  void get_leaves_under_split(frag_t under, T& c) const {
+    frag_vec_t s;
+    s.push_back(under);
+    while (!s.empty()) {
+      frag_t t = s.back();
+      s.pop_back();
       int nb = get_split(t);
       if (nb) 
-       t.split(nb, q);   // queue up children
+       t.split(nb, s);   // queue up children
       else
-       ls.push_front(t);  // not spit, it's a leaf.
+       c.push_back(t);  // not spit, it's a leaf.
     }
   }
 
@@ -286,20 +293,21 @@ public:
   /**
    * get_leaves_under(x, ls) -- search for any leaves fully contained by x
    */
-  void get_leaves_under(frag_t x, std::list<frag_t>& ls) const {
-    std::list<frag_t> q;
-    q.push_back(get_branch_or_leaf(x));
-    while (!q.empty()) {
-      frag_t t = q.front();
-      q.pop_front();
+  template<typename T>
+  void get_leaves_under(frag_t x, T& c) const {
+    frag_vec_t s;
+    s.push_back(get_branch_or_leaf(x));
+    while (!s.empty()) {
+      frag_t t = s.back();
+      s.pop_back();
       if (t.bits() >= x.bits() &&    // if t is more specific than x, and
          !x.contains(t))            // x does not contain t,
        continue;         // then skip
       int nb = get_split(t);
       if (nb) 
-       t.split(nb, q);   // queue up children
+       t.split(nb, s);   // queue up children
       else if (x.contains(t))
-       ls.push_back(t);  // not spit, it's a leaf.
+       c.push_back(t);  // not spit, it's a leaf.
     }
   }
 
@@ -307,18 +315,18 @@ public:
    * contains(fg) -- does fragtree contain the specific frag @a x
    */
   bool contains(frag_t x) const {
-    std::list<frag_t> q;
-    q.push_back(get_branch(x));
-    while (!q.empty()) {
-      frag_t t = q.front();
-      q.pop_front();
+    frag_vec_t s;
+    s.push_back(get_branch(x));
+    while (!s.empty()) {
+      frag_t t = s.back();
+      s.pop_back();
       if (t.bits() >= x.bits() &&  // if t is more specific than x, and
          !x.contains(t))          // x does not contain t,
        continue;         // then skip 
       int nb = get_split(t);
       if (nb) {
        if (t == x) return false;  // it's split.
-       t.split(nb, q);   // queue up children
+       t.split(nb, s);   // queue up children
       } else {
        if (t == x) return true;   // it's there.
       }
@@ -378,22 +386,18 @@ public:
   void try_assimilate_children(frag_t x) {
     int nb = get_split(x);
     if (!nb) return;
-    std::list<frag_t> children;
+    frag_vec_t children;
     x.split(nb, children);
     int childbits = 0;
-    for (std::list<frag_t>::iterator p = children.begin();
-        p != children.end();
-        ++p) {
-      int cb = get_split(*p);
+    for (auto& frag : children) {
+      int cb = get_split(frag);
       if (!cb) return;  // nope.
       if (childbits && cb != childbits) return;  // not the same
       childbits = cb;
     }
     // all children are split with childbits!
-    for (std::list<frag_t>::iterator p = children.begin();
-        p != children.end();
-        ++p)
-      _splits.erase(*p);
+    for (auto& frag : children)
+      _splits.erase(frag);
     _splits[x] += childbits;
   }
 
@@ -425,28 +429,26 @@ public:
       merge(parent, nb, false);
       split(parent, spread, false);
 
-      std::list<frag_t> subs;
+      frag_vec_t subs;
       parent.split(spread, subs);
-      for (std::list<frag_t>::iterator p = subs.begin();
-          p != subs.end();
-          ++p) {
-       lgeneric_dout(cct, 10) << "splitting intermediate " << *p << " by " << (nb-spread) << dendl;
-       split(*p, nb - spread, false);
+      for (auto& frag : subs) {
+       lgeneric_dout(cct, 10) << "splitting intermediate " << frag << " by " << (nb-spread) << dendl;
+       split(frag, nb - spread, false);
       }
     }
 
     // x is now a leaf or split.  
     // hoover up any children.
-    std::list<frag_t> q;
-    q.push_back(x);
-    while (!q.empty()) {
-      frag_t t = q.front();
-      q.pop_front();
+    frag_vec_t s;
+    s.push_back(x);
+    while (!s.empty()) {
+      frag_t t = s.back();
+      s.pop_back();
       int nb = get_split(t);
       if (nb) {
        lgeneric_dout(cct, 10) << "merging child " << t << " by " << nb << dendl;
        merge(t, nb, false);    // merge this point, and
-       t.split(nb, q);         // queue up children
+       t.split(nb, s);         // queue up children
       }
     }
 
@@ -485,11 +487,11 @@ public:
 
   void print(std::ostream& out) {
     out << "fragtree_t(";
-    std::list<frag_t> q;
-    q.push_back(frag_t());
-    while (!q.empty()) {
-      frag_t t = q.front();
-      q.pop_front();
+    frag_vec_t s;
+    s.push_back(frag_t());
+    while (!s.empty()) {
+      frag_t t = s.back();
+      s.pop_back();
       // newline + indent?
       if (t.bits()) {
        out << std::endl;
@@ -498,7 +500,7 @@ public:
       int nb = get_split(t);
       if (nb) {
        out << t << " %" << nb;
-       t.split(nb, q);   // queue up children
+       t.split(nb, s);   // queue up children
       } else {
        out << t;
       }