]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
src/common/mempool.cc: Choose shard with sched_getcpu() to reduce cache line thashing
authorBill Scales <bill_scales@uk.ibm.com>
Fri, 26 Jul 2024 09:11:25 +0000 (09:11 +0000)
committerBill Scales <bill_scales@uk.ibm.com>
Thu, 20 Mar 2025 07:46:07 +0000 (07:46 +0000)
Improve the performance of mempool sharding on systems with sched_getcpu()

The idea is to assign threads to shards based on which CPU core they are
executing on using sched_getcpu() to select the shard. All the threads
executing on the same CPU core share the same shard. If each CPU core has its
own shard there should be no cache line contention.

Fixes: https://tracker.ceph.com/issues/64100
Signed-off-by: Bill Scales <bill_scales@uk.ibm.com>
src/common/mempool.cc
src/include/mempool.h
src/test/test_c2c.cc
src/test/test_mempool.cc

index 9358727e148cd0ee7ffb36ddafed75fb311ab9b1..b16a68c062db3f8edc1506d76773800cd9fb1c7a 100644 (file)
  *
  */
 
+#include <thread>
 #include "include/mempool.h"
 #include "include/demangle.h"
 
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
-#else
-// Thread local variables should save index, not &shard[index],
-// because shard[] is defined in the class
-static thread_local size_t thread_shard_index = mempool::num_shards;
-#endif
-
 // default to debug_mode off
 bool mempool::debug_mode = false;
 
 // --------------------------------------------------------------
 
+namespace mempool {
+
+static size_t num_shard_bits;
+static size_t num_shards;
+
+std::unique_ptr<shard_t[]> shards = std::make_unique<shard_t[]>(get_num_shards());
+}
+
+size_t mempool::get_num_shards(void) {
+  static std::once_flag once;
+  std::call_once(once,[&]() {
+    unsigned int threads = std::thread::hardware_concurrency();
+    if (threads == 0) {
+      threads = DEFAULT_SHARDS;
+    }
+    threads = std::clamp<unsigned int>(threads, MIN_SHARDS, MAX_SHARDS);
+    threads--;
+    while (threads != 0) {
+      num_shard_bits++;
+      threads>>=1;
+    }
+    num_shards = 1 << num_shard_bits;
+  });
+  return num_shards;
+}
+
+// There are 2 implementations of pick_a_shard_int, SCHED GETCPU is
+// the preferred implementaion, ROUND ROBIN is used if sched_getcpu() is not
+// available.
+int mempool::pick_a_shard_int(void) {
+#if defined(MEMPOOL_SCHED_GETCPU)
+  // SCHED_GETCPU: Shards are assigned to CPU cores. Threads use sched_getcpu()
+  // to query the core before every access to the shard. Other than the (very
+  // rare) situation where a context switch occurs between calling
+  // sched_getcpu() and updating the shard there is no cache line
+  // contention
+  return sched_getcpu() & ((1 << num_shard_bits) - 1);
+#else
+  // ROUND_ROBIN: Static assignment of threads to shards using a round robin
+  // distribution. This minimizes the number of threads sharing the same shard,
+  // but threads sharing the same shard will cause cache line ping pong when
+  // the threads are running on different cores (likely)
+  static int thread_shard_next;
+  static std::mutex thread_shard_mtx;
+  static thread_local size_t thread_shard_index = MAX_SHARDS;
+
+  if (thread_shard_index == MAX_SHARDS) {
+    // Thread has not been assigned to a shard yet
+    std::lock_guard<std::mutex> lck (thread_shard_mtx);
+    thread_shard_index = thread_shard_next++ & ((1 << num_shard_bits) - 1);
+  }
+  return thread_shard_index;
+#endif
+}
+
 mempool::pool_t& mempool::get_pool(mempool::pool_index_t ix)
 {
   // We rely on this array being initialized before any invocation of
   // this function, even if it is called by ctors in other compilation
   // units that are being initialized before this compilation unit.
   static mempool::pool_t table[num_pools];
+  table[ix].pool_index = ix;
   return table[ix];
 }
 
