]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-replay: Clean up prep-for-replay.py
authorAdam Crume <adamcrume@gmail.com>
Mon, 21 Jul 2014 19:02:54 +0000 (12:02 -0700)
committerSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 17:57:30 +0000 (10:57 -0700)
Signed-off-by: Adam Crume <adamcrume@gmail.com>
examples/rbd-replay/run-prep-for-replay
src/rbd_replay/prep-for-replay.py

index 64afd31ed3481b59302f6d8b5909d9199e1b26cd..394caf89cf34c6dfed02f98f14c1f395a85dc19a 100755 (executable)
@@ -1,3 +1,3 @@
 #!/bin/bash
 
-PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit
+PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit replay.bin
index 8b4273925ff61ba08077104a5040ccaa77c8a662..5ae5782f7d1d4df561770576c5785de85299c426 100755 (executable)
 #
 
 from babeltrace import *
-import cProfile
 import datetime
 import struct
 import sys
 
-traces = TraceCollection()
-ret = traces.add_trace(sys.argv[1], "ctf")
-
-pendingIOs = {}
-threads = {}
-ios = []
-printOnRead = False
-recentCompletions = []
-limit = 100000000000
-ignoreWrites = True
-
-ioCount = 0
-def nextID():
-    global ioCount
-    val = ioCount
-    ioCount = ioCount + 2
-    return val
 
 class Extent(object):
     def __init__(self, offset, length):
@@ -47,8 +29,10 @@ class Extent(object):
         return "Extent(" + str(self.offset) + "," + str(self.length) + ")"
 
 class Thread(object):
-    def __init__(self, id):
+    def __init__(self, id, threads, window):
         self.id = id
+       self.threads = threads
+       self.window = window
         self.pendingIO = None
         self.latestIO = None # may not be meaningful
         self.latestCompletion = None # may not be meaningful
@@ -58,9 +42,9 @@ class Thread(object):
             self.lastTS = ts
     def issuedIO(self, io):
         latestIOs = []
-        for threadID in threads:
-            thread = threads[threadID]
-            if thread.latestIO and thread.latestIO.start_time > io.start_time - window:
+        for threadID in self.threads:
+            thread = self.threads[threadID]
+            if thread.latestIO and thread.latestIO.start_time > io.start_time - self.window:
                 latestIOs.append(thread.latestIO)
         io.addDependencies(latestIOs)
         self.latestIO = io
@@ -170,7 +154,7 @@ class IO(object):
         for dep in self.dependencies:
             deps[dep.ionum] = self.start_time - dep.start_time
         return deps
-    def addThreadCompletionDependencies(self, threads):
+    def addThreadCompletionDependencies(self, threads, recentCompletions):
         self.addDependencies(recentCompletions)
     def numCompletionSuccessors(self):
         return self.completion.numSuccessors if self.completion else 0
@@ -291,208 +275,220 @@ class CompletionIO(IO):
         return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap())
 
 
-window = 1 * 1e9
-
-def completed(io):
-    global recentCompletions
-    recentCompletions.append(io)
-    recentCompletions[:] = [x for x in recentCompletions if x.start_time > io.start_time - window]
-    # while recentCompletions[0].start_time < io.start_time - window:
-    #     del recentCompletions[0]
+class Processor(object):
+    def __init__(self):
+       self.window = 1 * 1e9
+       self.threads = {}
+       self.ioCount = 0
+       self.recentCompletions = []
+    def nextID(self):
+       val = self.ioCount
+       self.ioCount = self.ioCount + 2
+       return val
+    def completed(self, io):
+       self.recentCompletions.append(io)
+       self.recentCompletions[:] = [x for x in self.recentCompletions if x.start_time > io.start_time - self.window]
+    def run(self, args):
+       inputFileName = args[0]
+       outputFileName = args[1]
+        ios = []
+        pendingIOs = {}
+        limit = 100000000000
+        ignoreWrites = True
+        printOnRead = False
+        printOnWrite = False
+        threads = {}
+        traces = TraceCollection()
+        traces.add_trace(inputFileName, "ctf")
 
