]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-replay-prep: simplify IO dependency calculation
authorJason Dillaman <dillaman@redhat.com>
Wed, 7 Oct 2015 18:56:22 +0000 (14:56 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 7 Oct 2015 20:34:55 +0000 (16:34 -0400)
Only track read-after-write and write-after-write IO dependencies
via the associated write completions.  All IO events after a write
completion are considered to be dependent and can be pruned down
to at most the number of concurrent IOs.  This reduces the prep
time from a simple 'rbd bench-write' from over 4 hrs down to seconds.

Fixes: #13378, #13384
Backport: hammer

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/rbd_replay/actions.cc
src/rbd_replay/actions.hpp
src/rbd_replay/ios.cc
src/rbd_replay/ios.hpp
src/rbd_replay/rbd-replay-prep.cc

index 2bcdfb194ec608031668082372513674d0f8bea8..0c327b653edf4e8eaf6f260069065f7b965e1eb9 100644 (file)
@@ -25,13 +25,9 @@ using namespace std;
 
 Action::Action(action_id_t id,
                thread_id_t thread_id,
-               int num_successors,
-               int num_completion_successors,
                std::vector<dependency_d> &predecessors)
   : m_id(id),
     m_thread_id(thread_id),
-    m_num_successors(num_successors),
-    m_num_completion_successors(num_completion_successors),
     m_predecessors(predecessors) {
     }
 
@@ -45,8 +41,8 @@ Action::ptr Action::read_from(Deser &d) {
   }
   uint32_t ionum = d.read_uint32_t();
   uint64_t thread_id = d.read_uint64_t();
-  uint32_t num_successors = d.read_uint32_t();
-  uint32_t num_completion_successors = d.read_uint32_t();
+  d.read_uint32_t(); // unused
+  d.read_uint32_t(); // unused
   uint32_t num_dependencies = d.read_uint32_t();
   vector<dependency_d> deps;
   for (unsigned int i = 0; i < num_dependencies; i++) {
@@ -54,7 +50,7 @@ Action::ptr Action::read_from(Deser &d) {
     uint64_t time_delta = d.read_uint64_t();
     deps.push_back(dependency_d(dep_id, time_delta));
   }
-  DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps);
+  DummyAction dummy(ionum, thread_id, deps);
   switch (type) {
   case IO_START_THREAD:
     return StartThreadAction::read_from(dummy, d);
index 068e4dc413944ee378a48d067b8f5ae04484bebb..e9522dbf99fcdbdb53dac2b771203e673375c274 100644 (file)
@@ -133,8 +133,6 @@ public:
 
   Action(action_id_t id,
         thread_id_t thread_id,
-        int num_successors,
-        int num_completion_successors,
         std::vector<dependency_d> &predecessors);
 
   virtual ~Action();
@@ -176,8 +174,6 @@ private:
 
   const action_id_t m_id;
   const thread_id_t m_thread_id;
-  const int m_num_successors;
-  const int m_num_completion_successors;
   const std::vector<dependency_d> m_predecessors;
 };
 
@@ -194,10 +190,8 @@ class DummyAction : public Action {
 public:
   DummyAction(action_id_t id,
              thread_id_t thread_id,
-             int num_successors,
-             int num_completion_successors,
              std::vector<dependency_d> &predecessors)
-    : Action(id, thread_id, num_successors, num_completion_successors, predecessors) {
+    : Action(id, thread_id, predecessors) {
   }
 
   void perform(ActionCtx &ctx) {
index ccc560f85c814bd24be844f0902f4ce46f919632..21a68019ececd82b0999bea38ed1b27add4363a6 100644 (file)
@@ -24,33 +24,6 @@ bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
   return p1->start_time() < p2->start_time();
 }
 
-static uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
-  if (s.empty()) {
-    return 0;
-  }
-  return s.begin()->second->start_time();
-}
-
-static uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
-  if (s.empty()) {
-    return 0;
-  }
-  map<action_id_t, IO::ptr>::const_iterator itr(s.end());
-  --itr;
-  return itr->second->start_time();
-}
-
-void IO::add_dependencies(const io_set_t& deps) {
-  io_set_t base(m_dependencies);
-  for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
-    ptr dep(*itr);
-    for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
-      base.insert(*itr2);
-    }
-  }
-  batch_unreachable_from(deps, base, &m_dependencies);
-}
-
 void IO::write_debug_base(ostream& out, string type) const {
   out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
   bool first = true;
@@ -62,16 +35,17 @@ void IO::write_debug_base(ostream& out, string type) const {
     }
     out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
   }