@@ -73,8 +123,8 @@ void mempool::set_debug_mode(bool d)
 size_t mempool::pool_t::allocated_bytes() const
 {
   ssize_t result = 0;
-  for (size_t i = 0; i < num_shards; ++i) {
-    result += shard[i].bytes;
+  for (size_t i = 0; i < get_num_shards(); ++i) {
+    result += shards[i].pool[pool_index].bytes;
   }
   if (result < 0) {
     // we raced with some unbalanced allocations/deallocations
@@ -86,8 +136,8 @@ size_t mempool::pool_t::allocated_bytes() const
 size_t mempool::pool_t::allocated_items() const
 {
   ssize_t result = 0;
-  for (size_t i = 0; i < num_shards; ++i) {
-    result += shard[i].items;
+  for (size_t i = 0; i < get_num_shards(); ++i) {
+    result += shards[i].pool[pool_index].items;
   }
   if (result < 0) {
     // we raced with some unbalanced allocations/deallocations
@@ -98,47 +148,31 @@ size_t mempool::pool_t::allocated_items() const
 
 void mempool::pool_t::adjust_count(ssize_t items, ssize_t bytes)
 {
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
-  // the expected path: we alway pick the shard for a cpu core
-  // a thread is executing on.
-  const size_t shard_index = pick_a_shard_int();
-#else
-  // fallback for lack of sched_getcpu()
-  const size_t shard_index = []() {
-    if (thread_shard_index == num_shards) {
-      thread_shard_index = pick_a_shard_int();
-    }
-    return thread_shard_index;
-  }();
-#endif
-  shard[shard_index].items += items;
-  shard[shard_index].bytes += bytes;
+  const auto shid = pick_a_shard_int();
+  auto& shard = shards[shid].pool[pool_index];
+  shard.items += items;
+  shard.bytes += bytes;
 }
 
 void mempool::pool_t::get_stats(
   stats_t *total,
   std::map<std::string, stats_t> *by_type) const
 {
-  for (size_t i = 0; i < num_shards; ++i) {
-    total->items += shard[i].items;
-    total->bytes += shard[i].bytes;
+  for (size_t i = 0; i < get_num_shards(); ++i) {
+    total->items += shards[i].pool[pool_index].items;
+    total->bytes += shards[i].pool[pool_index].bytes;
   }
   if (debug_mode) {
     std::lock_guard shard_lock(lock);
     for (auto &p : type_map) {
       std::string n = ceph_demangle(p.second.type_name);
       stats_t &s = (*by_type)[n];
-#ifdef WITH_SEASTAR
       s.bytes = 0;
       s.items = 0;
-      for (size_t i = 0 ; i < num_shards; ++i) {
+      for (size_t i = 0 ; i < get_num_shards(); ++i) {
         s.bytes += p.second.shards[i].items * p.second.item_size;
         s.items += p.second.shards[i].items;
       }
-#else
-      s.bytes = p.second.items * p.second.item_size;
-      s.items = p.second.items;
-#endif
     }
   }
 }
index 4535bffcde78f8e5142de76a40cb6888625a0bf3..3910d8228cd00aa05f0fdac1a501dc88e83bff76 100644 (file)
 #include <boost/container/flat_set.hpp>
 #include <boost/container/flat_map.hpp>
 
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
+#if defined(_GNU_SOURCE)
+#  define MEMPOOL_SCHED_GETCPU
 #  include <sched.h>
 #endif
 
 #include "common/Formatter.h"
-#include "common/ceph_atomic.h"
 #include "include/ceph_assert.h"
 #include "include/compact_map.h"
 #include "include/compact_set.h"
@@ -196,54 +196,23 @@ extern void set_debug_mode(bool d);
 // --------------------------------------------------------------
 class pool_t;
 
-// we shard pool stats across many shard_t's to reduce the amount
-// of cacheline ping pong.
 enum {
-  num_shard_bits = 5
-};
-enum {
-  num_shards = 1 << num_shard_bits
-};
-
-static size_t pick_a_shard_int() {
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
-  // a thread local storage is actually just an approximation;
-  // what we truly want is a _cpu local storage_.
-  //
-  // on the architectures we care about sched_getcpu() is
-  // a syscall-handled-in-userspace (vdso!). it grabs the cpu
-  // id kernel exposes to a task on context switch.
-  return sched_getcpu() & ((1 << num_shard_bits) - 1);
+#if defined(MEMPOOL_SCHED_GETCPU)
+  MIN_SHARDS = 1,        //1
 #else
-  // Dirt cheap, see:
-  //   https://fossies.org/dox/glibc-2.32/pthread__self_8c_source.html
-  size_t me = (size_t)pthread_self();
-  size_t i = (me >> CEPH_PAGE_SHIFT) & ((1 << num_shard_bits) - 1);
-  return i;
+  MIN_SHARDS = 1<<5,     //32
 #endif
-}
-
-//
-// Align shard to a cacheline.
-//
-// It would be possible to retrieve the value at runtime (for instance
-// with getconf LEVEL1_DCACHE_LINESIZE or grep -m1 cache_alignment
-// /proc/cpuinfo). It is easier to hard code the largest cache
-// linesize for all known processors (128 bytes). If the actual cache
-// linesize is smaller on a given processor, it will just waste a few
-// bytes.
-//
-struct shard_t {
-  ceph::atomic<size_t> bytes = {0};
-  ceph::atomic<size_t> items = {0};
-  char __padding[128 - sizeof(ceph::atomic<size_t>)*2];
-} __attribute__ ((aligned (128)));
+  DEFAULT_SHARDS = 1<<5, //32
+  MAX_SHARDS = 1<<7      //128
+};
 
-static_assert(sizeof(shard_t) == 128, "shard_t should be cacheline-sized");
+int pick_a_shard_int(void);
+size_t get_num_shards(void);
 
 struct stats_t {
-  ssize_t items = 0;
-  ssize_t bytes = 0;
+  std::atomic<size_t> items = {0};
+  std::atomic<size_t> bytes = {0};
+
   void dump(ceph::Formatter *f) const {
     f->dump_int("items", items);
     f->dump_int("bytes", bytes);
@@ -256,24 +225,36 @@ struct stats_t {
   }
 };
 
+// Align shard to a cacheline, group stats for all mempools in the
+// same shard to improve cache line density.
+//
+// It would be possible to retrieve the value at runtime (for instance
+// with getconf LEVEL1_DCACHE_LINESIZE or grep -m1 cache_alignment
+// /proc/cpuinfo). It is easier to hard code the largest cache
+// linesize for all known processors (128 bytes). If the actual cache
+// linesize is smaller on a given processor, it will just waste a few
+// bytes.
+//
+struct shard_t {
+  stats_t pool[num_pools];
+} __attribute__ ((aligned (128)));
+static_assert(sizeof(shard_t)%128 == 0, "shard_t should be cacheline-sized");
+
+extern std::unique_ptr<shard_t[]> shards;
+
 pool_t& get_pool(pool_index_t ix);
 const char *get_pool_name(pool_index_t ix);
 
 struct type_t {
   const char *type_name;
   size_t item_size;
-#ifdef WITH_SEASTAR
   struct type_shard_t {
-    ceph::atomic<ssize_t> items = {0}; // signed
-    char __padding[128 - sizeof(ceph::atomic<ssize_t>)];
+    std::atomic<ssize_t> items = {0}; // signed
+    char __padding[128 - sizeof(std::atomic<ssize_t>)];
   } __attribute__ ((aligned (128)));
   static_assert(sizeof(type_shard_t) == 128,
                 "type_shard_t should be cacheline-sized");
-  type_shard_t shards[num_shards];
-#else
-// XXX: consider dropping this case for classic with perf tests
-  ceph::atomic<ssize_t> items = {0};  // signed
-#endif
+  std::unique_ptr<type_shard_t[]> shards = std::make_unique<type_shard_t[]>(get_num_shards());
 };
 
 struct type_info_hash {
@@ -283,14 +264,14 @@ struct type_info_hash {
 };
 
 class pool_t {
-  shard_t shard[num_shards];
-
   mutable std::mutex lock;  // only used for types list
   std::unordered_map<const char *, type_t> type_map;
 
   template<pool_index_t, typename T>
   friend class pool_allocator;
 public:
+  pool_index_t pool_index;
+
   //
   // How much this pool consumes. O(<num_shards>)
   //
@@ -362,15 +343,11 @@ public:
   T* allocate(size_t n, void *p = nullptr) {
     size_t total = sizeof(T) * n;
     const auto shid = pick_a_shard_int();
-    auto& shard = pool->shard[shid];
+    auto& shard = shards[shid].pool[pool->pool_index];
     shard.bytes += total;
     shard.items += n;
     if (type) {
-#ifdef WITH_SEASTAR
       type->shards[shid].items += n;
-#else
-      type->items += n;
-#endif
     }
     T* r = reinterpret_cast<T*>(new char[total]);
     return r;
@@ -379,15 +356,11 @@ public:
   void deallocate(T* p, size_t n) {
     size_t total = sizeof(T) * n;
     const auto shid = pick_a_shard_int();
-    auto& shard = pool->shard[shid];
+    auto& shard = shards[shid].pool[pool->pool_index];
     shard.bytes -= total;
     shard.items -= n;
     if (type) {
-#ifdef WITH_SEASTAR
       type->shards[shid].items -= n;
-#else
-      type->items -= n;
-#endif
     }
     delete[] reinterpret_cast<char*>(p);
   }
@@ -395,15 +368,11 @@ public:
   T* allocate_aligned(size_t n, size_t align, void *p = nullptr) {
     size_t total = sizeof(T) * n;
     const auto shid = pick_a_shard_int();
-    auto& shard = pool->shard[shid];
+    auto& shard = shards[shid].pool[pool->pool_index];
     shard.bytes += total;
     shard.items += n;
     if (type) {
-#ifdef WITH_SEASTAR
       type->shards[shid].items += n;
-#else
-      type->items += n;
-#endif
     }
     char *ptr;
     int rc = ::posix_memalign((void**)(void*)&ptr, align, total);
@@ -416,15 +385,11 @@ public:
   void deallocate_aligned(T* p, size_t n) {
     size_t total = sizeof(T) * n;
     const auto shid = pick_a_shard_int();
-    auto& shard = pool->shard[shid];
+    auto& shard = shards[shid].pool[pool->pool_index];
     shard.bytes -= total;
     shard.items -= n;
     if (type) {
-#ifdef WITH_SEASTAR
       type->shards[shid].items -= n;
-#else
-      type->items -= n;
-#endif
     }
     aligned_free(p);
   }
index 1569be305e58b3dfe5af12af45618a13ea53e8c3..7dd3a78d00a45963a85feb94ea559557647b403c 100644 (file)
@@ -19,16 +19,18 @@ static void usage(void)
 }
 
 
-mempool::shard_t shards[mempool::num_shards] = {0};
+mempool::shard_t *shards = new mempool::shard_t[mempool::get_num_shards()]();
 
 void sigterm_handler(int signum)
 {
   size_t total = 0;
-  for (auto& shard : shards) {
+  for (size_t i = 0; i < mempool::get_num_shards();i++) {
+    auto& shard = shards[i].pool[mempool::mempool_unittest_1];
     total += shard.bytes;
   }
   std::cout << total << std::endl;
-  exit(0);
+  signal(SIGTERM,SIG_DFL);
+  raise(SIGTERM);
 }
 
 int main(int argc, const char **argv)
@@ -74,7 +76,7 @@ int main(int argc, const char **argv)
            } else {
              i = 0;
            }
-           shards[i].bytes++;
+           shards[i].pool[mempool::mempool_unittest_1].bytes++;
          }
        }));
   }
index b806282d039937ef2b0fc4d7a44c19f4d6b591d6..cde8d293fdf4717eac2b09dd16d35127a43f4200 100644 (file)
@@ -406,11 +406,13 @@ TEST(mempool, btree_map_test)
   ASSERT_EQ(0, mempool::osd::allocated_bytes());
 }
 
-#if !defined(__arm__) && !defined(__aarch64__)
 TEST(mempool, check_shard_select)
 {
-  const size_t samples = mempool::num_shards * 100;
-  std::atomic_int shards[mempool::num_shards] = {0};
+  const size_t samples = mempool::get_num_shards() * 30;
+
+  std::unique_ptr<std::atomic_int[]> shards =
+    std::make_unique<std::atomic_int[]>(mempool::get_num_shards());
+
   std::vector<std::thread> workers;
   for (size_t i = 0; i < samples; i++) {
     workers.push_back(
@@ -424,18 +426,18 @@ TEST(mempool, check_shard_select)
   }
   workers.clear();
 
+#if !defined(MEMPOOL_SCHED_GETCPU)
   size_t missed = 0;
-  for (size_t i = 0; i < mempool::num_shards; i++) {
-    if (shards[i] == 0) {
+  for (size_t i = 0; i < mempool::get_num_shards(); i++) {
+    if (shards[i] != 30) {
+      // Each shard is expected to have exactly 30 threads
       missed++;
     }
   }
-
-  // If more than half of the shards did not get anything,
-  // the distribution is bad enough to deserve a failure.
-  EXPECT_LT(missed, mempool::num_shards / 2);
-}
+  EXPECT_EQ(missed, 0u);
 #endif
+  // Else: test_c2c.cc is a better test of the sharding algorithm
+}
 
 
 int main(int argc, char **argv)