*
*/
+#include <thread>
#include "include/mempool.h"
#include "include/demangle.h"
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
-#else
-// Thread local variables should save index, not &shard[index],
-// because shard[] is defined in the class
-static thread_local size_t thread_shard_index = mempool::num_shards;
-#endif
-
// default to debug_mode off
bool mempool::debug_mode = false;
// --------------------------------------------------------------
+namespace mempool {
+
+static size_t num_shard_bits;
+static size_t num_shards;
+
+std::unique_ptr<shard_t[]> shards = std::make_unique<shard_t[]>(get_num_shards());
+}
+
+size_t mempool::get_num_shards(void) {
+ static std::once_flag once;
+ std::call_once(once,[&]() {
+ unsigned int threads = std::thread::hardware_concurrency();
+ if (threads == 0) {
+ threads = DEFAULT_SHARDS;
+ }
+ threads = std::clamp<unsigned int>(threads, MIN_SHARDS, MAX_SHARDS);
+ threads--;
+ while (threads != 0) {
+ num_shard_bits++;
+ threads>>=1;
+ }
+ num_shards = 1 << num_shard_bits;
+ });
+ return num_shards;
+}
+
+// There are 2 implementations of pick_a_shard_int, SCHED GETCPU is
+// the preferred implementaion, ROUND ROBIN is used if sched_getcpu() is not
+// available.
+int mempool::pick_a_shard_int(void) {
+#if defined(MEMPOOL_SCHED_GETCPU)
+ // SCHED_GETCPU: Shards are assigned to CPU cores. Threads use sched_getcpu()
+ // to query the core before every access to the shard. Other than the (very
+ // rare) situation where a context switch occurs between calling
+ // sched_getcpu() and updating the shard there is no cache line
+ // contention
+ return sched_getcpu() & ((1 << num_shard_bits) - 1);
+#else
+ // ROUND_ROBIN: Static assignment of threads to shards using a round robin
+ // distribution. This minimizes the number of threads sharing the same shard,
+ // but threads sharing the same shard will cause cache line ping pong when
+ // the threads are running on different cores (likely)
+ static int thread_shard_next;
+ static std::mutex thread_shard_mtx;
+ static thread_local size_t thread_shard_index = MAX_SHARDS;
+
+ if (thread_shard_index == MAX_SHARDS) {
+ // Thread has not been assigned to a shard yet
+ std::lock_guard<std::mutex> lck (thread_shard_mtx);
+ thread_shard_index = thread_shard_next++ & ((1 << num_shard_bits) - 1);
+ }
+ return thread_shard_index;
+#endif
+}
+
mempool::pool_t& mempool::get_pool(mempool::pool_index_t ix)
{
// We rely on this array being initialized before any invocation of
// this function, even if it is called by ctors in other compilation
// units that are being initialized before this compilation unit.
static mempool::pool_t table[num_pools];
+ table[ix].pool_index = ix;
return table[ix];
}
size_t mempool::pool_t::allocated_bytes() const
{
ssize_t result = 0;
- for (size_t i = 0; i < num_shards; ++i) {
- result += shard[i].bytes;
+ for (size_t i = 0; i < get_num_shards(); ++i) {
+ result += shards[i].pool[pool_index].bytes;
}
if (result < 0) {
// we raced with some unbalanced allocations/deallocations
size_t mempool::pool_t::allocated_items() const
{
ssize_t result = 0;
- for (size_t i = 0; i < num_shards; ++i) {
- result += shard[i].items;
+ for (size_t i = 0; i < get_num_shards(); ++i) {
+ result += shards[i].pool[pool_index].items;
}
if (result < 0) {
// we raced with some unbalanced allocations/deallocations
void mempool::pool_t::adjust_count(ssize_t items, ssize_t bytes)
{
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
- // the expected path: we alway pick the shard for a cpu core
- // a thread is executing on.
- const size_t shard_index = pick_a_shard_int();
-#else
- // fallback for lack of sched_getcpu()
- const size_t shard_index = []() {
- if (thread_shard_index == num_shards) {
- thread_shard_index = pick_a_shard_int();
- }
- return thread_shard_index;
- }();
-#endif
- shard[shard_index].items += items;
- shard[shard_index].bytes += bytes;
+ const auto shid = pick_a_shard_int();
+ auto& shard = shards[shid].pool[pool_index];
+ shard.items += items;
+ shard.bytes += bytes;
}
void mempool::pool_t::get_stats(
stats_t *total,
std::map<std::string, stats_t> *by_type) const
{
- for (size_t i = 0; i < num_shards; ++i) {
- total->items += shard[i].items;
- total->bytes += shard[i].bytes;
+ for (size_t i = 0; i < get_num_shards(); ++i) {
+ total->items += shards[i].pool[pool_index].items;
+ total->bytes += shards[i].pool[pool_index].bytes;
}
if (debug_mode) {
std::lock_guard shard_lock(lock);
for (auto &p : type_map) {
std::string n = ceph_demangle(p.second.type_name);
stats_t &s = (*by_type)[n];
-#ifdef WITH_SEASTAR
s.bytes = 0;
s.items = 0;
- for (size_t i = 0 ; i < num_shards; ++i) {
+ for (size_t i = 0 ; i < get_num_shards(); ++i) {
s.bytes += p.second.shards[i].items * p.second.item_size;
s.items += p.second.shards[i].items;
}
-#else
- s.bytes = p.second.items * p.second.item_size;
- s.items = p.second.items;
-#endif
}
}
}
#include <boost/container/flat_set.hpp>
#include <boost/container/flat_map.hpp>
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
+#if defined(_GNU_SOURCE)
+# define MEMPOOL_SCHED_GETCPU
# include <sched.h>
#endif
#include "common/Formatter.h"
-#include "common/ceph_atomic.h"
#include "include/ceph_assert.h"
#include "include/compact_map.h"
#include "include/compact_set.h"
// --------------------------------------------------------------
class pool_t;
-// we shard pool stats across many shard_t's to reduce the amount
-// of cacheline ping pong.
enum {
- num_shard_bits = 5
-};
-enum {
- num_shards = 1 << num_shard_bits
-};
-
-static size_t pick_a_shard_int() {
-#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
- // a thread local storage is actually just an approximation;
- // what we truly want is a _cpu local storage_.
- //
- // on the architectures we care about sched_getcpu() is
- // a syscall-handled-in-userspace (vdso!). it grabs the cpu
- // id kernel exposes to a task on context switch.
- return sched_getcpu() & ((1 << num_shard_bits) - 1);
+#if defined(MEMPOOL_SCHED_GETCPU)
+ MIN_SHARDS = 1, //1
#else
- // Dirt cheap, see:
- // https://fossies.org/dox/glibc-2.32/pthread__self_8c_source.html
- size_t me = (size_t)pthread_self();
- size_t i = (me >> CEPH_PAGE_SHIFT) & ((1 << num_shard_bits) - 1);
- return i;
+ MIN_SHARDS = 1<<5, //32
#endif
-}
-
-//
-// Align shard to a cacheline.
-//
-// It would be possible to retrieve the value at runtime (for instance
-// with getconf LEVEL1_DCACHE_LINESIZE or grep -m1 cache_alignment
-// /proc/cpuinfo). It is easier to hard code the largest cache
-// linesize for all known processors (128 bytes). If the actual cache
-// linesize is smaller on a given processor, it will just waste a few
-// bytes.
-//
-struct shard_t {
- ceph::atomic<size_t> bytes = {0};
- ceph::atomic<size_t> items = {0};
- char __padding[128 - sizeof(ceph::atomic<size_t>)*2];
-} __attribute__ ((aligned (128)));
+ DEFAULT_SHARDS = 1<<5, //32
+ MAX_SHARDS = 1<<7 //128
+};
-static_assert(sizeof(shard_t) == 128, "shard_t should be cacheline-sized");
+int pick_a_shard_int(void);
+size_t get_num_shards(void);
struct stats_t {
- ssize_t items = 0;
- ssize_t bytes = 0;
+ std::atomic<size_t> items = {0};
+ std::atomic<size_t> bytes = {0};
+
void dump(ceph::Formatter *f) const {
f->dump_int("items", items);
f->dump_int("bytes", bytes);
}
};
+// Align shard to a cacheline, group stats for all mempools in the
+// same shard to improve cache line density.
+//
+// It would be possible to retrieve the value at runtime (for instance
+// with getconf LEVEL1_DCACHE_LINESIZE or grep -m1 cache_alignment
+// /proc/cpuinfo). It is easier to hard code the largest cache
+// linesize for all known processors (128 bytes). If the actual cache
+// linesize is smaller on a given processor, it will just waste a few
+// bytes.
+//
+struct shard_t {
+ stats_t pool[num_pools];
+} __attribute__ ((aligned (128)));
+static_assert(sizeof(shard_t)%128 == 0, "shard_t should be cacheline-sized");
+
+extern std::unique_ptr<shard_t[]> shards;
+
pool_t& get_pool(pool_index_t ix);
const char *get_pool_name(pool_index_t ix);
struct type_t {
const char *type_name;
size_t item_size;
-#ifdef WITH_SEASTAR
struct type_shard_t {
- ceph::atomic<ssize_t> items = {0}; // signed
- char __padding[128 - sizeof(ceph::atomic<ssize_t>)];
+ std::atomic<ssize_t> items = {0}; // signed
+ char __padding[128 - sizeof(std::atomic<ssize_t>)];
} __attribute__ ((aligned (128)));
static_assert(sizeof(type_shard_t) == 128,
"type_shard_t should be cacheline-sized");
- type_shard_t shards[num_shards];
-#else
-// XXX: consider dropping this case for classic with perf tests
- ceph::atomic<ssize_t> items = {0}; // signed
-#endif
+ std::unique_ptr<type_shard_t[]> shards = std::make_unique<type_shard_t[]>(get_num_shards());
};
struct type_info_hash {
};
class pool_t {
- shard_t shard[num_shards];
-
mutable std::mutex lock; // only used for types list
std::unordered_map<const char *, type_t> type_map;
template<pool_index_t, typename T>
friend class pool_allocator;
public:
+ pool_index_t pool_index;
+
//
// How much this pool consumes. O(<num_shards>)
//
T* allocate(size_t n, void *p = nullptr) {
size_t total = sizeof(T) * n;
const auto shid = pick_a_shard_int();
- auto& shard = pool->shard[shid];
+ auto& shard = shards[shid].pool[pool->pool_index];
shard.bytes += total;
shard.items += n;
if (type) {
-#ifdef WITH_SEASTAR
type->shards[shid].items += n;
-#else
- type->items += n;
-#endif
}
T* r = reinterpret_cast<T*>(new char[total]);
return r;
void deallocate(T* p, size_t n) {
size_t total = sizeof(T) * n;
const auto shid = pick_a_shard_int();
- auto& shard = pool->shard[shid];
+ auto& shard = shards[shid].pool[pool->pool_index];
shard.bytes -= total;
shard.items -= n;
if (type) {
-#ifdef WITH_SEASTAR
type->shards[shid].items -= n;
-#else
- type->items -= n;
-#endif
}
delete[] reinterpret_cast<char*>(p);
}
T* allocate_aligned(size_t n, size_t align, void *p = nullptr) {
size_t total = sizeof(T) * n;
const auto shid = pick_a_shard_int();
- auto& shard = pool->shard[shid];
+ auto& shard = shards[shid].pool[pool->pool_index];
shard.bytes += total;
shard.items += n;
if (type) {
-#ifdef WITH_SEASTAR
type->shards[shid].items += n;
-#else
- type->items += n;
-#endif
}
char *ptr;
int rc = ::posix_memalign((void**)(void*)&ptr, align, total);
void deallocate_aligned(T* p, size_t n) {
size_t total = sizeof(T) * n;
const auto shid = pick_a_shard_int();
- auto& shard = pool->shard[shid];
+ auto& shard = shards[shid].pool[pool->pool_index];
shard.bytes -= total;
shard.items -= n;
if (type) {
-#ifdef WITH_SEASTAR
type->shards[shid].items -= n;
-#else
- type->items -= n;
-#endif
}
aligned_free(p);
}
}
-mempool::shard_t shards[mempool::num_shards] = {0};
+mempool::shard_t *shards = new mempool::shard_t[mempool::get_num_shards()]();
void sigterm_handler(int signum)
{
size_t total = 0;
- for (auto& shard : shards) {
+ for (size_t i = 0; i < mempool::get_num_shards();i++) {
+ auto& shard = shards[i].pool[mempool::mempool_unittest_1];
total += shard.bytes;
}
std::cout << total << std::endl;
- exit(0);
+ signal(SIGTERM,SIG_DFL);
+ raise(SIGTERM);
}
int main(int argc, const char **argv)
} else {
i = 0;
}
- shards[i].bytes++;
+ shards[i].pool[mempool::mempool_unittest_1].bytes++;
}
}));
}
ASSERT_EQ(0, mempool::osd::allocated_bytes());
}
-#if !defined(__arm__) && !defined(__aarch64__)
TEST(mempool, check_shard_select)
{
- const size_t samples = mempool::num_shards * 100;
- std::atomic_int shards[mempool::num_shards] = {0};
+ const size_t samples = mempool::get_num_shards() * 30;
+
+ std::unique_ptr<std::atomic_int[]> shards =
+ std::make_unique<std::atomic_int[]>(mempool::get_num_shards());
+
std::vector<std::thread> workers;
for (size_t i = 0; i < samples; i++) {
workers.push_back(
}
workers.clear();
+#if !defined(MEMPOOL_SCHED_GETCPU)
size_t missed = 0;
- for (size_t i = 0; i < mempool::num_shards; i++) {
- if (shards[i] == 0) {
+ for (size_t i = 0; i < mempool::get_num_shards(); i++) {
+ if (shards[i] != 30) {
+ // Each shard is expected to have exactly 30 threads
missed++;
}
}
-
- // If more than half of the shards did not get anything,
- // the distribution is bad enough to deserve a failure.
- EXPECT_LT(missed, mempool::num_shards / 2);
-}
+ EXPECT_EQ(missed, 0u);
#endif
+ // Else: test_c2c.cc is a better test of the sharding algorithm
+}
int main(int argc, char **argv)