-  out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
+  out << "}";
 }
 
 
 void IO::write_to(Ser& out, io_type iotype) const {
+  // TODO break compatibility now to add version (and yank unused fields)?
   out.write_uint8_t(iotype);
   out.write_uint32_t(m_ionum);
   out.write_uint64_t(m_thread_id);
-  out.write_uint32_t(m_num_successors);
-  out.write_uint32_t(num_completion_successors());
+  out.write_uint32_t(0);
+  out.write_uint32_t(0);
   out.write_uint32_t(m_dependencies.size());
   vector<IO::ptr> deps;
   for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
@@ -84,102 +58,6 @@ void IO::write_to(Ser& out, io_type iotype) const {
   }
 }
 
-IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
-  assert(!m_completion.lock());
-  IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
-  m_completion = completion;
-  completion->m_dependencies.insert(shared_from_this());
-  return completion;
-}
-
-
-// TODO: Add unit tests
-// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
-void rbd_replay::batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
-  if (deps.empty()) {
-    return;
-  }
-
-  map<action_id_t, IO::ptr> searching_for;
-  for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
-    searching_for[(*itr)->ionum()] = *itr;
-  }
-
-  map<action_id_t, IO::ptr> boundary;
-  for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
-    boundary[(*itr)->ionum()] = *itr;
-  }
-
-  // The boundary horizon is the maximum timestamp of IOs in the boundary.
-  // This monotonically decreases, because dependencies (which are added to the set)
-  // have earlier timestamp than the dependent IOs (which were just removed from the set).
-  uint64_t boundary_horizon = max_time(boundary);
-
-  for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
-    if (boundary_horizon >= itr->second->start_time()) {
-      break;
-    }
-    unreachable->insert(itr->second);
-    searching_for.erase(itr++);
-  }
-  if (searching_for.empty()) {
-    return;
-  }
-
-  // The searching horizon is the minimum timestamp of IOs in the searching set.
-  // This monotonically increases, because elements are only removed from the set.
-  uint64_t searching_horizon = min_time(searching_for);
-
-  while (!boundary.empty()) {
-    // Take an IO from the end, which has the highest timestamp.
-    // This reduces the boundary horizon as early as possible,
-    // which means we can short cut as soon as possible.
-    map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
-    --b_itr;
-    boost::shared_ptr<IO> io(b_itr->second);
-    boundary.erase(b_itr);
-
-    for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
-      IO::ptr dep(*itr);
-      assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
-      io_map_t::iterator p = searching_for.find(dep->ionum());
-      if (p != searching_for.end()) {
-       searching_for.erase(p);
-       if (dep->start_time() == searching_horizon) {
-         searching_horizon = min_time(searching_for);
-         if (searching_horizon == 0) {
-           return;
-         }
-       }
-      }
-      boundary[dep->ionum()] = dep;
-    }
-
-    boundary_horizon = max_time(boundary);
-    if (boundary_horizon != 0) {
-      // Anything we're searching for that has a timestamp greater than the
-      // boundary horizon will never be found, since the boundary horizon
-      // falls monotonically.
-      for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
-       if (boundary_horizon >= itr->second->start_time()) {
-         break;
-       }
-       unreachable->insert(itr->second);
-       searching_for.erase(itr++);
-      }
-      searching_horizon = min_time(searching_for);
-      if (searching_horizon == 0) {
-       return;
-      }
-    }
-  }
-
-  // Anything we're still searching for has not been found.
-  for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
-    unreachable->insert(itr->second);
-  }
-}
-
 ostream& operator<<(ostream& out, IO::ptr io) {
   io->write_debug(out);
   return out;
index 5bebcd71731d58a3a4434eade548bd28bc6df1bc..7d4153a82e37b60a28318ce17edd909852225068 100644 (file)
@@ -35,23 +35,6 @@ typedef std::set<boost::shared_ptr<IO> > io_set_t;
 
 typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t;
 
-/**
-   Calculates reachability of IOs in the dependency graph.
-   All IOs in \c deps which are not transitive dependencies of anything in \c base
-   is added to \c unreachable.
-   In other words, for every IO \c x in \c deps: if nothing in \c base depends on \c x,
-   and nothing in \c base has dependencies that depend on \c x, etc.,
-   then \c x is added to \c unreachable.
-   Note that \c unreachable is \em not cleared, so the same set can be used across multiple
-   calls to collect dependencies.
-   @param[in] deps IOs to search for
-   @param[in] base root set of IOs to search from
-   @param[out] unreachable collects unreachable IOs
-   @related IO
-*/
-void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
-
-
 /**
    Used by rbd-replay-prep for processing the raw trace.
    Corresponds to the Action class, except that Actions are executed by rbd-replay,
@@ -67,19 +50,16 @@ public:
      @param ionum ID of this %IO
      @param start_time time the %IO started, in nanoseconds
      @param thread_id ID of the thread that issued the %IO
-     @param prev previously issued %IO on the same thread.  NULL for the first %IO on a thread.
    */
   IO(action_id_t ionum,
      uint64_t start_time,
      thread_id_t thread_id,
-     ptr prev)
+     const io_set_t& deps)
     : m_ionum(ionum),
       m_start_time(start_time),
