]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd_replay: Add unit test for batch_unreachable_from
authorAdam Crume <adamcrume@gmail.com>
Tue, 19 Aug 2014 23:04:40 +0000 (16:04 -0700)
committerSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 17:57:35 +0000 (10:57 -0700)
This requires a fair amount of code reorganization, since the types in
the function signature were previously not in a header file.

Signed-off-by: Adam Crume <adamcrume@gmail.com>
src/rbd_replay/Makefile.am
src/rbd_replay/ios.cc [new file with mode: 0644]
src/rbd_replay/ios.hpp [new file with mode: 0644]
src/rbd_replay/rbd-replay-prep.cc
src/test/Makefile.am
src/test/test_rbd_replay.cc

index 9420afdbab4a2b5711eb2d38c7ebc6049d30ecd8..f1f406f243504338e8d3bf565e2980268f75f18c 100644 (file)
@@ -14,6 +14,7 @@ noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
        rbd_replay/actions.hpp \
        rbd_replay/Deser.hpp \
        rbd_replay/ImageNameMap.hpp \
+       rbd_replay/ios.hpp \
        rbd_replay/PendingIO.hpp \
        rbd_replay/rbd_loc.hpp \
        rbd_replay/rbd_replay_debug.hpp \
@@ -31,12 +32,19 @@ if LINUX
 bin_PROGRAMS += rbd-replay
 endif #LINUX
 
-# TODO: See if we need any new dependencies
+librbd_replay_ios_la_SOURCES = rbd_replay/ios.cc
+librbd_replay_ios_la_LIBADD = $(LIBRBD) \
+       $(LIBRADOS) \
+       $(CEPH_GLOBAL) \
+       librbd_replay.la
+noinst_LTLIBRARIES += librbd_replay_ios.la
+
 rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
 rbd_replay_prep_LDADD = $(LIBRBD) \
        $(LIBRADOS) \
        $(CEPH_GLOBAL) \
        librbd_replay.la \
+       librbd_replay_ios.la \
        -lbabeltrace \
        -lbabeltrace-ctf \
        -lboost_date_time
diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc
new file mode 100644 (file)
index 0000000..ccc560f
--- /dev/null
@@ -0,0 +1,273 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include "ios.hpp"
+
+using namespace std;
+using namespace rbd_replay;
+
+bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
+  return p1->start_time() < p2->start_time();
+}
+
+static uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
+  if (s.empty()) {
+    return 0;
+  }
+  return s.begin()->second->start_time();
+}
+
+static uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
+  if (s.empty()) {
+    return 0;
+  }
+  map<action_id_t, IO::ptr>::const_iterator itr(s.end());
+  --itr;
+  return itr->second->start_time();
+}
+
+void IO::add_dependencies(const io_set_t& deps) {
+  io_set_t base(m_dependencies);
+  for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
+    ptr dep(*itr);
+    for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
+      base.insert(*itr2);
+    }
+  }
+  batch_unreachable_from(deps, base, &m_dependencies);
+}
+
+void IO::write_debug_base(ostream& out, string type) const {
+  out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
+  bool first = true;
+  for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+    if (first) {
+      first = false;
+    } else {
+      out << ", ";
+    }
+    out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
+  }
+  out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
+}
+
+
+void IO::write_to(Ser& out, io_type iotype) const {
+  out.write_uint8_t(iotype);
+  out.write_uint32_t(m_ionum);
+  out.write_uint64_t(m_thread_id);
+  out.write_uint32_t(m_num_successors);
+  out.write_uint32_t(num_completion_successors());
+  out.write_uint32_t(m_dependencies.size());
+  vector<IO::ptr> deps;
+  for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+    deps.push_back(*itr);
+  }
+  sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
+  for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
+    out.write_uint32_t((*itr)->m_ionum);
+    out.write_uint64_t(m_start_time - (*itr)->m_start_time);
+  }
+}
+
+IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
+  assert(!m_completion.lock());
+  IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
+  m_completion = completion;
+  completion->m_dependencies.insert(shared_from_this());
+  return completion;
+}
+
+
+// TODO: Add unit tests
+// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
+void rbd_replay::batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
+  if (deps.empty()) {
+    return;
+  }
+
+  map<action_id_t, IO::ptr> searching_for;
+  for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
+    searching_for[(*itr)->ionum()] = *itr;
+  }
+
+  map<action_id_t, IO::ptr> boundary;
+  for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
+    boundary[(*itr)->ionum()] = *itr;
+  }
+
+  // The boundary horizon is the maximum timestamp of IOs in the boundary.
+  // This monotonically decreases, because dependencies (which are added to the set)
+  // have earlier timestamp than the dependent IOs (which were just removed from the set).
+  uint64_t boundary_horizon = max_time(boundary);
+
+  for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
+    if (boundary_horizon >= itr->second->start_time()) {
+      break;
+    }
+    unreachable->insert(itr->second);
+    searching_for.erase(itr++);
+  }
+  if (searching_for.empty()) {
+    return;
+  }
+
+  // The searching horizon is the minimum timestamp of IOs in the searching set.
+  // This monotonically increases, because elements are only removed from the set.
+  uint64_t searching_horizon = min_time(searching_for);
+
+  while (!boundary.empty()) {
+    // Take an IO from the end, which has the highest timestamp.
+    // This reduces the boundary horizon as early as possible,
+    // which means we can short cut as soon as possible.
+    map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
+    --b_itr;
+    boost::shared_ptr<IO> io(b_itr->second);
+    boundary.erase(b_itr);
+
+    for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
+      IO::ptr dep(*itr);
+      assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
+      io_map_t::iterator p = searching_for.find(dep->ionum());
+      if (p != searching_for.end()) {
+       searching_for.erase(p);
+       if (dep->start_time() == searching_horizon) {
+         searching_horizon = min_time(searching_for);
+         if (searching_horizon == 0) {
+           return;
+         }
+       }
+      }
+      boundary[dep->ionum()] = dep;
+    }
+
+    boundary_horizon = max_time(boundary);
+    if (boundary_horizon != 0) {
+      // Anything we're searching for that has a timestamp greater than the
+      // boundary horizon will never be found, since the boundary horizon
+      // falls monotonically.
+      for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
+       if (boundary_horizon >= itr->second->start_time()) {
+         break;
+       }
+       unreachable->insert(itr->second);
+       searching_for.erase(itr++);
+      }
+      searching_horizon = min_time(searching_for);
+      if (searching_horizon == 0) {
+       return;
+      }
+    }
+  }
+
+  // Anything we're still searching for has not been found.
+  for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
+    unreachable->insert(itr->second);
+  }
+}
+
+ostream& operator<<(ostream& out, IO::ptr io) {
+  io->write_debug(out);
+  return out;
+}
+
+void StartThreadIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_START_THREAD);
+}
+
+void StartThreadIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "start thread");
+}
+
+void StopThreadIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_STOP_THREAD);
+}
+
+void StopThreadIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "stop thread");
+}
+
+void ReadIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_READ);
+  out.write_uint64_t(m_imagectx);
+  out.write_uint64_t(m_offset);
+  out.write_uint64_t(m_length);
+}
+
+void ReadIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "read");
+  out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void WriteIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_WRITE);
+  out.write_uint64_t(m_imagectx);
+  out.write_uint64_t(m_offset);
+  out.write_uint64_t(m_length);
+}
+
+void WriteIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "write");
+  out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioReadIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_ASYNC_READ);
+  out.write_uint64_t(m_imagectx);
+  out.write_uint64_t(m_offset);
+  out.write_uint64_t(m_length);
+}
+
+void AioReadIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "aio read");
+  out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioWriteIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_ASYNC_WRITE);
+  out.write_uint64_t(m_imagectx);
+  out.write_uint64_t(m_offset);
+  out.write_uint64_t(m_length);
+}
+
+void AioWriteIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "aio write");
+  out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void OpenImageIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_OPEN_IMAGE);
+  out.write_uint64_t(m_imagectx);
+  out.write_string(m_name);
+  out.write_string(m_snap_name);
+  out.write_bool(m_readonly);
+}
+
+void OpenImageIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "open image");
+  out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
+}
+
+void CloseImageIO::write_to(Ser& out) const {
+  IO::write_to(out, IO_CLOSE_IMAGE);
+  out.write_uint64_t(m_imagectx);
+}
+
+void CloseImageIO::write_debug(std::ostream& out) const {
+  write_debug_base(out, "close image");
+  out << ", imagectx=" << m_imagectx;
+}
diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp
new file mode 100644 (file)
index 0000000..ad9ff2a
--- /dev/null
@@ -0,0 +1,334 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_IOS_HPP
+#define _INCLUDED_RBD_REPLAY_IOS_HPP
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+#include <set>
+#include "actions.hpp"
+#include "Ser.hpp"
+
+
+namespace rbd_replay {
+
+class IO;
+
+typedef std::set<boost::shared_ptr<IO> > io_set_t;
+
+typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t;
+
+void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
+
+class IO : public boost::enable_shared_from_this<IO> {
+public:
+  typedef boost::shared_ptr<IO> ptr;
+
+  typedef boost::weak_ptr<IO> weak_ptr;
+
+  IO(action_id_t ionum,
+     uint64_t start_time,
+     thread_id_t thread_id,
+     ptr prev)
+    : m_ionum(ionum),
+      m_start_time(start_time),
+      m_dependencies(io_set_t()),
+      m_completion(weak_ptr()),
+      m_num_successors(0),
+      m_thread_id(thread_id),
+      m_prev(prev) {
+  }
+
+  virtual ~IO() {
+  }
+
+  uint64_t start_time() const {
+    return m_start_time;
+  }
+
+  io_set_t& dependencies() {
+    return m_dependencies;
+  }
+
+  const io_set_t& dependencies() const {
+    return m_dependencies;
+  }
+
+  void add_dependencies(const io_set_t& deps);
+
+  uint64_t num_completion_successors() const {
+    ptr c(m_completion.lock());
+    return c ? c->m_num_successors : 0;
+  }
+
+  void write_to(Ser& out, io_type iotype) const;
+
+  virtual void write_to(Ser& out) const = 0;
+
+  virtual bool is_completion() const {
+    return false;
+  }
+
+  void set_ionum(action_id_t ionum) {
+    m_ionum = ionum;
+  }
+
+  action_id_t ionum() const {
+    return m_ionum;
+  }
+
+  ptr prev() const {
+    return m_prev;
+  }
+
+  void set_num_successors(uint32_t n) {
+    m_num_successors = n;
+  }
+
+  uint32_t num_successors() const {
+    return m_num_successors;
+  }
+
+  void write_debug_base(std::ostream& out, std::string iotype) const;
+
+  virtual void write_debug(std::ostream& out) const = 0;
+
+  // The result must be stored somewhere, or else m_completion will expire
+  ptr create_completion(uint64_t start_time, thread_id_t thread_id);
+
+private:
+  action_id_t m_ionum;
+  uint64_t m_start_time;
+  io_set_t m_dependencies;
+  boost::weak_ptr<IO> m_completion;
+  uint32_t m_num_successors;
+  thread_id_t m_thread_id;
+  ptr m_prev;
+};
+
+std::ostream& operator<<(std::ostream& out, IO::ptr io);
+
+
+class StartThreadIO : public IO {
+public:
+  StartThreadIO(action_id_t ionum,
+               uint64_t start_time,
+               thread_id_t thread_id)
+    : IO(ionum, start_time, thread_id, IO::ptr()) {
+  }
+
+  void write_to(Ser& out) const;
+
+  void write_debug(std::ostream& out) const;
+};
+
+class StopThreadIO : public IO {
+public:
+  StopThreadIO(action_id_t ionum,
+              uint64_t start_time,
+              thread_id_t thread_id)
+    : IO(ionum, start_time, thread_id, IO::ptr()) {
+  }
+
+  void write_to(Ser& out) const;
+
+  void write_debug(std::ostream& out) const;
+};
+
+class ReadIO : public IO {
+public:
+  ReadIO(action_id_t ionum,
+        uint64_t start_time,
+        thread_id_t thread_id,
+        IO::ptr prev,
+        imagectx_id_t imagectx,
+        uint64_t offset,
+        uint64_t length)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_offset(offset),
+      m_length(length) {
+  }
+
+  void write_to(Ser& out) const;
+
+  void write_debug(std::ostream& out) const;
+
+private:
+  imagectx_id_t m_imagectx;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+class WriteIO : public IO {
+public:
+  WriteIO(action_id_t ionum,
+         uint64_t start_time,
+         thread_id_t thread_id,
+         IO::ptr prev,
+         imagectx_id_t imagectx,
+         uint64_t offset,
+         uint64_t length)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_offset(offset),
+      m_length(length) {
+  }
+
+  void write_to(Ser& out) const;
+
+  void write_debug(std::ostream& out) const;
+
+private:
+  imagectx_id_t m_imagectx;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+class AioReadIO : public IO {
+public:
+  AioReadIO(action_id_t ionum,
+           uint64_t start_time,
+           thread_id_t thread_id,
+           IO::ptr prev,
+           imagectx_id_t imagectx,
+           uint64_t offset,
+           uint64_t length)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_offset(offset),
+      m_length(length) {
+  }
+
+  void write_to(Ser& out) const;
+
+  void write_debug(std::ostream& out) const;
+
+private:
+  imagectx_id_t m_imagectx;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+class AioWriteIO : public IO {
+public:
+  AioWriteIO(action_id_t ionum,
+            uint64_t start_time,
+            thread_id_t thread_id,
+            IO::ptr prev,
+            imagectx_id_t imagectx,
+            uint64_t offset,
+            uint64_t length)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_offset(offset),
+      m_length(length) {
+  }
+
+  void write_to(Ser& out) const;
+
+  void write_debug(std::ostream& out) const;
+
+private:
+  imagectx_id_t m_imagectx;
+  uint64_t m_offset;
+  uint64_t m_length;
+};
+
+class OpenImageIO : public IO {
+public:
+  OpenImageIO(action_id_t ionum,
+             uint64_t start_time,
+             thread_id_t thread_id,
+             IO::ptr prev,
+             imagectx_id_t imagectx,
+             const std::string& name,
+             const std::string& snap_name,
+             bool readonly)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_name(name),
+      m_snap_name(snap_name),
+      m_readonly(readonly) {
+  }
+
+  void write_to(Ser& out) const;
+
+  imagectx_id_t imagectx() const {
+    return m_imagectx;
+  }
+
+  void write_debug(std::ostream& out) const;
+
+private:
+  imagectx_id_t m_imagectx;
+  std::string m_name;
+  std::string m_snap_name;
+  bool m_readonly;
+};
+
+class CloseImageIO : public IO {
+public:
+  CloseImageIO(action_id_t ionum,
+              uint64_t start_time,
+              thread_id_t thread_id,
+              IO::ptr prev,
+              imagectx_id_t imagectx)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx) {
+  }
+
+  void write_to(Ser& out) const;
+
+  imagectx_id_t imagectx() const {
+    return m_imagectx;
+  }
+
+  void write_debug(std::ostream& out) const;
+
+private:
+  imagectx_id_t m_imagectx;
+};
+
+class CompletionIO : public IO {
+public:
+  CompletionIO(action_id_t ionum,
+              uint64_t start_time,
+              thread_id_t thread_id)
+    : IO(ionum, start_time, thread_id, IO::ptr()) {
+  }
+
+  void write_to(Ser& out) const {
+  }
+
+  bool is_completion() const {
+    return true;
+  }
+
+  void write_debug(std::ostream& out) const {
+    write_debug_base(out, "completion");
+  }
+};
+
+bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
+
+}
+
+#endif
index 9e789a0a4924584b1ecda74e2565fd635cc3fec2..63a5fda55ac49e82c81a8aaa28f4425402732403 100644 (file)
 #include <cstdlib>
 #include <string>
 #include <assert.h>
-#include <iostream>
 #include <fstream>
 #include <boost/thread/thread.hpp>
-#include "actions.hpp"
-#include "Ser.hpp"
+#include "ios.hpp"
 
 using namespace std;
 using namespace rbd_replay;
 
-// Allows us to easily expose all the functions to make debugging easier.
-#define STATIC static
-
-class IO;
-
-typedef set<boost::shared_ptr<IO> > io_set_t;
-
-typedef map<action_id_t, boost::shared_ptr<IO> > io_map_t;
-
-STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
-
-class IO : public boost::enable_shared_from_this<IO> {
-public:
-  typedef boost::shared_ptr<IO> ptr;
-
-  typedef boost::weak_ptr<IO> weak_ptr;
-
-  IO(action_id_t ionum,
-     uint64_t start_time,
-     thread_id_t thread_id,
-     ptr prev)
-    : m_ionum(ionum),
-      m_start_time(start_time),
-      m_dependencies(io_set_t()),
-      m_completion(weak_ptr()),
-      m_num_successors(0),
-      m_thread_id(thread_id),
-      m_prev(prev) {
-  }
-
-  virtual ~IO() {
-  }
-
-  uint64_t start_time() const {
-    return m_start_time;
-  }
-
-  io_set_t& dependencies() {
-    return m_dependencies;
-  }
-
-  const io_set_t& dependencies() const {
-    return m_dependencies;
-  }
-
-  void add_dependencies(const io_set_t& deps) {
-    io_set_t base(m_dependencies);
-    for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
-      ptr dep(*itr);
-      for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
-       base.insert(*itr2);
-      }
-    }
-    batch_unreachable_from(deps, base, &m_dependencies);
-  }
-
-  uint64_t num_completion_successors() const {
-    ptr c(m_completion.lock());
-    return c ? c->m_num_successors : 0;
-  }
-
-  void write_to(Ser& out, io_type iotype) const;
-
-  virtual void write_to(Ser& out) const = 0;
-
-  virtual bool is_completion() const {
-    return false;
-  }
-
-  void set_ionum(action_id_t ionum) {
-    m_ionum = ionum;
-  }
-
-  action_id_t ionum() const {
-    return m_ionum;
-  }
-
-  ptr prev() const {
-    return m_prev;
-  }
-
-  void set_num_successors(uint32_t n) {
-    m_num_successors = n;
-  }
-
-  uint32_t num_successors() const {
-    return m_num_successors;
-  }
-
-  void write_debug_base(ostream& out, string iotype);
-
-  virtual void write_debug(ostream& out) = 0;
-
-  // The result must be stored somewhere, or else m_completion will expire
-  ptr create_completion(uint64_t start_time, thread_id_t thread_id);
-
-private:
-  action_id_t m_ionum;
-  uint64_t m_start_time;
-  io_set_t m_dependencies;
-  boost::weak_ptr<IO> m_completion;
-  uint32_t m_num_successors;
-  thread_id_t m_thread_id;
-  ptr m_prev;
-};
-
-ostream& operator<<(ostream& out, IO::ptr io) {
-  io->write_debug(out);
-  return out;
-}
 
 class Thread {
 public:
@@ -197,413 +85,11 @@ private:
   uint64_t m_max_ts;
 };
 