-def main():
-    global ios
-    # Parse phase
-    trace_start = None
-    count = 0
-    for event in traces.events:
-        count = count + 1
-        if count > limit:
-            break
-        ts = event.timestamp
-        if not trace_start:
-            trace_start = ts
-        ts = ts - trace_start
-        threadID = event["pthread_id"]
-        if threadID in threads:
-            thread = threads[threadID]
-        else:
-            thread = Thread(threadID)
-            threads[threadID] = thread
-            ionum = nextID()
-            io = StartThreadIO(ionum, ts, thread)
-            ios.append(io)
-            if printOnRead:
-                print str(io)
-        thread.insertTS(ts)
-        if event.name == "librbd:read_enter":
-            name = event["name"]
-            readid = event["id"]
-            imagectx = event["imagectx"]
-            ionum = nextID()
-            thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
-            thread.pendingIO.addThreadCompletionDependencies(threads)
-            thread.issuedIO(thread.pendingIO)
-            ios.append(thread.pendingIO)
-        elif event.name == "librbd:open_image_enter":
-            imagectx = event["imagectx"]
-            name = event["name"]
-            snap_name = event["snap_name"]
-            readid = event["id"]
-            readonly = event["read_only"]
-            ionum = nextID()
-            thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
-            thread.pendingIO.addThreadCompletionDependencies(threads)
-            thread.issuedIO(thread.pendingIO)
-            ios.append(thread.pendingIO)
-        elif event.name == "librbd:open_image_exit":
-            thread.pendingIO.end_time = ts
-            completionIO = CompletionIO(ts, thread, thread.pendingIO)
-            thread.completedIO(completionIO)
-            ios.append(completionIO)
-            completed(completionIO)
-            if printOnRead:
-                print str(thread.pendingIO)
-        elif event.name == "librbd:close_image_enter":
-            imagectx = event["imagectx"]
-            ionum = nextID()
-            thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
-            thread.pendingIO.addThreadCompletionDependencies(threads)
-            thread.issuedIO(thread.pendingIO)
-            ios.append(thread.pendingIO)
-        elif event.name == "librbd:close_image_exit":
-            thread.pendingIO.end_time = ts
-            completionIO = CompletionIO(ts, thread, thread.pendingIO)
-            thread.completedIO(completionIO)
-            ios.append(completionIO)
-            completed(completionIO)
-            if printOnRead:
-                print str(thread.pendingIO)
-        elif event.name == "librbd:read_extent":
-            offset = event["offset"]
-            length = event["length"]
-            thread.pendingIO.extents.append(Extent(offset, length))
-        elif event.name == "librbd:read_exit":
-            thread.pendingIO.end_time = ts
-            completionIO = CompletionIO(ts, thread, thread.pendingIO)
-            thread.completedIO(completionIO)
-            ios.append(completionIO)
-            completed(completionIO)
-            if printOnRead:
-                print str(thread.pendingIO)
-        elif event.name == "librbd:write_enter":
-            if not ignoreWrites:
+        # Parse phase
+        trace_start = None
+        count = 0
+        for event in traces.events:
+            count = count + 1
+            if count > limit:
+                break
+            ts = event.timestamp
+            if not trace_start:
+                trace_start = ts
+            ts = ts - trace_start
+            threadID = event["pthread_id"]
+            if threadID in threads:
+                thread = threads[threadID]
+            else:
+                thread = Thread(threadID, threads, self.window)
+                threads[threadID] = thread
+                ionum = self.nextID()
+                io = StartThreadIO(ionum, ts, thread)
+                ios.append(io)
+                if printOnRead:
+                    print str(io)
+            thread.insertTS(ts)
+            if event.name == "librbd:read_enter":
                 name = event["name"]
                 readid = event["id"]
-                offset = event["off"]
-                length = event["buf_len"]
                 imagectx = event["imagectx"]
-                ionum = nextID()
-                thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
-                thread.pendingIO.addThreadCompletionDependencies(threads)
+                ionum = self.nextID()
+                thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+                thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
                 thread.issuedIO(thread.pendingIO)
                 ios.append(thread.pendingIO)
-        elif event.name == "librbd:write_exit":
-            if not ignoreWrites:
+            elif event.name == "librbd:open_image_enter":
+                imagectx = event["imagectx"]
+                name = event["name"]
+                snap_name = event["snap_name"]
+                readid = event["id"]
+                readonly = event["read_only"]
+                ionum = self.nextID()
+                thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
+                thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+                thread.issuedIO(thread.pendingIO)
+                ios.append(thread.pendingIO)
+            elif event.name == "librbd:open_image_exit":
                 thread.pendingIO.end_time = ts
                 completionIO = CompletionIO(ts, thread, thread.pendingIO)
                 thread.completedIO(completionIO)
                 ios.append(completionIO)