-      m_dependencies(io_set_t()),
-      m_completion(weak_ptr()),
-      m_num_successors(0),
+      m_dependencies(deps),
       m_thread_id(thread_id),
-      m_prev(prev) {
+      m_completed(false) {
   }
 
   virtual ~IO() {
@@ -97,22 +77,8 @@ public:
     return m_dependencies;
   }
 
-  void add_dependencies(const io_set_t& deps);
-
-  /**
-     Returns the completion's number of successors, or 0 if the %IO does not have a completion.
-   */
-  uint64_t num_completion_successors() const {
-    ptr c(m_completion.lock());
-    return c ? c->m_num_successors : 0;
-  }
-
   virtual void write_to(Ser& out) const = 0;
 
-  virtual bool is_completion() const {
-    return false;
-  }
-
   void set_ionum(action_id_t ionum) {
     m_ionum = ionum;
   }
@@ -121,27 +87,8 @@ public:
     return m_ionum;
   }
 
-  ptr prev() const {
-    return m_prev;
-  }
-
-  void set_num_successors(uint32_t n) {
-    m_num_successors = n;
-  }
-
-  uint32_t num_successors() const {
-    return m_num_successors;
-  }
-
   virtual void write_debug(std::ostream& out) const = 0;
 
-  /**
-     Creates the completion for this IO.
-     This may only be called once per IO, and may not be called on completion IOs.
-     The completion must be stored, or else m_completion will expire.
-   */
-  ptr create_completion(uint64_t start_time, thread_id_t thread_id);
-
 protected:
   void write_to(Ser& out, io_type iotype) const;
 
@@ -151,10 +98,8 @@ private:
   action_id_t m_ionum;
   uint64_t m_start_time;
   io_set_t m_dependencies;
-  boost::weak_ptr<IO> m_completion;
-  uint32_t m_num_successors;
   thread_id_t m_thread_id;
-  ptr m_prev;
+  bool m_completed;
 };
 
 /// Used for dumping debug info.
@@ -167,7 +112,7 @@ public:
   StartThreadIO(action_id_t ionum,
                uint64_t start_time,
                thread_id_t thread_id)
