From 8c07345d331495e0b76cbe02e95593f66a5fcc43 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 30 Jul 2021 12:44:52 +0800 Subject: [PATCH] common: add ceph::fair_mutex a mutex which enqueues and wakes up the waiters in FIFO order, to ensure the fairness of the mutex. Signed-off-by: Kefu Chai --- src/common/fair_mutex.h | 54 +++++++++++++++++++++++ src/test/common/CMakeLists.txt | 5 +++ src/test/common/test_fair_mutex.cc | 71 ++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+) create mode 100644 src/common/fair_mutex.h create mode 100644 src/test/common/test_fair_mutex.cc diff --git a/src/common/fair_mutex.h b/src/common/fair_mutex.h new file mode 100644 index 0000000000000..17785053dd3d2 --- /dev/null +++ b/src/common/fair_mutex.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include "common/ceph_mutex.h" + +#include + +namespace ceph { +/// a FIFO mutex +class fair_mutex { +public: + fair_mutex(const std::string& name) + : mutex{ceph::make_mutex(name)} + {} + ~fair_mutex() = default; + fair_mutex(const fair_mutex&) = delete; + fair_mutex& operator=(const fair_mutex&) = delete; + + void lock() + { + std::unique_lock lock(mutex); + const unsigned my_id = next_id++; + cond.wait(lock, [&] { + return my_id == unblock_id; + }); + } + + bool try_lock() + { + std::lock_guard lock(mutex); + if (is_locked()) { + return false; + } + ++next_id; + return true; + } + + void unlock() + { + std::lock_guard lock(mutex); + ++unblock_id; + cond.notify_all(); + } + + bool is_locked() const + { + return next_id != unblock_id; + } +private: + unsigned next_id = 0; + unsigned unblock_id = 0; + ceph::condition_variable cond; + ceph::mutex mutex; +}; +} // namespace ceph diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 07b361a072912..6d5780a983935 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -283,6 +283,11 @@ add_executable(unittest_shunique_lock add_ceph_unittest(unittest_shunique_lock) target_link_libraries(unittest_shunique_lock ceph-common) +add_executable(unittest_fair_mutex + test_fair_mutex.cc) +add_ceph_unittest(unittest_fair_mutex) +target_link_libraries(unittest_fair_mutex ceph-common) + # unittest_perf_histogram add_executable(unittest_perf_histogram test_perf_histogram.cc diff --git a/src/test/common/test_fair_mutex.cc b/src/test/common/test_fair_mutex.cc new file mode 100644 index 0000000000000..1ee8b8ca4ac84 --- /dev/null +++ b/src/test/common/test_fair_mutex.cc @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +#include +#include +#include +#include +#include +#include "common/fair_mutex.h" + +TEST(FairMutex, simple) +{ + ceph::fair_mutex mutex{"fair::simple"}; + { + std::unique_lock lock{mutex}; + ASSERT_TRUE(mutex.is_locked()); + // fair_mutex does not recursive ownership semantics + ASSERT_FALSE(mutex.try_lock()); + } + // re-acquire the lock + { + std::unique_lock lock{mutex}; + ASSERT_TRUE(mutex.is_locked()); + } + ASSERT_FALSE(mutex.is_locked()); +} + +TEST(FairMutex, fair) +{ + // waiters are queued in FIFO order, and they are woken up in the same order + // we have a marathon participated by multiple teams: + // - each team is represented by a thread. + // - each team should have equal chance of being selected and scoring, assuming + // the runners in each team are distributed evenly in the waiting queue. + ceph::fair_mutex mutex{"fair::fair"}; + const int NR_TEAMS = 2; + std::array scoreboard{0, 0}; + const int NR_ROUNDS = 256; + auto play = [&](int team) { + for (int i = 0; i < NR_ROUNDS; i++) { + std::unique_lock lock{mutex}; + // pretent that i am running.. and it takes time + std::this_thread::sleep_for(std::chrono::microseconds(20)); + // score! + scoreboard[team]++; + // fair? + unsigned total = std::accumulate(scoreboard.begin(), + scoreboard.end(), + 0); + for (unsigned score : scoreboard) { + if (total < NR_ROUNDS) { + // not quite statistically significant. to reduce the false positive, + // just consider it fair + continue; + } + // check if any team is donimating the game. + unsigned avg = total / scoreboard.size(); + // leave at least half of the average to other teams + ASSERT_LE(score, total - avg / 2); + // don't treat myself too bad + ASSERT_GT(score, avg / 2); + }; + } + }; + std::array teams; + for (int team = 0; team < NR_TEAMS; team++) { + teams[team] = std::thread(play, team); + } + for (auto& team : teams) { + team.join(); + } +} -- 2.47.3