]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
lttng: Add rbd-replay
authorAdam Crume <adamcrume@gmail.com>
Thu, 17 Jul 2014 16:39:35 +0000 (09:39 -0700)
committerSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 17:57:29 +0000 (10:57 -0700)
Signed-off-by: Adam Crume <adamcrume@gmail.com>
19 files changed:
examples/rbd-replay/.gitignore [new file with mode: 0644]
examples/rbd-replay/create-image [new file with mode: 0755]
examples/rbd-replay/replay [new file with mode: 0755]
examples/rbd-replay/run-prep-for-replay [new file with mode: 0755]
examples/rbd-replay/trace [new file with mode: 0755]
src/.gitignore
src/Makefile.am
src/rbd_replay/BoundedBuffer.hpp [new file with mode: 0644]
src/rbd_replay/Deser.cc [new file with mode: 0644]
src/rbd_replay/Deser.hpp [new file with mode: 0644]
src/rbd_replay/Makefile.am [new file with mode: 0644]
src/rbd_replay/PendingIO.cc [new file with mode: 0644]
src/rbd_replay/PendingIO.hpp [new file with mode: 0644]
src/rbd_replay/Replayer.cc [new file with mode: 0644]
src/rbd_replay/Replayer.hpp [new file with mode: 0644]
src/rbd_replay/actions.cc [new file with mode: 0644]
src/rbd_replay/actions.hpp [new file with mode: 0644]
src/rbd_replay/prep-for-replay.py [new file with mode: 0755]
src/rbd_replay/rbd-replay.cc [new file with mode: 0644]

