rbd_replay/actions.hpp \
rbd_replay/Deser.hpp \
rbd_replay/ImageNameMap.hpp \
+ rbd_replay/ios.hpp \
rbd_replay/PendingIO.hpp \
rbd_replay/rbd_loc.hpp \
rbd_replay/rbd_replay_debug.hpp \
bin_PROGRAMS += rbd-replay
endif #LINUX
-# TODO: See if we need any new dependencies
+librbd_replay_ios_la_SOURCES = rbd_replay/ios.cc
+librbd_replay_ios_la_LIBADD = $(LIBRBD) \
+ $(LIBRADOS) \
+ $(CEPH_GLOBAL) \
+ librbd_replay.la
+noinst_LTLIBRARIES += librbd_replay_ios.la
+
rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
rbd_replay_prep_LDADD = $(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL) \
librbd_replay.la \
+ librbd_replay_ios.la \
-lbabeltrace \
-lbabeltrace-ctf \
-lboost_date_time
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include "ios.hpp"
+
+using namespace std;
+using namespace rbd_replay;
+
+bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
+ return p1->start_time() < p2->start_time();
+}
+
+static uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
+ if (s.empty()) {
+ return 0;
+ }
+ return s.begin()->second->start_time();
+}
+
+static uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
+ if (s.empty()) {
+ return 0;
+ }
+ map<action_id_t, IO::ptr>::const_iterator itr(s.end());
+ --itr;
+ return itr->second->start_time();
+}
+
+void IO::add_dependencies(const io_set_t& deps) {
+ io_set_t base(m_dependencies);
+ for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
+ ptr dep(*itr);
+ for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
+ base.insert(*itr2);
+ }
+ }
+ batch_unreachable_from(deps, base, &m_dependencies);
+}
+
+void IO::write_debug_base(ostream& out, string type) const {
+ out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
+ bool first = true;
+ for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
+ }
+ out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
+}
+
+
+void IO::write_to(Ser& out, io_type iotype) const {
+ out.write_uint8_t(iotype);
+ out.write_uint32_t(m_ionum);
+ out.write_uint64_t(m_thread_id);
+ out.write_uint32_t(m_num_successors);
+ out.write_uint32_t(num_completion_successors());
+ out.write_uint32_t(m_dependencies.size());
+ vector<IO::ptr> deps;
+ for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+ deps.push_back(*itr);
+ }
+ sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
+ for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
+ out.write_uint32_t((*itr)->m_ionum);
+ out.write_uint64_t(m_start_time - (*itr)->m_start_time);
+ }
+}
+
+IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
+ assert(!m_completion.lock());
+ IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
+ m_completion = completion;
+ completion->m_dependencies.insert(shared_from_this());
+ return completion;
+}
+
+
+// TODO: Add unit tests
+// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
+void rbd_replay::batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
+ if (deps.empty()) {
+ return;
+ }
+
+ map<action_id_t, IO::ptr> searching_for;
+ for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
+ searching_for[(*itr)->ionum()] = *itr;
+ }
+
+ map<action_id_t, IO::ptr> boundary;
+ for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
+ boundary[(*itr)->ionum()] = *itr;
+ }
+
+ // The boundary horizon is the maximum timestamp of IOs in the boundary.
+ // This monotonically decreases, because dependencies (which are added to the set)
+ // have earlier timestamp than the dependent IOs (which were just removed from the set).
+ uint64_t boundary_horizon = max_time(boundary);
+
+ for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
+ if (boundary_horizon >= itr->second->start_time()) {
+ break;
+ }
+ unreachable->insert(itr->second);
+ searching_for.erase(itr++);
+ }
+ if (searching_for.empty()) {
+ return;
+ }
+
+ // The searching horizon is the minimum timestamp of IOs in the searching set.
+ // This monotonically increases, because elements are only removed from the set.
+ uint64_t searching_horizon = min_time(searching_for);
+
+ while (!boundary.empty()) {
+ // Take an IO from the end, which has the highest timestamp.
+ // This reduces the boundary horizon as early as possible,
+ // which means we can short cut as soon as possible.
+ map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
+ --b_itr;
+ boost::shared_ptr<IO> io(b_itr->second);
+ boundary.erase(b_itr);
+
+ for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
+ IO::ptr dep(*itr);
+ assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
+ io_map_t::iterator p = searching_for.find(dep->ionum());
+ if (p != searching_for.end()) {
+ searching_for.erase(p);
+ if (dep->start_time() == searching_horizon) {
+ searching_horizon = min_time(searching_for);
+ if (searching_horizon == 0) {
+ return;
+ }
+ }
+ }
+ boundary[dep->ionum()] = dep;
+ }
+
+ boundary_horizon = max_time(boundary);
+ if (boundary_horizon != 0) {
+ // Anything we're searching for that has a timestamp greater than the
+ // boundary horizon will never be found, since the boundary horizon
+ // falls monotonically.
+ for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
+ if (boundary_horizon >= itr->second->start_time()) {
+ break;
+ }
+ unreachable->insert(itr->second);
+ searching_for.erase(itr++);
+ }
+ searching_horizon = min_time(searching_for);
+ if (searching_horizon == 0) {
+ return;
+ }
+ }
+ }
+
+ // Anything we're still searching for has not been found.
+ for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
+ unreachable->insert(itr->second);
+ }
+}
+
+ostream& operator<<(ostream& out, IO::ptr io) {
+ io->write_debug(out);
+ return out;
+}
+
+void StartThreadIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_START_THREAD);
+}
+
+void StartThreadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "start thread");
+}
+
+void StopThreadIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_STOP_THREAD);
+}
+
+void StopThreadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "stop thread");
+}
+
+void ReadIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_READ);
+ out.write_uint64_t(m_imagectx);
+ out.write_uint64_t(m_offset);
+ out.write_uint64_t(m_length);
+}
+
+void ReadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "read");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void WriteIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_WRITE);
+ out.write_uint64_t(m_imagectx);
+ out.write_uint64_t(m_offset);
+ out.write_uint64_t(m_length);
+}
+
+void WriteIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "write");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioReadIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_ASYNC_READ);
+ out.write_uint64_t(m_imagectx);
+ out.write_uint64_t(m_offset);
+ out.write_uint64_t(m_length);
+}
+
+void AioReadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio read");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioWriteIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_ASYNC_WRITE);
+ out.write_uint64_t(m_imagectx);
+ out.write_uint64_t(m_offset);
+ out.write_uint64_t(m_length);
+}
+
+void AioWriteIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio write");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void OpenImageIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_OPEN_IMAGE);
+ out.write_uint64_t(m_imagectx);
+ out.write_string(m_name);
+ out.write_string(m_snap_name);
+ out.write_bool(m_readonly);
+}
+
+void OpenImageIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "open image");
+ out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
+}
+
+void CloseImageIO::write_to(Ser& out) const {
+ IO::write_to(out, IO_CLOSE_IMAGE);
+ out.write_uint64_t(m_imagectx);
+}
+
+void CloseImageIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "close image");
+ out << ", imagectx=" << m_imagectx;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_IOS_HPP
+#define _INCLUDED_RBD_REPLAY_IOS_HPP
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+#include <set>
+#include "actions.hpp"
+#include "Ser.hpp"
+
+
+namespace rbd_replay {
+
+class IO;
+
+typedef std::set<boost::shared_ptr<IO> > io_set_t;
+
+typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t;
+
+void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
+
+class IO : public boost::enable_shared_from_this<IO> {
+public:
+ typedef boost::shared_ptr<IO> ptr;
+
+ typedef boost::weak_ptr<IO> weak_ptr;
+
+ IO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ ptr prev)
+ : m_ionum(ionum),
+ m_start_time(start_time),
+ m_dependencies(io_set_t()),
+ m_completion(weak_ptr()),
+ m_num_successors(0),
+ m_thread_id(thread_id),
+ m_prev(prev) {
+ }
+
+ virtual ~IO() {
+ }
+
+ uint64_t start_time() const {
+ return m_start_time;
+ }
+
+ io_set_t& dependencies() {
+ return m_dependencies;
+ }
+
+ const io_set_t& dependencies() const {
+ return m_dependencies;
+ }
+
+ void add_dependencies(const io_set_t& deps);
+
+ uint64_t num_completion_successors() const {
+ ptr c(m_completion.lock());
+ return c ? c->m_num_successors : 0;
+ }
+
+ void write_to(Ser& out, io_type iotype) const;
+
+ virtual void write_to(Ser& out) const = 0;
+
+ virtual bool is_completion() const {
+ return false;
+ }
+
+ void set_ionum(action_id_t ionum) {
+ m_ionum = ionum;
+ }
+
+ action_id_t ionum() const {
+ return m_ionum;
+ }
+
+ ptr prev() const {
+ return m_prev;
+ }
+
+ void set_num_successors(uint32_t n) {
+ m_num_successors = n;
+ }
+
+ uint32_t num_successors() const {
+ return m_num_successors;
+ }
+
+ void write_debug_base(std::ostream& out, std::string iotype) const;
+
+ virtual void write_debug(std::ostream& out) const = 0;
+
+ // The result must be stored somewhere, or else m_completion will expire
+ ptr create_completion(uint64_t start_time, thread_id_t thread_id);
+
+private:
+ action_id_t m_ionum;
+ uint64_t m_start_time;
+ io_set_t m_dependencies;
+ boost::weak_ptr<IO> m_completion;
+ uint32_t m_num_successors;
+ thread_id_t m_thread_id;
+ ptr m_prev;
+};
+
+std::ostream& operator<<(std::ostream& out, IO::ptr io);
+
+
+class StartThreadIO : public IO {
+public:
+ StartThreadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id)
+ : IO(ionum, start_time, thread_id, IO::ptr()) {
+ }
+
+ void write_to(Ser& out) const;
+
+ void write_debug(std::ostream& out) const;
+};
+
+class StopThreadIO : public IO {
+public:
+ StopThreadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id)
+ : IO(ionum, start_time, thread_id, IO::ptr()) {
+ }
+
+ void write_to(Ser& out) const;
+
+ void write_debug(std::ostream& out) const;
+};
+
+class ReadIO : public IO {
+public:
+ ReadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ IO::ptr prev,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, prev),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void write_to(Ser& out) const;
+
+ void write_debug(std::ostream& out) const;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class WriteIO : public IO {
+public:
+ WriteIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ IO::ptr prev,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, prev),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void write_to(Ser& out) const;
+
+ void write_debug(std::ostream& out) const;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class AioReadIO : public IO {
+public:
+ AioReadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ IO::ptr prev,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, prev),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void write_to(Ser& out) const;
+
+ void write_debug(std::ostream& out) const;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class AioWriteIO : public IO {
+public:
+ AioWriteIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ IO::ptr prev,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, prev),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void write_to(Ser& out) const;
+
+ void write_debug(std::ostream& out) const;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class OpenImageIO : public IO {
+public:
+ OpenImageIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ IO::ptr prev,
+ imagectx_id_t imagectx,
+ const std::string& name,
+ const std::string& snap_name,
+ bool readonly)
+ : IO(ionum, start_time, thread_id, prev),
+ m_imagectx(imagectx),
+ m_name(name),
+ m_snap_name(snap_name),
+ m_readonly(readonly) {
+ }
+
+ void write_to(Ser& out) const;
+
+ imagectx_id_t imagectx() const {
+ return m_imagectx;
+ }
+
+ void write_debug(std::ostream& out) const;
+
+private:
+ imagectx_id_t m_imagectx;
+ std::string m_name;
+ std::string m_snap_name;
+ bool m_readonly;
+};
+
+class CloseImageIO : public IO {
+public:
+ CloseImageIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ IO::ptr prev,
+ imagectx_id_t imagectx)
+ : IO(ionum, start_time, thread_id, prev),
+ m_imagectx(imagectx) {
+ }
+
+ void write_to(Ser& out) const;
+
+ imagectx_id_t imagectx() const {
+ return m_imagectx;
+ }
+
+ void write_debug(std::ostream& out) const;
+
+private:
+ imagectx_id_t m_imagectx;
+};
+
+class CompletionIO : public IO {
+public:
+ CompletionIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id)
+ : IO(ionum, start_time, thread_id, IO::ptr()) {
+ }
+
+ void write_to(Ser& out) const {
+ }
+
+ bool is_completion() const {
+ return true;
+ }
+
+ void write_debug(std::ostream& out) const {
+ write_debug_base(out, "completion");
+ }
+};
+
+bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
+
+}
+
+#endif
#include <cstdlib>
#include <string>
#include <assert.h>
-#include <iostream>
#include <fstream>
#include <boost/thread/thread.hpp>
-#include "actions.hpp"
-#include "Ser.hpp"
+#include "ios.hpp"
using namespace std;
using namespace rbd_replay;
-// Allows us to easily expose all the functions to make debugging easier.
-#define STATIC static
-
-class IO;
-
-typedef set<boost::shared_ptr<IO> > io_set_t;
-
-typedef map<action_id_t, boost::shared_ptr<IO> > io_map_t;
-
-STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
-
-class IO : public boost::enable_shared_from_this<IO> {
-public:
- typedef boost::shared_ptr<IO> ptr;
-
- typedef boost::weak_ptr<IO> weak_ptr;
-
- IO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- ptr prev)
- : m_ionum(ionum),
- m_start_time(start_time),
- m_dependencies(io_set_t()),
- m_completion(weak_ptr()),
- m_num_successors(0),
- m_thread_id(thread_id),
- m_prev(prev) {
- }
-
- virtual ~IO() {
- }
-
- uint64_t start_time() const {
- return m_start_time;
- }
-
- io_set_t& dependencies() {
- return m_dependencies;
- }
-
- const io_set_t& dependencies() const {
- return m_dependencies;
- }
-
- void add_dependencies(const io_set_t& deps) {
- io_set_t base(m_dependencies);
- for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
- ptr dep(*itr);
- for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
- base.insert(*itr2);
- }
- }
- batch_unreachable_from(deps, base, &m_dependencies);
- }
-
- uint64_t num_completion_successors() const {
- ptr c(m_completion.lock());
- return c ? c->m_num_successors : 0;
- }
-
- void write_to(Ser& out, io_type iotype) const;
-
- virtual void write_to(Ser& out) const = 0;
-
- virtual bool is_completion() const {
- return false;
- }
-
- void set_ionum(action_id_t ionum) {
- m_ionum = ionum;
- }
-
- action_id_t ionum() const {
- return m_ionum;
- }
-
- ptr prev() const {
- return m_prev;
- }
-
- void set_num_successors(uint32_t n) {
- m_num_successors = n;
- }
-
- uint32_t num_successors() const {
- return m_num_successors;
- }
-
- void write_debug_base(ostream& out, string iotype);
-
- virtual void write_debug(ostream& out) = 0;
-
- // The result must be stored somewhere, or else m_completion will expire
- ptr create_completion(uint64_t start_time, thread_id_t thread_id);
-
-private:
- action_id_t m_ionum;
- uint64_t m_start_time;
- io_set_t m_dependencies;
- boost::weak_ptr<IO> m_completion;
- uint32_t m_num_successors;
- thread_id_t m_thread_id;
- ptr m_prev;
-};
-
-ostream& operator<<(ostream& out, IO::ptr io) {
- io->write_debug(out);
- return out;
-}
class Thread {
public:
uint64_t m_max_ts;
};
-void IO::write_debug_base(ostream& out, string type) {
- out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
- bool first = true;
- for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
- if (first) {
- first = false;
- } else {
- out << ", ";
- }
- out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
- }
- out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
-}
-
-class StartThreadIO : public IO {
-public:
- StartThreadIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id)
- : IO(ionum, start_time, thread_id, IO::ptr()) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_START_THREAD);
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "start thread");
- }
-};
-
-class StopThreadIO : public IO {
-public:
- StopThreadIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id)
- : IO(ionum, start_time, thread_id, IO::ptr()) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_STOP_THREAD);
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "stop thread");
- }
-};
-
-class ReadIO : public IO {
-public:
- ReadIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- IO::ptr prev,
- imagectx_id_t imagectx,
- uint64_t offset,
- uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
- m_imagectx(imagectx),
- m_offset(offset),
- m_length(length) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_READ);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "read");
- out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
- }
-
-private:
- imagectx_id_t m_imagectx;
- uint64_t m_offset;
- uint64_t m_length;
-};
-
-class WriteIO : public IO {
-public:
- WriteIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- IO::ptr prev,
- imagectx_id_t imagectx,
- uint64_t offset,
- uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
- m_imagectx(imagectx),
- m_offset(offset),
- m_length(length) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_WRITE);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "write");
- out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
- }
-
-private:
- imagectx_id_t m_imagectx;
- uint64_t m_offset;
- uint64_t m_length;
-};
-
-class AioReadIO : public IO {
-public:
- AioReadIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- IO::ptr prev,
- imagectx_id_t imagectx,
- uint64_t offset,
- uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
- m_imagectx(imagectx),
- m_offset(offset),
- m_length(length) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_ASYNC_READ);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "aio read");
- out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
- }
-private:
- imagectx_id_t m_imagectx;
- uint64_t m_offset;
- uint64_t m_length;
-};
-
-class AioWriteIO : public IO {
-public:
- AioWriteIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- IO::ptr prev,
- imagectx_id_t imagectx,
- uint64_t offset,
- uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
- m_imagectx(imagectx),
- m_offset(offset),
- m_length(length) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_ASYNC_WRITE);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "aio write");
- out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
- }
-
-private:
- imagectx_id_t m_imagectx;
- uint64_t m_offset;
- uint64_t m_length;
-};
-
-class OpenImageIO : public IO {
-public:
- OpenImageIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- IO::ptr prev,
- imagectx_id_t imagectx,
- const string& name,
- const string& snap_name,
- bool readonly)
- : IO(ionum, start_time, thread_id, prev),
- m_imagectx(imagectx),
- m_name(name),
- m_snap_name(snap_name),
- m_readonly(readonly) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_OPEN_IMAGE);
- out.write_uint64_t(m_imagectx);
- out.write_string(m_name);
- out.write_string(m_snap_name);
- out.write_bool(m_readonly);
- }
-
- imagectx_id_t imagectx() const {
- return m_imagectx;
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "open image");
- out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
- }
-
-private:
- imagectx_id_t m_imagectx;
- string m_name;
- string m_snap_name;
- bool m_readonly;
-};
-
-class CloseImageIO : public IO {
-public:
- CloseImageIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id,
- IO::ptr prev,
- imagectx_id_t imagectx)
- : IO(ionum, start_time, thread_id, prev),
- m_imagectx(imagectx) {
- }
-
- void write_to(Ser& out) const {
- IO::write_to(out, IO_CLOSE_IMAGE);
- out.write_uint64_t(m_imagectx);
- }
-
- imagectx_id_t imagectx() const {
- return m_imagectx;
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "close image");
- out << ", imagectx=" << m_imagectx;
- }
-
-private:
- imagectx_id_t m_imagectx;
-};
-
-class CompletionIO : public IO {
-public:
- CompletionIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id)
- : IO(ionum, start_time, thread_id, IO::ptr()) {
- }
-
- void write_to(Ser& out) const {
- }
-
- bool is_completion() const {
- return true;
- }
-
- void write_debug(ostream& out) {
- write_debug_base(out, "completion");
- }
-};
-
-STATIC bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
- return p1->start_time() < p2->start_time();
-}
-
-void IO::write_to(Ser& out, io_type iotype) const {
- out.write_uint8_t(iotype);
- out.write_uint32_t(m_ionum);
- out.write_uint64_t(m_thread_id);
- out.write_uint32_t(m_num_successors);
- out.write_uint32_t(num_completion_successors());
- out.write_uint32_t(m_dependencies.size());
- vector<IO::ptr> deps;
- for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
- deps.push_back(*itr);
- }
- sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
- for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
- out.write_uint32_t((*itr)->m_ionum);
- out.write_uint64_t(m_start_time - (*itr)->m_start_time);
- }
-}
-
-IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
- assert(!m_completion.lock());
- IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
- m_completion = completion;
- completion->m_dependencies.insert(shared_from_this());
- return completion;
-}
-
-STATIC uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
- if (s.empty()) {
- return 0;
- }
- return s.begin()->second->start_time();
-}
-
-STATIC uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
- if (s.empty()) {
- return 0;
- }
- map<action_id_t, IO::ptr>::const_iterator itr(s.end());
- --itr;
- return itr->second->start_time();
-}
-
-// TODO: Add unit tests
-// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
-STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
- if (deps.empty()) {
- return;
- }
-
- map<action_id_t, IO::ptr> searching_for;
- for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
- searching_for[(*itr)->ionum()] = *itr;
- }
-
- map<action_id_t, IO::ptr> boundary;
- for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
- boundary[(*itr)->ionum()] = *itr;
- }
-
- // The boundary horizon is the maximum timestamp of IOs in the boundary.
- // This monotonically decreases, because dependencies (which are added to the set)
- // have earlier timestamp than the dependent IOs (which were just removed from the set).
- uint64_t boundary_horizon = max_time(boundary);
-
- for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
- if (boundary_horizon >= itr->second->start_time()) {
- break;
- }
- unreachable->insert(itr->second);
- searching_for.erase(itr++);
- }
- if (searching_for.empty()) {
- return;
- }
-
- // The searching horizon is the minimum timestamp of IOs in the searching set.
- // This monotonically increases, because elements are only removed from the set.
- uint64_t searching_horizon = min_time(searching_for);
-
- while (!boundary.empty()) {
- // Take an IO from the end, which has the highest timestamp.
- // This reduces the boundary horizon as early as possible,
- // which means we can short cut as soon as possible.
- map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
- --b_itr;
- boost::shared_ptr<IO> io(b_itr->second);
- boundary.erase(b_itr);
-
- for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
- IO::ptr dep(*itr);
- assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
- io_map_t::iterator p = searching_for.find(dep->ionum());
- if (p != searching_for.end()) {
- searching_for.erase(p);
- if (dep->start_time() == searching_horizon) {
- searching_horizon = min_time(searching_for);
- if (searching_horizon == 0) {
- return;
- }
- }
- }
- boundary[dep->ionum()] = dep;
- }
-
- boundary_horizon = max_time(boundary);
- if (boundary_horizon != 0) {
- // Anything we're searching for that has a timestamp greater than the
- // boundary horizon will never be found, since the boundary horizon
- // falls monotonically.
- for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
- if (boundary_horizon >= itr->second->start_time()) {
- break;
- }
- unreachable->insert(itr->second);
- searching_for.erase(itr++);
- }
- searching_horizon = min_time(searching_for);
- if (searching_horizon == 0) {
- return;
- }
- }
- }
-
- // Anything we're still searching for has not been found.
- for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
- unreachable->insert(itr->second);
- }
-}
-
-STATIC void usage(string prog) {
+static void usage(string prog) {
cout << "Usage: " << prog << " [ --window <seconds> ] <trace-input> <replay-output>" << endl;
}
-__attribute__((noreturn)) STATIC void usage_exit(string prog, string msg) {
+__attribute__((noreturn)) static void usage_exit(string prog, string msg) {
cerr << msg << endl;
usage(prog);
exit(1);
$(LIBRADOS) \
$(CEPH_GLOBAL) \
librbd_replay.la \
+ librbd_replay_ios.la \
$(UNITTEST_LDADD)
unittest_rbd_replay_CXXFLAGS = $(UNITTEST_CXXFLAGS)
check_PROGRAMS += unittest_rbd_replay
#include "gtest/gtest.h"
#include <stdint.h>
#include <boost/foreach.hpp>
+#include <cstdarg>
#include "rbd_replay/Deser.hpp"
#include "rbd_replay/ImageNameMap.hpp"
+#include "rbd_replay/ios.hpp"
#include "rbd_replay/rbd_loc.hpp"
#include "rbd_replay/Ser.hpp"
-using rbd_replay::ImageNameMap;
-using rbd_replay::rbd_loc;
+using namespace rbd_replay;
std::ostream& operator<<(std::ostream& o, const rbd_loc& name) {
return o << "('" << name.pool << "', '" << name.image << "', '" << name.snap << "')";
EXPECT_FALSE(m.parse("a/b/c"));
EXPECT_FALSE(m.parse("a@b/c"));
}
+
+static IO::ptr mkio(action_id_t ionum, ...) {
+ IO::ptr io(new StartThreadIO(ionum, ionum, 0));
+
+ va_list ap;
+ va_start(ap, ionum);
+ while (true) {
+ IO::ptr* dep = va_arg(ap, IO::ptr*);
+ if (!dep) {
+ break;
+ }
+ io->dependencies().insert(*dep);
+ }
+ va_end(ap);
+
+ return io;
+}
+
+TEST(RBDReplay, batch_unreachable_from) {
+ io_set_t deps;
+ io_set_t base;
+ io_set_t unreachable;
+ IO::ptr io1(mkio(1, NULL));
+ IO::ptr io2(mkio(2, &io1, NULL));
+ IO::ptr io3(mkio(3, &io2, NULL));
+ IO::ptr io4(mkio(4, &io1, NULL));
+ IO::ptr io5(mkio(5, &io2, &io4, NULL));
+ IO::ptr io6(mkio(6, &io3, &io5, NULL));
+ IO::ptr io7(mkio(7, &io4, NULL));
+ IO::ptr io8(mkio(8, &io5, &io7, NULL));
+ IO::ptr io9(mkio(9, &io6, &io8, NULL));
+ // 1 (deps) <-- 2 (deps) <-- 3 (deps)
+ // ^ ^ ^
+ // | | |
+ // 4 <--------- 5 (base) <-- 6 (deps)
+ // ^ ^ ^
+ // | | |
+ // 7 <--------- 8 <--------- 9
+ deps.insert(io1);
+ deps.insert(io2);
+ deps.insert(io3);
+ deps.insert(io6);
+ base.insert(io5);
+ // Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
+ batch_unreachable_from(deps, base, &unreachable);
+ EXPECT_EQ(0, unreachable.count(io1));
+ EXPECT_EQ(0, unreachable.count(io2));
+ EXPECT_EQ(1, unreachable.count(io3));
+ EXPECT_EQ(0, unreachable.count(io4));
+ EXPECT_EQ(0, unreachable.count(io5));
+ EXPECT_EQ(1, unreachable.count(io6));
+ EXPECT_EQ(0, unreachable.count(io7));
+ EXPECT_EQ(0, unreachable.count(io8));
+ EXPECT_EQ(0, unreachable.count(io9));
+}