]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-replay: Convert prep-for-replay.py to rbd-replay-prep.cc
authorAdam Crume <adamcrume@gmail.com>
Thu, 7 Aug 2014 20:38:19 +0000 (13:38 -0700)
committerSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 17:57:33 +0000 (10:57 -0700)
Signed-off-by: Adam Crume <adamcrume@gmail.com>
examples/rbd-replay/run-prep-for-replay [deleted file]
examples/rbd-replay/run-rbd-replay-prep [new file with mode: 0755]
src/.gitignore
src/rbd_replay/Makefile.am
src/rbd_replay/Ser.cc [new file with mode: 0644]
src/rbd_replay/Ser.hpp [new file with mode: 0644]
src/rbd_replay/actions.cc
src/rbd_replay/actions.hpp
src/rbd_replay/prep-for-replay.py [deleted file]
src/rbd_replay/rbd-replay-prep.cc [new file with mode: 0644]

diff --git a/examples/rbd-replay/run-prep-for-replay b/examples/rbd-replay/run-prep-for-replay
deleted file mode 100755 (executable)
index 394caf8..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/bash
-
-PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit replay.bin
diff --git a/examples/rbd-replay/run-rbd-replay-prep b/examples/rbd-replay/run-rbd-replay-prep
new file mode 100755 (executable)
index 0000000..28f4876
--- /dev/null
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+../../src/rbd-replay-prep traces/ust/uid/10002/64-bit replay.bin
index e7c09457d5f9d1dbe868fc8a56c4cdf7753eefdb..2482e1a8a280473b0a7a21d9b237cea090d71179 100644 (file)
@@ -67,6 +67,7 @@ Makefile
 /rbd
 /rbd-fuse
 /rbd-replay
+/rbd-replay-prep
 /rest-bench
 /sample.fetch_config
 /TAGS
index 6e85b40537b63cb02e9f9199dac9ec640a13e665..cd004b67392523f4c8811f7902d0da0752995fda 100644 (file)
@@ -4,7 +4,8 @@ librbd_replay_la_SOURCES = rbd_replay/actions.cc \
        rbd_replay/ImageNameMap.cc \
        rbd_replay/PendingIO.cc \
        rbd_replay/rbd_loc.cc \
-       rbd_replay/Replayer.cc
+       rbd_replay/Replayer.cc \
+       rbd_replay/Ser.cc
 librbd_replay_la_LIBADD = $(LIBRBD) \
        $(LIBRADOS) \
        $(CEPH_GLOBAL)
@@ -16,7 +17,9 @@ noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
        rbd_replay/PendingIO.hpp \
        rbd_replay/rbd_loc.hpp \
        rbd_replay/rbd_replay_debug.hpp \
-       rbd_replay/Replayer.hpp
+       rbd_replay/Replayer.hpp \
+       rbd_replay/Ser.hpp
+
 
 rbd_replay_SOURCES = rbd_replay/rbd-replay.cc
 rbd_replay_LDADD = $(LIBRBD) \
@@ -27,3 +30,14 @@ rbd_replay_LDADD = $(LIBRBD) \
 if LINUX
 bin_PROGRAMS += rbd-replay
 endif #LINUX
+
+# TODO: See if we need any new dependencies
+rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
+rbd_replay_prep_LDADD = $(LIBRBD) \
+       $(LIBRADOS) \
+       $(CEPH_GLOBAL) \
+       librbd_replay.la \
+       -lbabeltrace \
+       -lbabeltrace-ctf \
+       -lboost_date_time
+bin_PROGRAMS += rbd-replay-prep
diff --git a/src/rbd_replay/Ser.cc b/src/rbd_replay/Ser.cc
new file mode 100644 (file)
index 0000000..97a63cd
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Ser.hpp"
+#include <arpa/inet.h>
+#include <cstdlib>
+#include <endian.h>
+
+
+rbd_replay::Ser::Ser(std::ostream &out)
+  : m_out(out) {
+}
+
+void rbd_replay::Ser::write_uint8_t(uint8_t data) {
+  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
+}
+
+void rbd_replay::Ser::write_uint16_t(uint16_t data) {
+  data = htons(data);
+  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
+}
+
+void rbd_replay::Ser::write_uint32_t(uint32_t data) {
+  data = htonl(data);
+  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
+}
+
+void rbd_replay::Ser::write_uint64_t(uint64_t data) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  data = (static_cast<uint64_t>(htonl(data)) << 32 | htonl(data >> 32));
+#endif
+  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
+}
+
+void rbd_replay::Ser::write_string(const std::string& data) {
+  write_uint32_t(data.length());
+  m_out.write(data.data(), data.length());
+}
+
+void rbd_replay::Ser::write_bool(bool data) {
+  write_uint8_t(data ? 1 : 0);
+}
diff --git a/src/rbd_replay/Ser.hpp b/src/rbd_replay/Ser.hpp
new file mode 100644 (file)
index 0000000..2bada8f
--- /dev/null
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_SER_HPP
+#define _INCLUDED_RBD_REPLAY_SER_HPP
+
+#include <iostream>
+#include <stdint.h>
+
+namespace rbd_replay {
+
+class Ser {
+public:
+  Ser(std::ostream &out);
+
+  void write_uint8_t(uint8_t);
+
+  void write_uint16_t(uint16_t);
+
+  void write_uint32_t(uint32_t);
+
+  void write_uint64_t(uint64_t);
+
+  void write_string(const std::string&);
+
+  void write_bool(bool b);
+
+private:
+  std::ostream &m_out;
+};
+
+}
+
+#endif
index d9e143dcf88c9a4f8979b4f3c39cd5b8d0688ac5..2e9a4cb27a62e0dd825da98a95d638b22a5da4a4 100644 (file)
@@ -56,21 +56,21 @@ Action::ptr Action::read_from(Deser &d) {
   }
   DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps);
   switch (type) {
-  case 0:
+  case IO_START_THREAD:
     return StartThreadAction::read_from(dummy, d);
-  case 1:
+  case IO_STOP_THREAD:
     return StopThreadAction::read_from(dummy, d);
-  case 2:
+  case IO_READ:
     return ReadAction::read_from(dummy, d);
-  case 3:
+  case IO_WRITE:
     return WriteAction::read_from(dummy, d);
-  case 4:
+  case IO_ASYNC_READ:
     return AioReadAction::read_from(dummy, d);
-  case 5:
+  case IO_ASYNC_WRITE:
     return AioWriteAction::read_from(dummy, d);
-  case 6:
+  case IO_OPEN_IMAGE:
     return OpenImageAction::read_from(dummy, d);
-  case 7:
+  case IO_CLOSE_IMAGE:
     return CloseImageAction::read_from(dummy, d);
   default:
     cerr << "Invalid action type: " << type << std::endl;
index fdc9eaa517483e5d94e08d861f4d1e5299f5e3e1..5d3f4dc4cd45ba952f95f0340796e1e632afcf03 100644 (file)
@@ -40,6 +40,18 @@ struct dependency_d {
   }
 };
 
