--- /dev/null
+/*.log
+/replayer
+/traces
--- /dev/null
+#!/bin/bash
+
+pool=rbd
+image=my-image
+size=10G
+export LD_LIBRARY_PATH=../../src/.libs
+#qemu-img create -f raw rbd:$pool/$image:conf=../../src/ceph.conf $size
+qemu-img convert linux-0.2.img -O raw rbd:$pool/$image:conf=../../src/ceph.conf
--- /dev/null
+#!/bin/bash
+
+lttng create
+lttng enable-event -u 'librbd:*'
+lttng add-context -u -t pthread_id
+lttng start
+LD_LIBRARY_PATH=../../src/.libs ../../src/rbd-replay --conf=../../src/ceph.conf replay.bin | tee replay.log
+lttng stop
+lttng view > replay-trace.log
--- /dev/null
+#!/bin/bash
+
+PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit
--- /dev/null
+#!/bin/bash
+
+mkdir -p traces
+lttng create -o traces librbd
+lttng enable-event -u 'librbd:*'
+lttng add-context -u -t pthread_id
+lttng start
+LD_LIBRARY_PATH=../../src/.libs/ qemu-system-i386 -m 1024 rbd:rbd/my-image:conf=../../src/ceph.conf
+lttng stop
+lttng view > trace.log
/radosgw-admin
/rbd
/rbd-fuse
+/rbd-replay
/rest-bench
/sample.fetch_config
/TAGS
include rgw/Makefile.am
include cls/Makefile.am
include key_value_store/Makefile.am
+include rbd_replay/Makefile.am
include test/Makefile.am
include tools/Makefile.am
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _INCLUDED_BOUNDED_BUFFER_HPP
+#define _INCLUDED_BOUNDED_BUFFER_HPP
+
+#include <boost/bind.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+
+// Taken from http://www.boost.org/doc/libs/1_55_0/libs/circular_buffer/example/circular_buffer_bound_example.cpp
+template <class T>
+class BoundedBuffer {
+public:
+ typedef boost::circular_buffer<T> container_type;
+ typedef typename container_type::size_type size_type;
+ typedef typename container_type::value_type value_type;
+ typedef typename boost::call_traits<value_type>::param_type param_type;
+
+ explicit BoundedBuffer(size_type capacity) : m_unread(0), m_container(capacity) {
+ }
+
+ void push_front(typename boost::call_traits<value_type>::param_type item) {
+ // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
+ boost::mutex::scoped_lock lock(m_mutex);
+ m_not_full.wait(lock, boost::bind(&BoundedBuffer<value_type>::is_not_full, this));
+ m_container.push_front(item);
+ ++m_unread;
+ lock.unlock();
+ m_not_empty.notify_one();
+ }
+
+ void pop_back(value_type* pItem) {
+ boost::mutex::scoped_lock lock(m_mutex);
+ m_not_empty.wait(lock, boost::bind(&BoundedBuffer<value_type>::is_not_empty, this));
+ *pItem = m_container[--m_unread];
+ lock.unlock();
+ m_not_full.notify_one();
+ }
+
+private:
+ BoundedBuffer(const BoundedBuffer&); // Disabled copy constructor.
+ BoundedBuffer& operator= (const BoundedBuffer&); // Disabled assign operator.
+
+ bool is_not_empty() const {
+ return m_unread > 0;
+ }
+ bool is_not_full() const {
+ return m_unread < m_container.capacity();
+ }
+
+ size_type m_unread;
+ container_type m_container;
+ boost::mutex m_mutex;
+ boost::condition m_not_empty;
+ boost::condition m_not_full;
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Deser.hpp"
+#include <arpa/inet.h>
+#include <cstdlib>
+#include <endian.h>
+
+
+rbd_replay::Deser::Deser(std::istream &in)
+ : m_in(in) {
+}
+
+uint8_t rbd_replay::Deser::read_uint8_t() {
+ uint8_t data;
+ m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+ return data;
+}
+
+uint16_t rbd_replay::Deser::read_uint16_t() {
+ uint16_t data;
+ m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+ return ntohs(data);
+}
+
+uint32_t rbd_replay::Deser::read_uint32_t() {
+ uint32_t data;
+ m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+ return ntohl(data);
+}
+
+uint64_t rbd_replay::Deser::read_uint64_t() {
+ uint64_t data;
+ m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+ data = (static_cast<uint64_t>(ntohl(data)) << 32 | ntohl(data >> 32));
+#endif
+ return data;
+}
+
+std::string rbd_replay::Deser::read_string() {
+ uint32_t length = read_uint32_t();
+ char* data = reinterpret_cast<char*>(malloc(length));
+ m_in.read(data, length);
+ std::string s(data, length);
+ free(data);
+ return s;
+}
+
+bool rbd_replay::Deser::read_bool() {
+ return read_uint8_t() != 0;
+}
+
+bool rbd_replay::Deser::eof() {
+ return m_in.eof();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP
+#define _INCLUDED_RBD_REPLAY_DESER_HPP
+
+#include <iostream>
+#include <stdint.h>
+
+namespace rbd_replay {
+
+class Deser {
+public:
+ Deser(std::istream &in);
+
+ uint8_t read_uint8_t();
+
+ uint16_t read_uint16_t();
+
+ uint32_t read_uint32_t();
+
+ uint64_t read_uint64_t();
+
+ std::string read_string();
+
+ bool read_bool();
+
+ bool eof();
+
+private:
+ std::istream &m_in;
+};
+
+}
+
+#endif
--- /dev/null
+rbd_replay_SOURCES = rbd_replay/rbd-replay.cc \
+ rbd_replay/actions.cc \
+ rbd_replay/Deser.cc \
+ rbd_replay/PendingIO.cc \
+ rbd_replay/Replayer.cc
+rbd_replay_LDADD = $(LIBRBD) $(LIBRADOS) $(CEPH_GLOBAL)
+noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
+ rbd_replay/actions.hpp \
+ rbd_replay/Deser.hpp \
+ rbd_replay/PendingIO.hpp \
+ rbd_replay/Replayer.hpp
+if LINUX
+bin_PROGRAMS += rbd-replay
+endif #LINUX
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "PendingIO.hpp"
+
+
+using namespace std;
+using namespace rbd_replay;
+
+extern "C"
+void pending_io_callback(librbd::completion_t cb, void *arg) {
+ PendingIO *io = (PendingIO*)arg;
+ io->completed(cb);
+}
+
+PendingIO::PendingIO(action_id_t id,
+ ActionCtx &worker)
+ : m_id(id),
+ m_completion(this, pending_io_callback),
+ m_worker(worker) {
+ }
+
+void PendingIO::completed(librbd::completion_t cb) {
+ cout << "Completed pending IO #" << m_id << endl;
+ m_worker.remove_pending(shared_from_this());
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_PENDINGIO_HPP
+#define _INCLUDED_RBD_REPLAY_PENDINGIO_HPP
+
+#include <boost/enable_shared_from_this.hpp>
+#include "actions.hpp"
+
+namespace rbd_replay {
+
+class PendingIO : public boost::enable_shared_from_this<PendingIO> {
+public:
+ typedef boost::shared_ptr<PendingIO> ptr;
+
+ PendingIO(action_id_t id,
+ ActionCtx &worker);
+
+ void completed(librbd::completion_t cb);
+
+ action_id_t id() const {
+ return m_id;
+ }
+
+ ceph::bufferlist &bufferlist() {
+ return m_bl;
+ }
+
+ librbd::RBD::AioCompletion &completion() {
+ return m_completion;
+ }
+
+private:
+ const action_id_t m_id;
+ ceph::bufferlist m_bl;
+ librbd::RBD::AioCompletion m_completion;
+ ActionCtx &m_worker;
+};
+
+}
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Replayer.hpp"
+#include <boost/foreach.hpp>
+#include <boost/thread/thread.hpp>
+#include <fstream>
+
+
+using namespace std;
+using namespace rbd_replay;
+
+
+Worker::Worker(Replayer &replayer)
+ : m_replayer(replayer),
+ m_buffer(100),
+ m_done(false) {
+}
+
+void Worker::start() {
+ m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Worker::run, this)));
+}
+
+// Should only be called by StopThreadAction
+void Worker::stop() {
+ m_done = true;
+}
+
+void Worker::join() {
+ m_thread->join();
+}
+
+void Worker::send(Action::ptr action) {
+ m_buffer.push_front(action);
+}
+
+void Worker::add_pending(PendingIO::ptr io) {
+ boost::mutex::scoped_lock lock(m_pending_ios_mutex);
+ m_pending_ios.push_back(io);
+}
+
+void Worker::run() {
+ cout << "Worker thread started" << endl;
+ while (!m_done) {
+ Action::ptr action;
+ m_buffer.pop_back(&action);
+ m_replayer.wait_for_actions(action->predecessors());
+ action->perform(*this);
+ m_replayer.set_action_complete(action->id());
+ }
+ {
+ boost::mutex::scoped_lock lock(m_pending_ios_mutex);
+ while (!m_pending_ios.empty()) {
+ m_pending_ios_empty.wait(lock);
+ }
+ }
+ cout << "Worker thread stopped" << endl;
+}
+
+
+void Worker::remove_pending(PendingIO::ptr io) {
+ m_replayer.set_action_complete(io->id());
+ boost::mutex::scoped_lock lock(m_pending_ios_mutex);
+ for (vector<PendingIO::ptr>::iterator itr = m_pending_ios.begin(); itr != m_pending_ios.end(); itr++) {
+ if (*itr == io) {
+ m_pending_ios.erase(itr);
+ break;
+ }
+ }
+ if (m_pending_ios.empty()) {
+ m_pending_ios_empty.notify_all();
+ }
+}
+
+
+librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) {
+ return m_replayer.get_image(imagectx_id);
+}
+
+
+void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) {
+ m_replayer.put_image(imagectx_id, image);
+}
+
+
+void Worker::erase_image(imagectx_id_t imagectx_id) {
+ m_replayer.erase_image(imagectx_id);
+}
+
+
+librbd::RBD* Worker::rbd() {
+ return m_replayer.get_rbd();
+}
+
+
+librados::IoCtx* Worker::ioctx() {
+ return m_replayer.get_ioctx();
+}
+
+
+void Worker::set_action_complete(action_id_t id) {
+ m_replayer.set_action_complete(id);
+}
+
+
+Replayer::Replayer() {
+}
+
+void Replayer::run(const std::string conf_file, const std::string replay_file) {
+ cout << "IO thread started" << endl;
+ {
+ librados::Rados rados;
+ rados.init(NULL);
+ int r = rados.conf_read_file(conf_file.c_str());
+ if (r) {
+ cerr << "Unable to read conf file: " << r << endl;
+ goto out;
+ }
+ r = rados.connect();
+ if (r) {
+ cerr << "Unable to connect to Rados: " << r << endl;
+ goto out;
+ }
+ m_ioctx = new librados::IoCtx();
+ {
+ const char* pool_name = "rbd";
+ r = rados.ioctx_create(pool_name, *m_ioctx);
+ if (r) {
+ cerr << "Unable to create IoCtx: " << r << endl;
+ goto out2;
+ }
+ m_rbd = new librbd::RBD();
+ map<thread_id_t, Worker*> workers;
+
+ ifstream input(replay_file.c_str(), ios::in | ios::binary);
+ if (!input.is_open()) {
+ cerr << "Unable to open " << replay_file << endl;
+ exit(1);
+ }
+
+ Deser deser(input);
+ while (true) {
+ Action::ptr action = Action::read_from(deser);
+ if (!action) {
+ break;
+ }
+ if (action->is_start_thread()) {
+ Worker *worker = new Worker(*this);
+ workers[action->thread_id()] = worker;
+ worker->start();
+ } else {
+ workers[action->thread_id()]->send(action);
+ }
+ }
+
+ cout << "Waiting for workers to die" << endl;
+ pair<thread_id_t, Worker*> w;
+ BOOST_FOREACH(w, workers) {
+ w.second->join();
+ delete w.second;
+ }
+ clear_images();
+ delete m_rbd;
+ m_rbd = NULL;
+ }
+ out2:
+ delete m_ioctx;
+ m_ioctx = NULL;
+ }
+ out:
+ cout << "IO thread stopped" << endl;
+}
+
+
+librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
+ boost::shared_lock<boost::shared_mutex> lock(m_images_mutex);
+ return m_images[imagectx_id];
+}
+
+void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
+ boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
+ assert(m_images.count(imagectx_id) == 0);
+ m_images[imagectx_id] = image;
+}
+
+void Replayer::erase_image(imagectx_id_t imagectx_id) {
+ boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
+ delete m_images[imagectx_id];
+ m_images.erase(imagectx_id);
+}
+
+void Replayer::set_action_complete(action_id_t id) {
+ cout << "ActionTracker::set_complete(" << id << ")" << endl;
+ boost::system_time now(boost::get_system_time());
+ boost::unique_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+ assert(m_actions_complete.count(id) == 0);
+ m_actions_complete[id] = now;
+ m_actions_complete_condition.notify_all();
+}
+
+bool Replayer::is_action_complete(action_id_t id) {
+ boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+ return _is_action_complete(id);
+}
+
+void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
+ boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
+ BOOST_FOREACH(const dependency_d &dep, deps) {
+ cout << "Waiting for " << dep.id << endl;
+ boost::system_time start_time(boost::get_system_time());
+ boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
+ while (!_is_action_complete(dep.id)) {
+ //m_actions_complete_condition.wait(lock);
+ m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1));
+ cout << "Still waiting for " << dep.id << endl;
+ }
+ boost::system_time action_completed_time(m_actions_complete[dep.id]);
+ lock.unlock();
+ boost::system_time end_time(boost::get_system_time());
+ long long micros = (end_time - start_time).total_microseconds();
+ cout << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << endl;
+ // Apparently the nanoseconds constructor is optional:
+ // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
+ boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000));
+ if (sub_release_time > release_time) {
+ release_time = sub_release_time;
+ }
+ }
+ if (release_time > boost::get_system_time()) {
+ cout << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << endl;
+ boost::this_thread::sleep(release_time);
+ }
+}
+
+void Replayer::clear_images() {
+ boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
+ pair<imagectx_id_t, librbd::Image*> p;
+ BOOST_FOREACH(p, m_images) {
+ delete p.second;
+ }
+ m_images.clear();
+}
+
+bool Replayer::_is_action_complete(action_id_t id) {
+ return m_actions_complete.count(id) > 0;
+}
+
+void Replayer::set_latency_multiplier(float f) {
+ m_latency_multiplier = f;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_REPLAYER_HPP
+#define _INCLUDED_RBD_REPLAY_REPLAYER_HPP
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include "BoundedBuffer.hpp"
+#include "PendingIO.hpp"
+
+namespace rbd_replay {
+
+class Replayer;
+
+class Worker : public ActionCtx {
+public:
+ explicit Worker(Replayer &replayer);
+
+ void start();
+
+ // Should only be called by StopThreadAction
+ void stop();
+
+ void join();
+
+ void send(Action::ptr action);
+
+ void add_pending(PendingIO::ptr io);
+
+ void remove_pending(PendingIO::ptr io);
+
+ librbd::Image* get_image(imagectx_id_t imagectx_id);
+
+ void put_image(imagectx_id_t imagectx_id, librbd::Image* image);
+
+ void erase_image(imagectx_id_t imagectx_id);
+
+ librbd::RBD* rbd();
+
+ librados::IoCtx* ioctx();
+
+ void set_action_complete(action_id_t id);
+
+private:
+ void run();
+
+ Replayer &m_replayer;
+ BoundedBuffer<Action::ptr> m_buffer;
+ boost::shared_ptr<boost::thread> m_thread;
+ std::vector<PendingIO::ptr> m_pending_ios;
+ boost::mutex m_pending_ios_mutex;
+ boost::condition m_pending_ios_empty;
+ bool m_done;
+};
+
+
+class Replayer {
+public:
+ Replayer();
+
+ void run(const std::string conf_file, const std::string replay_file);
+
+ librbd::RBD* get_rbd() {
+ return m_rbd;
+ }
+
+ librados::IoCtx* get_ioctx() {
+ return m_ioctx;
+ }
+
+ librbd::Image* get_image(imagectx_id_t imagectx_id);
+
+ void put_image(imagectx_id_t imagectx_id, librbd::Image *image);
+
+ void erase_image(imagectx_id_t imagectx_id);
+
+ void set_action_complete(action_id_t id);
+
+ bool is_action_complete(action_id_t id);
+
+ void wait_for_actions(const std::vector<dependency_d> &deps);
+
+ void set_latency_multiplier(float f);
+
+private:
+ void clear_images();
+
+ bool _is_action_complete(action_id_t id);
+
+ // Disallow assignment and copying
+ Replayer(const Replayer& rhs);
+ const Replayer& operator=(const Replayer& rhs);
+
+ librbd::RBD* m_rbd;
+ librados::IoCtx* m_ioctx;
+ float m_latency_multiplier;
+
+ std::map<imagectx_id_t, librbd::Image*> m_images;
+ boost::shared_mutex m_images_mutex;
+
+ // Maps an action ID to the time the action completed
+ std::map<action_id_t, boost::system_time> m_actions_complete;
+ boost::shared_mutex m_actions_complete_mutex;
+ boost::condition m_actions_complete_condition;
+};
+
+}
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "actions.hpp"
+#include <cstdlib>
+#include "PendingIO.hpp"
+
+
+using namespace rbd_replay;
+using namespace std;
+
+
+Action::Action(action_id_t id,
+ thread_id_t thread_id,
+ int num_successors,
+ int num_completion_successors,
+ std::vector<dependency_d> &predecessors)
+ : m_id(id),
+ m_thread_id(thread_id),
+ m_num_successors(num_successors),
+ m_num_completion_successors(num_completion_successors),
+ m_predecessors(predecessors) {
+ }
+
+Action::~Action() {
+}
+
+Action::ptr Action::read_from(Deser &d) {
+ uint8_t type = d.read_uint8_t();
+ if (d.eof()) {
+ return Action::ptr();
+ }
+ uint32_t ionum = d.read_uint32_t();
+ uint64_t thread_id = d.read_uint64_t();
+ uint32_t num_successors = d.read_uint32_t();
+ uint32_t num_completion_successors = d.read_uint32_t();
+ uint32_t num_dependencies = d.read_uint32_t();
+ vector<dependency_d> deps;
+ for (unsigned int i = 0; i < num_dependencies; i++) {
+ uint32_t dep_id = d.read_uint32_t();
+ uint64_t time_delta = d.read_uint64_t();
+ deps.push_back(dependency_d(dep_id, time_delta));
+ }
+ DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps);
+ switch (type) {
+ case 0:
+ return StartThreadAction::read_from(dummy, d);
+ case 1:
+ return StopThreadAction::read_from(dummy, d);
+ case 2:
+ return ReadAction::read_from(dummy, d);
+ case 3:
+ return WriteAction::read_from(dummy, d);
+ case 4:
+ return AioReadAction::read_from(dummy, d);
+ case 5:
+ return AioWriteAction::read_from(dummy, d);
+ case 6:
+ return OpenImageAction::read_from(dummy, d);
+ case 7:
+ return CloseImageAction::read_from(dummy, d);
+ default:
+ cerr << "Invalid action type: " << type << endl;
+ exit(1);
+ }
+}
+
+
+StartThreadAction::StartThreadAction(Action &src)
+ : Action(src) {
+}
+
+void StartThreadAction::perform(ActionCtx &ctx) {
+ cerr << "StartThreadAction should never actually be performed" << endl;
+ exit(1);
+}
+
+bool StartThreadAction::is_start_thread() {
+ return true;
+}
+
+Action::ptr StartThreadAction::read_from(Action &src, Deser &d) {
+ return Action::ptr(new StartThreadAction(src));
+}
+
+
+StopThreadAction::StopThreadAction(Action &src)
+ : Action(src) {
+}
+
+void StopThreadAction::perform(ActionCtx &ctx) {
+ cout << "Performing stop thread action #" << id() << endl;
+ ctx.stop();
+}
+
+Action::ptr StopThreadAction::read_from(Action &src, Deser &d) {
+ return Action::ptr(new StopThreadAction(src));
+}
+
+AioReadAction::AioReadAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length)
+ : Action(src),
+ m_imagectx_id(imagectx_id),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+Action::ptr AioReadAction::read_from(Action &src, Deser &d) {
+ imagectx_id_t imagectx_id = d.read_uint64_t();
+ uint64_t offset = d.read_uint64_t();
+ uint64_t length = d.read_uint64_t();
+ return Action::ptr(new AioReadAction(src, imagectx_id, offset, length));
+}
+
+void AioReadAction::perform(ActionCtx &worker) {
+ cout << "Performing AIO read action #" << id() << endl;
+ librbd::Image *image = worker.get_image(m_imagectx_id);
+ assert(image);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion());
+ worker.add_pending(io);
+}
+
+
+ReadAction::ReadAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length)
+ : Action(src),
+ m_imagectx_id(imagectx_id),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+Action::ptr ReadAction::read_from(Action &src, Deser &d) {
+ imagectx_id_t imagectx_id = d.read_uint64_t();
+ uint64_t offset = d.read_uint64_t();
+ uint64_t length = d.read_uint64_t();
+ return Action::ptr(new ReadAction(src, imagectx_id, offset, length));
+}
+
+void ReadAction::perform(ActionCtx &worker) {
+ cout << "Performing read action #" << id() << endl;
+ librbd::Image *image = worker.get_image(m_imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ image->read(m_offset, m_length, io->bufferlist());
+ worker.remove_pending(io);
+}
+
+
+AioWriteAction::AioWriteAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length)
+ : Action(src),
+ m_imagectx_id(imagectx_id),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+Action::ptr AioWriteAction::read_from(Action &src, Deser &d) {
+ imagectx_id_t imagectx_id = d.read_uint64_t();
+ uint64_t offset = d.read_uint64_t();
+ uint64_t length = d.read_uint64_t();
+ return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length));
+}
+
+void AioWriteAction::perform(ActionCtx &worker) {
+ cout << "Performing AIO write action #" << id() << endl;
+ librbd::Image *image = worker.get_image(m_imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ io->bufferlist().append_zero(m_length);
+ image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion());
+ worker.add_pending(io);
+}
+
+
+WriteAction::WriteAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length)
+ : Action(src),
+ m_imagectx_id(imagectx_id),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+Action::ptr WriteAction::read_from(Action &src, Deser &d) {
+ imagectx_id_t imagectx_id = d.read_uint64_t();
+ uint64_t offset = d.read_uint64_t();
+ uint64_t length = d.read_uint64_t();
+ return Action::ptr(new WriteAction(src, imagectx_id, offset, length));
+}
+
+void WriteAction::perform(ActionCtx &worker) {
+ cout << "Performing write action #" << id() << endl;
+ librbd::Image *image = worker.get_image(m_imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ io->bufferlist().append_zero(m_length);
+ image->write(m_offset, m_length, io->bufferlist());
+ worker.remove_pending(io);
+}
+
+
+OpenImageAction::OpenImageAction(Action &src,
+ imagectx_id_t imagectx_id,
+ string name,
+ string snap_name,
+ bool readonly)
+ : Action(src),
+ m_imagectx_id(imagectx_id),
+ m_name(name),
+ m_snap_name(snap_name),
+ m_readonly(readonly) {
+ }
+
+Action::ptr OpenImageAction::read_from(Action &src, Deser &d) {
+ imagectx_id_t imagectx_id = d.read_uint64_t();
+ string name = d.read_string();
+ string snap_name = d.read_string();
+ bool readonly = d.read_bool();
+ return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly));
+}
+
+void OpenImageAction::perform(ActionCtx &worker) {
+ cout << "Performing open image action #" << id() << endl;
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ librbd::Image *image = new librbd::Image();
+ librbd::RBD *rbd = worker.rbd();
+ int r;
+ if (m_readonly) {
+ r = rbd->open_read_only(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str());
+ } else {
+ r = rbd->open(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str());
+ }
+ if (r) {
+ cerr << "Unable to open image '" << m_name << "' with snap '" << m_snap_name << "' and readonly " << m_readonly << ": " << strerror(-r) << endl;
+ exit(1);
+ }
+ worker.put_image(m_imagectx_id, image);
+ worker.remove_pending(io);
+}
+
+
+CloseImageAction::CloseImageAction(Action &src,
+ imagectx_id_t imagectx_id)
+ : Action(src),
+ m_imagectx_id(imagectx_id) {
+ }
+
+Action::ptr CloseImageAction::read_from(Action &src, Deser &d) {
+ imagectx_id_t imagectx_id = d.read_uint64_t();
+ return Action::ptr(new CloseImageAction(src, imagectx_id));
+}
+
+void CloseImageAction::perform(ActionCtx &worker) {
+ cout << "Performing close image action #" << id() << endl;
+ worker.erase_image(m_imagectx_id);
+ worker.set_action_complete(pending_io_id());
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_ACTIONS_HPP
+#define _INCLUDED_RBD_REPLAY_ACTIONS_HPP
+
+#include <boost/shared_ptr.hpp>
+#include "include/rbd/librbd.hpp"
+#include "Deser.hpp"
+
+namespace rbd_replay {
+
+typedef uint64_t imagectx_id_t;
+typedef uint64_t thread_id_t;
+
+// Even IDs are normal actions, odd IDs are completions
+typedef uint32_t action_id_t;
+
+struct dependency_d {
+ action_id_t id;
+
+ uint64_t time_delta;
+
+ dependency_d(action_id_t id,
+ uint64_t time_delta)
+ : id(id),
+ time_delta(time_delta) {
+ }
+};
+
+
+class PendingIO;
+
+
+class ActionCtx {
+public:
+ virtual ~ActionCtx() {
+ }
+
+ virtual librbd::Image* get_image(imagectx_id_t imagectx_id) = 0;
+
+ virtual void put_image(imagectx_id_t imagectx_id, librbd::Image* image) = 0;
+
+ virtual void erase_image(imagectx_id_t imagectx_id) = 0;
+
+ virtual librbd::RBD* rbd() = 0;
+
+ virtual librados::IoCtx* ioctx() = 0;
+
+ virtual void add_pending(boost::shared_ptr<PendingIO> io) = 0;
+
+ virtual void remove_pending(boost::shared_ptr<PendingIO> io) = 0;
+
+ virtual void set_action_complete(action_id_t id) = 0;
+
+ virtual void stop() = 0;
+};
+
+
+class Action {
+public:
+ typedef boost::shared_ptr<Action> ptr;
+
+ Action(action_id_t id,
+ thread_id_t thread_id,
+ int num_successors,
+ int num_completion_successors,
+ std::vector<dependency_d> &predecessors);
+
+ virtual ~Action();
+
+ virtual void perform(ActionCtx &ctx) = 0;
+
+ action_id_t pending_io_id() {
+ return m_id + 1;
+ }
+
+ // There's probably a better way to do this, but oh well.
+ virtual bool is_start_thread() {
+ return false;
+ }
+
+ action_id_t id() const {
+ return m_id;
+ }
+
+ thread_id_t thread_id() const {
+ return m_thread_id;
+ }
+
+ const std::vector<dependency_d>& predecessors() const {
+ return m_predecessors;
+ }
+
+ static ptr read_from(Deser &d);
+
+private:
+ const action_id_t m_id;
+ const thread_id_t m_thread_id;
+ const int m_num_successors;
+ const int m_num_completion_successors;
+ const std::vector<dependency_d> m_predecessors;
+};
+
+
+class DummyAction : public Action {
+public:
+ DummyAction(action_id_t id,
+ thread_id_t thread_id,
+ int num_successors,
+ int num_completion_successors,
+ std::vector<dependency_d> &predecessors)
+ : Action(id, thread_id, num_successors, num_completion_successors, predecessors) {
+ }
+
+ void perform(ActionCtx &ctx) {
+ }
+};
+
+class StopThreadAction : public Action {
+public:
+ explicit StopThreadAction(Action &src);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+};
+
+
+class AioReadAction : public Action {
+public:
+ AioReadAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+ imagectx_id_t m_imagectx_id;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+
+class ReadAction : public Action {
+public:
+ ReadAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+ imagectx_id_t m_imagectx_id;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+
+class AioWriteAction : public Action {
+public:
+ AioWriteAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+ imagectx_id_t m_imagectx_id;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+
+class WriteAction : public Action {
+public:
+ WriteAction(const Action &src,
+ imagectx_id_t imagectx_id,
+ uint64_t offset,
+ uint64_t length);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+ imagectx_id_t m_imagectx_id;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+
+class OpenImageAction : public Action {
+public:
+ OpenImageAction(Action &src,
+ imagectx_id_t imagectx_id,
+ std::string name,
+ std::string snap_name,
+ bool readonly);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+ imagectx_id_t m_imagectx_id;
+ std::string m_name;
+ std::string m_snap_name;
+ bool m_readonly;
+};
+
+
+class CloseImageAction : public Action {
+public:
+ CloseImageAction(Action &src,
+ imagectx_id_t imagectx_id);
+
+ void perform(ActionCtx &ctx);
+
+ static Action::ptr read_from(Action &src, Deser &d);
+
+private:
+ imagectx_id_t m_imagectx_id;
+};
+
+
+class StartThreadAction : public Action {
+public:
+ explicit StartThreadAction(Action &src);
+
+ void perform(ActionCtx &ctx);
+
+ bool is_start_thread();
+
+ static Action::ptr read_from(Action &src, Deser &d);
+};
+
+}
+
+#endif
--- /dev/null
+#!/usr/bin/python
+# -*- mode:Python; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+# vim: ts=8 sw=2 smarttab
+#
+# Ceph - scalable distributed file system
+#
+# Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation. See file COPYING.
+#
+#
+
+from babeltrace import *
+import cProfile
+import datetime
+import struct
+import sys
+
+traces = TraceCollection()
+ret = traces.add_trace(sys.argv[1], "ctf")
+
+pendingIOs = {}
+threads = {}
+ios = []
+printOnRead = False
+recentCompletions = []
+limit = 100000000000
+ignoreWrites = True
+
+ioCount = 0
+def nextID():
+ global ioCount
+ val = ioCount
+ ioCount = ioCount + 2
+ return val
+
+class Extent(object):
+ def __init__(self, offset, length):
+ self.offset = offset
+ self.length = length
+ def __str__(self):
+ return str(self.offset) + "+" + str(self.length)
+ def __repr__(self):
+ return "Extent(" + str(self.offset) + "," + str(self.length) + ")"
+
+class Thread(object):
+ def __init__(self, id):
+ self.id = id
+ self.pendingIO = None
+ self.latestIO = None # may not be meaningful
+ self.latestCompletion = None # may not be meaningful
+ self.lastTS = None
+ def insertTS(self, ts):
+ if not self.lastTS or ts > self.lastTS:
+ self.lastTS = ts
+ def issuedIO(self, io):
+ latestIOs = []
+ for threadID in threads:
+ thread = threads[threadID]
+ if thread.latestIO and thread.latestIO.start_time > io.start_time - window:
+ latestIOs.append(thread.latestIO)
+ io.addDependencies(latestIOs)
+ self.latestIO = io
+ def completedIO(self, io):
+ self.latestCompletion = io
+
+def batchUnreachableFrom(deps, base):
+ if not base:
+ return set()
+ if not deps:
+ return set()
+ unreachable = set()
+ searchingFor = set(deps)
+ discovered = set()
+ boundary = set(base)
+ boundaryHorizon = None
+ for io in boundary:
+ if not boundaryHorizon or io.start_time > boundaryHorizon:
+ boundaryHorizon = io.start_time
+ searchingHorizon = None
+ for io in searchingFor:
+ if not searchingHorizon or io.start_time < searchingHorizon:
+ searchingHorizon = io.start_time
+ tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
+ searchingFor.difference_update(tmp)
+ unreachable.update(tmp)
+ while boundary and searchingFor:
+ io = boundary.pop()
+ for dep in io.dependencies:
+ if dep in searchingFor:
+ searchingFor.remove(dep)
+ if dep.start_time == searchingHorizon:
+ searchingHorizon = None
+ for io in searchingFor:
+ if not searchingHorizon or io.start_time < searchingHorizon:
+ searchingHorizon = io.start_time
+ if not dep in discovered:
+ boundary.add(dep)
+ if io.start_time == boundaryHorizon:
+ boundaryHorizon = None
+ for io in boundary:
+ if not boundaryHorizon or io.start_time > boundaryHorizon:
+ boundaryHorizon = io.start_time
+ if boundaryHorizon:
+ tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
+ searchingFor.difference_update(tmp)
+ unreachable.update(tmp)
+ searchingHorizon = None
+ for io in searchingFor:
+ if not searchingHorizon or io.start_time < searchingHorizon:
+ searchingHorizon = io.start_time
+ unreachable.update(searchingFor)
+ return unreachable
+
+class IO(object):
+ def __init__(self, ionum, start_time, thread, prev):
+ self.ionum = ionum
+ self.start_time = start_time
+ self.thread = thread
+ self.dependencies = set()
+ self.isCompletion = False
+ self.prev = prev
+ self.numSuccessors = 0
+ self.completion = None
+ def reachableFrom(self, ios):
+ if not ios:
+ return False
+ discovered = set()
+ boundary = set(ios)
+ horizon = None
+ for io in boundary:
+ if not horizon or io.start_time > horizon:
+ horizon = io.start_time
+ if horizon < self.start_time:
+ return False
+ while boundary:
+ io = boundary.pop()
+ for dep in io.dependencies:
+ if self == dep:
+ return True
+ if not dep in discovered:
+ boundary.add(dep)
+ if io.start_time == horizon:
+ horizon = None
+ for io in boundary:
+ if not horizon or io.start_time > horizon:
+ horizon = io.start_time
+ if horizon and horizon < self.start_time:
+ return False
+ return False
+ def addDependency(self, dep):
+ if not dep.reachableFrom(self.dependencies):
+ self.dependencies.add(dep)
+ def addDependencies(self, deps):
+ base = set(self.dependencies)
+ for dep in deps:
+ base.update(dep.dependencies)
+ unreachable = batchUnreachableFrom(deps, base)
+ self.dependencies.update(unreachable)
+ def depIDs(self):
+ ids = []
+ for dep in self.dependencies:
+ ids.append(dep.ionum)
+ return ids
+ def depMap(self):
+ deps = dict()
+ for dep in self.dependencies:
+ deps[dep.ionum] = self.start_time - dep.start_time
+ return deps
+ def addThreadCompletionDependencies(self, threads):
+ self.addDependencies(recentCompletions)
+ def numCompletionSuccessors(self):
+ return self.completion.numSuccessors if self.completion else 0
+ def writeTo(self, f, iotype):
+ f.write(struct.pack("!BIQIII", iotype, self.ionum, self.thread.id, self.numSuccessors, self.numCompletionSuccessors(), len(self.dependencies)))
+ for dep in self.dependencies:
+ f.write(struct.pack("!IQ", dep.ionum, self.start_time - dep.start_time))
+
+class StartThreadIO(IO):
+ def __init__(self, ionum, start_time, thread):
+ IO.__init__(self, ionum, start_time, thread, None)
+ def writeTo(self, f):
+ IO.writeTo(self, f, 0)
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": start thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class StopThreadIO(IO):
+ def __init__(self, ionum, start_time, thread):
+ IO.__init__(self, ionum, start_time, thread, None)
+ def writeTo(self, f):
+ IO.writeTo(self, f, 1)
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": stop thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class ReadIO(IO):
+ def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+ IO.__init__(self, ionum, start_time, thread, prev)
+ self.imagectx = imagectx
+ self.extents = extents
+ def writeTo(self, f):
+ IO.writeTo(self, f, 2)
+ if len(self.extents) != 1:
+ raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+ extent = self.extents[0]
+ f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class WriteIO(IO):
+ def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+ IO.__init__(self, ionum, start_time, thread, prev)
+ self.imagectx = imagectx
+ self.extents = extents
+ def writeTo(self, f):
+ IO.writeTo(self, f, 3)
+ if len(self.extents) != 1:
+ raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+ extent = self.extents[0]
+ f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class AioReadIO(IO):
+ def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+ IO.__init__(self, ionum, start_time, thread, prev)
+ self.imagectx = imagectx
+ self.extents = extents
+ def writeTo(self, f):
+ IO.writeTo(self, f, 4)
+ if len(self.extents) != 1:
+ raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+ extent = self.extents[0]
+ f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class AioWriteIO(IO):
+ def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
+ IO.__init__(self, ionum, start_time, thread, prev)
+ self.imagectx = imagectx
+ self.extents = extents
+ def writeTo(self, f):
+ IO.writeTo(self, f, 5)
+ if len(self.extents) != 1:
+ raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
+ extent = self.extents[0]
+ f.write(struct.pack("!QQQ", imagectx, extent.offset, extent.length))
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
+
+class OpenImageIO(IO):
+ def __init__(self, ionum, start_time, thread, prev, imagectx, name, snap_name, readonly):
+ IO.__init__(self, ionum, start_time, thread, prev)
+ self.imagectx = imagectx
+ self.name = name
+ self.snap_name = snap_name
+ self.readonly = readonly
+ def writeTo(self, f):
+ IO.writeTo(self, f, 6)
+ f.write(struct.pack("!QI", self.imagectx, len(self.name)))
+ f.write(self.name)
+ f.write(struct.pack("!I", len(self.snap_name)))
+ f.write(self.snap_name)
+ f.write(struct.pack("!b", self.readonly))
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": open image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", name = " + self.name + ", snap_name = " + self.snap_name + ", readonly = " + str(self.readonly) + ", deps = " + str(self.depMap())
+
+class CloseImageIO(IO):
+ def __init__(self, ionum, start_time, thread, prev, imagectx):
+ IO.__init__(self, ionum, start_time, thread, prev)
+ self.imagectx = imagectx
+ def writeTo(self, f):
+ IO.writeTo(self, f, 7)
+ f.write(struct.pack("!Q", self.imagectx))
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": close image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", deps = " + str(self.depMap())
+
+class CompletionIO(IO):
+ def __init__(self, start_time, thread, baseIO):
+ IO.__init__(self, baseIO.ionum + 1, start_time, thread, None)
+ self.baseIO = baseIO
+ self.isCompletion = True
+ self.addDependency(baseIO)
+ baseIO.completion = self
+ def writeTo(self, f):
+ pass
+ def __str__(self):
+ return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap())
+
+
+window = 1 * 1e9
+
+def completed(io):
+ global recentCompletions
+ recentCompletions.append(io)
+ recentCompletions[:] = [x for x in recentCompletions if x.start_time > io.start_time - window]
+ # while recentCompletions[0].start_time < io.start_time - window:
+ # del recentCompletions[0]
+
+def main():
+ global ios
+ # Parse phase
+ trace_start = None
+ count = 0
+ for event in traces.events:
+ count = count + 1
+ if count > limit:
+ break
+ ts = event.timestamp
+ if not trace_start:
+ trace_start = ts
+ ts = ts - trace_start
+ threadID = event["pthread_id"]
+ if threadID in threads:
+ thread = threads[threadID]
+ else:
+ thread = Thread(threadID)
+ threads[threadID] = thread
+ ionum = nextID()
+ io = StartThreadIO(ionum, ts, thread)
+ ios.append(io)
+ if printOnRead:
+ print str(io)
+ thread.insertTS(ts)
+ if event.name == "librbd:read_enter":
+ name = event["name"]
+ readid = event["id"]
+ imagectx = event["imagectx"]
+ ionum = nextID()
+ thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+ thread.pendingIO.addThreadCompletionDependencies(threads)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ elif event.name == "librbd:open_image_enter":
+ imagectx = event["imagectx"]
+ name = event["name"]
+ snap_name = event["snap_name"]
+ readid = event["id"]
+ readonly = event["read_only"]
+ ionum = nextID()
+ thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
+ thread.pendingIO.addThreadCompletionDependencies(threads)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ elif event.name == "librbd:open_image_exit":
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
+ ios.append(completionIO)
+ completed(completionIO)
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:close_image_enter":
+ imagectx = event["imagectx"]
+ ionum = nextID()
+ thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
+ thread.pendingIO.addThreadCompletionDependencies(threads)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ elif event.name == "librbd:close_image_exit":
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
+ ios.append(completionIO)
+ completed(completionIO)
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:read_extent":
+ offset = event["offset"]
+ length = event["length"]
+ thread.pendingIO.extents.append(Extent(offset, length))
+ elif event.name == "librbd:read_exit":
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
+ ios.append(completionIO)
+ completed(completionIO)
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:write_enter":
+ if not ignoreWrites:
+ name = event["name"]
+ readid = event["id"]
+ offset = event["off"]
+ length = event["buf_len"]
+ imagectx = event["imagectx"]
+ ionum = nextID()
+ thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+ thread.pendingIO.addThreadCompletionDependencies(threads)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ elif event.name == "librbd:write_exit":
+ if not ignoreWrites:
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
+ ios.append(completionIO)
+ completed(completionIO)
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:aio_read_enter":
+ name = event["name"]
+ readid = event["id"]
+ completion = event["completion"]
+ imagectx = event["imagectx"]
+ ionum = nextID()
+ thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+ thread.pendingIO.addThreadCompletionDependencies(threads)
+ ios.append(thread.pendingIO)
+ thread.issuedIO(thread.pendingIO)
+ pendingIOs[completion] = thread.pendingIO
+ elif event.name == "librbd:aio_read_extent":
+ offset = event["offset"]
+ length = event["length"]
+ thread.pendingIO.extents.append(Extent(offset, length))
+ elif event.name == "librbd:aio_read_exit":
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:aio_write_enter":
+ if not ignoreWrites:
+ name = event["name"]
+ writeid = event["id"]
+ offset = event["off"]
+ length = event["len"]
+ completion = event["completion"]
+ imagectx = event["imagectx"]
+ ionum = nextID()
+ thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+ thread.pendingIO.addThreadCompletionDependencies(threads)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ pendingIOs[completion] = thread.pendingIO
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:aio_complete_enter":
+ completion = event["completion"]
+ retval = event["rval"]
+ if completion in pendingIOs:
+ completedIO = pendingIOs[completion]
+ del pendingIOs[completion]
+ completedIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, completedIO)
+ completedIO.thread.completedIO(completionIO)
+ ios.append(completionIO)
+ completed(completionIO)
+ if printOnRead:
+ print str(completionIO)
+
+
+ # Insert-thread-stop phase
+ ios = sorted(ios, key = lambda io: io.start_time)
+ for threadID in threads:
+ thread = threads[threadID]
+ ionum = None
+ maxIONum = 0 # only valid if ionum is None
+ for io in ios:
+ if io.ionum > maxIONum:
+ maxIONum = io.ionum
+ if io.start_time > thread.lastTS:
+ ionum = io.ionum
+ if ionum % 2 == 1:
+ ionum = ionum + 1
+ break
+ if not ionum:
+ if maxIONum % 2 == 1:
+ maxIONum = maxIONum - 1
+ ionum = maxIONum + 2
+ for io in ios:
+ if io.ionum >= ionum:
+ io.ionum = io.ionum + 2
+ # TODO: Insert in the right place, don't append and re-sort
+ ios.append(StopThreadIO(ionum, thread.lastTS, thread))
+ ios = sorted(ios, key = lambda io: io.start_time)
+
+
+ for io in ios:
+ if io.prev and io.prev in io.dependencies:
+ io.dependencies.remove(io.prev)
+ if io.isCompletion:
+ io.dependencies.clear()
+ for dep in io.dependencies:
+ dep.numSuccessors = dep.numSuccessors + 1
+
+ print
+ # for io in ios:
+ # if not io.isCompletion:
+ # print str(io)
+
+ with open("replay.bin", "wb") as f:
+ for io in ios:
+ if not io.isCompletion:
+ print str(io)
+ io.writeTo(f)
+
+cProfile.run("main()")
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <vector>
+#include "common/ceph_argparse.h"
+#include "Replayer.hpp"
+
+
+using namespace std;
+using namespace rbd_replay;
+
+
+static const char* get_remainder(const char *string, const char *prefix) {
+ while (*prefix) {
+ if (*prefix++ != *string++) {
+ return NULL;
+ }
+ }
+ return string;
+}
+
+static void usage(const char* program) {
+ cout << "Usage: " << program << " --conf=<config_file> <replay_file>" << endl;
+}
+
+int main(int argc, const char **argv) {
+ vector<const char*> args;
+
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ std::vector<const char*>::iterator i;
+ string conf;
+ float latency_multiplier = 1;
+ std::string val;
+ std::ostringstream err;
+ for (i = args.begin(); i != args.end(); ) {
+ if (ceph_argparse_double_dash(args, i)) {
+ break;
+ } else if (ceph_argparse_witharg(args, i, &val, "-c", "--conf", (char*)NULL)) {
+ conf = val;
+ } else if (ceph_argparse_withfloat(args, i, &latency_multiplier, &err, "--latency-multiplier",
+ (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << err.str() << std::endl;
+ return 1;
+ }
+ } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
+ usage(argv[0]);
+ return 0;
+ } else if (get_remainder(*i, "-")) {
+ cerr << "Unrecognized argument: " << *i << endl;
+ return 1;
+ } else {
+ ++i;
+ }
+ }
+
+ if (conf.empty()) {
+ cerr << "No config file specified. Use -c or --conf." << endl;
+ return 1;
+ }
+
+ string replay_file;
+ if (!args.empty()) {
+ replay_file = args[0];
+ }
+
+ if (replay_file.empty()) {
+ cerr << "No replay file specified." << endl;
+ return 1;
+ }
+
+ Replayer replayer;
+ replayer.set_latency_multiplier(latency_multiplier);
+ replayer.run(conf, replay_file);
+}