-                completed(completionIO)
+                self.completed(completionIO)
                 if printOnRead:
                     print str(thread.pendingIO)
-        elif event.name == "librbd:aio_read_enter":
-            name = event["name"]
-            readid = event["id"]
-            completion = event["completion"]
-            imagectx = event["imagectx"]
-            ionum = nextID()
-            thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
-            thread.pendingIO.addThreadCompletionDependencies(threads)
-            ios.append(thread.pendingIO)
-            thread.issuedIO(thread.pendingIO)
-            pendingIOs[completion] = thread.pendingIO
-        elif event.name == "librbd:aio_read_extent":
-            offset = event["offset"]
-            length = event["length"]
-            thread.pendingIO.extents.append(Extent(offset, length))
-        elif event.name == "librbd:aio_read_exit":
-            if printOnRead:
-                print str(thread.pendingIO)
-        elif event.name == "librbd:aio_write_enter":
-            if not ignoreWrites:
-                name = event["name"]
-                writeid = event["id"]
-                offset = event["off"]
-                length = event["len"]
-                completion = event["completion"]
+            elif event.name == "librbd:close_image_enter":
                 imagectx = event["imagectx"]
-                ionum = nextID()
-                thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
-                thread.pendingIO.addThreadCompletionDependencies(threads)
+                ionum = self.nextID()
+                thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
+                thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
                 thread.issuedIO(thread.pendingIO)
                 ios.append(thread.pendingIO)
-                pendingIOs[completion] = thread.pendingIO
+            elif event.name == "librbd:close_image_exit":
+                thread.pendingIO.end_time = ts
+                completionIO = CompletionIO(ts, thread, thread.pendingIO)
+                thread.completedIO(completionIO)
+                ios.append(completionIO)
+                self.completed(completionIO)
                 if printOnRead:
                     print str(thread.pendingIO)
-        elif event.name == "librbd:aio_complete_enter":
-            completion = event["completion"]
-            retval = event["rval"]
-            if completion in pendingIOs:
-                completedIO = pendingIOs[completion]
-                del pendingIOs[completion]
-                completedIO.end_time = ts
-                completionIO = CompletionIO(ts, thread, completedIO)
-                completedIO.thread.completedIO(completionIO)
+            elif event.name == "librbd:read_extent":
+                offset = event["offset"]
+                length = event["length"]
+                thread.pendingIO.extents.append(Extent(offset, length))
+            elif event.name == "librbd:read_exit":
+                thread.pendingIO.end_time = ts
+                completionIO = CompletionIO(ts, thread, thread.pendingIO)
+                thread.completedIO(completionIO)
                 ios.append(completionIO)
                 completed(completionIO)
                 if printOnRead:
-                    print str(completionIO)
-
+                    print str(thread.pendingIO)
+            elif event.name == "librbd:write_enter":
+                if not ignoreWrites:
+                    name = event["name"]
+                    readid = event["id"]
+                    offset = event["off"]
+                    length = event["buf_len"]
+                    imagectx = event["imagectx"]
+                    ionum = self.nextID()
+                    thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+                    thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+                    thread.issuedIO(thread.pendingIO)
+                    ios.append(thread.pendingIO)
+            elif event.name == "librbd:write_exit":
+                if not ignoreWrites:
+                    thread.pendingIO.end_time = ts
+                    completionIO = CompletionIO(ts, thread, thread.pendingIO)
+                    thread.completedIO(completionIO)
+                    ios.append(completionIO)
+                    completed(completionIO)
+                    if printOnRead:
+                        print str(thread.pendingIO)
+            elif event.name == "librbd:aio_read_enter":
+                name = event["name"]
+                readid = event["id"]
+                completion = event["completion"]
+                imagectx = event["imagectx"]
+                ionum = self.nextID()
+                thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+                thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+                ios.append(thread.pendingIO)
+                thread.issuedIO(thread.pendingIO)
+                pendingIOs[completion] = thread.pendingIO
+            elif event.name == "librbd:aio_read_extent":
+                offset = event["offset"]
+                length = event["length"]
+                thread.pendingIO.extents.append(Extent(offset, length))
+            elif event.name == "librbd:aio_read_exit":
+                if printOnRead:
+                    print str(thread.pendingIO)
+            elif event.name == "librbd:aio_write_enter":
+                if not ignoreWrites:
+                    name = event["name"]
+                    writeid = event["id"]
+                    offset = event["off"]
+                    length = event["len"]
+                    completion = event["completion"]
+                    imagectx = event["imagectx"]
+                    ionum = self.nextID()
+                    thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+                    thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+                    thread.issuedIO(thread.pendingIO)
+                    ios.append(thread.pendingIO)
+                    pendingIOs[completion] = thread.pendingIO
+                    if printOnRead:
+                        print str(thread.pendingIO)
+            elif event.name == "librbd:aio_complete_enter":
+                completion = event["completion"]
+                retval = event["rval"]
+                if completion in pendingIOs:
+                    completedIO = pendingIOs[completion]
+                    del pendingIOs[completion]
+                    completedIO.end_time = ts
+                    completionIO = CompletionIO(ts, thread, completedIO)
+                    completedIO.thread.completedIO(completionIO)
+                    ios.append(completionIO)
+                    self.completed(completionIO)
+                    if printOnRead:
+                        print str(completionIO)
 