-    : IO(ionum, start_time, thread_id, IO::ptr()) {
+    : IO(ionum, start_time, thread_id, io_set_t()) {
   }
 
   void write_to(Ser& out) const;
@@ -179,8 +124,9 @@ class StopThreadIO : public IO {
 public:
   StopThreadIO(action_id_t ionum,
               uint64_t start_time,
-              thread_id_t thread_id)
-    : IO(ionum, start_time, thread_id, IO::ptr()) {
+              thread_id_t thread_id,
+               const io_set_t& deps)
+    : IO(ionum, start_time, thread_id, deps) {
   }
 
   void write_to(Ser& out) const;
@@ -193,11 +139,11 @@ public:
   ReadIO(action_id_t ionum,
         uint64_t start_time,
         thread_id_t thread_id,
-        IO::ptr prev,
+         const io_set_t& deps,
         imagectx_id_t imagectx,
         uint64_t offset,
         uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
+    : IO(ionum, start_time, thread_id, deps),
       m_imagectx(imagectx),
       m_offset(offset),
       m_length(length) {
@@ -218,11 +164,11 @@ public:
   WriteIO(action_id_t ionum,
          uint64_t start_time,
          thread_id_t thread_id,
-         IO::ptr prev,
+          const io_set_t& deps,
          imagectx_id_t imagectx,
          uint64_t offset,
          uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
+    : IO(ionum, start_time, thread_id, deps),
       m_imagectx(imagectx),
       m_offset(offset),
       m_length(length) {
@@ -243,11 +189,11 @@ public:
   AioReadIO(action_id_t ionum,
            uint64_t start_time,
            thread_id_t thread_id,
-           IO::ptr prev,
+            const io_set_t& deps,
            imagectx_id_t imagectx,
            uint64_t offset,
            uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
+    : IO(ionum, start_time, thread_id, deps),
       m_imagectx(imagectx),
       m_offset(offset),
       m_length(length) {
@@ -268,11 +214,11 @@ public:
   AioWriteIO(action_id_t ionum,
             uint64_t start_time,
             thread_id_t thread_id,
-            IO::ptr prev,
+             const io_set_t& deps,
             imagectx_id_t imagectx,
             uint64_t offset,
             uint64_t length)
-    : IO(ionum, start_time, thread_id, prev),
+    : IO(ionum, start_time, thread_id, deps),
       m_imagectx(imagectx),
       m_offset(offset),
       m_length(length) {
@@ -293,12 +239,12 @@ public:
   OpenImageIO(action_id_t ionum,
              uint64_t start_time,
              thread_id_t thread_id,
-             IO::ptr prev,
+              const io_set_t& deps,
              imagectx_id_t imagectx,
              const std::string& name,
              const std::string& snap_name,
              bool readonly)
-    : IO(ionum, start_time, thread_id, prev),
+    : IO(ionum, start_time, thread_id, deps),
       m_imagectx(imagectx),
       m_name(name),
       m_snap_name(snap_name),
@@ -325,9 +271,9 @@ public:
   CloseImageIO(action_id_t ionum,
               uint64_t start_time,
               thread_id_t thread_id,
-              IO::ptr prev,
+               const io_set_t& deps,
               imagectx_id_t imagectx)
-    : IO(ionum, start_time, thread_id, prev),
+    : IO(ionum, start_time, thread_id, deps),
       m_imagectx(imagectx) {
   }
 
@@ -343,26 +289,6 @@ private:
   imagectx_id_t m_imagectx;
 };
 
-class CompletionIO : public IO {
-public:
-  CompletionIO(action_id_t ionum,
-              uint64_t start_time,
-              thread_id_t thread_id)
-    : IO(ionum, start_time, thread_id, IO::ptr()) {
-  }
-
-  void write_to(Ser& out) const {
-  }
-
-  bool is_completion() const {
-    return true;
-  }
-
-  void write_debug(std::ostream& out) const {
-    write_debug_base(out, "completion");
-  }
-};
-
 /// @related IO
 bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
 
index e0998f5b2f3724f42e7db26929dc08678bd2b350..0a108759008f8329300a9d481de55b8edfdd7ec7 100644 (file)
@@ -22,6 +22,7 @@
 #include <string>
 #include <assert.h>
 #include <fstream>
+#include <set>
 #include <boost/thread/thread.hpp>
 #include "ios.hpp"
 
@@ -42,7 +43,6 @@ public:
         uint64_t window)
     : m_id(id),
       m_window(window),
-      m_pending_io(IO::ptr()),
       m_latest_io(IO::ptr()),
       m_max_ts(0) {
   }
@@ -57,35 +57,26 @@ public:
     return m_max_ts;
   }
 
-  void issued_io(IO::ptr io, const map<thread_id_t, ptr>& threads) {
+  void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) {
     assert(io);
-    io_set_t latest_ios;
-    for (map<thread_id_t, ptr>::const_iterator itr = threads.begin(), end = threads.end(); itr != end; ++itr) {
-      assertf(itr->second, "id = %ld", itr->first);
-      ptr thread(itr->second);
-      if (thread->m_latest_io) {
-       if (thread->m_latest_io->start_time() + m_window > io->start_time()) {
-         latest_ios.insert(thread->m_latest_io);
-       }
-      }
+    if (m_latest_io.get() != NULL) {
+      latest_ios->erase(m_latest_io);
     }
-    io->add_dependencies(latest_ios);
     m_latest_io = io;
-    m_pending_io = io;
+    latest_ios->insert(io);
   }
 
   thread_id_t id() const {
     return m_id;
   }
 
-  IO::ptr pending_io() {
-    return m_pending_io;
+  IO::ptr latest_io() {
+    return m_latest_io;
   }
 
 private:
   thread_id_t m_id;
   uint64_t m_window;
-  IO::ptr m_pending_io;
   IO::ptr m_latest_io;
   uint64_t m_max_ts;
 };
@@ -137,14 +128,8 @@ class Processor {
 public:
   Processor()
     : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
-      m_threads(),
       m_io_count(0),
-      m_recent_completions(io_set_t()),
-      m_open_images(set<imagectx_id_t>()),
-      m_ios(vector<IO::ptr>()),
-      m_pending_ios(map<uint64_t, IO::ptr>()),
-      m_anonymize(false),
-      m_anonymized_images(map<string, AnonymizedImage>()) {
+      m_anonymize(false) {
   }
 
   void run(vector<string> args) {
@@ -160,8 +145,6 @@ public:
        }
        m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
       } else if (arg.find("--window=") == 0) {
-       // TODO: test
-       printf("Arg: '%s'\n", arg.c_str() + sizeof("--window="));
        m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
       } else if (arg == "--anonymize") {
        m_anonymize = true;
@@ -231,25 +214,6 @@ public:
 
     insert_thread_stops();
 
-    for (vector<IO::ptr>::const_iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) {
-      IO::ptr io(*itr);
-      IO::ptr prev(io->prev());
-      if (prev) {
-       // TODO: explain when prev is and isn't a dep
-       io_set_t::iterator depitr = io->dependencies().find(prev);
-       if (depitr != io->dependencies().end()) {
-         io->dependencies().erase(depitr);
-       }
-      }
-      if (io->is_completion()) {
-       io->dependencies().clear();
-      }
-      for (io_set_t::const_iterator depitr = io->dependencies().begin(); depitr != io->dependencies().end(); ++depitr) {
-       IO::ptr dep(*depitr);
-       dep->set_num_successors(dep->num_successors() + 1);
-      }
-    }
-
     ofstream myfile;
     myfile.open(output_file_name.c_str(), ios::out | ios::binary);
     Ser ser(myfile);
@@ -274,16 +238,10 @@ private:
        }
        if (io->start_time() > thread->max_ts()) {
          ionum = io->ionum();
-         if (ionum & 1) {
-           ionum++;
-         }
          break;
        }
       }
       if (ionum == none) {
-       if (maxIONum & 1) {
-         maxIONum--;
-       }
        ionum = maxIONum + 2;
       }
       for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) {
@@ -292,7 +250,9 @@ private:
          io->set_ionum(io->ionum() + 2);
        }
       }
-      IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(), thread->id()));
+      IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(),
+                                              thread->id(),
+                                              m_recent_completions));
       vector<IO::ptr>::iterator insertion_point = lower_bound(m_ios.begin(), m_ios.end(), stop_thread_io, compare_io_ptrs_by_start_time);
       m_ios.insert(insertion_point, stop_thread_io);
     }
