]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
interval_set: optimize intersection_of
authorZac Medico <zmedico@gmail.com>
Thu, 31 Aug 2017 03:59:32 +0000 (20:59 -0700)
committerZac Medico <zmedico@gmail.com>
Thu, 31 Aug 2017 03:59:32 +0000 (20:59 -0700)
Iterate over all elements of the smaller set, and use find_inc to
locate elements from the larger set in logarithmic time. This greatly
improves performance when one set is much larger than the other:

     2 +-+--+----+----+----+----+----+----+----+----+--+-+
P      +*                                                +
E      |*                                                |
R  1.8 +*                                                +
F      | *                                               |
O      | *                                               |
R  1.6 + *                                               +
M      | *                                               |
A      | *                                               |
N  1.4 + *                                               +
C      |  *                                              |
E      |  *                                              |
   1.2 +   *                                             +
R      |    *                                            |
A      |     *                                           |
T    1 +      ***                                        +
I      |         ******                                  |
O      +               ***********************************
   0.8 +-+--+----+----+----+----+----+----+----+----+--+-+
       0   0.1  0.2  0.3  0.4  0.5  0.6  0.7  0.8  0.9   1

                          SET SIZE RATIO

The above plot compares performance of the new intersection_size_asym
function to the existing intersection_of function. The performance of
intersection_size_asym gets worse as the set size ratio approaches 1.
For set size ratios where the performance ratio is greater than 1, the
performance of intersection_size_asym is superior. Therefore, this
patch only uses intersection_size_asym when the set size ratio is less
than or equal to 0.1 (code uses the reciprocal which is 10).

The plot was generated using benchmark results produced by the
following program:

#include <iostream>
#include <sys/timeb.h>
#include "include/interval_set.h"

int main()
{
  const int interval_count = 100000;
  const int interval_distance = 4;
  const int interval_size = 2;
  const int sample_count = 8;
  const int max_offset = interval_count * interval_distance;
  interval_set<int> a, b, intersection;

  for (int i = 0; i < max_offset; i+=interval_distance) {
    a.insert(i, interval_size);
  }

  for (int m = 1; m < 100; m++) {
    float ratio = 1 / float(m);

    for (int i = 0; i < max_offset; i+=interval_distance*m) {
      b.insert(i, interval_size);
    }

    struct timeb start, end;
    int ms = 0;
    for (int i = 0; i < sample_count; i++) {
      ftime(&start);
      intersection.intersection_of(a, b);
      ftime(&end);
      ms += (int) (1000.0 * (end.time - start.time)
        + (end.millitm - start.millitm));
      intersection.clear();
    }
    b.clear();

    std::cout << ratio << "\t" << ms << std::endl << std::flush;
  }
}

Signed-off-by: Zac Medico <zmedico@gmail.com>
src/include/interval_set.h

index b84d8187052d68eaa2a2c14706207ef3dc3883c5..8b309db8470ad45b89cc0098fa6a0bca9b510013 100644 (file)
@@ -235,6 +235,60 @@ class interval_set {
     }
     return p;
   }
+
+  void intersection_size_asym(const interval_set &s, const interval_set &l) {
+    typename decltype(m)::const_iterator ps = s.m.begin(), pl;
+    assert(ps != s.m.end());
+    T offset = ps->first, prev_offset;
+    bool first = true;
+    typename decltype(m)::iterator mi = m.begin();
+
+    while (1) {
+      if (first)
+        first = false;
+      else
+        assert(offset > prev_offset);
+      pl = l.find_inc(offset);
+      prev_offset = offset;
+      if (pl == l.m.end())
+        break;
+      while (ps != s.m.end() && ps->first + ps->second <= pl->first)
+        ++ps;
+      if (ps == s.m.end())
+        break;
+      offset = pl->first + pl->second;
+      if (offset <= ps->first) {
+        offset = ps->first;
+        continue;
+      }
+
+      if (*ps == *pl) {
+        do {
+          mi = m.insert(mi, *ps);
+          _size += ps->second;
+          ++ps;
+          ++pl;
+        } while (ps != s.m.end() && pl != l.m.end() && *ps == *pl);
+        if (ps == s.m.end())
+          break;
+        offset = ps->first;
+        continue;
+      }
+
+      T start = std::max<T>(ps->first, pl->first);
+      T en = std::min<T>(ps->first + ps->second, offset);
+      assert(en > start);
+      typename decltype(m)::value_type i{start, en - start};
+      mi = m.insert(mi, i);
+      _size += i.second;
+      if (ps->first + ps->second <= offset) {
+        ++ps;
+        if (ps == s.m.end())
+          break;
+        offset = ps->first;
+      }
+    }
+  }
   
  public:
   bool operator==(const interval_set& other) const {
@@ -461,6 +515,29 @@ class interval_set {
     assert(&b != this);
     clear();
 
+    const interval_set *s, *l;
+
+    if (a.size() < b.size()) {
+      s = &a;
+      l = &b;
+    } else {
+      s = &b;
+      l = &a;
+    }
+
+    if (!s->size())
+      return;
+
+    /*
+     * Use the lower_bound algorithm for larger size ratios
+     * where it performs better, but not for smaller size
+     * ratios where sequential search performs better.
+     */
+    if (l->size() / s->size() >= 10) {
+      intersection_size_asym(*s, *l);
+      return;
+    }
+
     typename std::map<T,T>::const_iterator pa = a.m.begin();
     typename std::map<T,T>::const_iterator pb = b.m.begin();