-    # Insert-thread-stop phase
-    ios = sorted(ios, key = lambda io: io.start_time)
-    for threadID in threads:
-        thread = threads[threadID]
-        ionum = None
-        maxIONum = 0 # only valid if ionum is None
-        for io in ios:
-            if io.ionum > maxIONum:
-                maxIONum = io.ionum
-            if io.start_time > thread.lastTS:
-                ionum = io.ionum
-                if ionum % 2 == 1:
-                    ionum = ionum + 1
-                break
-        if not ionum:
-            if maxIONum % 2 == 1:
-                maxIONum = maxIONum - 1
-            ionum = maxIONum + 2
-        for io in ios:
-            if io.ionum >= ionum:
-                io.ionum = io.ionum + 2
-        # TODO: Insert in the right place, don't append and re-sort
-        ios.append(StopThreadIO(ionum, thread.lastTS, thread))
+        # Insert-thread-stop phase
         ios = sorted(ios, key = lambda io: io.start_time)
+        for threadID in threads:
+            thread = threads[threadID]
+            ionum = None
+            maxIONum = 0 # only valid if ionum is None
+            for io in ios:
+                if io.ionum > maxIONum:
+                    maxIONum = io.ionum
+                if io.start_time > thread.lastTS:
+                    ionum = io.ionum
+                    if ionum % 2 == 1:
+                        ionum = ionum + 1
+                    break
+            if not ionum:
+                if maxIONum % 2 == 1:
+                    maxIONum = maxIONum - 1
+                ionum = maxIONum + 2
+            for io in ios:
+                if io.ionum >= ionum:
+                    io.ionum = io.ionum + 2
+            # TODO: Insert in the right place, don't append and re-sort
+            ios.append(StopThreadIO(ionum, thread.lastTS, thread))
+            ios = sorted(ios, key = lambda io: io.start_time)
 
+        for io in ios:
+            if io.prev and io.prev in io.dependencies:
+                io.dependencies.remove(io.prev)
+            if io.isCompletion:
+                io.dependencies.clear()
+            for dep in io.dependencies:
+                dep.numSuccessors = dep.numSuccessors + 1
 
-    for io in ios:
-        if io.prev and io.prev in io.dependencies:
-            io.dependencies.remove(io.prev)
-        if io.isCompletion:
-            io.dependencies.clear()
-        for dep in io.dependencies:
-            dep.numSuccessors = dep.numSuccessors + 1
-
-    print
-    # for io in ios:
-    #     if not io.isCompletion:
-    #         print str(io)
+        if printOnRead and printOnWrite:
+           print
 
-    with open("replay.bin", "wb") as f:
-        for io in ios:
-            if not io.isCompletion:
-                print str(io)
-            io.writeTo(f)
+        with open(outputFileName, "wb") as f:
+            for io in ios:
+                if printOnWrite and not io.isCompletion:
+                    print str(io)
+                io.writeTo(f)
 
-cProfile.run("main()")
+if __name__ == '__main__':
+    Processor().run(sys.argv[1:])