@@ -366,55 +326,51 @@ private:
       const struct bt_definition *m_scope;
     } fields(evt, scope_fields);
 
-    if (strcmp(event_name, "librbd:read_enter") == 0 ||
-        strcmp(event_name, "librbd:read2_enter") == 0) {
-      string name(fields.string("name"));
-      string snap_name(fields.string("snap_name"));
-      bool readonly = fields.int64("read_only");
-      imagectx_id_t imagectx = fields.uint64("imagectx");
-      uint64_t offset = fields.uint64("offset");
-      uint64_t length = fields.uint64("length");
-      require_image(ts, thread, imagectx, name, snap_name, readonly);
-      action_id_t ionum = next_id();
-      IO::ptr io(new ReadIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
-      io->add_dependencies(m_recent_completions);
-      thread->issued_io(io, m_threads);
-      m_ios.push_back(io);
-    } else if (strcmp(event_name, "librbd:open_image_enter") == 0) {
+    if (strcmp(event_name, "librbd:open_image_enter") == 0) {
       string name(fields.string("name"));
       string snap_name(fields.string("snap_name"));
-      bool readonly = fields.int64("read_only");
+      bool readonly = fields.uint64("read_only");
       imagectx_id_t imagectx = fields.uint64("imagectx");
       action_id_t ionum = next_id();
       pair<string, string> aname(map_image_snap(name, snap_name));
-      IO::ptr io(new OpenImageIO(ionum, ts, threadID, thread->pending_io(), imagectx, aname.first, aname.second, readonly));
-      io->add_dependencies(m_recent_completions);
-      thread->issued_io(io, m_threads);
+      IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions,
+                                 imagectx, aname.first, aname.second,
+                                 readonly));
+      thread->issued_io(io, &m_latest_ios);
       m_ios.push_back(io);
     } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