-void IO::write_debug_base(ostream& out, string type) {
-  out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
-  bool first = true;
-  for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
-    if (first) {
-      first = false;
-    } else {
-      out << ", ";
-    }
-    out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
-  }
-  out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
-}
-
-class StartThreadIO : public IO {
-public:
-  StartThreadIO(action_id_t ionum,
-               uint64_t start_time,
-               thread_id_t thread_id)
-    : IO(ionum, start_time, thread_id, IO::ptr()) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_START_THREAD);
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "start thread");
-  }
-};
-
-class StopThreadIO : public IO {
-public:
-  StopThreadIO(action_id_t ionum,
-              uint64_t start_time,
-              thread_id_t thread_id)
-    : IO(ionum, start_time, thread_id, IO::ptr()) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_STOP_THREAD);
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "stop thread");
-  }
-};
-
-class ReadIO : public IO {
-public:
-  ReadIO(action_id_t ionum,
-        uint64_t start_time,
-        thread_id_t thread_id,
-        IO::ptr prev,
-        imagectx_id_t imagectx,
-        uint64_t offset,
-        uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
-      m_imagectx(imagectx),
-      m_offset(offset),
-      m_length(length) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_READ);
-    out.write_uint64_t(m_imagectx);
-    out.write_uint64_t(m_offset);
-    out.write_uint64_t(m_length);
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "read");
-    out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
-  }
-
-private:
-  imagectx_id_t m_imagectx;
-  uint64_t m_offset;
-  uint64_t m_length;
-};
-
-class WriteIO : public IO {
-public:
-  WriteIO(action_id_t ionum,
-         uint64_t start_time,
-         thread_id_t thread_id,
-         IO::ptr prev,
-         imagectx_id_t imagectx,
-         uint64_t offset,
-         uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
-      m_imagectx(imagectx),
-      m_offset(offset),
-      m_length(length) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_WRITE);
-    out.write_uint64_t(m_imagectx);
-    out.write_uint64_t(m_offset);
-    out.write_uint64_t(m_length);
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "write");
-    out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
-  }
-
-private:
-  imagectx_id_t m_imagectx;
-  uint64_t m_offset;
-  uint64_t m_length;
-};
-
-class AioReadIO : public IO {
-public:
-  AioReadIO(action_id_t ionum,
-           uint64_t start_time,
-           thread_id_t thread_id,
-           IO::ptr prev,
-           imagectx_id_t imagectx,
-           uint64_t offset,
-           uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
-      m_imagectx(imagectx),
-      m_offset(offset),
-      m_length(length) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_ASYNC_READ);
-    out.write_uint64_t(m_imagectx);
-    out.write_uint64_t(m_offset);
-    out.write_uint64_t(m_length);
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "aio read");
-    out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
-  }
-private:
-  imagectx_id_t m_imagectx;
-  uint64_t m_offset;
-  uint64_t m_length;
-};
-
-class AioWriteIO : public IO {
-public:
-  AioWriteIO(action_id_t ionum,
-            uint64_t start_time,
-            thread_id_t thread_id,
-            IO::ptr prev,
-            imagectx_id_t imagectx,
-            uint64_t offset,
-            uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
-      m_imagectx(imagectx),
-      m_offset(offset),
-      m_length(length) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_ASYNC_WRITE);
-    out.write_uint64_t(m_imagectx);
-    out.write_uint64_t(m_offset);
-    out.write_uint64_t(m_length);
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "aio write");
-    out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
-  }
-
-private:
-  imagectx_id_t m_imagectx;
-  uint64_t m_offset;
-  uint64_t m_length;
-};
-
-class OpenImageIO : public IO {
-public:
-  OpenImageIO(action_id_t ionum,
-             uint64_t start_time,
-             thread_id_t thread_id,
-             IO::ptr prev,
-             imagectx_id_t imagectx,
-             const string& name,
-             const string& snap_name,
-             bool readonly)
-    : IO(ionum, start_time, thread_id, prev),
-      m_imagectx(imagectx),
-      m_name(name),
-      m_snap_name(snap_name),
-      m_readonly(readonly) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_OPEN_IMAGE);
-    out.write_uint64_t(m_imagectx);
-    out.write_string(m_name);
-    out.write_string(m_snap_name);
-    out.write_bool(m_readonly);
-  }
-
-  imagectx_id_t imagectx() const {
-    return m_imagectx;
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "open image");
-    out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
-  }
-
-private:
-  imagectx_id_t m_imagectx;
-  string m_name;
-  string m_snap_name;
-  bool m_readonly;
-};
-
-class CloseImageIO : public IO {
-public:
-  CloseImageIO(action_id_t ionum,
-              uint64_t start_time,
-              thread_id_t thread_id,
-              IO::ptr prev,
-              imagectx_id_t imagectx)
-    : IO(ionum, start_time, thread_id, prev),
-      m_imagectx(imagectx) {
-  }
-
-  void write_to(Ser& out) const {
-    IO::write_to(out, IO_CLOSE_IMAGE);
-    out.write_uint64_t(m_imagectx);
-  }
-
-  imagectx_id_t imagectx() const {
-    return m_imagectx;
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "close image");
-    out << ", imagectx=" << m_imagectx;
-  }
-
-private:
-  imagectx_id_t m_imagectx;
-};
-
-class CompletionIO : public IO {
-public:
-  CompletionIO(action_id_t ionum,
-              uint64_t start_time,
-              thread_id_t thread_id)
-    : IO(ionum, start_time, thread_id, IO::ptr()) {
-  }
-
-  void write_to(Ser& out) const {
-  }
-
-  bool is_completion() const {
-    return true;
-  }
-
-  void write_debug(ostream& out) {
-    write_debug_base(out, "completion");
-  }
-};
-
-STATIC bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
-  return p1->start_time() < p2->start_time();
-}
-
-void IO::write_to(Ser& out, io_type iotype) const {
-  out.write_uint8_t(iotype);
-  out.write_uint32_t(m_ionum);
-  out.write_uint64_t(m_thread_id);
-  out.write_uint32_t(m_num_successors);
-  out.write_uint32_t(num_completion_successors());
-  out.write_uint32_t(m_dependencies.size());
-  vector<IO::ptr> deps;
-  for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
-    deps.push_back(*itr);
-  }
-  sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
-  for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
-    out.write_uint32_t((*itr)->m_ionum);
-    out.write_uint64_t(m_start_time - (*itr)->m_start_time);
-  }
-}
-
-IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
-  assert(!m_completion.lock());
-  IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
-  m_completion = completion;
-  completion->m_dependencies.insert(shared_from_this());
-  return completion;
-}
-
-STATIC uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
-  if (s.empty()) {
-    return 0;
-  }
-  return s.begin()->second->start_time();
-}
-
-STATIC uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
-  if (s.empty()) {
-    return 0;
-  }
-  map<action_id_t, IO::ptr>::const_iterator itr(s.end());
-  --itr;
-  return itr->second->start_time();
-}
-
-// TODO: Add unit tests
-// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
-STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
-  if (deps.empty()) {
-    return;
-  }
-
-  map<action_id_t, IO::ptr> searching_for;
-  for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
-    searching_for[(*itr)->ionum()] = *itr;
-  }
-
-  map<action_id_t, IO::ptr> boundary;
-  for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
-    boundary[(*itr)->ionum()] = *itr;
-  }
-
-  // The boundary horizon is the maximum timestamp of IOs in the boundary.
-  // This monotonically decreases, because dependencies (which are added to the set)
-  // have earlier timestamp than the dependent IOs (which were just removed from the set).
-  uint64_t boundary_horizon = max_time(boundary);
-
-  for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
-    if (boundary_horizon >= itr->second->start_time()) {
-      break;
-    }
-    unreachable->insert(itr->second);
-    searching_for.erase(itr++);
-  }
-  if (searching_for.empty()) {
-    return;
-  }
-
-  // The searching horizon is the minimum timestamp of IOs in the searching set.
-  // This monotonically increases, because elements are only removed from the set.
-  uint64_t searching_horizon = min_time(searching_for);
-
-  while (!boundary.empty()) {
-    // Take an IO from the end, which has the highest timestamp.
-    // This reduces the boundary horizon as early as possible,
-    // which means we can short cut as soon as possible.
-    map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
-    --b_itr;
-    boost::shared_ptr<IO> io(b_itr->second);
-    boundary.erase(b_itr);
-
-    for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
-      IO::ptr dep(*itr);
-      assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
-      io_map_t::iterator p = searching_for.find(dep->ionum());
-      if (p != searching_for.end()) {
-       searching_for.erase(p);
-       if (dep->start_time() == searching_horizon) {
-         searching_horizon = min_time(searching_for);
-         if (searching_horizon == 0) {
-           return;
-         }
-       }
-      }
-      boundary[dep->ionum()] = dep;
-    }
-
-    boundary_horizon = max_time(boundary);
-    if (boundary_horizon != 0) {
-      // Anything we're searching for that has a timestamp greater than the
-      // boundary horizon will never be found, since the boundary horizon
-      // falls monotonically.
-      for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
-       if (boundary_horizon >= itr->second->start_time()) {
-         break;
-       }
-       unreachable->insert(itr->second);
-       searching_for.erase(itr++);
-      }
-      searching_horizon = min_time(searching_for);
-      if (searching_horizon == 0) {
-       return;
-      }
-    }
-  }
-
-  // Anything we're still searching for has not been found.
-  for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
-    unreachable->insert(itr->second);
-  }
-}
-
-STATIC void usage(string prog) {
+static void usage(string prog) {
   cout << "Usage: " << prog << " [ --window <seconds> ] <trace-input> <replay-output>" << endl;
 }
 
-__attribute__((noreturn)) STATIC void usage_exit(string prog, string msg) {
+__attribute__((noreturn)) static void usage_exit(string prog, string msg) {
   cerr << msg << endl;
   usage(prog);
   exit(1);
index 6ba23f9ca08bfee9be8cb86caebeebf8ea6950eb..ba994921594e251c8fe709cb807a63e7a800f9bf 100644 (file)
@@ -583,6 +583,7 @@ unittest_rbd_replay_LDADD = $(LIBRBD) \
        $(LIBRADOS) \
        $(CEPH_GLOBAL) \
        librbd_replay.la \
+       librbd_replay_ios.la \
        $(UNITTEST_LDADD)
 unittest_rbd_replay_CXXFLAGS = $(UNITTEST_CXXFLAGS)
 check_PROGRAMS += unittest_rbd_replay
index d05074af231b34c5f5c1df90f5f73868648163a7..eb3c8e34543cc489c4b721f73a5555740df2725b 100644 (file)
 #include "gtest/gtest.h"
 #include <stdint.h>
 #include <boost/foreach.hpp>
+#include <cstdarg>
 #include "rbd_replay/Deser.hpp"
 #include "rbd_replay/ImageNameMap.hpp"
+#include "rbd_replay/ios.hpp"
 #include "rbd_replay/rbd_loc.hpp"
 #include "rbd_replay/Ser.hpp"
 
 
-using rbd_replay::ImageNameMap;
-using rbd_replay::rbd_loc;
+using namespace rbd_replay;
 
 std::ostream& operator<<(std::ostream& o, const rbd_loc& name) {
   return o << "('" << name.pool << "', '" << name.image << "', '" << name.snap << "')";
@@ -167,3 +168,58 @@ TEST(RBDReplay, rbd_loc_parse) {
   EXPECT_FALSE(m.parse("a/b/c"));
   EXPECT_FALSE(m.parse("a@b/c"));
 }
+
+static IO::ptr mkio(action_id_t ionum, ...) {
+  IO::ptr io(new StartThreadIO(ionum, ionum, 0));
+
+  va_list ap;
+  va_start(ap, ionum);
+  while (true) {
+    IO::ptr* dep = va_arg(ap, IO::ptr*);
+    if (!dep) {
+      break;
+    }
+    io->dependencies().insert(*dep);
+  }
+  va_end(ap);
+
+  return io;
+}
+
+TEST(RBDReplay, batch_unreachable_from) {
+  io_set_t deps;
+  io_set_t base;
+  io_set_t unreachable;
+  IO::ptr io1(mkio(1, NULL));
+  IO::ptr io2(mkio(2, &io1, NULL));
+  IO::ptr io3(mkio(3, &io2, NULL));
+  IO::ptr io4(mkio(4, &io1, NULL));
+  IO::ptr io5(mkio(5, &io2, &io4, NULL));
+  IO::ptr io6(mkio(6, &io3, &io5, NULL));
+  IO::ptr io7(mkio(7, &io4, NULL));
+  IO::ptr io8(mkio(8, &io5, &io7, NULL));
+  IO::ptr io9(mkio(9, &io6, &io8, NULL));
+  // 1 (deps) <-- 2 (deps) <-- 3 (deps)
+  // ^            ^            ^
+  // |            |            |
+  // 4 <--------- 5 (base) <-- 6 (deps)
+  // ^            ^            ^
+  // |            |            |
+  // 7 <--------- 8 <--------- 9
+  deps.insert(io1);
+  deps.insert(io2);
+  deps.insert(io3);
+  deps.insert(io6);
+  base.insert(io5);
+  // Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
+  batch_unreachable_from(deps, base, &unreachable);
+  EXPECT_EQ(0, unreachable.count(io1));
+  EXPECT_EQ(0, unreachable.count(io2));
+  EXPECT_EQ(1, unreachable.count(io3));
+  EXPECT_EQ(0, unreachable.count(io4));
+  EXPECT_EQ(0, unreachable.count(io5));
+  EXPECT_EQ(1, unreachable.count(io6));
+  EXPECT_EQ(0, unreachable.count(io7));
+  EXPECT_EQ(0, unreachable.count(io8));
+  EXPECT_EQ(0, unreachable.count(io9));
+}