]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-replay: Hash completions across multiple mutexes to reduce contention
authorAdam Crume <adamcrume@gmail.com>
Mon, 21 Jul 2014 18:14:01 +0000 (11:14 -0700)
committerSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 17:57:30 +0000 (10:57 -0700)
Signed-off-by: Adam Crume <adamcrume@gmail.com>
src/rbd_replay/Replayer.cc
src/rbd_replay/Replayer.hpp
src/rbd_replay/rbd-replay.cc

index f85d569ce573b110acac91d560664956e1ecf2fe..e9ec1260bed167eca2fdd24d3e7b05e93d2a3cc9 100644 (file)
@@ -122,7 +122,17 @@ void Worker::set_action_complete(action_id_t id) {
 }
 
 
-Replayer::Replayer() {
+Replayer::Replayer(int num_action_trackers)
+  : m_num_action_trackers(num_action_trackers),
+    m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
+}
+
+Replayer::~Replayer() {
+  delete[] m_action_trackers;
+}
+
+Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
+  return m_action_trackers[id % m_num_action_trackers];
 }
 
 void Replayer::run(const std::string replay_file) {
@@ -210,15 +220,17 @@ void Replayer::erase_image(imagectx_id_t imagectx_id) {
 void Replayer::set_action_complete(action_id_t id) {
   dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
   boost::system_time now(boost::get_system_time());
-  boost::unique_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
-  assert(m_actions_complete.count(id) == 0);
-  m_actions_complete[id] = now;
-  m_actions_complete_condition.notify_all();
+  action_tracker_d &tracker = tracker_for(id);
+  boost::unique_lock<boost::shared_mutex> lock(tracker.mutex);
+  assert(tracker.actions.count(id) == 0);
+  tracker.actions[id] = now;
+  tracker.condition.notify_all();
 }
 
 bool Replayer::is_action_complete(action_id_t id) {
-  boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
-  return _is_action_complete(id);
+  action_tracker_d &tracker = tracker_for(id);
+  boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
+  return tracker.actions.count(id) > 0;
 }
 
 void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
@@ -226,17 +238,17 @@ void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
   BOOST_FOREACH(const dependency_d &dep, deps) {
     dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
     boost::system_time start_time(boost::get_system_time());
-    boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+    action_tracker_d &tracker = tracker_for(dep.id);
+    boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
     bool first_time = true;
-    while (!_is_action_complete(dep.id)) {
-      //m_actions_complete_condition.wait(lock);
+    while (tracker.actions.count(dep.id) == 0) {
       if (!first_time) {
        dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
       }
-      m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1));
+      tracker.condition.timed_wait(lock, boost::posix_time::seconds(1));
       first_time = false;
     }
-    boost::system_time action_completed_time(m_actions_complete[dep.id]);
+    boost::system_time action_completed_time(tracker.actions[dep.id]);
     lock.unlock();
     boost::system_time end_time(boost::get_system_time());
     long long micros = (end_time - start_time).total_microseconds();
@@ -263,10 +275,6 @@ void Replayer::clear_images() {
   m_images.clear();
 }
 
-bool Replayer::_is_action_complete(action_id_t id) {
-  return m_actions_complete.count(id) > 0;
-}
-
 void Replayer::set_latency_multiplier(float f) {
   m_latency_multiplier = f;
 }
index a585f5615963ac6d0ee732e4cf9444dbe5f9cfa2..d2ae9daf1f5522f1ccdc5c110c3e949c172631ef 100644 (file)
@@ -68,7 +68,9 @@ private:
 
 class Replayer {
 public:
-  Replayer();
+  Replayer(int num_action_trackers);
+
+  ~Replayer();
 
   void run(const std::string replay_file);
 
@@ -95,9 +97,16 @@ public:
   void set_latency_multiplier(float f);
 
 private:
+  struct action_tracker_d {
+    // Maps an action ID to the time the action completed
+    std::map<action_id_t, boost::system_time> actions;
+    boost::shared_mutex mutex;
+    boost::condition condition;
+  };
+
   void clear_images();
 
-  bool _is_action_complete(action_id_t id);
+  action_tracker_d &tracker_for(action_id_t id);
 
   // Disallow assignment and copying
   Replayer(const Replayer& rhs);
@@ -110,10 +119,11 @@ private:
   std::map<imagectx_id_t, librbd::Image*> m_images;
   boost::shared_mutex m_images_mutex;
 
-  // Maps an action ID to the time the action completed
-  std::map<action_id_t, boost::system_time> m_actions_complete;
-  boost::shared_mutex m_actions_complete_mutex;
-  boost::condition m_actions_complete_condition;
+  // Actions are hashed across the trackers by ID.
+  // Number of trackers should probably be larger than the number of cores and prime.
+  // Should definitely be odd.
+  const int m_num_action_trackers;
+  action_tracker_d* m_action_trackers;
 };
 
 }
index a42091c864bfd2a4d851c25cb46263ebaa9c4186..616a0276f7815ea29a1d349a86709010e8fd983b 100644 (file)
@@ -13,6 +13,7 @@
  */
 
 #include <vector>
+#include <boost/thread.hpp>
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
 #include "Replayer.hpp"
@@ -81,7 +82,8 @@ int main(int argc, const char **argv) {
     return 1;
   }
 
-  Replayer replayer;
+  unsigned int nthreads = boost::thread::hardware_concurrency();
+  Replayer replayer(2 * nthreads + 1);
   replayer.set_latency_multiplier(latency_multiplier);
   replayer.run(replay_file);
 }