-      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
-      m_ios.push_back(completionIO);
-      boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->pending_io()));
+      completed(thread->latest_io());
+      boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
       assert(io);
       m_open_images.insert(io->imagectx());
     } else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
       imagectx_id_t imagectx = fields.uint64("imagectx");
       action_id_t ionum = next_id();
-      IO::ptr io(new CloseImageIO(ionum, ts, threadID, thread->pending_io(), imagectx));
-      io->add_dependencies(m_recent_completions);
-      thread->issued_io(io, m_threads);
-      m_ios.push_back(thread->pending_io());
+      IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
+                                  imagectx));
+      thread->issued_io(io, &m_latest_ios);
+      m_ios.push_back(thread->latest_io());
     } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
-      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
-      m_ios.push_back(completionIO);
-      completed(completionIO);
-      boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->pending_io()));
+      completed(thread->latest_io());
+      boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
       assert(io);
       m_open_images.erase(io->imagectx());
+    } else if (strcmp(event_name, "librbd:read_enter") == 0 ||
+               strcmp(event_name, "librbd:read2_enter") == 0) {
+      string name(fields.string("name"));
+      string snap_name(fields.string("snap_name"));
+      bool readonly = fields.int64("read_only");
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      uint64_t offset = fields.uint64("offset");
+      uint64_t length = fields.uint64("length");
+      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      action_id_t ionum = next_id();
+      IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
+                            offset, length));
+      thread->issued_io(io, &m_latest_ios);
+      m_ios.push_back(io);
     } else if (strcmp(event_name, "librbd:read_exit") == 0) {
-      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
-      m_ios.push_back(completionIO);
-      completed(completionIO);
+      completed(thread->latest_io());
     } else if (strcmp(event_name, "librbd:write_enter") == 0 ||
                strcmp(event_name, "librbd:write2_enter") == 0) {
       string name(fields.string("name"));
@@ -425,14 +381,12 @@ private:
       imagectx_id_t imagectx = fields.uint64("imagectx");
       require_image(ts, thread, imagectx, name, snap_name, readonly);
       action_id_t ionum = next_id();
-      IO::ptr io(new WriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
-      io->add_dependencies(m_recent_completions);
-      thread->issued_io(io, m_threads);
+      IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
+                             imagectx, offset, length));
+      thread->issued_io(io, &m_latest_ios);
       m_ios.push_back(io);
     } else if (strcmp(event_name, "librbd:write_exit") == 0) {
-      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
-      m_ios.push_back(completionIO);
-      completed(completionIO);
+      completed(thread->latest_io());
     } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
                strcmp(event_name, "librbd:aio_read2_enter") == 0) {
       string name(fields.string("name"));
@@ -444,10 +398,10 @@ private:
       uint64_t length = fields.uint64("length");
       require_image(ts, thread, imagectx, name, snap_name, readonly);
       action_id_t ionum = next_id();
-      IO::ptr io(new AioReadIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
-      io->add_dependencies(m_recent_completions);
+      IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
+                               imagectx, offset, length));
       m_ios.push_back(io);
