}
-Replayer::Replayer() {
+Replayer::Replayer(int num_action_trackers)
+ : m_num_action_trackers(num_action_trackers),
+ m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
+}
+
+Replayer::~Replayer() {
+ delete[] m_action_trackers;
+}
+
+Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
+ return m_action_trackers[id % m_num_action_trackers];
}
void Replayer::run(const std::string replay_file) {
void Replayer::set_action_complete(action_id_t id) {
dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
boost::system_time now(boost::get_system_time());
- boost::unique_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
- assert(m_actions_complete.count(id) == 0);
- m_actions_complete[id] = now;
- m_actions_complete_condition.notify_all();
+ action_tracker_d &tracker = tracker_for(id);
+ boost::unique_lock<boost::shared_mutex> lock(tracker.mutex);
+ assert(tracker.actions.count(id) == 0);
+ tracker.actions[id] = now;
+ tracker.condition.notify_all();
}
bool Replayer::is_action_complete(action_id_t id) {
- boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
- return _is_action_complete(id);
+ action_tracker_d &tracker = tracker_for(id);
+ boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
+ return tracker.actions.count(id) > 0;
}
void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
BOOST_FOREACH(const dependency_d &dep, deps) {
dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
boost::system_time start_time(boost::get_system_time());
- boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+ action_tracker_d &tracker = tracker_for(dep.id);
+ boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
bool first_time = true;
- while (!_is_action_complete(dep.id)) {
- //m_actions_complete_condition.wait(lock);
+ while (tracker.actions.count(dep.id) == 0) {
if (!first_time) {
dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
}
- m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1));
+ tracker.condition.timed_wait(lock, boost::posix_time::seconds(1));
first_time = false;
}
- boost::system_time action_completed_time(m_actions_complete[dep.id]);
+ boost::system_time action_completed_time(tracker.actions[dep.id]);
lock.unlock();
boost::system_time end_time(boost::get_system_time());
long long micros = (end_time - start_time).total_microseconds();
m_images.clear();
}
-bool Replayer::_is_action_complete(action_id_t id) {
- return m_actions_complete.count(id) > 0;
-}
-
void Replayer::set_latency_multiplier(float f) {
m_latency_multiplier = f;
}
class Replayer {
public:
- Replayer();
+ Replayer(int num_action_trackers);
+
+ ~Replayer();
void run(const std::string replay_file);
void set_latency_multiplier(float f);
private:
+ struct action_tracker_d {
+ // Maps an action ID to the time the action completed
+ std::map<action_id_t, boost::system_time> actions;
+ boost::shared_mutex mutex;
+ boost::condition condition;
+ };
+
void clear_images();
- bool _is_action_complete(action_id_t id);
+ action_tracker_d &tracker_for(action_id_t id);
// Disallow assignment and copying
Replayer(const Replayer& rhs);
std::map<imagectx_id_t, librbd::Image*> m_images;
boost::shared_mutex m_images_mutex;
- // Maps an action ID to the time the action completed
- std::map<action_id_t, boost::system_time> m_actions_complete;
- boost::shared_mutex m_actions_complete_mutex;
- boost::condition m_actions_complete_condition;
+ // Actions are hashed across the trackers by ID.
+ // Number of trackers should probably be larger than the number of cores and prime.
+ // Should definitely be odd.
+ const int m_num_action_trackers;
+ action_tracker_d* m_action_trackers;
};
}