]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common: add ceph::fair_mutex 42556/head
authorKefu Chai <kchai@redhat.com>
Fri, 30 Jul 2021 04:44:52 +0000 (12:44 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 30 Jul 2021 05:01:20 +0000 (13:01 +0800)
a mutex which enqueues and wakes up the waiters in FIFO order, to
ensure the fairness of the mutex.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/common/fair_mutex.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_fair_mutex.cc [new file with mode: 0644]

diff --git a/src/common/fair_mutex.h b/src/common/fair_mutex.h
new file mode 100644 (file)
index 0000000..1778505
--- /dev/null
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include "common/ceph_mutex.h"
+
+#include <string>
+
+namespace ceph {
+/// a FIFO mutex
+class fair_mutex {
+public:
+  fair_mutex(const std::string& name)
+    : mutex{ceph::make_mutex(name)}
+  {}
+  ~fair_mutex() = default;
+  fair_mutex(const fair_mutex&) = delete;
+  fair_mutex& operator=(const fair_mutex&) = delete;
+
+  void lock()
+  {
+    std::unique_lock lock(mutex);
+    const unsigned my_id = next_id++;
+    cond.wait(lock, [&] {
+      return my_id == unblock_id;
+    });
+  }
+
+  bool try_lock()
+  {
+    std::lock_guard lock(mutex);
+    if (is_locked()) {
+      return false;
+    }
+    ++next_id;
+    return true;
+  }
+
+  void unlock()
+  {
+    std::lock_guard lock(mutex);
+    ++unblock_id;
+    cond.notify_all();
+  }
+
+  bool is_locked() const
+  {
+    return next_id != unblock_id;
+  }
+private:
+  unsigned next_id = 0;
+  unsigned unblock_id = 0;
+  ceph::condition_variable cond;
+  ceph::mutex mutex;
+};
+} // namespace ceph
index 07b361a07291214334d1555a88e301753ac8c8e9..6d5780a983935cc7702de1c1bfdfe537346c3bdd 100644 (file)
@@ -283,6 +283,11 @@ add_executable(unittest_shunique_lock
 add_ceph_unittest(unittest_shunique_lock)
 target_link_libraries(unittest_shunique_lock ceph-common)
 
+add_executable(unittest_fair_mutex
+  test_fair_mutex.cc)
+add_ceph_unittest(unittest_fair_mutex)
+target_link_libraries(unittest_fair_mutex ceph-common)
+
 # unittest_perf_histogram
 add_executable(unittest_perf_histogram
   test_perf_histogram.cc
diff --git a/src/test/common/test_fair_mutex.cc b/src/test/common/test_fair_mutex.cc
new file mode 100644 (file)
index 0000000..1ee8b8c
--- /dev/null
@@ -0,0 +1,71 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include <array>
+#include <mutex>
+#include <numeric>
+#include <thread>
+#include <gtest/gtest.h>
+#include "common/fair_mutex.h"
+
+TEST(FairMutex, simple)
+{
+  ceph::fair_mutex mutex{"fair::simple"};
+  {
+    std::unique_lock lock{mutex};
+    ASSERT_TRUE(mutex.is_locked());
+    // fair_mutex does not recursive ownership semantics
+    ASSERT_FALSE(mutex.try_lock());
+  }
+  // re-acquire the lock
+  {
+    std::unique_lock lock{mutex};
+    ASSERT_TRUE(mutex.is_locked());
+  }
+  ASSERT_FALSE(mutex.is_locked());
+}
+
+TEST(FairMutex, fair)
+{
+  // waiters are queued in FIFO order, and they are woken up in the same order
+  // we have a marathon participated by multiple teams:
+  // - each team is represented by a thread.
+  // - each team should have equal chance of being selected and scoring, assuming
+  //   the runners in each team are distributed evenly in the waiting queue.
+  ceph::fair_mutex mutex{"fair::fair"};
+  const int NR_TEAMS = 2;
+  std::array<unsigned, NR_TEAMS> scoreboard{0, 0};
+  const int NR_ROUNDS = 256;
+  auto play = [&](int team) {
+    for (int i = 0; i < NR_ROUNDS; i++) {
+      std::unique_lock lock{mutex};
+      // pretent that i am running.. and it takes time
+      std::this_thread::sleep_for(std::chrono::microseconds(20));
+      // score!
+      scoreboard[team]++;
+      // fair?
+      unsigned total = std::accumulate(scoreboard.begin(),
+                                       scoreboard.end(),
+                                       0);
+      for (unsigned score : scoreboard) {
+        if (total < NR_ROUNDS) {
+          // not quite statistically significant. to reduce the false positive,
+          // just consider it fair
+          continue;
+        }
+        // check if any team is donimating the game.
+        unsigned avg = total / scoreboard.size();
+        // leave at least half of the average to other teams
+        ASSERT_LE(score, total - avg / 2);
+        // don't treat myself too bad
+        ASSERT_GT(score, avg / 2);
+      };
+    }
+  };
+  std::array<std::thread, NR_TEAMS> teams;
+  for (int team = 0; team < NR_TEAMS; team++) {
+    teams[team] = std::thread(play, team);
+  }
+  for (auto& team : teams) {
+    team.join();
+  }
+}