-      thread->issued_io(io, m_threads);
+      thread->issued_io(io, &m_latest_ios);
       m_pending_ios[completion] = io;
     } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
                strcmp(event_name, "librbd:aio_write2_enter") == 0) {
@@ -460,9 +414,9 @@ private:
       imagectx_id_t imagectx = fields.uint64("imagectx");
       require_image(ts, thread, imagectx, name, snap_name, readonly);
       action_id_t ionum = next_id();
-      IO::ptr io(new AioWriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
-      io->add_dependencies(m_recent_completions);
-      thread->issued_io(io, m_threads);
+      IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
+                                imagectx, offset, length));
+      thread->issued_io(io, &m_latest_ios);
       m_ios.push_back(io);
       m_pending_ios[completion] = io;
     } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
@@ -471,9 +425,7 @@ private:
       if (itr != m_pending_ios.end()) {
        IO::ptr completedIO(itr->second);
        m_pending_ios.erase(itr);
-       IO::ptr completionIO(completedIO->create_completion(ts, threadID));
-       m_ios.push_back(completionIO);
-       completed(completionIO);
+        completed(completedIO);
       }
     }
 
@@ -487,9 +439,14 @@ private:
   }
 
   void completed(IO::ptr io) {
-    uint64_t limit = io->start_time() < m_window ? 0 : io->start_time() - m_window;
-    for (io_set_t::iterator itr = m_recent_completions.begin(); itr != m_recent_completions.end(); ) {
-      if ((*itr)->start_time() < limit) {
+    uint64_t limit = (io->start_time() < m_window ?
+      0 : io->start_time() - m_window);
+    for (io_set_t::iterator itr = m_recent_completions.begin();
+         itr != m_recent_completions.end(); ) {
+      IO::ptr recent_comp(*itr);
+      if ((recent_comp->start_time() < limit ||
+           io->dependencies().count(recent_comp) != 0) &&
+          m_latest_ios.count(recent_comp) == 0) {
        m_recent_completions.erase(itr++);
       } else {
        ++itr;
@@ -521,13 +478,12 @@ private:
     }
     action_id_t ionum = next_id();
     pair<string, string> aname(map_image_snap(name, snap_name));
-    IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), thread->pending_io(), imagectx, aname.first, aname.second, readonly));
-    io->add_dependencies(m_recent_completions);
-    thread->issued_io(io, m_threads);
+    IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(),
+                               m_recent_completions, imagectx, aname.first,
+                               aname.second, readonly));
+    thread->issued_io(io, &m_latest_ios);
     m_ios.push_back(io);
-    IO::ptr completionIO(io->create_completion(ts - 1, thread->id()));
-    m_ios.push_back(completionIO);
-    completed(completionIO);
+    completed(io);
     m_open_images.insert(imagectx);
   }
 
@@ -540,6 +496,7 @@ private:
 
   // keyed by completion
   map<uint64_t, IO::ptr> m_pending_ios;
+  std::set<IO::ptr> m_latest_ios;
 
   bool m_anonymize;
   map<string, AnonymizedImage> m_anonymized_images;