From 4c4289c24603312aa9fee960d7b401a223be445b Mon Sep 17 00:00:00 2001 From: Adam Crume Date: Thu, 17 Jul 2014 09:39:35 -0700 Subject: [PATCH] lttng: Add rbd-replay Signed-off-by: Adam Crume --- examples/rbd-replay/.gitignore | 3 + examples/rbd-replay/create-image | 8 + examples/rbd-replay/replay | 9 + examples/rbd-replay/run-prep-for-replay | 3 + examples/rbd-replay/trace | 10 + src/.gitignore | 1 + src/Makefile.am | 1 + src/rbd_replay/BoundedBuffer.hpp | 60 +++ src/rbd_replay/Deser.cc | 67 ++++ src/rbd_replay/Deser.hpp | 47 +++ src/rbd_replay/Makefile.am | 14 + src/rbd_replay/PendingIO.cc | 37 ++ src/rbd_replay/PendingIO.hpp | 53 +++ src/rbd_replay/Replayer.cc | 261 +++++++++++++ src/rbd_replay/Replayer.hpp | 121 ++++++ src/rbd_replay/actions.cc | 276 +++++++++++++ src/rbd_replay/actions.hpp | 260 +++++++++++++ src/rbd_replay/prep-for-replay.py | 498 ++++++++++++++++++++++++ src/rbd_replay/rbd-replay.cc | 88 +++++ 19 files changed, 1817 insertions(+) create mode 100644 examples/rbd-replay/.gitignore create mode 100755 examples/rbd-replay/create-image create mode 100755 examples/rbd-replay/replay create mode 100755 examples/rbd-replay/run-prep-for-replay create mode 100755 examples/rbd-replay/trace create mode 100644 src/rbd_replay/BoundedBuffer.hpp create mode 100644 src/rbd_replay/Deser.cc create mode 100644 src/rbd_replay/Deser.hpp create mode 100644 src/rbd_replay/Makefile.am create mode 100644 src/rbd_replay/PendingIO.cc create mode 100644 src/rbd_replay/PendingIO.hpp create mode 100644 src/rbd_replay/Replayer.cc create mode 100644 src/rbd_replay/Replayer.hpp create mode 100644 src/rbd_replay/actions.cc create mode 100644 src/rbd_replay/actions.hpp create mode 100755 src/rbd_replay/prep-for-replay.py create mode 100644 src/rbd_replay/rbd-replay.cc diff --git a/examples/rbd-replay/.gitignore b/examples/rbd-replay/.gitignore new file mode 100644 index 0000000000000..f9e70539ce7d0 --- /dev/null +++ b/examples/rbd-replay/.gitignore @@ -0,0 +1,3 @@ +/*.log +/replayer +/traces diff --git a/examples/rbd-replay/create-image b/examples/rbd-replay/create-image new file mode 100755 index 0000000000000..3486d98d9db83 --- /dev/null +++ b/examples/rbd-replay/create-image @@ -0,0 +1,8 @@ +#!/bin/bash + +pool=rbd +image=my-image +size=10G +export LD_LIBRARY_PATH=../../src/.libs +#qemu-img create -f raw rbd:$pool/$image:conf=../../src/ceph.conf $size +qemu-img convert linux-0.2.img -O raw rbd:$pool/$image:conf=../../src/ceph.conf diff --git a/examples/rbd-replay/replay b/examples/rbd-replay/replay new file mode 100755 index 0000000000000..78e01686f870e --- /dev/null +++ b/examples/rbd-replay/replay @@ -0,0 +1,9 @@ +#!/bin/bash + +lttng create +lttng enable-event -u 'librbd:*' +lttng add-context -u -t pthread_id +lttng start +LD_LIBRARY_PATH=../../src/.libs ../../src/rbd-replay --conf=../../src/ceph.conf replay.bin | tee replay.log +lttng stop +lttng view > replay-trace.log diff --git a/examples/rbd-replay/run-prep-for-replay b/examples/rbd-replay/run-prep-for-replay new file mode 100755 index 0000000000000..64afd31ed3481 --- /dev/null +++ b/examples/rbd-replay/run-prep-for-replay @@ -0,0 +1,3 @@ +#!/bin/bash + +PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit diff --git a/examples/rbd-replay/trace b/examples/rbd-replay/trace new file mode 100755 index 0000000000000..02a53589ef9dc --- /dev/null +++ b/examples/rbd-replay/trace @@ -0,0 +1,10 @@ +#!/bin/bash + +mkdir -p traces +lttng create -o traces librbd +lttng enable-event -u 'librbd:*' +lttng add-context -u -t pthread_id +lttng start +LD_LIBRARY_PATH=../../src/.libs/ qemu-system-i386 -m 1024 rbd:rbd/my-image:conf=../../src/ceph.conf +lttng stop +lttng view > trace.log diff --git a/src/.gitignore b/src/.gitignore index fec9f70c31d87..e7c09457d5f9d 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -66,6 +66,7 @@ Makefile /radosgw-admin /rbd /rbd-fuse +/rbd-replay /rest-bench /sample.fetch_config /TAGS diff --git a/src/Makefile.am b/src/Makefile.am index 2cde64543061a..adaf73e47620f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -32,6 +32,7 @@ include librbd/Makefile.am include rgw/Makefile.am include cls/Makefile.am include key_value_store/Makefile.am +include rbd_replay/Makefile.am include test/Makefile.am include tools/Makefile.am diff --git a/src/rbd_replay/BoundedBuffer.hpp b/src/rbd_replay/BoundedBuffer.hpp new file mode 100644 index 0000000000000..540fd317e2164 --- /dev/null +++ b/src/rbd_replay/BoundedBuffer.hpp @@ -0,0 +1,60 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef _INCLUDED_BOUNDED_BUFFER_HPP +#define _INCLUDED_BOUNDED_BUFFER_HPP + +#include +#include +#include +#include + +// Taken from http://www.boost.org/doc/libs/1_55_0/libs/circular_buffer/example/circular_buffer_bound_example.cpp +template +class BoundedBuffer { +public: + typedef boost::circular_buffer container_type; + typedef typename container_type::size_type size_type; + typedef typename container_type::value_type value_type; + typedef typename boost::call_traits::param_type param_type; + + explicit BoundedBuffer(size_type capacity) : m_unread(0), m_container(capacity) { + } + + void push_front(typename boost::call_traits::param_type item) { + // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method. + boost::mutex::scoped_lock lock(m_mutex); + m_not_full.wait(lock, boost::bind(&BoundedBuffer::is_not_full, this)); + m_container.push_front(item); + ++m_unread; + lock.unlock(); + m_not_empty.notify_one(); + } + + void pop_back(value_type* pItem) { + boost::mutex::scoped_lock lock(m_mutex); + m_not_empty.wait(lock, boost::bind(&BoundedBuffer::is_not_empty, this)); + *pItem = m_container[--m_unread]; + lock.unlock(); + m_not_full.notify_one(); + } + +private: + BoundedBuffer(const BoundedBuffer&); // Disabled copy constructor. + BoundedBuffer& operator= (const BoundedBuffer&); // Disabled assign operator. + + bool is_not_empty() const { + return m_unread > 0; + } + bool is_not_full() const { + return m_unread < m_container.capacity(); + } + + size_type m_unread; + container_type m_container; + boost::mutex m_mutex; + boost::condition m_not_empty; + boost::condition m_not_full; +}; + +#endif diff --git a/src/rbd_replay/Deser.cc b/src/rbd_replay/Deser.cc new file mode 100644 index 0000000000000..986a18c166a66 --- /dev/null +++ b/src/rbd_replay/Deser.cc @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Deser.hpp" +#include +#include +#include + + +rbd_replay::Deser::Deser(std::istream &in) + : m_in(in) { +} + +uint8_t rbd_replay::Deser::read_uint8_t() { + uint8_t data; + m_in.read(reinterpret_cast(&data), sizeof(data)); + return data; +} + +uint16_t rbd_replay::Deser::read_uint16_t() { + uint16_t data; + m_in.read(reinterpret_cast(&data), sizeof(data)); + return ntohs(data); +} + +uint32_t rbd_replay::Deser::read_uint32_t() { + uint32_t data; + m_in.read(reinterpret_cast(&data), sizeof(data)); + return ntohl(data); +} + +uint64_t rbd_replay::Deser::read_uint64_t() { + uint64_t data; + m_in.read(reinterpret_cast(&data), sizeof(data)); +#if __BYTE_ORDER == __LITTLE_ENDIAN + data = (static_cast(ntohl(data)) << 32 | ntohl(data >> 32)); +#endif + return data; +} + +std::string rbd_replay::Deser::read_string() { + uint32_t length = read_uint32_t(); + char* data = reinterpret_cast(malloc(length)); + m_in.read(data, length); + std::string s(data, length); + free(data); + return s; +} + +bool rbd_replay::Deser::read_bool() { + return read_uint8_t() != 0; +} + +bool rbd_replay::Deser::eof() { + return m_in.eof(); +} diff --git a/src/rbd_replay/Deser.hpp b/src/rbd_replay/Deser.hpp new file mode 100644 index 0000000000000..d5fad5ae6650e --- /dev/null +++ b/src/rbd_replay/Deser.hpp @@ -0,0 +1,47 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP +#define _INCLUDED_RBD_REPLAY_DESER_HPP + +#include +#include + +namespace rbd_replay { + +class Deser { +public: + Deser(std::istream &in); + + uint8_t read_uint8_t(); + + uint16_t read_uint16_t(); + + uint32_t read_uint32_t(); + + uint64_t read_uint64_t(); + + std::string read_string(); + + bool read_bool(); + + bool eof(); + +private: + std::istream &m_in; +}; + +} + +#endif diff --git a/src/rbd_replay/Makefile.am b/src/rbd_replay/Makefile.am new file mode 100644 index 0000000000000..fee53d2acc3f5 --- /dev/null +++ b/src/rbd_replay/Makefile.am @@ -0,0 +1,14 @@ +rbd_replay_SOURCES = rbd_replay/rbd-replay.cc \ + rbd_replay/actions.cc \ + rbd_replay/Deser.cc \ + rbd_replay/PendingIO.cc \ + rbd_replay/Replayer.cc +rbd_replay_LDADD = $(LIBRBD) $(LIBRADOS) $(CEPH_GLOBAL) +noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \ + rbd_replay/actions.hpp \ + rbd_replay/Deser.hpp \ + rbd_replay/PendingIO.hpp \ + rbd_replay/Replayer.hpp +if LINUX +bin_PROGRAMS += rbd-replay +endif #LINUX diff --git a/src/rbd_replay/PendingIO.cc b/src/rbd_replay/PendingIO.cc new file mode 100644 index 0000000000000..40f379a69c162 --- /dev/null +++ b/src/rbd_replay/PendingIO.cc @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "PendingIO.hpp" + + +using namespace std; +using namespace rbd_replay; + +extern "C" +void pending_io_callback(librbd::completion_t cb, void *arg) { + PendingIO *io = (PendingIO*)arg; + io->completed(cb); +} + +PendingIO::PendingIO(action_id_t id, + ActionCtx &worker) + : m_id(id), + m_completion(this, pending_io_callback), + m_worker(worker) { + } + +void PendingIO::completed(librbd::completion_t cb) { + cout << "Completed pending IO #" << m_id << endl; + m_worker.remove_pending(shared_from_this()); +} diff --git a/src/rbd_replay/PendingIO.hpp b/src/rbd_replay/PendingIO.hpp new file mode 100644 index 0000000000000..7413a59cb5eab --- /dev/null +++ b/src/rbd_replay/PendingIO.hpp @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_PENDINGIO_HPP +#define _INCLUDED_RBD_REPLAY_PENDINGIO_HPP + +#include +#include "actions.hpp" + +namespace rbd_replay { + +class PendingIO : public boost::enable_shared_from_this { +public: + typedef boost::shared_ptr ptr; + + PendingIO(action_id_t id, + ActionCtx &worker); + + void completed(librbd::completion_t cb); + + action_id_t id() const { + return m_id; + } + + ceph::bufferlist &bufferlist() { + return m_bl; + } + + librbd::RBD::AioCompletion &completion() { + return m_completion; + } + +private: + const action_id_t m_id; + ceph::bufferlist m_bl; + librbd::RBD::AioCompletion m_completion; + ActionCtx &m_worker; +}; + +} + +#endif diff --git a/src/rbd_replay/Replayer.cc b/src/rbd_replay/Replayer.cc new file mode 100644 index 0000000000000..e2d32ffd165bf --- /dev/null +++ b/src/rbd_replay/Replayer.cc @@ -0,0 +1,261 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Replayer.hpp" +#include +#include +#include + + +using namespace std; +using namespace rbd_replay; + + +Worker::Worker(Replayer &replayer) + : m_replayer(replayer), + m_buffer(100), + m_done(false) { +} + +void Worker::start() { + m_thread = boost::shared_ptr(new boost::thread(boost::bind(&Worker::run, this))); +} + +// Should only be called by StopThreadAction +void Worker::stop() { + m_done = true; +} + +void Worker::join() { + m_thread->join(); +} + +void Worker::send(Action::ptr action) { + m_buffer.push_front(action); +} + +void Worker::add_pending(PendingIO::ptr io) { + boost::mutex::scoped_lock lock(m_pending_ios_mutex); + m_pending_ios.push_back(io); +} + +void Worker::run() { + cout << "Worker thread started" << endl; + while (!m_done) { + Action::ptr action; + m_buffer.pop_back(&action); + m_replayer.wait_for_actions(action->predecessors()); + action->perform(*this); + m_replayer.set_action_complete(action->id()); + } + { + boost::mutex::scoped_lock lock(m_pending_ios_mutex); + while (!m_pending_ios.empty()) { + m_pending_ios_empty.wait(lock); + } + } + cout << "Worker thread stopped" << endl; +} + + +void Worker::remove_pending(PendingIO::ptr io) { + m_replayer.set_action_complete(io->id()); + boost::mutex::scoped_lock lock(m_pending_ios_mutex); + for (vector::iterator itr = m_pending_ios.begin(); itr != m_pending_ios.end(); itr++) { + if (*itr == io) { + m_pending_ios.erase(itr); + break; + } + } + if (m_pending_ios.empty()) { + m_pending_ios_empty.notify_all(); + } +} + + +librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) { + return m_replayer.get_image(imagectx_id); +} + + +void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) { + m_replayer.put_image(imagectx_id, image); +} + + +void Worker::erase_image(imagectx_id_t imagectx_id) { + m_replayer.erase_image(imagectx_id); +} + + +librbd::RBD* Worker::rbd() { + return m_replayer.get_rbd(); +} + + +librados::IoCtx* Worker::ioctx() { + return m_replayer.get_ioctx(); +} + + +void Worker::set_action_complete(action_id_t id) { + m_replayer.set_action_complete(id); +} + + +Replayer::Replayer() { +} + +void Replayer::run(const std::string conf_file, const std::string replay_file) { + cout << "IO thread started" << endl; + { + librados::Rados rados; + rados.init(NULL); + int r = rados.conf_read_file(conf_file.c_str()); + if (r) { + cerr << "Unable to read conf file: " << r << endl; + goto out; + } + r = rados.connect(); + if (r) { + cerr << "Unable to connect to Rados: " << r << endl; + goto out; + } + m_ioctx = new librados::IoCtx(); + { + const char* pool_name = "rbd"; + r = rados.ioctx_create(pool_name, *m_ioctx); + if (r) { + cerr << "Unable to create IoCtx: " << r << endl; + goto out2; + } + m_rbd = new librbd::RBD(); + map workers; + + ifstream input(replay_file.c_str(), ios::in | ios::binary); + if (!input.is_open()) { + cerr << "Unable to open " << replay_file << endl; + exit(1); + } + + Deser deser(input); + while (true) { + Action::ptr action = Action::read_from(deser); + if (!action) { + break; + } + if (action->is_start_thread()) { + Worker *worker = new Worker(*this); + workers[action->thread_id()] = worker; + worker->start(); + } else { + workers[action->thread_id()]->send(action); + } + } + + cout << "Waiting for workers to die" << endl; + pair w; + BOOST_FOREACH(w, workers) { + w.second->join(); + delete w.second; + } + clear_images(); + delete m_rbd; + m_rbd = NULL; + } + out2: + delete m_ioctx; + m_ioctx = NULL; + } + out: + cout << "IO thread stopped" << endl; +} + + +librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) { + boost::shared_lock lock(m_images_mutex); + return m_images[imagectx_id]; +} + +void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) { + boost::unique_lock lock(m_images_mutex); + assert(m_images.count(imagectx_id) == 0); + m_images[imagectx_id] = image; +} + +void Replayer::erase_image(imagectx_id_t imagectx_id) { + boost::unique_lock lock(m_images_mutex); + delete m_images[imagectx_id]; + m_images.erase(imagectx_id); +} + +void Replayer::set_action_complete(action_id_t id) { + cout << "ActionTracker::set_complete(" << id << ")" << endl; + boost::system_time now(boost::get_system_time()); + boost::unique_lock lock(m_actions_complete_mutex); + assert(m_actions_complete.count(id) == 0); + m_actions_complete[id] = now; + m_actions_complete_condition.notify_all(); +} + +bool Replayer::is_action_complete(action_id_t id) { + boost::shared_lock lock(m_actions_complete_mutex); + return _is_action_complete(id); +} + +void Replayer::wait_for_actions(const vector &deps) { + boost::posix_time::ptime release_time(boost::posix_time::neg_infin); + BOOST_FOREACH(const dependency_d &dep, deps) { + cout << "Waiting for " << dep.id << endl; + boost::system_time start_time(boost::get_system_time()); + boost::shared_lock lock(m_actions_complete_mutex); + while (!_is_action_complete(dep.id)) { + //m_actions_complete_condition.wait(lock); + m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1)); + cout << "Still waiting for " << dep.id << endl; + } + boost::system_time action_completed_time(m_actions_complete[dep.id]); + lock.unlock(); + boost::system_time end_time(boost::get_system_time()); + long long micros = (end_time - start_time).total_microseconds(); + cout << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << endl; + // Apparently the nanoseconds constructor is optional: + // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options + boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000)); + if (sub_release_time > release_time) { + release_time = sub_release_time; + } + } + if (release_time > boost::get_system_time()) { + cout << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << endl; + boost::this_thread::sleep(release_time); + } +} + +void Replayer::clear_images() { + boost::unique_lock lock(m_images_mutex); + pair p; + BOOST_FOREACH(p, m_images) { + delete p.second; + } + m_images.clear(); +} + +bool Replayer::_is_action_complete(action_id_t id) { + return m_actions_complete.count(id) > 0; +} + +void Replayer::set_latency_multiplier(float f) { + m_latency_multiplier = f; +} diff --git a/src/rbd_replay/Replayer.hpp b/src/rbd_replay/Replayer.hpp new file mode 100644 index 0000000000000..a2f18eaa1bf13 --- /dev/null +++ b/src/rbd_replay/Replayer.hpp @@ -0,0 +1,121 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_REPLAYER_HPP +#define _INCLUDED_RBD_REPLAY_REPLAYER_HPP + +#include +#include +#include "BoundedBuffer.hpp" +#include "PendingIO.hpp" + +namespace rbd_replay { + +class Replayer; + +class Worker : public ActionCtx { +public: + explicit Worker(Replayer &replayer); + + void start(); + + // Should only be called by StopThreadAction + void stop(); + + void join(); + + void send(Action::ptr action); + + void add_pending(PendingIO::ptr io); + + void remove_pending(PendingIO::ptr io); + + librbd::Image* get_image(imagectx_id_t imagectx_id); + + void put_image(imagectx_id_t imagectx_id, librbd::Image* image); + + void erase_image(imagectx_id_t imagectx_id); + + librbd::RBD* rbd(); + + librados::IoCtx* ioctx(); + + void set_action_complete(action_id_t id); + +private: + void run(); + + Replayer &m_replayer; + BoundedBuffer m_buffer; + boost::shared_ptr m_thread; + std::vector m_pending_ios; + boost::mutex m_pending_ios_mutex; + boost::condition m_pending_ios_empty; + bool m_done; +}; + + +class Replayer { +public: + Replayer(); + + void run(const std::string conf_file, const std::string replay_file); + + librbd::RBD* get_rbd() { + return m_rbd; + } + + librados::IoCtx* get_ioctx() { + return m_ioctx; + } + + librbd::Image* get_image(imagectx_id_t imagectx_id); + + void put_image(imagectx_id_t imagectx_id, librbd::Image *image); + + void erase_image(imagectx_id_t imagectx_id); + + void set_action_complete(action_id_t id); + + bool is_action_complete(action_id_t id); + + void wait_for_actions(const std::vector &deps); + + void set_latency_multiplier(float f); + +private: + void clear_images(); + + bool _is_action_complete(action_id_t id); + + // Disallow assignment and copying + Replayer(const Replayer& rhs); + const Replayer& operator=(const Replayer& rhs); + + librbd::RBD* m_rbd; + librados::IoCtx* m_ioctx; + float m_latency_multiplier; + + std::map m_images; + boost::shared_mutex m_images_mutex; + + // Maps an action ID to the time the action completed + std::map m_actions_complete; + boost::shared_mutex m_actions_complete_mutex; + boost::condition m_actions_complete_condition; +}; + +} + +#endif diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc new file mode 100644 index 0000000000000..baaa0ba96e3a8 --- /dev/null +++ b/src/rbd_replay/actions.cc @@ -0,0 +1,276 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "actions.hpp" +#include +#include "PendingIO.hpp" + + +using namespace rbd_replay; +using namespace std; + + +Action::Action(action_id_t id, + thread_id_t thread_id, + int num_successors, + int num_completion_successors, + std::vector &predecessors) + : m_id(id), + m_thread_id(thread_id), + m_num_successors(num_successors), + m_num_completion_successors(num_completion_successors), + m_predecessors(predecessors) { + } + +Action::~Action() { +} + +Action::ptr Action::read_from(Deser &d) { + uint8_t type = d.read_uint8_t(); + if (d.eof()) { + return Action::ptr(); + } + uint32_t ionum = d.read_uint32_t(); + uint64_t thread_id = d.read_uint64_t(); + uint32_t num_successors = d.read_uint32_t(); + uint32_t num_completion_successors = d.read_uint32_t(); + uint32_t num_dependencies = d.read_uint32_t(); + vector deps; + for (unsigned int i = 0; i < num_dependencies; i++) { + uint32_t dep_id = d.read_uint32_t(); + uint64_t time_delta = d.read_uint64_t(); + deps.push_back(dependency_d(dep_id, time_delta)); + } + DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps); + switch (type) { + case 0: + return StartThreadAction::read_from(dummy, d); + case 1: + return StopThreadAction::read_from(dummy, d); + case 2: + return ReadAction::read_from(dummy, d); + case 3: + return WriteAction::read_from(dummy, d); + case 4: + return AioReadAction::read_from(dummy, d); + case 5: + return AioWriteAction::read_from(dummy, d); + case 6: + return OpenImageAction::read_from(dummy, d); + case 7: + return CloseImageAction::read_from(dummy, d); + default: + cerr << "Invalid action type: " << type << endl; + exit(1); + } +} + + +StartThreadAction::StartThreadAction(Action &src) + : Action(src) { +} + +void StartThreadAction::perform(ActionCtx &ctx) { + cerr << "StartThreadAction should never actually be performed" << endl; + exit(1); +} + +bool StartThreadAction::is_start_thread() { + return true; +} + +Action::ptr StartThreadAction::read_from(Action &src, Deser &d) { + return Action::ptr(new StartThreadAction(src)); +} + + +StopThreadAction::StopThreadAction(Action &src) + : Action(src) { +} + +void StopThreadAction::perform(ActionCtx &ctx) { + cout << "Performing stop thread action #" << id() << endl; + ctx.stop(); +} + +Action::ptr StopThreadAction::read_from(Action &src, Deser &d) { + return Action::ptr(new StopThreadAction(src)); +} + +AioReadAction::AioReadAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length) + : Action(src), + m_imagectx_id(imagectx_id), + m_offset(offset), + m_length(length) { + } + +Action::ptr AioReadAction::read_from(Action &src, Deser &d) { + imagectx_id_t imagectx_id = d.read_uint64_t(); + uint64_t offset = d.read_uint64_t(); + uint64_t length = d.read_uint64_t(); + return Action::ptr(new AioReadAction(src, imagectx_id, offset, length)); +} + +void AioReadAction::perform(ActionCtx &worker) { + cout << "Performing AIO read action #" << id() << endl; + librbd::Image *image = worker.get_image(m_imagectx_id); + assert(image); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion()); + worker.add_pending(io); +} + + +ReadAction::ReadAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length) + : Action(src), + m_imagectx_id(imagectx_id), + m_offset(offset), + m_length(length) { + } + +Action::ptr ReadAction::read_from(Action &src, Deser &d) { + imagectx_id_t imagectx_id = d.read_uint64_t(); + uint64_t offset = d.read_uint64_t(); + uint64_t length = d.read_uint64_t(); + return Action::ptr(new ReadAction(src, imagectx_id, offset, length)); +} + +void ReadAction::perform(ActionCtx &worker) { + cout << "Performing read action #" << id() << endl; + librbd::Image *image = worker.get_image(m_imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + image->read(m_offset, m_length, io->bufferlist()); + worker.remove_pending(io); +} + + +AioWriteAction::AioWriteAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length) + : Action(src), + m_imagectx_id(imagectx_id), + m_offset(offset), + m_length(length) { + } + +Action::ptr AioWriteAction::read_from(Action &src, Deser &d) { + imagectx_id_t imagectx_id = d.read_uint64_t(); + uint64_t offset = d.read_uint64_t(); + uint64_t length = d.read_uint64_t(); + return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length)); +} + +void AioWriteAction::perform(ActionCtx &worker) { + cout << "Performing AIO write action #" << id() << endl; + librbd::Image *image = worker.get_image(m_imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + io->bufferlist().append_zero(m_length); + image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion()); + worker.add_pending(io); +} + + +WriteAction::WriteAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length) + : Action(src), + m_imagectx_id(imagectx_id), + m_offset(offset), + m_length(length) { + } + +Action::ptr WriteAction::read_from(Action &src, Deser &d) { + imagectx_id_t imagectx_id = d.read_uint64_t(); + uint64_t offset = d.read_uint64_t(); + uint64_t length = d.read_uint64_t(); + return Action::ptr(new WriteAction(src, imagectx_id, offset, length)); +} + +void WriteAction::perform(ActionCtx &worker) { + cout << "Performing write action #" << id() << endl; + librbd::Image *image = worker.get_image(m_imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + io->bufferlist().append_zero(m_length); + image->write(m_offset, m_length, io->bufferlist()); + worker.remove_pending(io); +} + + +OpenImageAction::OpenImageAction(Action &src, + imagectx_id_t imagectx_id, + string name, + string snap_name, + bool readonly) + : Action(src), + m_imagectx_id(imagectx_id), + m_name(name), + m_snap_name(snap_name), + m_readonly(readonly) { + } + +Action::ptr OpenImageAction::read_from(Action &src, Deser &d) { + imagectx_id_t imagectx_id = d.read_uint64_t(); + string name = d.read_string(); + string snap_name = d.read_string(); + bool readonly = d.read_bool(); + return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly)); +} + +void OpenImageAction::perform(ActionCtx &worker) { + cout << "Performing open image action #" << id() << endl; + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + librbd::Image *image = new librbd::Image(); + librbd::RBD *rbd = worker.rbd(); + int r; + if (m_readonly) { + r = rbd->open_read_only(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str()); + } else { + r = rbd->open(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str()); + } + if (r) { + cerr << "Unable to open image '" << m_name << "' with snap '" << m_snap_name << "' and readonly " << m_readonly << ": " << strerror(-r) << endl; + exit(1); + } + worker.put_image(m_imagectx_id, image); + worker.remove_pending(io); +} + + +CloseImageAction::CloseImageAction(Action &src, + imagectx_id_t imagectx_id) + : Action(src), + m_imagectx_id(imagectx_id) { + } + +Action::ptr CloseImageAction::read_from(Action &src, Deser &d) { + imagectx_id_t imagectx_id = d.read_uint64_t(); + return Action::ptr(new CloseImageAction(src, imagectx_id)); +} + +void CloseImageAction::perform(ActionCtx &worker) { + cout << "Performing close image action #" << id() << endl; + worker.erase_image(m_imagectx_id); + worker.set_action_complete(pending_io_id()); +} diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp new file mode 100644 index 0000000000000..95f12d92e2e46 --- /dev/null +++ b/src/rbd_replay/actions.hpp @@ -0,0 +1,260 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_ACTIONS_HPP +#define _INCLUDED_RBD_REPLAY_ACTIONS_HPP + +#include +#include "include/rbd/librbd.hpp" +#include "Deser.hpp" + +namespace rbd_replay { + +typedef uint64_t imagectx_id_t; +typedef uint64_t thread_id_t; + +// Even IDs are normal actions, odd IDs are completions +typedef uint32_t action_id_t; + +struct dependency_d { + action_id_t id; + + uint64_t time_delta; + + dependency_d(action_id_t id, + uint64_t time_delta) + : id(id), + time_delta(time_delta) { + } +}; + + +class PendingIO; + + +class ActionCtx { +public: + virtual ~ActionCtx() { + } + + virtual librbd::Image* get_image(imagectx_id_t imagectx_id) = 0; + + virtual void put_image(imagectx_id_t imagectx_id, librbd::Image* image) = 0; + + virtual void erase_image(imagectx_id_t imagectx_id) = 0; + + virtual librbd::RBD* rbd() = 0; + + virtual librados::IoCtx* ioctx() = 0; + + virtual void add_pending(boost::shared_ptr io) = 0; + + virtual void remove_pending(boost::shared_ptr io) = 0; + + virtual void set_action_complete(action_id_t id) = 0; + + virtual void stop() = 0; +}; + + +class Action { +public: + typedef boost::shared_ptr ptr; + + Action(action_id_t id, + thread_id_t thread_id, + int num_successors, + int num_completion_successors, + std::vector &predecessors); + + virtual ~Action(); + + virtual void perform(ActionCtx &ctx) = 0; + + action_id_t pending_io_id() { + return m_id + 1; + } + + // There's probably a better way to do this, but oh well. + virtual bool is_start_thread() { + return false; + } + + action_id_t id() const { + return m_id; + } + + thread_id_t thread_id() const { + return m_thread_id; + } + + const std::vector& predecessors() const { + return m_predecessors; + } + + static ptr read_from(Deser &d); + +private: + const action_id_t m_id; + const thread_id_t m_thread_id; + const int m_num_successors; + const int m_num_completion_successors; + const std::vector m_predecessors; +}; + + +class DummyAction : public Action { +public: + DummyAction(action_id_t id, + thread_id_t thread_id, + int num_successors, + int num_completion_successors, + std::vector &predecessors) + : Action(id, thread_id, num_successors, num_completion_successors, predecessors) { + } + + void perform(ActionCtx &ctx) { + } +}; + +class StopThreadAction : public Action { +public: + explicit StopThreadAction(Action &src); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); +}; + + +class AioReadAction : public Action { +public: + AioReadAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); + +private: + imagectx_id_t m_imagectx_id; + uint64_t m_offset; + uint64_t m_length; +}; + + +class ReadAction : public Action { +public: + ReadAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); + +private: + imagectx_id_t m_imagectx_id; + uint64_t m_offset; + uint64_t m_length; +}; + + +class AioWriteAction : public Action { +public: + AioWriteAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); + +private: + imagectx_id_t m_imagectx_id; + uint64_t m_offset; + uint64_t m_length; +}; + + +class WriteAction : public Action { +public: + WriteAction(const Action &src, + imagectx_id_t imagectx_id, + uint64_t offset, + uint64_t length); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); + +private: + imagectx_id_t m_imagectx_id; + uint64_t m_offset; + uint64_t m_length; +}; + + +class OpenImageAction : public Action { +public: + OpenImageAction(Action &src, + imagectx_id_t imagectx_id, + std::string name, + std::string snap_name, + bool readonly); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); + +private: + imagectx_id_t m_imagectx_id; + std::string m_name; + std::string m_snap_name; + bool m_readonly; +}; + + +class CloseImageAction : public Action { +public: + CloseImageAction(Action &src, + imagectx_id_t imagectx_id); + + void perform(ActionCtx &ctx); + + static Action::ptr read_from(Action &src, Deser &d); + +private: + imagectx_id_t m_imagectx_id; +}; + + +class StartThreadAction : public Action { +public: + explicit StartThreadAction(Action &src); + + void perform(ActionCtx &ctx); + + bool is_start_thread(); + + static Action::ptr read_from(Action &src, Deser &d); +}; + +} + +#endif diff --git a/src/rbd_replay/prep-for-replay.py b/src/rbd_replay/prep-for-replay.py new file mode 100755 index 0000000000000..8b4273925ff61 --- /dev/null +++ b/src/rbd_replay/prep-for-replay.py @@ -0,0 +1,498 @@ +#!/usr/bin/python +# -*- mode:Python; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +# vim: ts=8 sw=2 smarttab +# +# Ceph - scalable distributed file system +# +# Copyright (C) 2014 Adam Crume +# +# This is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1, as published by the Free Software +# Foundation. See file COPYING. +# +# + +from babeltrace import * +import cProfile +import datetime +import struct +import sys + +traces = TraceCollection() +ret = traces.add_trace(sys.argv[1], "ctf") + +pendingIOs = {} +threads = {} +ios = [] +printOnRead = False +recentCompletions = [] +limit = 100000000000 +ignoreWrites = True + +ioCount = 0 +def nextID(): + global ioCount + val = ioCount + ioCount = ioCount + 2 + return val + +class Extent(object): + def __init__(self, offset, length): + self.offset = offset + self.length = length + def __str__(self): + return str(self.offset) + "+" + str(self.length) + def __repr__(self): + return "Extent(" + str(self.offset) + "," + str(self.length) + ")" + +class Thread(object): + def __init__(self, id): + self.id = id + self.pendingIO = None + self.latestIO = None # may not be meaningful + self.latestCompletion = None # may not be meaningful + self.lastTS = None + def insertTS(self, ts): + if not self.lastTS or ts > self.lastTS: + self.lastTS = ts + def issuedIO(self, io): + latestIOs = [] + for threadID in threads: + thread = threads[threadID] + if thread.latestIO and thread.latestIO.start_time > io.start_time - window: + latestIOs.append(thread.latestIO) + io.addDependencies(latestIOs) + self.latestIO = io + def completedIO(self, io): + self.latestCompletion = io + +def batchUnreachableFrom(deps, base): + if not base: + return set() + if not deps: + return set() + unreachable = set() + searchingFor = set(deps) + discovered = set() + boundary = set(base) + boundaryHorizon = None + for io in boundary: + if not boundaryHorizon or io.start_time > boundaryHorizon: + boundaryHorizon = io.start_time + searchingHorizon = None + for io in searchingFor: + if not searchingHorizon or io.start_time < searchingHorizon: + searchingHorizon = io.start_time + tmp = [x for x in searchingFor if boundaryHorizon < x.start_time] + searchingFor.difference_update(tmp) + unreachable.update(tmp) + while boundary and searchingFor: + io = boundary.pop() + for dep in io.dependencies: + if dep in searchingFor: + searchingFor.remove(dep) + if dep.start_time == searchingHorizon: + searchingHorizon = None + for io in searchingFor: + if not searchingHorizon or io.start_time < searchingHorizon: + searchingHorizon = io.start_time + if not dep in discovered: + boundary.add(dep) + if io.start_time == boundaryHorizon: + boundaryHorizon = None + for io in boundary: + if not boundaryHorizon or io.start_time > boundaryHorizon: + boundaryHorizon = io.start_time + if boundaryHorizon: + tmp = [x for x in searchingFor if boundaryHorizon < x.start_time] + searchingFor.difference_update(tmp) + unreachable.update(tmp) + searchingHorizon = None + for io in searchingFor: + if not searchingHorizon or io.start_time < searchingHorizon: + searchingHorizon = io.start_time + unreachable.update(searchingFor) + return unreachable + +class IO(object): + def __init__(self, ionum, start_time, thread, prev): + self.ionum = ionum + self.start_time = start_time + self.thread = thread + self.dependencies = set() + self.isCompletion = False + self.prev = prev + self.numSuccessors = 0 + self.completion = None + def reachableFrom(self, ios): + if not ios: + return False + discovered = set() + boundary = set(ios) + horizon = None + for io in boundary: + if not horizon or io.start_time > horizon: + horizon = io.start_time + if horizon < self.start_time: + return False + while boundary: + io = boundary.pop() + for dep in io.dependencies: + if self == dep: + return True + if not dep in discovered: + boundary.add(dep) + if io.start_time == horizon: + horizon = None + for io in boundary: + if not horizon or io.start_time > horizon: + horizon = io.start_time + if horizon and horizon < self.start_time: + return False + return False + def addDependency(self, dep): + if not dep.reachableFrom(self.dependencies): + self.dependencies.add(dep) + def addDependencies(self, deps): + base = set(self.dependencies) + for dep in deps: + base.update(dep.dependencies) + unreachable = batchUnreachableFrom(deps, base) + self.dependencies.update(unreachable) + def depIDs(self): + ids = [] + for dep in self.dependencies: + ids.append(dep.ionum) + return ids + def depMap(self): + deps = dict() + for dep in self.dependencies: + deps[dep.ionum] = self.start_time - dep.start_time + return deps + def addThreadCompletionDependencies(self, threads): + self.addDependencies(recentCompletions) + def numCompletionSuccessors(self): + return self.completion.numSuccessors if self.completion else 0 + def writeTo(self, f, iotype): + f.write(struct.pack("!BIQIII", iotype, self.ionum, self.thread.id, self.numSuccessors, self.numCompletionSuccessors(), len(self.dependencies))) + for dep in self.dependencies: + f.write(struct.pack("!IQ", dep.ionum, self.start_time - dep.start_time)) + +class StartThreadIO(IO): + def __init__(self, ionum, start_time, thread): + IO.__init__(self, ionum, start_time, thread, None) + def writeTo(self, f): + IO.writeTo(self, f, 0) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": start thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) + +class StopThreadIO(IO): + def __init__(self, ionum, start_time, thread): + IO.__init__(self, ionum, start_time, thread, None) + def writeTo(self, f): + IO.writeTo(self, f, 1) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": stop thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) + +class ReadIO(IO): + def __init__(self, ionum, start_time, thread, prev, imagectx, extents): + IO.__init__(self, ionum, start_time, thread, prev) + self.imagectx = imagectx + self.extents = extents + def writeTo(self, f): + IO.writeTo(self, f, 2) + if len(self.extents) != 1: + raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) + extent = self.extents[0] + f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) + +class WriteIO(IO): + def __init__(self, ionum, start_time, thread, prev, imagectx, extents): + IO.__init__(self, ionum, start_time, thread, prev) + self.imagectx = imagectx + self.extents = extents + def writeTo(self, f): + IO.writeTo(self, f, 3) + if len(self.extents) != 1: + raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) + extent = self.extents[0] + f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) + +class AioReadIO(IO): + def __init__(self, ionum, start_time, thread, prev, imagectx, extents): + IO.__init__(self, ionum, start_time, thread, prev) + self.imagectx = imagectx + self.extents = extents + def writeTo(self, f): + IO.writeTo(self, f, 4) + if len(self.extents) != 1: + raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) + extent = self.extents[0] + f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) + +class AioWriteIO(IO): + def __init__(self, ionum, start_time, thread, prev, imagectx, extents): + IO.__init__(self, ionum, start_time, thread, prev) + self.imagectx = imagectx + self.extents = extents + def writeTo(self, f): + IO.writeTo(self, f, 5) + if len(self.extents) != 1: + raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) + extent = self.extents[0] + f.write(struct.pack("!QQQ", imagectx, extent.offset, extent.length)) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) + +class OpenImageIO(IO): + def __init__(self, ionum, start_time, thread, prev, imagectx, name, snap_name, readonly): + IO.__init__(self, ionum, start_time, thread, prev) + self.imagectx = imagectx + self.name = name + self.snap_name = snap_name + self.readonly = readonly + def writeTo(self, f): + IO.writeTo(self, f, 6) + f.write(struct.pack("!QI", self.imagectx, len(self.name))) + f.write(self.name) + f.write(struct.pack("!I", len(self.snap_name))) + f.write(self.snap_name) + f.write(struct.pack("!b", self.readonly)) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": open image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", name = " + self.name + ", snap_name = " + self.snap_name + ", readonly = " + str(self.readonly) + ", deps = " + str(self.depMap()) + +class CloseImageIO(IO): + def __init__(self, ionum, start_time, thread, prev, imagectx): + IO.__init__(self, ionum, start_time, thread, prev) + self.imagectx = imagectx + def writeTo(self, f): + IO.writeTo(self, f, 7) + f.write(struct.pack("!Q", self.imagectx)) + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": close image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", deps = " + str(self.depMap()) + +class CompletionIO(IO): + def __init__(self, start_time, thread, baseIO): + IO.__init__(self, baseIO.ionum + 1, start_time, thread, None) + self.baseIO = baseIO + self.isCompletion = True + self.addDependency(baseIO) + baseIO.completion = self + def writeTo(self, f): + pass + def __str__(self): + return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap()) + + +window = 1 * 1e9 + +def completed(io): + global recentCompletions + recentCompletions.append(io) + recentCompletions[:] = [x for x in recentCompletions if x.start_time > io.start_time - window] + # while recentCompletions[0].start_time < io.start_time - window: + # del recentCompletions[0] + +def main(): + global ios + # Parse phase + trace_start = None + count = 0 + for event in traces.events: + count = count + 1 + if count > limit: + break + ts = event.timestamp + if not trace_start: + trace_start = ts + ts = ts - trace_start + threadID = event["pthread_id"] + if threadID in threads: + thread = threads[threadID] + else: + thread = Thread(threadID) + threads[threadID] = thread + ionum = nextID() + io = StartThreadIO(ionum, ts, thread) + ios.append(io) + if printOnRead: + print str(io) + thread.insertTS(ts) + if event.name == "librbd:read_enter": + name = event["name"] + readid = event["id"] + imagectx = event["imagectx"] + ionum = nextID() + thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) + thread.pendingIO.addThreadCompletionDependencies(threads) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + elif event.name == "librbd:open_image_enter": + imagectx = event["imagectx"] + name = event["name"] + snap_name = event["snap_name"] + readid = event["id"] + readonly = event["read_only"] + ionum = nextID() + thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly) + thread.pendingIO.addThreadCompletionDependencies(threads) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + elif event.name == "librbd:open_image_exit": + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) + ios.append(completionIO) + completed(completionIO) + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:close_image_enter": + imagectx = event["imagectx"] + ionum = nextID() + thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx) + thread.pendingIO.addThreadCompletionDependencies(threads) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + elif event.name == "librbd:close_image_exit": + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) + ios.append(completionIO) + completed(completionIO) + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:read_extent": + offset = event["offset"] + length = event["length"] + thread.pendingIO.extents.append(Extent(offset, length)) + elif event.name == "librbd:read_exit": + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) + ios.append(completionIO) + completed(completionIO) + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:write_enter": + if not ignoreWrites: + name = event["name"] + readid = event["id"] + offset = event["off"] + length = event["buf_len"] + imagectx = event["imagectx"] + ionum = nextID() + thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) + thread.pendingIO.addThreadCompletionDependencies(threads) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + elif event.name == "librbd:write_exit": + if not ignoreWrites: + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) + ios.append(completionIO) + completed(completionIO) + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:aio_read_enter": + name = event["name"] + readid = event["id"] + completion = event["completion"] + imagectx = event["imagectx"] + ionum = nextID() + thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) + thread.pendingIO.addThreadCompletionDependencies(threads) + ios.append(thread.pendingIO) + thread.issuedIO(thread.pendingIO) + pendingIOs[completion] = thread.pendingIO + elif event.name == "librbd:aio_read_extent": + offset = event["offset"] + length = event["length"] + thread.pendingIO.extents.append(Extent(offset, length)) + elif event.name == "librbd:aio_read_exit": + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:aio_write_enter": + if not ignoreWrites: + name = event["name"] + writeid = event["id"] + offset = event["off"] + length = event["len"] + completion = event["completion"] + imagectx = event["imagectx"] + ionum = nextID() + thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) + thread.pendingIO.addThreadCompletionDependencies(threads) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + pendingIOs[completion] = thread.pendingIO + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:aio_complete_enter": + completion = event["completion"] + retval = event["rval"] + if completion in pendingIOs: + completedIO = pendingIOs[completion] + del pendingIOs[completion] + completedIO.end_time = ts + completionIO = CompletionIO(ts, thread, completedIO) + completedIO.thread.completedIO(completionIO) + ios.append(completionIO) + completed(completionIO) + if printOnRead: + print str(completionIO) + + + # Insert-thread-stop phase + ios = sorted(ios, key = lambda io: io.start_time) + for threadID in threads: + thread = threads[threadID] + ionum = None + maxIONum = 0 # only valid if ionum is None + for io in ios: + if io.ionum > maxIONum: + maxIONum = io.ionum + if io.start_time > thread.lastTS: + ionum = io.ionum + if ionum % 2 == 1: + ionum = ionum + 1 + break + if not ionum: + if maxIONum % 2 == 1: + maxIONum = maxIONum - 1 + ionum = maxIONum + 2 + for io in ios: + if io.ionum >= ionum: + io.ionum = io.ionum + 2 + # TODO: Insert in the right place, don't append and re-sort + ios.append(StopThreadIO(ionum, thread.lastTS, thread)) + ios = sorted(ios, key = lambda io: io.start_time) + + + for io in ios: + if io.prev and io.prev in io.dependencies: + io.dependencies.remove(io.prev) + if io.isCompletion: + io.dependencies.clear() + for dep in io.dependencies: + dep.numSuccessors = dep.numSuccessors + 1 + + print + # for io in ios: + # if not io.isCompletion: + # print str(io) + + with open("replay.bin", "wb") as f: + for io in ios: + if not io.isCompletion: + print str(io) + io.writeTo(f) + +cProfile.run("main()") diff --git a/src/rbd_replay/rbd-replay.cc b/src/rbd_replay/rbd-replay.cc new file mode 100644 index 0000000000000..1825bbc50208a --- /dev/null +++ b/src/rbd_replay/rbd-replay.cc @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include "common/ceph_argparse.h" +#include "Replayer.hpp" + + +using namespace std; +using namespace rbd_replay; + + +static const char* get_remainder(const char *string, const char *prefix) { + while (*prefix) { + if (*prefix++ != *string++) { + return NULL; + } + } + return string; +} + +static void usage(const char* program) { + cout << "Usage: " << program << " --conf= " << endl; +} + +int main(int argc, const char **argv) { + vector args; + + argv_to_vec(argc, argv, args); + env_to_vec(args); + + std::vector::iterator i; + string conf; + float latency_multiplier = 1; + std::string val; + std::ostringstream err; + for (i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "-c", "--conf", (char*)NULL)) { + conf = val; + } else if (ceph_argparse_withfloat(args, i, &latency_multiplier, &err, "--latency-multiplier", + (char*)NULL)) { + if (!err.str().empty()) { + cerr << err.str() << std::endl; + return 1; + } + } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { + usage(argv[0]); + return 0; + } else if (get_remainder(*i, "-")) { + cerr << "Unrecognized argument: " << *i << endl; + return 1; + } else { + ++i; + } + } + + if (conf.empty()) { + cerr << "No config file specified. Use -c or --conf." << endl; + return 1; + } + + string replay_file; + if (!args.empty()) { + replay_file = args[0]; + } + + if (replay_file.empty()) { + cerr << "No replay file specified." << endl; + return 1; + } + + Replayer replayer; + replayer.set_latency_multiplier(latency_multiplier); + replayer.run(conf, replay_file); +} -- 2.39.5