diff --git a/examples/rbd-replay/.gitignore b/examples/rbd-replay/.gitignore
new file mode 100644 (file)
index 0000000..f9e7053
--- /dev/null
@@ -0,0 +1,3 @@
+/*.log
+/replayer
+/traces
diff --git a/examples/rbd-replay/create-image b/examples/rbd-replay/create-image
new file mode 100755 (executable)
index 0000000..3486d98
--- /dev/null
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+pool=rbd
+image=my-image
+size=10G
+export LD_LIBRARY_PATH=../../src/.libs
+#qemu-img create -f raw rbd:$pool/$image:conf=../../src/ceph.conf $size
+qemu-img convert linux-0.2.img -O raw rbd:$pool/$image:conf=../../src/ceph.conf
diff --git a/examples/rbd-replay/replay b/examples/rbd-replay/replay
new file mode 100755 (executable)
index 0000000..78e0168
--- /dev/null
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+lttng create
+lttng enable-event -u 'librbd:*'
+lttng add-context -u -t pthread_id
+lttng start
+LD_LIBRARY_PATH=../../src/.libs ../../src/rbd-replay --conf=../../src/ceph.conf replay.bin | tee replay.log
+lttng stop
+lttng view > replay-trace.log
diff --git a/examples/rbd-replay/run-prep-for-replay b/examples/rbd-replay/run-prep-for-replay
new file mode 100755 (executable)
index 0000000..64afd31
--- /dev/null
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit
diff --git a/examples/rbd-replay/trace b/examples/rbd-replay/trace
new file mode 100755 (executable)
index 0000000..02a5358
--- /dev/null
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+mkdir -p traces
+lttng create -o traces librbd
+lttng enable-event -u 'librbd:*'
+lttng add-context -u -t pthread_id
+lttng start
+LD_LIBRARY_PATH=../../src/.libs/ qemu-system-i386 -m 1024 rbd:rbd/my-image:conf=../../src/ceph.conf
+lttng stop
+lttng view > trace.log
index fec9f70c31d871e2dc24dd104e0dff281620103d..e7c09457d5f9d1dbe868fc8a56c4cdf7753eefdb 100644 (file)
@@ -66,6 +66,7 @@ Makefile
 /radosgw-admin
 /rbd
 /rbd-fuse
+/rbd-replay
 /rest-bench
 /sample.fetch_config
 /TAGS
index 2cde64543061af3fbe2fa23af2fd8c5a4ac52340..adaf73e47620f43ea3a6777441001f6c4017deb8 100644 (file)
@@ -32,6 +32,7 @@ include librbd/Makefile.am
 include rgw/Makefile.am
 include cls/Makefile.am
 include key_value_store/Makefile.am
+include rbd_replay/Makefile.am
 include test/Makefile.am
 include tools/Makefile.am
 
diff --git a/src/rbd_replay/BoundedBuffer.hpp b/src/rbd_replay/BoundedBuffer.hpp
new file mode 100644 (file)
index 0000000..540fd31
--- /dev/null
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _INCLUDED_BOUNDED_BUFFER_HPP
+#define _INCLUDED_BOUNDED_BUFFER_HPP
+
+#include <boost/bind.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+
+// Taken from http://www.boost.org/doc/libs/1_55_0/libs/circular_buffer/example/circular_buffer_bound_example.cpp
+template <class T>
+class BoundedBuffer {
+public:
+  typedef boost::circular_buffer<T> container_type;
+  typedef typename container_type::size_type size_type;
+  typedef typename container_type::value_type value_type;
+  typedef typename boost::call_traits<value_type>::param_type param_type;
+
+  explicit BoundedBuffer(size_type capacity) : m_unread(0), m_container(capacity) {
+  }
+
+  void push_front(typename boost::call_traits<value_type>::param_type item) {
+    // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
+    boost::mutex::scoped_lock lock(m_mutex);
+    m_not_full.wait(lock, boost::bind(&BoundedBuffer<value_type>::is_not_full, this));
+    m_container.push_front(item);
+    ++m_unread;
+    lock.unlock();
+    m_not_empty.notify_one();
+  }
+
+  void pop_back(value_type* pItem) {
+    boost::mutex::scoped_lock lock(m_mutex);
+    m_not_empty.wait(lock, boost::bind(&BoundedBuffer<value_type>::is_not_empty, this));
+    *pItem = m_container[--m_unread];
+    lock.unlock();
+    m_not_full.notify_one();
+  }
+
+private:
+  BoundedBuffer(const BoundedBuffer&);             // Disabled copy constructor.
+  BoundedBuffer& operator= (const BoundedBuffer&); // Disabled assign operator.
+
+  bool is_not_empty() const {
+    return m_unread > 0;
+  }
+  bool is_not_full() const {
+    return m_unread < m_container.capacity();
+  }
+
+  size_type m_unread;
+  container_type m_container;
+  boost::mutex m_mutex;
+  boost::condition m_not_empty;
+  boost::condition m_not_full;
+};
+
+#endif
diff --git a/src/rbd_replay/Deser.cc b/src/rbd_replay/Deser.cc
new file mode 100644 (file)
index 0000000..986a18c
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Deser.hpp"
+#include <arpa/inet.h>
+#include <cstdlib>
+#include <endian.h>
+
+
+rbd_replay::Deser::Deser(std::istream &in)
+  : m_in(in) {
+}
+
+uint8_t rbd_replay::Deser::read_uint8_t() {
+  uint8_t data;
+  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+  return data;
+}
+
+uint16_t rbd_replay::Deser::read_uint16_t() {
+  uint16_t data;
+  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+  return ntohs(data);
+}
+
+uint32_t rbd_replay::Deser::read_uint32_t() {
+  uint32_t data;
+  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+  return ntohl(data);
+}
+
+uint64_t rbd_replay::Deser::read_uint64_t() {
+  uint64_t data;
+  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  data = (static_cast<uint64_t>(ntohl(data)) << 32 | ntohl(data >> 32));
+#endif
+  return data;
+}
+
+std::string rbd_replay::Deser::read_string() {
+  uint32_t length = read_uint32_t();
+  char* data = reinterpret_cast<char*>(malloc(length));
+  m_in.read(data, length);
+  std::string s(data, length);
+  free(data);
+  return s;
+}
+
+bool rbd_replay::Deser::read_bool() {
+  return read_uint8_t() != 0;
+}
+
+bool rbd_replay::Deser::eof() {
+  return m_in.eof();
+}
diff --git a/src/rbd_replay/Deser.hpp b/src/rbd_replay/Deser.hpp
new file mode 100644 (file)
index 0000000..d5fad5a
--- /dev/null
@@ -0,0 +1,47 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP
+#define _INCLUDED_RBD_REPLAY_DESER_HPP
+
+#include <iostream>
+#include <stdint.h>
+
+namespace rbd_replay {
+
+class Deser {
+public:
+  Deser(std::istream &in);
+
+  uint8_t read_uint8_t();
+
+  uint16_t read_uint16_t();
+
+  uint32_t read_uint32_t();
+
+  uint64_t read_uint64_t();
+
+  std::string read_string();
+
+  bool read_bool();
+
+  bool eof();
+
+private:
+  std::istream &m_in;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/Makefile.am b/src/rbd_replay/Makefile.am
new file mode 100644 (file)
index 0000000..fee53d2
--- /dev/null
@@ -0,0 +1,14 @@
+rbd_replay_SOURCES = rbd_replay/rbd-replay.cc \
+       rbd_replay/actions.cc \
+       rbd_replay/Deser.cc \
+       rbd_replay/PendingIO.cc \
+       rbd_replay/Replayer.cc
+rbd_replay_LDADD = $(LIBRBD) $(LIBRADOS) $(CEPH_GLOBAL)
+noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
+       rbd_replay/actions.hpp \
+       rbd_replay/Deser.hpp \
+       rbd_replay/PendingIO.hpp \
+       rbd_replay/Replayer.hpp
+if LINUX
+bin_PROGRAMS += rbd-replay
+endif #LINUX
diff --git a/src/rbd_replay/PendingIO.cc b/src/rbd_replay/PendingIO.cc
new file mode 100644 (file)
index 0000000..40f379a
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "PendingIO.hpp"
+
+
+using namespace std;
+using namespace rbd_replay;
+
+extern "C"
+void pending_io_callback(librbd::completion_t cb, void *arg) {
+  PendingIO *io = (PendingIO*)arg;
+  io->completed(cb);
+}
+
+PendingIO::PendingIO(action_id_t id,
+                    ActionCtx &worker)
+  : m_id(id),
+    m_completion(this, pending_io_callback),
+    m_worker(worker) {
+    }
+
+void PendingIO::completed(librbd::completion_t cb) {
+  cout << "Completed pending IO #" << m_id << endl;
+  m_worker.remove_pending(shared_from_this());
+}
diff --git a/src/rbd_replay/PendingIO.hpp b/src/rbd_replay/PendingIO.hpp
new file mode 100644 (file)
index 0000000..7413a59
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_PENDINGIO_HPP
+#define _INCLUDED_RBD_REPLAY_PENDINGIO_HPP
+
+#include <boost/enable_shared_from_this.hpp>
+#include "actions.hpp"
+
+namespace rbd_replay {
+
+class PendingIO : public boost::enable_shared_from_this<PendingIO> {
+public:
+  typedef boost::shared_ptr<PendingIO> ptr;
+
+  PendingIO(action_id_t id,
+            ActionCtx &worker);
+
+  void completed(librbd::completion_t cb);
+
+  action_id_t id() const {
+    return m_id;
+  }
+
+  ceph::bufferlist &bufferlist() {
+    return m_bl;
+  }
+
+  librbd::RBD::AioCompletion &completion() {
+    return m_completion;
+  }
+
+private:
+  const action_id_t m_id;
+  ceph::bufferlist m_bl;
+  librbd::RBD::AioCompletion m_completion;
+  ActionCtx &m_worker;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/Replayer.cc b/src/rbd_replay/Replayer.cc
new file mode 100644 (file)
index 0000000..e2d32ff
--- /dev/null
@@ -0,0 +1,261 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Replayer.hpp"
+#include <boost/foreach.hpp>
+#include <boost/thread/thread.hpp>
+#include <fstream>
+
+
+using namespace std;
+using namespace rbd_replay;
+
+
+Worker::Worker(Replayer &replayer)
+  : m_replayer(replayer),
+    m_buffer(100),
+    m_done(false) {
+}
+
+void Worker::start() {
+  m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Worker::run, this)));
+}
+
+// Should only be called by StopThreadAction
+void Worker::stop() {
+  m_done = true;
+}
+
+void Worker::join() {
+  m_thread->join();
+}
+
+void Worker::send(Action::ptr action) {
+  m_buffer.push_front(action);
+}
+
+void Worker::add_pending(PendingIO::ptr io) {
+  boost::mutex::scoped_lock lock(m_pending_ios_mutex);
+  m_pending_ios.push_back(io);
+}
+
+void Worker::run() {
+  cout << "Worker thread started" << endl;
+  while (!m_done) {
+    Action::ptr action;
+    m_buffer.pop_back(&action);
+    m_replayer.wait_for_actions(action->predecessors());
+    action->perform(*this);
+    m_replayer.set_action_complete(action->id());
+  }
+  {
+    boost::mutex::scoped_lock lock(m_pending_ios_mutex);
+    while (!m_pending_ios.empty()) {
+      m_pending_ios_empty.wait(lock);
+    }
+  }
+  cout << "Worker thread stopped" << endl;
+}
+
+
+void Worker::remove_pending(PendingIO::ptr io) {
+  m_replayer.set_action_complete(io->id());
+  boost::mutex::scoped_lock lock(m_pending_ios_mutex);
+  for (vector<PendingIO::ptr>::iterator itr = m_pending_ios.begin(); itr != m_pending_ios.end(); itr++) {
+    if (*itr == io) {
+      m_pending_ios.erase(itr);
+      break;
+    }
+  }
+  if (m_pending_ios.empty()) {
+    m_pending_ios_empty.notify_all();
+  }
+}
+
+
+librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) {
+  return m_replayer.get_image(imagectx_id);
+}
+
+
+void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) {
+  m_replayer.put_image(imagectx_id, image);
+}
+
+
+void Worker::erase_image(imagectx_id_t imagectx_id) {
+  m_replayer.erase_image(imagectx_id);
+}
+
+
+librbd::RBD* Worker::rbd() {
+  return m_replayer.get_rbd();
+}
+
+
+librados::IoCtx* Worker::ioctx() {
+  return m_replayer.get_ioctx();
+}
+
+
+void Worker::set_action_complete(action_id_t id) {
+  m_replayer.set_action_complete(id);
+}
+
+
+Replayer::Replayer() {
+}
+
+void Replayer::run(const std::string conf_file, const std::string replay_file) {
+  cout << "IO thread started" << endl;
+  {
+    librados::Rados rados;
+    rados.init(NULL);
+    int r = rados.conf_read_file(conf_file.c_str());
+    if (r) {
+      cerr << "Unable to read conf file: " << r << endl;
+      goto out;
+    }
+    r = rados.connect();
+    if (r) {
+      cerr << "Unable to connect to Rados: " << r << endl;
+      goto out;
+    }
+    m_ioctx = new librados::IoCtx();
+    {
+      const char* pool_name = "rbd";
+      r = rados.ioctx_create(pool_name, *m_ioctx);
+      if (r) {
+       cerr << "Unable to create IoCtx: " << r << endl;
+       goto out2;
+      }
+      m_rbd = new librbd::RBD();
+      map<thread_id_t, Worker*> workers;
+
+      ifstream input(replay_file.c_str(), ios::in | ios::binary);
+      if (!input.is_open()) {
+       cerr << "Unable to open " << replay_file << endl;
+       exit(1);
+      }
+
+      Deser deser(input);
+      while (true) {
+       Action::ptr action = Action::read_from(deser);
+       if (!action) {
+         break;
+       }
+       if (action->is_start_thread()) {
+         Worker *worker = new Worker(*this);
+         workers[action->thread_id()] = worker;
+         worker->start();
+       } else {
+         workers[action->thread_id()]->send(action);
+       }
+      }
+
+      cout << "Waiting for workers to die" << endl;
+      pair<thread_id_t, Worker*> w;
+      BOOST_FOREACH(w, workers) {
+       w.second->join();
+       delete w.second;
+      }
+      clear_images();
+      delete m_rbd;
+      m_rbd = NULL;
+    }
+  out2:
+    delete m_ioctx;
+    m_ioctx = NULL;
+  }
+ out:
+  cout << "IO thread stopped" << endl;
+}
+
+
+librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
+  boost::shared_lock<boost::shared_mutex> lock(m_images_mutex);
+  return m_images[imagectx_id];
+}
+
+void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
+  boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
+  assert(m_images.count(imagectx_id) == 0);
+  m_images[imagectx_id] = image;
+}
+
+void Replayer::erase_image(imagectx_id_t imagectx_id) {
+  boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
+  delete m_images[imagectx_id];
+  m_images.erase(imagectx_id);
+}
+
+void Replayer::set_action_complete(action_id_t id) {
+  cout << "ActionTracker::set_complete(" << id << ")" << endl;
+  boost::system_time now(boost::get_system_time());
+  boost::unique_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+  assert(m_actions_complete.count(id) == 0);
+  m_actions_complete[id] = now;
+  m_actions_complete_condition.notify_all();
+}
+
+bool Replayer::is_action_complete(action_id_t id) {
+  boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+  return _is_action_complete(id);
+}
+
+void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
+  boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
+  BOOST_FOREACH(const dependency_d &dep, deps) {
+    cout << "Waiting for " << dep.id << endl;
+    boost::system_time start_time(boost::get_system_time());
+    boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+    while (!_is_action_complete(dep.id)) {
+      //m_actions_complete_condition.wait(lock);
+      m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1));
+      cout << "Still waiting for " << dep.id << endl;
+    }
+    boost::system_time action_completed_time(m_actions_complete[dep.id]);
+    lock.unlock();
+    boost::system_time end_time(boost::get_system_time());
+    long long micros = (end_time - start_time).total_microseconds();
+    cout << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << endl;
+    // Apparently the nanoseconds constructor is optional:
+    // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
+    boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000));
+    if (sub_release_time > release_time) {
+      release_time = sub_release_time;
+    }
+  }
+  if (release_time > boost::get_system_time()) {
+    cout << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << endl;
+    boost::this_thread::sleep(release_time);
+  }
+}
+
+void Replayer::clear_images() {
+  boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
+  pair<imagectx_id_t, librbd::Image*> p;
+  BOOST_FOREACH(p, m_images) {
+    delete p.second;
+  }
+  m_images.clear();
+}
+
+bool Replayer::_is_action_complete(action_id_t id) {
+  return m_actions_complete.count(id) > 0;
+}
+
+void Replayer::set_latency_multiplier(float f) {
+  m_latency_multiplier = f;
+}
diff --git a/src/rbd_replay/Replayer.hpp b/src/rbd_replay/Replayer.hpp
new file mode 100644 (file)
index 0000000..a2f18ea
--- /dev/null
@@ -0,0 +1,121 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_REPLAYER_HPP
+#define _INCLUDED_RBD_REPLAY_REPLAYER_HPP
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include "BoundedBuffer.hpp"
+#include "PendingIO.hpp"
+
+namespace rbd_replay {
+
+class Replayer;
+
+class Worker : public ActionCtx {
+public:
+  explicit Worker(Replayer &replayer);
+
+  void start();
+
+  // Should only be called by StopThreadAction
+  void stop();
+
+  void join();
+
+  void send(Action::ptr action);
+
+  void add_pending(PendingIO::ptr io);
+
+  void remove_pending(PendingIO::ptr io);
+
+  librbd::Image* get_image(imagectx_id_t imagectx_id);
+
+  void put_image(imagectx_id_t imagectx_id, librbd::Image* image);
+
+  void erase_image(imagectx_id_t imagectx_id);
+
+  librbd::RBD* rbd();
+
+  librados::IoCtx* ioctx();
+
+  void set_action_complete(action_id_t id);
+
+private:
+  void run();
+
+  Replayer &m_replayer;
+  BoundedBuffer<Action::ptr> m_buffer;
+  boost::shared_ptr<boost::thread> m_thread;
+  std::vector<PendingIO::ptr> m_pending_ios;
+  boost::mutex m_pending_ios_mutex;
+  boost::condition m_pending_ios_empty;
+  bool m_done;
+};
+
+
+class Replayer {
+public:
+  Replayer();
+
+  void run(const std::string conf_file, const std::string replay_file);
+
+  librbd::RBD* get_rbd() {
+    return m_rbd;
+  }
+
+  librados::IoCtx* get_ioctx() {
+    return m_ioctx;
+  }
+
+  librbd::Image* get_image(imagectx_id_t imagectx_id);
+
+  void put_image(imagectx_id_t imagectx_id, librbd::Image *image);
+
+  void erase_image(imagectx_id_t imagectx_id);
+
+  void set_action_complete(action_id_t id);
+
+  bool is_action_complete(action_id_t id);
+
+  void wait_for_actions(const std::vector<dependency_d> &deps);
+
+  void set_latency_multiplier(float f);
+
+private:
+  void clear_images();
+
+  bool _is_action_complete(action_id_t id);
+
+  // Disallow assignment and copying
+  Replayer(const Replayer& rhs);
+  const Replayer& operator=(const Replayer& rhs);
+
+  librbd::RBD* m_rbd;
+  librados::IoCtx* m_ioctx;
+  float m_latency_multiplier;
+
+  std::map<imagectx_id_t, librbd::Image*> m_images;
+  boost::shared_mutex m_images_mutex;
+
+  // Maps an action ID to the time the action completed
+  std::map<action_id_t, boost::system_time> m_actions_complete;
+  boost::shared_mutex m_actions_complete_mutex;
+  boost::condition m_actions_complete_condition;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc
new file mode 100644 (file)
index 0000000..baaa0ba
--- /dev/null
@@ -0,0 +1,276 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "actions.hpp"
+#include <cstdlib>
+#include "PendingIO.hpp"
+
+
+using namespace rbd_replay;
+using namespace std;
+
+
+Action::Action(action_id_t id,
+               thread_id_t thread_id,
+               int num_successors,
+               int num_completion_successors,
+               std::vector<dependency_d> &predecessors)
+  : m_id(id),
+    m_thread_id(thread_id),
+    m_num_successors(num_successors),
+    m_num_completion_successors(num_completion_successors),
+    m_predecessors(predecessors) {
+    }
+
+Action::~Action() {
+}
+
+Action::ptr Action::read_from(Deser &d) {
+  uint8_t type = d.read_uint8_t();
+  if (d.eof()) {
+    return Action::ptr();
+  }
+  uint32_t ionum = d.read_uint32_t();
+  uint64_t thread_id = d.read_uint64_t();
+  uint32_t num_successors = d.read_uint32_t();
+  uint32_t num_completion_successors = d.read_uint32_t();
+  uint32_t num_dependencies = d.read_uint32_t();
+  vector<dependency_d> deps;
+  for (unsigned int i = 0; i < num_dependencies; i++) {
+    uint32_t dep_id = d.read_uint32_t();
+    uint64_t time_delta = d.read_uint64_t();
+    deps.push_back(dependency_d(dep_id, time_delta));
+  }
+  DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps);
+  switch (type) {
+  case 0:
+    return StartThreadAction::read_from(dummy, d);
+  case 1:
+    return StopThreadAction::read_from(dummy, d);
+  case 2:
+    return ReadAction::read_from(dummy, d);
+  case 3:
+    return WriteAction::read_from(dummy, d);
+  case 4:
+    return AioReadAction::read_from(dummy, d);
+  case 5:
+    return AioWriteAction::read_from(dummy, d);
+  case 6:
+    return OpenImageAction::read_from(dummy, d);
+  case 7:
+    return CloseImageAction::read_from(dummy, d);
+  default:
+    cerr << "Invalid action type: " << type << endl;
+    exit(1);
+  }
+}
+
+
+StartThreadAction::StartThreadAction(Action &src)
+  : Action(src) {
+}
+
+void StartThreadAction::perform(ActionCtx &ctx) {
+  cerr << "StartThreadAction should never actually be performed" << endl;
+  exit(1);
+}
+
+bool StartThreadAction::is_start_thread() {
+  return true;
+}
+
+Action::ptr StartThreadAction::read_from(Action &src, Deser &d) {
+  return Action::ptr(new StartThreadAction(src));
+}
+
+
+StopThreadAction::StopThreadAction(Action &src)
+  : Action(src) {
+}
+
+void StopThreadAction::perform(ActionCtx &ctx) {
+  cout << "Performing stop thread action #" << id() << endl;
+  ctx.stop();
+}
+
+Action::ptr StopThreadAction::read_from(Action &src, Deser &d) {
+  return Action::ptr(new StopThreadAction(src));
+}
+
+AioReadAction::AioReadAction(const Action &src,
+                             imagectx_id_t imagectx_id,
+                             uint64_t offset,
+                             uint64_t length)
+  : Action(src),
+    m_imagectx_id(imagectx_id),
+    m_offset(offset),
+    m_length(length) {
+    }
+
+Action::ptr AioReadAction::read_from(Action &src, Deser &d) {
+  imagectx_id_t imagectx_id = d.read_uint64_t();
+  uint64_t offset = d.read_uint64_t();
+  uint64_t length = d.read_uint64_t();
+  return Action::ptr(new AioReadAction(src, imagectx_id, offset, length));
+}
+
+void AioReadAction::perform(ActionCtx &worker) {
+  cout << "Performing AIO read action #" << id() << endl;
+  librbd::Image *image = worker.get_image(m_imagectx_id);
+  assert(image);
+  PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+  image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion());
+  worker.add_pending(io);
+}
+
+
+ReadAction::ReadAction(const Action &src,
+                       imagectx_id_t imagectx_id,
+                       uint64_t offset,
+                       uint64_t length)
+  : Action(src),
+    m_imagectx_id(imagectx_id),
+    m_offset(offset),
+    m_length(length) {
+    }
+
+Action::ptr ReadAction::read_from(Action &src, Deser &d) {
+  imagectx_id_t imagectx_id = d.read_uint64_t();
+  uint64_t offset = d.read_uint64_t();
+  uint64_t length = d.read_uint64_t();
+  return Action::ptr(new ReadAction(src, imagectx_id, offset, length));
+}
+
+void ReadAction::perform(ActionCtx &worker) {
+  cout << "Performing read action #" << id() << endl;
+  librbd::Image *image = worker.get_image(m_imagectx_id);
+  PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+  worker.add_pending(io);
+  image->read(m_offset, m_length, io->bufferlist());
+  worker.remove_pending(io);
+}
+
+
+AioWriteAction::AioWriteAction(const Action &src,
+                               imagectx_id_t imagectx_id,
+                               uint64_t offset,
+                               uint64_t length)
+  : Action(src),
+    m_imagectx_id(imagectx_id),
+    m_offset(offset),
+    m_length(length) {
+    }
+
+Action::ptr AioWriteAction::read_from(Action &src, Deser &d) {
+  imagectx_id_t imagectx_id = d.read_uint64_t();
+  uint64_t offset = d.read_uint64_t();
+  uint64_t length = d.read_uint64_t();
+  return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length));
+}
+
+void AioWriteAction::perform(ActionCtx &worker) {
+  cout << "Performing AIO write action #" << id() << endl;
+  librbd::Image *image = worker.get_image(m_imagectx_id);
+  PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+  io->bufferlist().append_zero(m_length);
+  image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion());
+  worker.add_pending(io);
+}
+
+
+WriteAction::WriteAction(const Action &src,
+                         imagectx_id_t imagectx_id,
+                         uint64_t offset,
+                         uint64_t length)
+  : Action(src),
+    m_imagectx_id(imagectx_id),
+    m_offset(offset),
+    m_length(length) {
+    }
+
+Action::ptr WriteAction::read_from(Action &src, Deser &d) {
+  imagectx_id_t imagectx_id = d.read_uint64_t();
+  uint64_t offset = d.read_uint64_t();
+  uint64_t length = d.read_uint64_t();
+  return Action::ptr(new WriteAction(src, imagectx_id, offset, length));
+}
+
+void WriteAction::perform(ActionCtx &worker) {
+  cout << "Performing write action #" << id() << endl;
+  librbd::Image *image = worker.get_image(m_imagectx_id);
+  PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+  worker.add_pending(io);
+  io->bufferlist().append_zero(m_length);
+  image->write(m_offset, m_length, io->bufferlist());
+  worker.remove_pending(io);
+}
+
+
+OpenImageAction::OpenImageAction(Action &src,
+                                 imagectx_id_t imagectx_id,
+                                 string name,
+                                 string snap_name,
+                                 bool readonly)
+  : Action(src),
+    m_imagectx_id(imagectx_id),
+    m_name(name),
+    m_snap_name(snap_name),
+    m_readonly(readonly) {
+    }
+
+Action::ptr OpenImageAction::read_from(Action &src, Deser &d) {
+  imagectx_id_t imagectx_id = d.read_uint64_t();
+  string name = d.read_string();
+  string snap_name = d.read_string();
+  bool readonly = d.read_bool();
+  return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly));
+}
+
+void OpenImageAction::perform(ActionCtx &worker) {
+  cout << "Performing open image action #" << id() << endl;
+  PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+  worker.add_pending(io);
+  librbd::Image *image = new librbd::Image();
+  librbd::RBD *rbd = worker.rbd();
+  int r;
+  if (m_readonly) {
+    r = rbd->open_read_only(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str());
+  } else {
+    r = rbd->open(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str());
+  }
+  if (r) {
+    cerr << "Unable to open image '" << m_name << "' with snap '" << m_snap_name << "' and readonly " << m_readonly << ": " << strerror(-r) << endl;
+    exit(1);
+  }
+  worker.put_image(m_imagectx_id, image);
+  worker.remove_pending(io);
+}
+
+
+CloseImageAction::CloseImageAction(Action &src,
+                                   imagectx_id_t imagectx_id)
+  : Action(src),
+    m_imagectx_id(imagectx_id) {
+    }
+
+Action::ptr CloseImageAction::read_from(Action &src, Deser &d) {
+  imagectx_id_t imagectx_id = d.read_uint64_t();
+  return Action::ptr(new CloseImageAction(src, imagectx_id));
+}
+
+void CloseImageAction::perform(ActionCtx &worker) {
+  cout << "Performing close image action #" << id() << endl;
+  worker.erase_image(m_imagectx_id);
+  worker.set_action_complete(pending_io_id());
+}
diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp
new file mode 100644 (file)
index 0000000..95f12d9
--- /dev/null
@@ -0,0 +1,260 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_ACTIONS_HPP
+#define _INCLUDED_RBD_REPLAY_ACTIONS_HPP
+
+#include <boost/shared_ptr.hpp>
+#include "include/rbd/librbd.hpp"
+#include "Deser.hpp"
+
+namespace rbd_replay {
+
+typedef uint64_t imagectx_id_t;
+typedef uint64_t thread_id_t;
+
+// Even IDs are normal actions, odd IDs are completions
+typedef uint32_t action_id_t;
+
+struct dependency_d {
+  action_id_t id;
+
+  uint64_t time_delta;
+
+  dependency_d(action_id_t id,
+              uint64_t time_delta)
+    : id(id),
+      time_delta(time_delta) {
+  }
+};
+
+
+class PendingIO;
+
+
+class ActionCtx {
+public:
+  virtual ~ActionCtx() {
+  }
+
+  virtual librbd::Image* get_image(imagectx_id_t imagectx_id) = 0;
+
+  virtual void put_image(imagectx_id_t imagectx_id, librbd::Image* image) = 0;
+
+  virtual void erase_image(imagectx_id_t imagectx_id) = 0;
+
+  virtual librbd::RBD* rbd() = 0;
+
+  virtual librados::IoCtx* ioctx() = 0;
+
+  virtual void add_pending(boost::shared_ptr<PendingIO> io) = 0;
+
+  virtual void remove_pending(boost::shared_ptr<PendingIO> io) = 0;
+
+  virtual void set_action_complete(action_id_t id) = 0;
+
+  virtual void stop() = 0;
+};
+
+
+class Action {
+public:
+  typedef boost::shared_ptr<Action> ptr;
+
+  Action(action_id_t id,
+        thread_id_t thread_id,
+        int num_successors,
+        int num_completion_successors,
+        std::vector<dependency_d> &predecessors);
+
+  virtual ~Action();
+
+  virtual void perform(ActionCtx &ctx) = 0;
+
+  action_id_t pending_io_id() {
+    return m_id + 1;
+  }
+
+  // There's probably a better way to do this, but oh well.
+  virtual bool is_start_thread() {
+    return false;
+  }
+
+  action_id_t id() const {
+    return m_id;
+  }
+
+  thread_id_t thread_id() const {
+    return m_thread_id;
+  }
+
+  const std::vector<dependency_d>& predecessors() const {
+    return m_predecessors;
+  }
+
+  static ptr read_from(Deser &d);
+
+private:
+  const action_id_t m_id;
+  const thread_id_t m_thread_id;
+  const int m_num_successors;
+  const int m_num_completion_successors;
+  const std::vector<dependency_d> m_predecessors;
+};
+
+
+class DummyAction : public Action {
+public:
+  DummyAction(action_id_t id,
+             thread_id_t thread_id,
+             int num_successors,
+             int num_completion_successors,
+             std::vector<dependency_d> &predecessors)
+    : Action(id, thread_id, num_successors, num_completion_successors, predecessors) {
+  }
+
+  void perform(ActionCtx &ctx) {
+  }
+};
+
+class StopThreadAction : public Action {
+public:
+  explicit StopThreadAction(Action &src);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+};
+
+
+class AioReadAction : public Action {
+public:
+  AioReadAction(const Action &src,
+               imagectx_id_t imagectx_id,
+               uint64_t offset,
+               uint64_t length);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+  imagectx_id_t m_imagectx_id;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+
+class ReadAction : public Action {
+public:
+  ReadAction(const Action &src,
+            imagectx_id_t imagectx_id,
+            uint64_t offset,
+            uint64_t length);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+  imagectx_id_t m_imagectx_id;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+
+class AioWriteAction : public Action {
+public:
+  AioWriteAction(const Action &src,
+                imagectx_id_t imagectx_id,
+                uint64_t offset,
+                uint64_t length);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+  imagectx_id_t m_imagectx_id;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+
+class WriteAction : public Action {
+public:
+  WriteAction(const Action &src,
+             imagectx_id_t imagectx_id,
+             uint64_t offset,
+             uint64_t length);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+  imagectx_id_t m_imagectx_id;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+
+class OpenImageAction : public Action {
+public:
+  OpenImageAction(Action &src,
+                 imagectx_id_t imagectx_id,
+                 std::string name,
+                 std::string snap_name,
+                 bool readonly);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+  imagectx_id_t m_imagectx_id;
+  std::string m_name;
+  std::string m_snap_name;
+  bool m_readonly;
+};
+
+
+class CloseImageAction : public Action {
+public:
+  CloseImageAction(Action &src,
+                  imagectx_id_t imagectx_id);
+
+  void perform(ActionCtx &ctx);
+
+  static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+  imagectx_id_t m_imagectx_id;
+};
+
+
+class StartThreadAction : public Action {
+public:
+  explicit StartThreadAction(Action &src);
+
+  void perform(ActionCtx &ctx);
+
+  bool is_start_thread();
+
+  static Action::ptr read_from(Action &src, Deser &d);
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/prep-for-replay.py b/src/rbd_replay/prep-for-replay.py
new file mode 100755 (executable)
index 0000000..8b42739
--- /dev/null
@@ -0,0 +1,498 @@
+#!/usr/bin/python
+# -*- mode:Python; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+# vim: ts=8 sw=2 smarttab
+#
+# Ceph - scalable distributed file system
+#
+# Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation.  See file COPYING.
+#
+#
+
+from babeltrace import *
+import cProfile
+import datetime
+import struct
+import sys
+
+traces = TraceCollection()
+ret = traces.add_trace(sys.argv[1], "ctf")
+
+pendingIOs = {}
+threads = {}
+ios = []
+printOnRead = False
+recentCompletions = []
+limit = 100000000000
+ignoreWrites = True
+
+ioCount = 0
+def nextID():
+    global ioCount
+    val = ioCount
+    ioCount = ioCount + 2
+    return val
+
+class Extent(object):
+    def __init__(self, offset, length):
+        self.offset = offset
+        self.length = length
+    def __str__(self):
+        return str(self.offset) + "+" + str(self.length)
+    def __repr__(self):
+        return "Extent(" + str(self.offset) + "," + str(self.length) + ")"
+
+class Thread(object):
+    def __init__(self, id):
+        self.id = id
+        self.pendingIO = None
+        self.latestIO = None # may not be meaningful
+        self.latestCompletion = None # may not be meaningful
+        self.lastTS = None
+    def insertTS(self, ts):
+        if not self.lastTS or ts > self.lastTS:
+            self.lastTS = ts
+    def issuedIO(self, io):
+        latestIOs = []
+        for threadID in threads:
+            thread = threads[threadID]
+            if thread.latestIO and thread.latestIO.start_time > io.start_time - window:
+                latestIOs.append(thread.latestIO)
+        io.addDependencies(latestIOs)
+        self.latestIO = io
+    def completedIO(self, io):
+        self.latestCompletion = io
+
+def batchUnreachableFrom(deps, base):
+    if not base:
+        return set()
+    if not deps:
+        return set()
+    unreachable = set()
+    searchingFor = set(deps)
+    discovered = set()
+    boundary = set(base)
+    boundaryHorizon = None
+    for io in boundary:
+        if not boundaryHorizon or io.start_time > boundaryHorizon:
+            boundaryHorizon = io.start_time
+    searchingHorizon = None
+    for io in searchingFor:
+        if not searchingHorizon or io.start_time < searchingHorizon:
+            searchingHorizon = io.start_time
+    tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
+    searchingFor.difference_update(tmp)
+    unreachable.update(tmp)
+    while boundary and searchingFor:
+        io = boundary.pop()
+        for dep in io.dependencies:
+            if dep in searchingFor:
+                searchingFor.remove(dep)
+                if dep.start_time == searchingHorizon:
+                    searchingHorizon = None
+                    for io in searchingFor:
+                        if not searchingHorizon or io.start_time < searchingHorizon:
+                            searchingHorizon = io.start_time
+            if not dep in discovered:
+                boundary.add(dep)
+        if io.start_time == boundaryHorizon:
+            boundaryHorizon = None
+            for io in boundary:
+                if not boundaryHorizon or io.start_time > boundaryHorizon:
+                    boundaryHorizon = io.start_time
+            if boundaryHorizon:
+                tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
+                searchingFor.difference_update(tmp)
+                unreachable.update(tmp)
+                searchingHorizon = None
+                for io in searchingFor:
+                    if not searchingHorizon or io.start_time < searchingHorizon:
+                        searchingHorizon = io.start_time
+    unreachable.update(searchingFor)
+    return unreachable
+
+class IO(object):
+    def __init__(self, ionum, start_time, thread, prev):
+        self.ionum = ionum
+        self.start_time = start_time
+        self.thread = thread
+        self.dependencies = set()
+        self.isCompletion = False
+        self.prev = prev
+        self.numSuccessors = 0
+        self.completion = None
+    def reachableFrom(self, ios):
+        if not ios:
+            return False
+        discovered = set()
+        boundary = set(ios)
+        horizon = None
+        for io in boundary:
+            if not horizon or io.start_time > horizon:
+                horizon = io.start_time
+        if horizon < self.start_time:
+            return False
+        while boundary:
+            io = boundary.pop()
+            for dep in io.dependencies:
+                if self == dep:
+                    return True
+                if not dep in discovered:
+                    boundary.add(dep)
+            if io.start_time == horizon:
+                horizon = None
+                for io in boundary:
+                    if not horizon or io.start_time > horizon:
+                        horizon = io.start_time
+                if horizon and horizon < self.start_time:
+                    return False
+        return False
+    def addDependency(self, dep):
+        if not dep.reachableFrom(self.dependencies):
+            self.dependencies.add(dep)
+    def addDependencies(self, deps):
+        base = set(self.dependencies)
+        for dep in deps:
+            base.update(dep.dependencies)
+        unreachable = batchUnreachableFrom(deps, base)
+        self.dependencies.update(unreachable)
+    def depIDs(self):
+        ids = []
+        for dep in self.dependencies:
+            ids.append(dep.ionum)
+        return ids
+    def depMap(self):
+        deps = dict()
+        for dep in self.dependencies:
+            deps[dep.ionum] = self.start_time - dep.start_time
+        return deps
+    def addThreadCompletionDependencies(self, threads):
+        self.addDependencies(recentCompletions)
+    def numCompletionSuccessors(self):
+        return self.completion.numSuccessors if self.completion else 0
+    def writeTo(self, f, iotype):
+        f.write(struct.pack("!BIQIII", iotype, self.ionum, self.thread.id, self.numSuccessors, self.numCompletionSuccessors(), len(self.dependencies)))
+        for dep in self.dependencies:
+            f.write(struct.pack("!IQ", dep.ionum, self.start_time - dep.start_time))
+
+class StartThreadIO(IO):
+    def __init__(self, ionum, start_time, thread):
+        IO.__init__(self, ionum, start_time, thread, None)
+    def writeTo(self, f):
+        IO.writeTo(self, f, 0)
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": start thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class StopThreadIO(IO):
+    def __init__(self, ionum, start_time, thread):
+        IO.__init__(self, ionum, start_time, thread, None)
+    def writeTo(self, f):
+        IO.writeTo(self, f, 1)
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": stop thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class ReadIO(IO):
+    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+        IO.__init__(self, ionum, start_time, thread, prev)
+        self.imagectx = imagectx
+        self.extents = extents
+    def writeTo(self, f):
+        IO.writeTo(self, f, 2)
+        if len(self.extents) != 1:
+            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+        extent = self.extents[0]
+        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class WriteIO(IO):
+    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+        IO.__init__(self, ionum, start_time, thread, prev)
+        self.imagectx = imagectx
+        self.extents = extents
+    def writeTo(self, f):
+        IO.writeTo(self, f, 3)
+        if len(self.extents) != 1:
+            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+        extent = self.extents[0]
+        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class AioReadIO(IO):
+    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+        IO.__init__(self, ionum, start_time, thread, prev)
+        self.imagectx = imagectx
+        self.extents = extents
+    def writeTo(self, f):
+        IO.writeTo(self, f, 4)
+        if len(self.extents) != 1:
+            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+        extent = self.extents[0]
+        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class AioWriteIO(IO):
+    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+        IO.__init__(self, ionum, start_time, thread, prev)
+        self.imagectx = imagectx
+        self.extents = extents
+    def writeTo(self, f):
+        IO.writeTo(self, f, 5)
+        if len(self.extents) != 1:
+            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+        extent = self.extents[0]
+        f.write(struct.pack("!QQQ", imagectx, extent.offset, extent.length))
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class OpenImageIO(IO):
+    def __init__(self, ionum, start_time, thread, prev, imagectx, name, snap_name, readonly):
+        IO.__init__(self, ionum, start_time, thread, prev)
+        self.imagectx = imagectx
+        self.name = name
+        self.snap_name = snap_name
+        self.readonly = readonly
+    def writeTo(self, f):
+        IO.writeTo(self, f, 6)
+        f.write(struct.pack("!QI", self.imagectx, len(self.name)))
+        f.write(self.name)
+        f.write(struct.pack("!I", len(self.snap_name)))
+        f.write(self.snap_name)
+        f.write(struct.pack("!b", self.readonly))
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": open image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", name = " + self.name + ", snap_name = " + self.snap_name + ", readonly = " + str(self.readonly) + ", deps = " + str(self.depMap())
+
+class CloseImageIO(IO):
+    def __init__(self, ionum, start_time, thread, prev, imagectx):
+        IO.__init__(self, ionum, start_time, thread, prev)
+        self.imagectx = imagectx
+    def writeTo(self, f):
+        IO.writeTo(self, f, 7)
+        f.write(struct.pack("!Q", self.imagectx))
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": close image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", deps = " + str(self.depMap())
+
+class CompletionIO(IO):
+    def __init__(self, start_time, thread, baseIO):
+        IO.__init__(self, baseIO.ionum + 1, start_time, thread, None)
+        self.baseIO = baseIO
+        self.isCompletion = True
+        self.addDependency(baseIO)
+        baseIO.completion = self
+    def writeTo(self, f):
+        pass
+    def __str__(self):
+        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap())
+
+
+window = 1 * 1e9
+
+def completed(io):
+    global recentCompletions
+    recentCompletions.append(io)
+    recentCompletions[:] = [x for x in recentCompletions if x.start_time > io.start_time - window]
+    # while recentCompletions[0].start_time < io.start_time - window:
+    #     del recentCompletions[0]
+
+def main():
+    global ios
+    # Parse phase
+    trace_start = None
+    count = 0
+    for event in traces.events:
+        count = count + 1
+        if count > limit:
+            break
+        ts = event.timestamp
+        if not trace_start:
+            trace_start = ts
+        ts = ts - trace_start
+        threadID = event["pthread_id"]
+        if threadID in threads:
+            thread = threads[threadID]
+        else:
+            thread = Thread(threadID)
+            threads[threadID] = thread
+            ionum = nextID()
+            io = StartThreadIO(ionum, ts, thread)
+            ios.append(io)
+            if printOnRead:
+                print str(io)
+        thread.insertTS(ts)
+        if event.name == "librbd:read_enter":
+            name = event["name"]
+            readid = event["id"]
+            imagectx = event["imagectx"]
+            ionum = nextID()
+            thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+            thread.pendingIO.addThreadCompletionDependencies(threads)
+            thread.issuedIO(thread.pendingIO)
+            ios.append(thread.pendingIO)
+        elif event.name == "librbd:open_image_enter":
+            imagectx = event["imagectx"]
+            name = event["name"]
+            snap_name = event["snap_name"]
+            readid = event["id"]
+            readonly = event["read_only"]
+            ionum = nextID()
+            thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
+            thread.pendingIO.addThreadCompletionDependencies(threads)
+            thread.issuedIO(thread.pendingIO)
+            ios.append(thread.pendingIO)
+        elif event.name == "librbd:open_image_exit":
+            thread.pendingIO.end_time = ts
+            completionIO = CompletionIO(ts, thread, thread.pendingIO)
+            thread.completedIO(completionIO)
+            ios.append(completionIO)
+            completed(completionIO)
+            if printOnRead:
+                print str(thread.pendingIO)
+        elif event.name == "librbd:close_image_enter":
+            imagectx = event["imagectx"]
+            ionum = nextID()
+            thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
+            thread.pendingIO.addThreadCompletionDependencies(threads)
+            thread.issuedIO(thread.pendingIO)
+            ios.append(thread.pendingIO)
+        elif event.name == "librbd:close_image_exit":
+            thread.pendingIO.end_time = ts
+            completionIO = CompletionIO(ts, thread, thread.pendingIO)
+            thread.completedIO(completionIO)
+            ios.append(completionIO)
+            completed(completionIO)
+            if printOnRead:
+                print str(thread.pendingIO)
+        elif event.name == "librbd:read_extent":
+            offset = event["offset"]
+            length = event["length"]
+            thread.pendingIO.extents.append(Extent(offset, length))
+        elif event.name == "librbd:read_exit":
+            thread.pendingIO.end_time = ts
+            completionIO = CompletionIO(ts, thread, thread.pendingIO)
+            thread.completedIO(completionIO)
+            ios.append(completionIO)
+            completed(completionIO)
+            if printOnRead:
+                print str(thread.pendingIO)
+        elif event.name == "librbd:write_enter":
+            if not ignoreWrites:
+                name = event["name"]
+                readid = event["id"]
+                offset = event["off"]
+                length = event["buf_len"]
+                imagectx = event["imagectx"]
+                ionum = nextID()
+                thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+                thread.pendingIO.addThreadCompletionDependencies(threads)
+                thread.issuedIO(thread.pendingIO)
+                ios.append(thread.pendingIO)
+        elif event.name == "librbd:write_exit":
+            if not ignoreWrites:
+                thread.pendingIO.end_time = ts
+                completionIO = CompletionIO(ts, thread, thread.pendingIO)
+                thread.completedIO(completionIO)
+                ios.append(completionIO)
+                completed(completionIO)
+                if printOnRead:
+                    print str(thread.pendingIO)
+        elif event.name == "librbd:aio_read_enter":
+            name = event["name"]
+            readid = event["id"]
+            completion = event["completion"]
+            imagectx = event["imagectx"]
+            ionum = nextID()
+            thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+            thread.pendingIO.addThreadCompletionDependencies(threads)
+            ios.append(thread.pendingIO)
+            thread.issuedIO(thread.pendingIO)
+            pendingIOs[completion] = thread.pendingIO
+        elif event.name == "librbd:aio_read_extent":
+            offset = event["offset"]
+            length = event["length"]
+            thread.pendingIO.extents.append(Extent(offset, length))
+        elif event.name == "librbd:aio_read_exit":
+            if printOnRead:
+                print str(thread.pendingIO)
+        elif event.name == "librbd:aio_write_enter":
+            if not ignoreWrites:
+                name = event["name"]
+                writeid = event["id"]
+                offset = event["off"]
+                length = event["len"]
+                completion = event["completion"]
+                imagectx = event["imagectx"]
+                ionum = nextID()
+                thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+                thread.pendingIO.addThreadCompletionDependencies(threads)
+                thread.issuedIO(thread.pendingIO)
+                ios.append(thread.pendingIO)
+                pendingIOs[completion] = thread.pendingIO
+                if printOnRead:
+                    print str(thread.pendingIO)
+        elif event.name == "librbd:aio_complete_enter":
+            completion = event["completion"]
+            retval = event["rval"]
+            if completion in pendingIOs:
+                completedIO = pendingIOs[completion]
+                del pendingIOs[completion]
+                completedIO.end_time = ts
+                completionIO = CompletionIO(ts, thread, completedIO)
+                completedIO.thread.completedIO(completionIO)
+                ios.append(completionIO)
+                completed(completionIO)
+                if printOnRead:
+                    print str(completionIO)
+
+
+    # Insert-thread-stop phase
+    ios = sorted(ios, key = lambda io: io.start_time)
+    for threadID in threads:
+        thread = threads[threadID]
+        ionum = None
+        maxIONum = 0 # only valid if ionum is None
+        for io in ios:
+            if io.ionum > maxIONum:
+                maxIONum = io.ionum
+            if io.start_time > thread.lastTS:
+                ionum = io.ionum
+                if ionum % 2 == 1:
+                    ionum = ionum + 1
+                break
+        if not ionum:
+            if maxIONum % 2 == 1:
+                maxIONum = maxIONum - 1
+            ionum = maxIONum + 2
+        for io in ios:
+            if io.ionum >= ionum:
+                io.ionum = io.ionum + 2
+        # TODO: Insert in the right place, don't append and re-sort
+        ios.append(StopThreadIO(ionum, thread.lastTS, thread))
+        ios = sorted(ios, key = lambda io: io.start_time)
+
+
+    for io in ios:
+        if io.prev and io.prev in io.dependencies:
+            io.dependencies.remove(io.prev)
+        if io.isCompletion:
+            io.dependencies.clear()
+        for dep in io.dependencies:
+            dep.numSuccessors = dep.numSuccessors + 1
+
+    print
+    # for io in ios:
+    #     if not io.isCompletion:
+    #         print str(io)
+
+    with open("replay.bin", "wb") as f:
+        for io in ios:
+            if not io.isCompletion:
+                print str(io)
+            io.writeTo(f)
+
+cProfile.run("main()")
diff --git a/src/rbd_replay/rbd-replay.cc b/src/rbd_replay/rbd-replay.cc
new file mode 100644 (file)
index 0000000..1825bbc
--- /dev/null
@@ -0,0 +1,88 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <vector>
+#include "common/ceph_argparse.h"
+#include "Replayer.hpp"
+
+
+using namespace std;
+using namespace rbd_replay;
+
+
+static const char* get_remainder(const char *string, const char *prefix) {
+  while (*prefix) {
+    if (*prefix++ != *string++) {
+      return NULL;
+    }
+  }
+  return string;
+}
+
+static void usage(const char* program) {
+  cout << "Usage: " << program << " --conf=<config_file> <replay_file>" << endl;
+}
+
+int main(int argc, const char **argv) {
+  vector<const char*> args;
+
+  argv_to_vec(argc, argv, args);
+  env_to_vec(args);
+
+  std::vector<const char*>::iterator i;
+  string conf;
+  float latency_multiplier = 1;
+  std::string val;
+  std::ostringstream err;
+  for (i = args.begin(); i != args.end(); ) {
+    if (ceph_argparse_double_dash(args, i)) {
+      break;
+    } else if (ceph_argparse_witharg(args, i, &val, "-c", "--conf", (char*)NULL)) {
+      conf = val;
+    } else if (ceph_argparse_withfloat(args, i, &latency_multiplier, &err, "--latency-multiplier",
+                                    (char*)NULL)) {
+      if (!err.str().empty()) {
+       cerr << err.str() << std::endl;
+       return 1;
+      }
+    } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
+      usage(argv[0]);
+      return 0;
+    } else if (get_remainder(*i, "-")) {
+      cerr << "Unrecognized argument: " << *i << endl;
+      return 1;
+    } else {
+      ++i;
+    }
+  }
+
+  if (conf.empty()) {
+    cerr << "No config file specified.  Use -c or --conf." << endl;
+    return 1;
+  }
+
+  string replay_file;
+  if (!args.empty()) {
+    replay_file = args[0];
+  }
+
+  if (replay_file.empty()) {
+    cerr << "No replay file specified." << endl;
+    return 1;
+  }
+
+  Replayer replayer;
+  replayer.set_latency_multiplier(latency_multiplier);
+  replayer.run(conf, replay_file);
+}