+// These are written to files, so don't change existing assignments.
+enum io_type {
+  IO_START_THREAD,
+  IO_STOP_THREAD,
+  IO_READ,
+  IO_WRITE,
+  IO_ASYNC_READ,
+  IO_ASYNC_WRITE,
+  IO_OPEN_IMAGE,
+  IO_CLOSE_IMAGE,
+};
+
 
 class PendingIO;
 
diff --git a/src/rbd_replay/prep-for-replay.py b/src/rbd_replay/prep-for-replay.py
deleted file mode 100755 (executable)
index a630075..0000000
+++ /dev/null
@@ -1,526 +0,0 @@
-#!/usr/bin/python
-# -*- mode:Python; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-# vim: ts=8 sw=2 smarttab
-#
-# Ceph - scalable distributed file system
-#
-# Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
-#
-# This is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1, as published by the Free Software
-# Foundation.  See file COPYING.
-#
-#
-
-import argparse
-from babeltrace import *
-import datetime
-import struct
-import sys
-
-
-class Extent(object):
-    def __init__(self, offset, length):
-        self.offset = offset
-        self.length = length
-    def __str__(self):
-        return str(self.offset) + "+" + str(self.length)
-    def __repr__(self):
-        return "Extent(" + str(self.offset) + "," + str(self.length) + ")"
-
-class Thread(object):
-    def __init__(self, id, threads, window):
-        self.id = id
-       self.threads = threads
-       self.window = window
-        self.pendingIO = None
-        self.latestIO = None # may not be meaningful
-        self.latestCompletion = None # may not be meaningful
-        self.lastTS = None
-    def insertTS(self, ts):
-        if not self.lastTS or ts > self.lastTS:
-            self.lastTS = ts
-    def issuedIO(self, io):
-        latestIOs = []
-        for threadID in self.threads:
-            thread = self.threads[threadID]
-            if thread.latestIO and thread.latestIO.start_time > io.start_time - self.window:
-                latestIOs.append(thread.latestIO)
-        io.addDependencies(latestIOs)
-        self.latestIO = io
-    def completedIO(self, io):
-        self.latestCompletion = io
-
-def batchUnreachableFrom(deps, base):
-    if not base:
-        return set()
-    if not deps:
-        return set()
-    unreachable = set()
-    searchingFor = set(deps)
-    discovered = set()
-    boundary = set(base)
-    boundaryHorizon = None
-    for io in boundary:
-        if not boundaryHorizon or io.start_time > boundaryHorizon:
-            boundaryHorizon = io.start_time
-    searchingHorizon = None
-    for io in searchingFor:
-        if not searchingHorizon or io.start_time < searchingHorizon:
-            searchingHorizon = io.start_time
-    tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
-    searchingFor.difference_update(tmp)
-    unreachable.update(tmp)
-    while boundary and searchingFor:
-        io = boundary.pop()
-        for dep in io.dependencies:
-            if dep in searchingFor:
-                searchingFor.remove(dep)
-                if dep.start_time == searchingHorizon:
-                    searchingHorizon = None
-                    for io in searchingFor:
-                        if not searchingHorizon or io.start_time < searchingHorizon:
-                            searchingHorizon = io.start_time
-            if not dep in discovered:
-                boundary.add(dep)
-        if io.start_time == boundaryHorizon:
-            boundaryHorizon = None
-            for io in boundary:
-                if not boundaryHorizon or io.start_time > boundaryHorizon:
-                    boundaryHorizon = io.start_time
-            if boundaryHorizon:
-                tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
-                searchingFor.difference_update(tmp)
-                unreachable.update(tmp)
-                searchingHorizon = None
-                for io in searchingFor:
-                    if not searchingHorizon or io.start_time < searchingHorizon:
-                        searchingHorizon = io.start_time
-    unreachable.update(searchingFor)
-    return unreachable
-
-class IO(object):
-    def __init__(self, ionum, start_time, thread, prev):
-        self.ionum = ionum
-        self.start_time = start_time
-        self.thread = thread
-        self.dependencies = set()
-        self.isCompletion = False
-        self.prev = prev
-        self.numSuccessors = 0
-        self.completion = None
-    def reachableFrom(self, ios):
-        if not ios:
-            return False
-        discovered = set()
-        boundary = set(ios)
-        horizon = None
-        for io in boundary:
-            if not horizon or io.start_time > horizon:
-                horizon = io.start_time
-        if horizon < self.start_time:
-            return False
-        while boundary:
-            io = boundary.pop()
-            for dep in io.dependencies:
-                if self == dep:
-                    return True
-                if not dep in discovered:
-                    boundary.add(dep)
-            if io.start_time == horizon:
-                horizon = None
-                for io in boundary:
-                    if not horizon or io.start_time > horizon:
-                        horizon = io.start_time
-                if horizon and horizon < self.start_time:
-                    return False
-        return False
-    def addDependency(self, dep):
-        if not dep.reachableFrom(self.dependencies):
-            self.dependencies.add(dep)
-    def addDependencies(self, deps):
-        base = set(self.dependencies)
-        for dep in deps:
-            base.update(dep.dependencies)
-        unreachable = batchUnreachableFrom(deps, base)
-        self.dependencies.update(unreachable)
-    def depIDs(self):
-        ids = []
-        for dep in self.dependencies:
-            ids.append(dep.ionum)
-        return ids
-    def depMap(self):
-        deps = dict()
-        for dep in self.dependencies:
-            deps[dep.ionum] = self.start_time - dep.start_time
-        return deps
-    def addThreadCompletionDependencies(self, threads, recentCompletions):
-        self.addDependencies(recentCompletions)
-    def numCompletionSuccessors(self):
-        return self.completion.numSuccessors if self.completion else 0
-    def writeTo(self, f, iotype):
-        f.write(struct.pack("!BIQIII", iotype, self.ionum, self.thread.id, self.numSuccessors, self.numCompletionSuccessors(), len(self.dependencies)))
-        for dep in self.dependencies:
-            f.write(struct.pack("!IQ", dep.ionum, self.start_time - dep.start_time))
-
-class StartThreadIO(IO):
-    def __init__(self, ionum, start_time, thread):
-        IO.__init__(self, ionum, start_time, thread, None)
-    def writeTo(self, f):
-        IO.writeTo(self, f, 0)
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": start thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
-
-class StopThreadIO(IO):
-    def __init__(self, ionum, start_time, thread):
-        IO.__init__(self, ionum, start_time, thread, None)
-    def writeTo(self, f):
-        IO.writeTo(self, f, 1)
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": stop thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
-
-class ReadIO(IO):
-    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
-        IO.__init__(self, ionum, start_time, thread, prev)
-        self.imagectx = imagectx
-        self.extents = extents
-    def writeTo(self, f):
-        IO.writeTo(self, f, 2)
-        if len(self.extents) != 1:
-            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
-        extent = self.extents[0]
-        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
-
-class WriteIO(IO):
-    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
-        IO.__init__(self, ionum, start_time, thread, prev)
-        self.imagectx = imagectx
-        self.extents = extents
-    def writeTo(self, f):
-        IO.writeTo(self, f, 3)
-        if len(self.extents) != 1:
-            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
-        extent = self.extents[0]
-        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
-
-class AioReadIO(IO):
-    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
-        IO.__init__(self, ionum, start_time, thread, prev)
-        self.imagectx = imagectx
-        self.extents = extents
-    def writeTo(self, f):
-        IO.writeTo(self, f, 4)
-        if len(self.extents) != 1:
-            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
-        extent = self.extents[0]
-        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
-
-class AioWriteIO(IO):
-    def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
-        IO.__init__(self, ionum, start_time, thread, prev)
-        self.imagectx = imagectx
-        self.extents = extents
-    def writeTo(self, f):
-        IO.writeTo(self, f, 5)
-        if len(self.extents) != 1:
-            raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
-        extent = self.extents[0]
-        f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
-
-class OpenImageIO(IO):
-    def __init__(self, ionum, start_time, thread, prev, imagectx, name, snap_name, readonly):
-        IO.__init__(self, ionum, start_time, thread, prev)
-        self.imagectx = imagectx
-        self.name = name
-        self.snap_name = snap_name
-        self.readonly = readonly
-    def writeTo(self, f):
-        IO.writeTo(self, f, 6)
-        f.write(struct.pack("!QI", self.imagectx, len(self.name)))
-        f.write(self.name)
-        f.write(struct.pack("!I", len(self.snap_name)))
-        f.write(self.snap_name)
-        f.write(struct.pack("!b", self.readonly))
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": open image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", name = " + self.name + ", snap_name = " + self.snap_name + ", readonly = " + str(self.readonly) + ", deps = " + str(self.depMap())
-
-class CloseImageIO(IO):
-    def __init__(self, ionum, start_time, thread, prev, imagectx):
-        IO.__init__(self, ionum, start_time, thread, prev)
-        self.imagectx = imagectx
-    def writeTo(self, f):
-        IO.writeTo(self, f, 7)
-        f.write(struct.pack("!Q", self.imagectx))
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": close image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", deps = " + str(self.depMap())
-
-class CompletionIO(IO):
-    def __init__(self, start_time, thread, baseIO):
-        IO.__init__(self, baseIO.ionum + 1, start_time, thread, None)
-        self.baseIO = baseIO
-        self.isCompletion = True
-        self.addDependency(baseIO)
-        baseIO.completion = self
-    def writeTo(self, f):
-        pass
-    def __str__(self):
-        return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap())
-
-
-class Processor(object):
-    def __init__(self):
-       self.window = 1 * 1e9
-       self.threads = {}
-       self.ioCount = 0
-       self.recentCompletions = []
-       self.openImages = {}
-       self.threads = {}
-       self.ios = []
-    def nextID(self):
-       val = self.ioCount
-       self.ioCount = self.ioCount + 2
-       return val
-    def completed(self, io):
-       self.recentCompletions.append(io)
-       self.recentCompletions[:] = [x for x in self.recentCompletions if x.start_time > io.start_time - self.window]
-    def requireImage(self, ts, thread, imagectx, name, snap_name, readonly):
-       if imagectx in self.openImages:
-           return
-       ionum = self.nextID()
-       thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
-       thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-       thread.issuedIO(thread.pendingIO)
-       self.ios.append(thread.pendingIO)
-       thread.pendingIO.end_time = ts
-       completionIO = CompletionIO(ts, thread, thread.pendingIO)
-       thread.completedIO(completionIO)
-       self.ios.append(completionIO)
-       self.completed(completionIO)
-       self.openImages[thread.pendingIO.imagectx] = thread.pendingIO.imagectx
-       if self.printOnRead:
-           print str(thread.pendingIO)
-    def run(self, raw_args):
-       parser = argparse.ArgumentParser(description='convert librbd trace output to an rbd-replay input file.')
-       parser.add_argument('--print-on-read', action="store_true", help='print events as they are read in (for debugging)')
-       parser.add_argument('--print-on-write', action="store_true", help='print events as they are written out (for debugging)')
-       parser.add_argument('--window', default=1, type=float, help='size of the window, in seconds.  Larger values slow down processing, and smaller values may miss dependencies.  Default: 1')
-       parser.add_argument('input', help='trace to read')
-       parser.add_argument('output', help='replay file to write')
-       args = parser.parse_args(raw_args)
-       self.window = 1e9 * args.window
-       inputFileName = args.input
-       outputFileName = args.output
-        pendingIOs = {}
-        limit = 100000000000
-        self.printOnRead = args.print_on_read
-        printOnWrite = args.print_on_write
-        traces = TraceCollection()
-        traces.add_trace(inputFileName, "ctf")
-
-        # Parse phase
-        trace_start = None
-        count = 0
-        for event in traces.events:
-            count = count + 1
-            if count > limit:
-                break
-            ts = event.timestamp
-            if not trace_start:
-                trace_start = ts
-            ts = ts - trace_start
-            threadID = event["pthread_id"]
-            if threadID in self.threads:
-                thread = self.threads[threadID]
-            else:
-                thread = Thread(threadID, self.threads, self.window)
-                self.threads[threadID] = thread
-                ionum = self.nextID()
-                io = StartThreadIO(ionum, ts, thread)
-                self.ios.append(io)
-                if self.printOnRead:
-                    print str(io)
-            thread.insertTS(ts)
-            if event.name == "librbd:read_enter":
-                name = event["name"]
-                snap_name = event["snap_name"]
-                readonly = event["read_only"]
-                imagectx = event["imagectx"]
-               self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
-                ionum = self.nextID()
-                thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
-                thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-                thread.issuedIO(thread.pendingIO)
-                self.ios.append(thread.pendingIO)
-            elif event.name == "librbd:open_image_enter":
-                imagectx = event["imagectx"]
-                name = event["name"]
-                snap_name = event["snap_name"]
-                readid = event["id"]
-                readonly = event["read_only"]
-                ionum = self.nextID()
-                thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
-                thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-                thread.issuedIO(thread.pendingIO)
-                self.ios.append(thread.pendingIO)
-            elif event.name == "librbd:open_image_exit":
-                thread.pendingIO.end_time = ts
-                completionIO = CompletionIO(ts, thread, thread.pendingIO)
-                thread.completedIO(completionIO)
-                self.ios.append(completionIO)
-                self.completed(completionIO)
-               self.openImages[thread.pendingIO.imagectx] = thread.pendingIO.imagectx
-                if self.printOnRead:
-                    print str(thread.pendingIO)
-            elif event.name == "librbd:close_image_enter":
-                imagectx = event["imagectx"]
-                ionum = self.nextID()
-                thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
-                thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-                thread.issuedIO(thread.pendingIO)
-                self.ios.append(thread.pendingIO)
-            elif event.name == "librbd:close_image_exit":
-                thread.pendingIO.end_time = ts
-                completionIO = CompletionIO(ts, thread, thread.pendingIO)
-                thread.completedIO(completionIO)
-                self.ios.append(completionIO)
-                self.completed(completionIO)
-               del self.openImages[thread.pendingIO.imagectx]
-                if self.printOnRead:
-                    print str(thread.pendingIO)
-            elif event.name == "librbd:read_extent":
-                offset = event["offset"]
-                length = event["length"]
-                thread.pendingIO.extents.append(Extent(offset, length))
-            elif event.name == "librbd:read_exit":
-                thread.pendingIO.end_time = ts
-                completionIO = CompletionIO(ts, thread, thread.pendingIO)
-                thread.completedIO(completionIO)
-                self.ios.append(completionIO)
-                completed(completionIO)
-                if self.printOnRead:
-                    print str(thread.pendingIO)
-            elif event.name == "librbd:write_enter":
-                name = event["name"]
-                snap_name = event["snap_name"]
-                readonly = event["read_only"]
-                offset = event["off"]
-                length = event["buf_len"]
-                imagectx = event["imagectx"]
-               self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
-                ionum = self.nextID()
-                thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
-                thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-                thread.issuedIO(thread.pendingIO)
-                self.ios.append(thread.pendingIO)
-            elif event.name == "librbd:write_exit":
-                thread.pendingIO.end_time = ts
-                completionIO = CompletionIO(ts, thread, thread.pendingIO)
-                thread.completedIO(completionIO)
-                self.ios.append(completionIO)
-                completed(completionIO)
-                if self.printOnRead:
-                    print str(thread.pendingIO)
-            elif event.name == "librbd:aio_read_enter":
-                name = event["name"]
-                snap_name = event["snap_name"]
-                readonly = event["read_only"]
-                completion = event["completion"]
-                imagectx = event["imagectx"]
-               self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
-                ionum = self.nextID()
-                thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
-                thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-                self.ios.append(thread.pendingIO)
-                thread.issuedIO(thread.pendingIO)
-                pendingIOs[completion] = thread.pendingIO
-            elif event.name == "librbd:aio_read_extent":
-                offset = event["offset"]
-                length = event["length"]
-                thread.pendingIO.extents.append(Extent(offset, length))
-            elif event.name == "librbd:aio_read_exit":
-                if self.printOnRead:
-                    print str(thread.pendingIO)
-            elif event.name == "librbd:aio_write_enter":
-                name = event["name"]
-                snap_name = event["snap_name"]
-                readonly = event["read_only"]
-                offset = event["off"]
-                length = event["len"]
-                completion = event["completion"]
-                imagectx = event["imagectx"]
-               self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
-                ionum = self.nextID()
-                thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
-                thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
-                thread.issuedIO(thread.pendingIO)
-                self.ios.append(thread.pendingIO)
-                pendingIOs[completion] = thread.pendingIO
-                if self.printOnRead:
-                    print str(thread.pendingIO)
-            elif event.name == "librbd:aio_complete_enter":
-                completion = event["completion"]
-                retval = event["rval"]
-                if completion in pendingIOs:
-                    completedIO = pendingIOs[completion]
-                    del pendingIOs[completion]
-                    completedIO.end_time = ts
-                    completionIO = CompletionIO(ts, thread, completedIO)
-                    completedIO.thread.completedIO(completionIO)
-                    self.ios.append(completionIO)
-                    self.completed(completionIO)
-                    if self.printOnRead:
-                        print str(completionIO)
-
-        # Insert-thread-stop phase
-        self.ios = sorted(self.ios, key = lambda io: io.start_time)
-        for threadID in self.threads:
-            thread = self.threads[threadID]
-            ionum = None
-            maxIONum = 0 # only valid if ionum is None
-            for io in self.ios:
-                if io.ionum > maxIONum:
-                    maxIONum = io.ionum
-                if io.start_time > thread.lastTS:
-                    ionum = io.ionum
-                    if ionum % 2 == 1:
-                        ionum = ionum + 1
-                    break
-            if not ionum:
-                if maxIONum % 2 == 1:
-                    maxIONum = maxIONum - 1
-                ionum = maxIONum + 2
-            for io in self.ios:
-                if io.ionum >= ionum:
-                    io.ionum = io.ionum + 2
-            # TODO: Insert in the right place, don't append and re-sort
-            self.ios.append(StopThreadIO(ionum, thread.lastTS, thread))
-            self.ios = sorted(self.ios, key = lambda io: io.start_time)
-
-        for io in self.ios:
-            if io.prev and io.prev in io.dependencies:
-                io.dependencies.remove(io.prev)
-            if io.isCompletion:
-                io.dependencies.clear()
-            for dep in io.dependencies:
-                dep.numSuccessors = dep.numSuccessors + 1
-
-        if self.printOnRead and printOnWrite:
-           print
-
-        with open(outputFileName, "wb") as f:
-            for io in self.ios:
-                if printOnWrite and not io.isCompletion:
-                    print str(io)
-                io.writeTo(f)
-
-if __name__ == '__main__':
-    Processor().run(sys.argv[1:])
diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc
new file mode 100644 (file)
index 0000000..96adfc1
--- /dev/null
@@ -0,0 +1,1051 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include <babeltrace/babeltrace.h>
+#include <babeltrace/ctf/events.h>
+#include <babeltrace/ctf/iterator.h>
+#include <cstdlib>
+#include <string>
+#include <assert.h>
+#include <iostream>
+#include <fstream>
+#include <boost/thread/thread.hpp>
+#include "actions.hpp"
+#include "Ser.hpp"
+
+using namespace std;
+using namespace rbd_replay;
+
+// Allows us to easily expose all the functions to make debugging easier.
+#define STATIC static
+
+struct extent {
+  extent() : offset(0), length(0) {
+  }
+  extent(uint64_t offset, uint64_t length) : offset(offset), length(length) {
+  }
+  uint64_t offset;
+  uint64_t length;
+};
+
+class IO;
+
+typedef set<boost::shared_ptr<IO> > io_set_t;
+
+typedef map<action_id_t, boost::shared_ptr<IO> > io_map_t;
+
+STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
+
+class IO : public boost::enable_shared_from_this<IO> {
+public:
+  typedef boost::shared_ptr<IO> ptr;
+
+  typedef boost::weak_ptr<IO> weak_ptr;
+
+  IO(action_id_t ionum,
+     uint64_t start_time,
+     thread_id_t thread_id,
+     ptr prev)
+    : m_ionum(ionum),
+      m_start_time(start_time),
+      m_dependencies(io_set_t()),
+      m_completion(weak_ptr()),
+      m_num_successors(0),
+      m_thread_id(thread_id),
+      m_prev(prev) {
+  }
+
+  virtual ~IO() {
+  }
+
+  uint64_t start_time() const {
+    return m_start_time;
+  }
+
+  io_set_t& dependencies() {
+    return m_dependencies;
+  }
+
+  const io_set_t& dependencies() const {
+    return m_dependencies;
+  }
+
+  void add_dependencies(const io_set_t& deps) {
+    io_set_t base(m_dependencies);
+    for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
+      ptr dep(*itr);
+      for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
+       base.insert(*itr2);
+      }
+    }
+    batch_unreachable_from(deps, base, &m_dependencies);
+  }
+
+  uint64_t num_completion_successors() const {
+    ptr c(m_completion.lock());
+    return c ? c->m_num_successors : 0;
+  }
+
+  void write_to(Ser& out, io_type iotype) const;
+
+  virtual void write_to(Ser& out) const = 0;
+
+  virtual bool is_completion() const {
+    return false;
+  }
+
+  void set_ionum(action_id_t ionum) {
+    m_ionum = ionum;
+  }
+
+  action_id_t ionum() const {
+    return m_ionum;
+  }
+
+  ptr prev() const {
+    return m_prev;
+  }
+
+  void set_num_successors(uint32_t n) {
+    m_num_successors = n;
+  }
+
+  uint32_t num_successors() const {
+    return m_num_successors;
+  }
+
+  void write_debug_base(ostream& out, string iotype);
+
+  virtual void write_debug(ostream& out) = 0;
+
+  // The result must be stored somewhere, or else m_completion will expire
+  ptr create_completion(uint64_t start_time, thread_id_t thread_id);
+
+private:
+  action_id_t m_ionum;
+  uint64_t m_start_time;
+  io_set_t m_dependencies;
+  boost::weak_ptr<IO> m_completion;
+  uint32_t m_num_successors;
+  thread_id_t m_thread_id;
+  ptr m_prev;
+};
+
+ostream& operator<<(ostream& out, IO::ptr io) {
+  io->write_debug(out);
+  return out;
+}
+
+class Thread {
+public:
+  typedef boost::shared_ptr<Thread> ptr;
+
+  Thread(thread_id_t id,
+        uint64_t window)
+    : m_id(id),
+      m_window(window),
+      m_pending_io(IO::ptr()),
+      m_latest_io(IO::ptr()),
+      m_max_ts(0) {
+  }
+
+  void insert_ts(uint64_t ts) {
+    if (m_max_ts == 0 || ts > m_max_ts) {
+      m_max_ts = ts;
+    }
+  }
+
+  uint64_t max_ts() const {
+    return m_max_ts;
+  }
+
+  void issued_io(IO::ptr io, const map<thread_id_t, ptr>& threads) {
+    assert(io);
+    io_set_t latest_ios;
+    for (map<thread_id_t, ptr>::const_iterator itr = threads.begin(), end = threads.end(); itr != end; ++itr) {
+      assertf(itr->second, "id = %ld", itr->first);
+      ptr thread(itr->second);
+      if (thread->m_latest_io) {
+       if (thread->m_latest_io->start_time() + m_window > io->start_time()) {
+         latest_ios.insert(thread->m_latest_io);
+       }
+      }
+    }
+    io->add_dependencies(latest_ios);
+    m_latest_io = io;
+    m_pending_io = io;
+  }
+
+  thread_id_t id() const {
+    return m_id;
+  }
+
+  IO::ptr pending_io() {
+    return m_pending_io;
+  }
+
+private:
+  thread_id_t m_id;
+  uint64_t m_window;
+  IO::ptr m_pending_io;
+  IO::ptr m_latest_io;
+  uint64_t m_max_ts;
+};
+
+void IO::write_debug_base(ostream& out, string type) {
+  out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
+  bool first = true;
+  for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+    if (first) {
+      first = false;
+    } else {
+      out << ", ";
+    }
+    out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
+  }
+  out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
+}
+
+class StartThreadIO : public IO {
+public:
+  StartThreadIO(action_id_t ionum,
+               uint64_t start_time,
+               thread_id_t thread_id)
+    : IO(ionum, start_time, thread_id, IO::ptr()) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_START_THREAD);
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "start thread");
+  }
+};
+
+class StopThreadIO : public IO {
+public:
+  StopThreadIO(action_id_t ionum,
+              uint64_t start_time,
+              thread_id_t thread_id)
+    : IO(ionum, start_time, thread_id, IO::ptr()) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_STOP_THREAD);
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "stop thread");
+  }
+};
+
+class ReadIO : public IO {
+public:
+  ReadIO(action_id_t ionum,
+        uint64_t start_time,
+        thread_id_t thread_id,
+        IO::ptr prev,
+        imagectx_id_t imagectx)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_extents(vector<extent>()) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_READ);
+    // TODO: figure out how to handle empty IO, i.e. reads/writes with no extents.
+    // These happen if the trace cuts off mid-IO.  We should just toss it, but it
+    // might mess up the dependency graph.
+    assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size());
+    out.write_uint64_t(m_imagectx);
+    out.write_uint64_t(m_extents[0].offset);
+    out.write_uint64_t(m_extents[0].length);
+  }
+
+  void add_extent(const extent& e) {
+    m_extents.push_back(e);
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "read");
+    out << ", imagectx=" << m_imagectx << ", extents=[";
+    for (int i = 0, n = m_extents.size(); i < n; i++) {
+      if (i > 0) {
+       out << ", ";
+      }
+      out << m_extents[i].offset << "+" << m_extents[i].length;
+    }
+    out << "]";
+  }
+
+private:
+  imagectx_id_t m_imagectx;
+  vector<extent> m_extents;
+};
+
+class WriteIO : public IO {
+public:
+  WriteIO(action_id_t ionum,
+         uint64_t start_time,
+         thread_id_t thread_id,
+         IO::ptr prev,
+         imagectx_id_t imagectx,
+         const vector<extent>& extents)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_extents(extents) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_WRITE);
+    assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size());
+    out.write_uint64_t(m_imagectx);
+    out.write_uint64_t(m_extents[0].offset);
+    out.write_uint64_t(m_extents[0].length);
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "write");
+    out << ", imagectx=" << m_imagectx << ", extents=[";
+    for (int i = 0, n = m_extents.size(); i < n; i++) {
+      if (i > 0) {
+       out << ", ";
+      }
+      out << m_extents[i].offset << "+" << m_extents[i].length;
+    }
+    out << "]";
+  }
+
+private:
+  imagectx_id_t m_imagectx;
+  vector<extent> m_extents;
+};
+
+class AioReadIO : public IO {
+public:
+  AioReadIO(action_id_t ionum,
+           uint64_t start_time,
+           thread_id_t thread_id,
+           IO::ptr prev,
+           imagectx_id_t imagectx)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_extents(vector<extent>()) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_ASYNC_READ);
+    assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size());
+    out.write_uint64_t(m_imagectx);
+    out.write_uint64_t(m_extents[0].offset);
+    out.write_uint64_t(m_extents[0].length);
+  }
+
+  void add_extent(const extent& e) {
+    m_extents.push_back(e);
+  }
+
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "aio read");
+    out << ", imagectx=" << m_imagectx << ", extents=[";
+    for (int i = 0, n = m_extents.size(); i < n; i++) {
+      if (i > 0) {
+       out << ", ";
+      }
+      out << m_extents[i].offset << "+" << m_extents[i].length;
+    }
+    out << "]";
+  }
+private:
+  imagectx_id_t m_imagectx;
+  vector<extent> m_extents;
+};
+
+class AioWriteIO : public IO {
+public:
+  AioWriteIO(action_id_t ionum,
+            uint64_t start_time,
+            thread_id_t thread_id,
+            IO::ptr prev,
+            imagectx_id_t imagectx,
+            const vector<extent>& extents)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_extents(extents) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_ASYNC_WRITE);
+    assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size());
+    out.write_uint64_t(m_imagectx);
+    out.write_uint64_t(m_extents[0].offset);
+    out.write_uint64_t(m_extents[0].length);
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "aio write");
+    out << ", imagectx=" << m_imagectx << ", extents=[";
+    for (int i = 0, n = m_extents.size(); i < n; i++) {
+      if (i > 0) {
+       out << ", ";
+      }
+      out << m_extents[i].offset << "+" << m_extents[i].length;
+    }
+    out << "]";
+  }
+
+private:
+  imagectx_id_t m_imagectx;
+  vector<extent> m_extents;
+};
+
+class OpenImageIO : public IO {
+public:
+  OpenImageIO(action_id_t ionum,
+             uint64_t start_time,
+             thread_id_t thread_id,
+             IO::ptr prev,
+             imagectx_id_t imagectx,
+             const string& name,
+             const string& snap_name,
+             bool readonly)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx),
+      m_name(name),
+      m_snap_name(snap_name),
+      m_readonly(readonly) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_OPEN_IMAGE);
+    out.write_uint64_t(m_imagectx);
+    out.write_string(m_name);
+    out.write_string(m_snap_name);
+    out.write_bool(m_readonly);
+  }
+
+  imagectx_id_t imagectx() const {
+    return m_imagectx;
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "open image");
+    out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
+  }
+
+private:
+  imagectx_id_t m_imagectx;
+  string m_name;
+  string m_snap_name;
+  bool m_readonly;
+};
+
+class CloseImageIO : public IO {
+public:
+  CloseImageIO(action_id_t ionum,
+              uint64_t start_time,
+              thread_id_t thread_id,
+              IO::ptr prev,
+              imagectx_id_t imagectx)
+    : IO(ionum, start_time, thread_id, prev),
+      m_imagectx(imagectx) {
+  }
+
+  void write_to(Ser& out) const {
+    IO::write_to(out, IO_CLOSE_IMAGE);
+    out.write_uint64_t(m_imagectx);
+  }
+
+  imagectx_id_t imagectx() const {
+    return m_imagectx;
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "close image");
+    out << ", imagectx=" << m_imagectx;
+  }
+
+private:
+  imagectx_id_t m_imagectx;
+};
+
+class CompletionIO : public IO {
+public:
+  CompletionIO(action_id_t ionum,
+              uint64_t start_time,
+              thread_id_t thread_id)
+    : IO(ionum, start_time, thread_id, IO::ptr()) {
+  }
+
+  void write_to(Ser& out) const {
+  }
+
+  bool is_completion() const {
+    return true;
+  }
+
+  void write_debug(ostream& out) {
+    write_debug_base(out, "completion");
+  }
+};
+
+STATIC bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
+  return p1->start_time() < p2->start_time();
+}
+
+void IO::write_to(Ser& out, io_type iotype) const {
+  out.write_uint8_t(iotype);
+  out.write_uint32_t(m_ionum);
+  out.write_uint64_t(m_thread_id);
+  out.write_uint32_t(m_num_successors);
+  out.write_uint32_t(num_completion_successors());
+  out.write_uint32_t(m_dependencies.size());
+  vector<IO::ptr> deps;
+  for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+    deps.push_back(*itr);
+  }
+  sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
+  for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
+    out.write_uint32_t((*itr)->m_ionum);
+    out.write_uint64_t(m_start_time - (*itr)->m_start_time);
+  }
+}
+
+IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
+  assert(!m_completion.lock());
+  IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
+  m_completion = completion;
+  completion->m_dependencies.insert(shared_from_this());
+  return completion;
+}
+
+STATIC uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
+  if (s.empty()) {
+    return 0;
+  }
+  return s.begin()->second->start_time();
+}
+
+STATIC uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
+  if (s.empty()) {
+    return 0;
+  }
+  map<action_id_t, IO::ptr>::const_iterator itr(s.end());
+  --itr;
+  return itr->second->start_time();
+}
+
+// TODO: Add unit tests
+// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
+STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
+  if (deps.empty()) {
+    return;
+  }
+
+  map<action_id_t, IO::ptr> searching_for;
+  for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
+    searching_for[(*itr)->ionum()] = *itr;
+  }
+
+  map<action_id_t, IO::ptr> boundary;
+  for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
+    boundary[(*itr)->ionum()] = *itr;
+  }
+
+  // The boundary horizon is the maximum timestamp of IOs in the boundary.
+  // This monotonically decreases, because dependencies (which are added to the set)
+  // have earlier timestamp than the dependent IOs (which were just removed from the set).
+  uint64_t boundary_horizon = max_time(boundary);
+
+  for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
+    if (boundary_horizon >= itr->second->start_time()) {
+      break;
+    }
+    unreachable->insert(itr->second);
+    searching_for.erase(itr++);
+  }
+  if (searching_for.empty()) {
+    return;
+  }
+
+  // The searching horizon is the minimum timestamp of IOs in the searching set.
+  // This monotonically increases, because elements are only removed from the set.
+  uint64_t searching_horizon = min_time(searching_for);
+
+  while (!boundary.empty()) {
+    // Take an IO from the end, which has the highest timestamp.
+    // This reduces the boundary horizon as early as possible,
+    // which means we can short cut as soon as possible.
+    map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
+    --b_itr;
+    boost::shared_ptr<IO> io(b_itr->second);
+    boundary.erase(b_itr);
+
+    for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
+      IO::ptr dep(*itr);
+      assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
+      io_map_t::iterator p = searching_for.find(dep->ionum());
+      if (p != searching_for.end()) {
+       searching_for.erase(p);
+       if (dep->start_time() == searching_horizon) {
+         searching_horizon = min_time(searching_for);
+         if (searching_horizon == 0) {
+           return;
+         }
+       }
+      }
+      boundary[dep->ionum()] = dep;
+    }
+
+    boundary_horizon = max_time(boundary);
+    if (boundary_horizon != 0) {
+      // Anything we're searching for that has a timestamp greater than the
+      // boundary horizon will never be found, since the boundary horizon
+      // falls monotonically.
+      for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
+       if (boundary_horizon >= itr->second->start_time()) {
+         break;
+       }
+       unreachable->insert(itr->second);
+       searching_for.erase(itr++);
+      }
+      searching_horizon = min_time(searching_for);
+      if (searching_horizon == 0) {
+       return;
+      }
+    }
+  }
+
+  // Anything we're still searching for has not been found.
+  for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
+    unreachable->insert(itr->second);
+  }
+}
+
+STATIC void usage(string prog) {
+  cout << "Usage: " << prog << " [ --window <seconds> ] <trace-input> <replay-output>" << endl;
+}
+
+__attribute__((noreturn)) STATIC void usage_exit(string prog, string msg) {
+  cerr << msg << endl;
+  usage(prog);
+  exit(1);
+}
+
+class Processor {
+public:
+  Processor()
+    : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
+      m_threads(),
+      m_io_count(0),
+      m_recent_completions(io_set_t()),
+      m_open_images(set<imagectx_id_t>()),
+      m_ios(vector<IO::ptr>()),
+      m_pending_ios(map<uint64_t, IO::ptr>()) {
+  }
+
+  void run(vector<string> args) {
+    string input_file_name;
+    string output_file_name;
+    bool got_input = false;
+    bool got_output = false;
+    for (int i = 1, nargs = args.size(); i < nargs; i++) {
+      const string& arg(args[i]);
+      if (arg == "--window") {
+       if (i == nargs - 1) {
+         usage_exit(args[0], "--window requires an argument");
+       }
+       m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
+      } else if (arg.find("--window=") == 0) {
+       // TODO: test
+       printf("Arg: '%s'\n", arg.c_str() + sizeof("--window="));
+       m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
+      } else if (arg == "-h" || arg == "--help") {
+       usage(args[0]);
+       exit(0);
+      } else if (arg.find("-") == 0) {
+       usage_exit(args[0], "Unrecognized argument: " + arg);
+      } else if (!got_input) {
+       input_file_name = arg;
+       got_input = true;
+      } else if (!got_output) {
+       output_file_name = arg;
+       got_output = true;
+      } else {
+       usage_exit(args[0], "Too many arguments");
+      }
+    }
+    if (!got_output) {
+      usage_exit(args[0], "Not enough arguments");
+    }
+
+    struct bt_context *ctx = bt_context_create();
+    int trace_handle = bt_context_add_trace(ctx,
+                                           input_file_name.c_str(), // path
+                                           "ctf", // format
+                                           NULL, // packet_seek
+                                           NULL, // stream_list
+                                           NULL); // metadata
+    assertf(trace_handle >= 0, "trace_handle = %d", trace_handle);
+
+    uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL);
+    assert(start_time_ns != -1ULL);
+
+    struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx,
+                                                NULL, // begin_pos
+                                                NULL); // end_pos
+    assert(itr);
+
+    struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
+
+    uint64_t trace_start = 0;
+    struct bt_ctf_event *evt;
+    bool first = true;
+    while(true) {
+      evt = bt_ctf_iter_read_event(itr);
+      if(!evt) {
+       break;
+      }
+      uint64_t ts = bt_ctf_get_timestamp(evt);
+      assert(ts != -1ULL);
+
+      if (first) {
+       trace_start = ts;
+       first = false;
+      }
+      ts -= trace_start;
+      ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is.
+
+      process_event(ts, evt);
+
+      int r = bt_iter_next(bt_itr);
+      assert(!r);
+    }
+
+    bt_ctf_iter_destroy(itr);
+
+    insert_thread_stops();
+
+    for (vector<IO::ptr>::const_iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) {
+      IO::ptr io(*itr);
+      IO::ptr prev(io->prev());
+      if (prev) {
+       // TODO: explain when prev is and isn't a dep
+       io_set_t::iterator depitr = io->dependencies().find(prev);
+       if (depitr != io->dependencies().end()) {
+         io->dependencies().erase(depitr);
+       }
+      }
+      if (io->is_completion()) {
+       io->dependencies().clear();
+      }
+      for (io_set_t::const_iterator depitr = io->dependencies().begin(); depitr != io->dependencies().end(); ++depitr) {
+       IO::ptr dep(*depitr);
+       dep->set_num_successors(dep->num_successors() + 1);
+      }
+    }
+
+    ofstream myfile;
+    myfile.open(output_file_name.c_str(), ios::out | ios::binary);
+    Ser ser(myfile);
+    for (vector<IO::ptr>::iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) {
+      (*itr)->write_to(ser);
+    }
+    myfile.close();
+  }
+
+private:
+  void insert_thread_stops() {
+    sort(m_ios.begin(), m_ios.end(), compare_io_ptrs_by_start_time);
+    for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(), end = m_threads.end(); itr != end; ++itr) {
+      Thread::ptr thread(itr->second);
+      const action_id_t none = -1;
+      action_id_t ionum = none;
+      action_id_t maxIONum = 0; // only valid if ionum is none
+      for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) {
+       IO::ptr io(*itr2);
+       if (io->ionum() > maxIONum) {
+         maxIONum = io->ionum();
+       }
+       if (io->start_time() > thread->max_ts()) {
+         ionum = io->ionum();
+         if (ionum & 1) {
+           ionum++;
+         }
+         break;
+       }
+      }
+      if (ionum == none) {
+       if (maxIONum & 1) {
+         maxIONum--;
+       }
+       ionum = maxIONum + 2;
+      }
+      for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) {
+       IO::ptr io(*itr2);
+       if (io->ionum() >= ionum) {
+         io->set_ionum(io->ionum() + 2);
+       }
+      }
+      IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(), thread->id()));
+      vector<IO::ptr>::iterator insertion_point = lower_bound(m_ios.begin(), m_ios.end(), stop_thread_io, compare_io_ptrs_by_start_time);
+      m_ios.insert(insertion_point, stop_thread_io);
+    }
+  }
+
+  void process_event(uint64_t ts, struct bt_ctf_event *evt) {
+    const char *event_name = bt_ctf_event_name(evt);
+    const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt,
+                                                                          BT_STREAM_EVENT_CONTEXT);
+    assert(scope_context);
+    const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt,
+                                                                         BT_EVENT_FIELDS);
+    assert(scope_fields);
+
+    const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id");
+    assert(pthread_id_field);
+    thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field);
+    Thread::ptr &thread(m_threads[threadID]);
+    if (!thread) {
+      thread.reset(new Thread(threadID, m_window));
+      IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID));
+      m_ios.push_back(io);
+    }
+    thread->insert_ts(ts);
+
+    class FieldLookup {
+    public:
+      FieldLookup(struct bt_ctf_event *evt,
+                 const struct bt_definition *scope)
+       : m_evt(evt),
+         m_scope(scope) {
+      }
+
+      const char* string(const char* name) {
+       const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
+       assertf(field, "field name = '%s'", name);
+       const char* c = bt_ctf_get_string(field);
+       int err = bt_ctf_field_get_error();
+       assertf(c && err == 0, "field name = '%s', err = %d", name, err);
+       return c;
+      }
+
+      int64_t int64(const char* name) {
+       const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
+       assertf(field, "field name = '%s'", name);
+       int64_t val = bt_ctf_get_int64(field);
+       int err = bt_ctf_field_get_error();
+       assertf(err == 0, "field name = '%s', err = %d", name, err);
+       return val;
+      }
+
+      uint64_t uint64(const char* name) {
+       const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
+       assertf(field, "field name = '%s'", name);
+       uint64_t val = bt_ctf_get_uint64(field);
+       int err = bt_ctf_field_get_error();
+       assertf(err == 0, "field name = '%s', err = %d", name, err);
+       return val;
+      }
+
+    private:
+      struct bt_ctf_event *m_evt;
+      const struct bt_definition *m_scope;
+    } fields(evt, scope_fields);
+
+    if (strcmp(event_name, "librbd:read_enter") == 0) {
+      string name(fields.string("name"));
+      string snap_name(fields.string("snap_name"));
+      bool readonly = fields.int64("read_only");
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      action_id_t ionum = next_id();
+      IO::ptr io(new ReadIO(ionum, ts, threadID, thread->pending_io(), imagectx));
+      io->add_dependencies(m_recent_completions);
+      thread->issued_io(io, m_threads);
+      m_ios.push_back(io);
+    } else if (strcmp(event_name, "librbd:open_image_enter") == 0) {
+      string name(fields.string("name"));
+      string snap_name(fields.string("snap_name"));
+      bool readonly = fields.int64("read_only");
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      action_id_t ionum = next_id();
+      IO::ptr io(new OpenImageIO(ionum, ts, threadID, thread->pending_io(), imagectx, name, snap_name, readonly));
+      io->add_dependencies(m_recent_completions);
+      thread->issued_io(io, m_threads);
+      m_ios.push_back(io);
+    } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
+      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
+      m_ios.push_back(completionIO);
+      boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->pending_io()));
+      assert(io);
+      m_open_images.insert(io->imagectx());
+    } else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      action_id_t ionum = next_id();
+      IO::ptr io(new CloseImageIO(ionum, ts, threadID, thread->pending_io(), imagectx));
+      io->add_dependencies(m_recent_completions);
+      thread->issued_io(thread->pending_io(), m_threads);
+      m_ios.push_back(thread->pending_io());
+    } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
+      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
+      m_ios.push_back(completionIO);
+      completed(completionIO);
+      boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->pending_io()));
+      assert(io);
+      m_open_images.erase(io->imagectx());
+    } else if (strcmp(event_name, "librbd:read_extent") == 0) {
+      boost::shared_ptr<ReadIO> io(boost::dynamic_pointer_cast<ReadIO>(thread->pending_io()));
+      assert(io);
+      uint64_t offset = fields.uint64("offset");
+      uint64_t length = fields.uint64("length");
+      io->add_extent(extent(offset, length));
+    } else if (strcmp(event_name, "librbd:read_exit") == 0) {
+      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
+      m_ios.push_back(completionIO);
+      completed(completionIO);
+    } else if (strcmp(event_name, "librbd:write_enter") == 0) {
+      string name(fields.string("name"));
+      string snap_name(fields.string("snap_name"));
+      bool readonly = fields.int64("read_only");
+      uint64_t offset = fields.uint64("off");
+      uint64_t length = fields.uint64("buf_len");
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      action_id_t ionum = next_id();
+      vector<extent> extents;
+      extents.push_back(extent(offset, length));
+      IO::ptr io(new WriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, extents));
+      io->add_dependencies(m_recent_completions);
+      thread->issued_io(io, m_threads);
+      m_ios.push_back(io);
+    } else if (strcmp(event_name, "librbd:write_exit") == 0) {
+      IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
+      m_ios.push_back(completionIO);
+      completed(completionIO);
+    } else if (strcmp(event_name, "librbd:aio_read_enter") == 0) {
+      string name(fields.string("name"));
+      string snap_name(fields.string("snap_name"));
+      bool readonly = fields.int64("read_only");
+      uint64_t completion = fields.uint64("completion");
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      action_id_t ionum = next_id();
+      IO::ptr io(new AioReadIO(ionum, ts, threadID, thread->pending_io(), imagectx));
+      io->add_dependencies(m_recent_completions);
+      m_ios.push_back(io);
+      thread->issued_io(io, m_threads);
+      m_pending_ios[completion] = io;
+    } else if (strcmp(event_name, "librbd:aio_read_extent") == 0) {
+      boost::shared_ptr<AioReadIO> io(boost::dynamic_pointer_cast<AioReadIO>(thread->pending_io()));
+      assert(io);
+      uint64_t offset = fields.uint64("offset");
+      uint64_t length = fields.uint64("length");
+      io->add_extent(extent(offset, length));
+    } else if (strcmp(event_name, "librbd:aio_write_enter") == 0) {
+      string name(fields.string("name"));
+      string snap_name(fields.string("snap_name"));
+      bool readonly = fields.int64("read_only");
+      uint64_t offset = fields.uint64("off");
+      uint64_t length = fields.uint64("len");
+      uint64_t completion = fields.uint64("completion");
+      imagectx_id_t imagectx = fields.uint64("imagectx");
+      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      action_id_t ionum = next_id();
+      vector<extent> extents;
+      extents.push_back(extent(offset, length));
+      IO::ptr io(new AioWriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, extents));
+      io->add_dependencies(m_recent_completions);
+      thread->issued_io(io, m_threads);
+      m_ios.push_back(io);
+      m_pending_ios[completion] = io;
+    } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
+      uint64_t completion = fields.uint64("completion");
+      map<uint64_t, IO::ptr>::iterator itr = m_pending_ios.find(completion);
+      if (itr != m_pending_ios.end()) {
+       IO::ptr completedIO(itr->second);
+       m_pending_ios.erase(itr);
+       IO::ptr completionIO(completedIO->create_completion(ts, threadID));
+       m_ios.push_back(completionIO);
+       completed(completionIO);
+      }
+    }
+
+    //        cout << ts << "\t" << event_name << "\tthreadID = " << threadID << endl;
+  }
+
+  action_id_t next_id() {
+    action_id_t id = m_io_count;
+    m_io_count += 2;
+    return id;
+  }
+
+  void completed(IO::ptr io) {
+    uint64_t limit = io->start_time() < m_window ? 0 : io->start_time() - m_window;
+    for (io_set_t::iterator itr = m_recent_completions.begin(); itr != m_recent_completions.end(); ) {
+      if ((*itr)->start_time() < limit) {
+       m_recent_completions.erase(itr++);
+      } else {
+       ++itr;
+      }
+    }
+    m_recent_completions.insert(io);
+  }
+
+  void require_image(uint64_t ts,
+                    Thread::ptr thread,
+                    imagectx_id_t imagectx,
+                    const string& name,
+                    const string& snap_name,
+                    bool readonly) {
+    assert(thread);
+    if (m_open_images.count(imagectx) > 0) {
+      return;
+    }
+    action_id_t ionum = next_id();
+    IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), thread->pending_io(), imagectx, name, snap_name, readonly));
+    io->add_dependencies(m_recent_completions);
+    thread->issued_io(io, m_threads);
+    m_ios.push_back(io);
+    IO::ptr completionIO(io->create_completion(ts - 1, thread->id()));
+    m_ios.push_back(completionIO);
+    completed(completionIO);
+    m_open_images.insert(imagectx);
+  }
+
+  uint64_t m_window;
+  map<thread_id_t, Thread::ptr> m_threads;
+  uint32_t m_io_count;
+  io_set_t m_recent_completions;
+  set<imagectx_id_t> m_open_images;
+  vector<IO::ptr> m_ios;
+
+  // keyed by completion
+  map<uint64_t, IO::ptr> m_pending_ios;
+};
+
+int main(int argc, char** argv) {
+  vector<string> args;
+  for (int i = 0; i < argc; i++) {
+    args.push_back(string(argv[i]));
+  }
+
+  Processor p;
+  p.run(args);
+}