From 4f6e775c320a9fa0091a74a6813549b6eb056109 Mon Sep 17 00:00:00 2001 From: Adam Crume Date: Mon, 21 Jul 2014 12:02:54 -0700 Subject: [PATCH] rbd-replay: Clean up prep-for-replay.py Signed-off-by: Adam Crume --- examples/rbd-replay/run-prep-for-replay | 2 +- src/rbd_replay/prep-for-replay.py | 400 ++++++++++++------------ 2 files changed, 199 insertions(+), 203 deletions(-) diff --git a/examples/rbd-replay/run-prep-for-replay b/examples/rbd-replay/run-prep-for-replay index 64afd31ed3481..394caf89cf34c 100755 --- a/examples/rbd-replay/run-prep-for-replay +++ b/examples/rbd-replay/run-prep-for-replay @@ -1,3 +1,3 @@ #!/bin/bash -PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit +PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit replay.bin diff --git a/src/rbd_replay/prep-for-replay.py b/src/rbd_replay/prep-for-replay.py index 8b4273925ff61..5ae5782f7d1d4 100755 --- a/src/rbd_replay/prep-for-replay.py +++ b/src/rbd_replay/prep-for-replay.py @@ -14,28 +14,10 @@ # from babeltrace import * -import cProfile import datetime import struct import sys -traces = TraceCollection() -ret = traces.add_trace(sys.argv[1], "ctf") - -pendingIOs = {} -threads = {} -ios = [] -printOnRead = False -recentCompletions = [] -limit = 100000000000 -ignoreWrites = True - -ioCount = 0 -def nextID(): - global ioCount - val = ioCount - ioCount = ioCount + 2 - return val class Extent(object): def __init__(self, offset, length): @@ -47,8 +29,10 @@ class Extent(object): return "Extent(" + str(self.offset) + "," + str(self.length) + ")" class Thread(object): - def __init__(self, id): + def __init__(self, id, threads, window): self.id = id + self.threads = threads + self.window = window self.pendingIO = None self.latestIO = None # may not be meaningful self.latestCompletion = None # may not be meaningful @@ -58,9 +42,9 @@ class Thread(object): self.lastTS = ts def issuedIO(self, io): latestIOs = [] - for threadID in threads: - thread = threads[threadID] - if thread.latestIO and thread.latestIO.start_time > io.start_time - window: + for threadID in self.threads: + thread = self.threads[threadID] + if thread.latestIO and thread.latestIO.start_time > io.start_time - self.window: latestIOs.append(thread.latestIO) io.addDependencies(latestIOs) self.latestIO = io @@ -170,7 +154,7 @@ class IO(object): for dep in self.dependencies: deps[dep.ionum] = self.start_time - dep.start_time return deps - def addThreadCompletionDependencies(self, threads): + def addThreadCompletionDependencies(self, threads, recentCompletions): self.addDependencies(recentCompletions) def numCompletionSuccessors(self): return self.completion.numSuccessors if self.completion else 0 @@ -291,208 +275,220 @@ class CompletionIO(IO): return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap()) -window = 1 * 1e9 - -def completed(io): - global recentCompletions - recentCompletions.append(io) - recentCompletions[:] = [x for x in recentCompletions if x.start_time > io.start_time - window] - # while recentCompletions[0].start_time < io.start_time - window: - # del recentCompletions[0] +class Processor(object): + def __init__(self): + self.window = 1 * 1e9 + self.threads = {} + self.ioCount = 0 + self.recentCompletions = [] + def nextID(self): + val = self.ioCount + self.ioCount = self.ioCount + 2 + return val + def completed(self, io): + self.recentCompletions.append(io) + self.recentCompletions[:] = [x for x in self.recentCompletions if x.start_time > io.start_time - self.window] + def run(self, args): + inputFileName = args[0] + outputFileName = args[1] + ios = [] + pendingIOs = {} + limit = 100000000000 + ignoreWrites = True + printOnRead = False + printOnWrite = False + threads = {} + traces = TraceCollection() + traces.add_trace(inputFileName, "ctf") -def main(): - global ios - # Parse phase - trace_start = None - count = 0 - for event in traces.events: - count = count + 1 - if count > limit: - break - ts = event.timestamp - if not trace_start: - trace_start = ts - ts = ts - trace_start - threadID = event["pthread_id"] - if threadID in threads: - thread = threads[threadID] - else: - thread = Thread(threadID) - threads[threadID] = thread - ionum = nextID() - io = StartThreadIO(ionum, ts, thread) - ios.append(io) - if printOnRead: - print str(io) - thread.insertTS(ts) - if event.name == "librbd:read_enter": - name = event["name"] - readid = event["id"] - imagectx = event["imagectx"] - ionum = nextID() - thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) - thread.pendingIO.addThreadCompletionDependencies(threads) - thread.issuedIO(thread.pendingIO) - ios.append(thread.pendingIO) - elif event.name == "librbd:open_image_enter": - imagectx = event["imagectx"] - name = event["name"] - snap_name = event["snap_name"] - readid = event["id"] - readonly = event["read_only"] - ionum = nextID() - thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly) - thread.pendingIO.addThreadCompletionDependencies(threads) - thread.issuedIO(thread.pendingIO) - ios.append(thread.pendingIO) - elif event.name == "librbd:open_image_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - ios.append(completionIO) - completed(completionIO) - if printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:close_image_enter": - imagectx = event["imagectx"] - ionum = nextID() - thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx) - thread.pendingIO.addThreadCompletionDependencies(threads) - thread.issuedIO(thread.pendingIO) - ios.append(thread.pendingIO) - elif event.name == "librbd:close_image_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - ios.append(completionIO) - completed(completionIO) - if printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:read_extent": - offset = event["offset"] - length = event["length"] - thread.pendingIO.extents.append(Extent(offset, length)) - elif event.name == "librbd:read_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - ios.append(completionIO) - completed(completionIO) - if printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:write_enter": - if not ignoreWrites: + # Parse phase + trace_start = None + count = 0 + for event in traces.events: + count = count + 1 + if count > limit: + break + ts = event.timestamp + if not trace_start: + trace_start = ts + ts = ts - trace_start + threadID = event["pthread_id"] + if threadID in threads: + thread = threads[threadID] + else: + thread = Thread(threadID, threads, self.window) + threads[threadID] = thread + ionum = self.nextID() + io = StartThreadIO(ionum, ts, thread) + ios.append(io) + if printOnRead: + print str(io) + thread.insertTS(ts) + if event.name == "librbd:read_enter": name = event["name"] readid = event["id"] - offset = event["off"] - length = event["buf_len"] imagectx = event["imagectx"] - ionum = nextID() - thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) - thread.pendingIO.addThreadCompletionDependencies(threads) + ionum = self.nextID() + thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) + thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions) thread.issuedIO(thread.pendingIO) ios.append(thread.pendingIO) - elif event.name == "librbd:write_exit": - if not ignoreWrites: + elif event.name == "librbd:open_image_enter": + imagectx = event["imagectx"] + name = event["name"] + snap_name = event["snap_name"] + readid = event["id"] + readonly = event["read_only"] + ionum = self.nextID() + thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly) + thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + elif event.name == "librbd:open_image_exit": thread.pendingIO.end_time = ts completionIO = CompletionIO(ts, thread, thread.pendingIO) thread.completedIO(completionIO) ios.append(completionIO) - completed(completionIO) + self.completed(completionIO) if printOnRead: print str(thread.pendingIO) - elif event.name == "librbd:aio_read_enter": - name = event["name"] - readid = event["id"] - completion = event["completion"] - imagectx = event["imagectx"] - ionum = nextID() - thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) - thread.pendingIO.addThreadCompletionDependencies(threads) - ios.append(thread.pendingIO) - thread.issuedIO(thread.pendingIO) - pendingIOs[completion] = thread.pendingIO - elif event.name == "librbd:aio_read_extent": - offset = event["offset"] - length = event["length"] - thread.pendingIO.extents.append(Extent(offset, length)) - elif event.name == "librbd:aio_read_exit": - if printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:aio_write_enter": - if not ignoreWrites: - name = event["name"] - writeid = event["id"] - offset = event["off"] - length = event["len"] - completion = event["completion"] + elif event.name == "librbd:close_image_enter": imagectx = event["imagectx"] - ionum = nextID() - thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) - thread.pendingIO.addThreadCompletionDependencies(threads) + ionum = self.nextID() + thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx) + thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions) thread.issuedIO(thread.pendingIO) ios.append(thread.pendingIO) - pendingIOs[completion] = thread.pendingIO + elif event.name == "librbd:close_image_exit": + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) + ios.append(completionIO) + self.completed(completionIO) if printOnRead: print str(thread.pendingIO) - elif event.name == "librbd:aio_complete_enter": - completion = event["completion"] - retval = event["rval"] - if completion in pendingIOs: - completedIO = pendingIOs[completion] - del pendingIOs[completion] - completedIO.end_time = ts - completionIO = CompletionIO(ts, thread, completedIO) - completedIO.thread.completedIO(completionIO) + elif event.name == "librbd:read_extent": + offset = event["offset"] + length = event["length"] + thread.pendingIO.extents.append(Extent(offset, length)) + elif event.name == "librbd:read_exit": + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) ios.append(completionIO) completed(completionIO) if printOnRead: - print str(completionIO) - + print str(thread.pendingIO) + elif event.name == "librbd:write_enter": + if not ignoreWrites: + name = event["name"] + readid = event["id"] + offset = event["off"] + length = event["buf_len"] + imagectx = event["imagectx"] + ionum = self.nextID() + thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) + thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + elif event.name == "librbd:write_exit": + if not ignoreWrites: + thread.pendingIO.end_time = ts + completionIO = CompletionIO(ts, thread, thread.pendingIO) + thread.completedIO(completionIO) + ios.append(completionIO) + completed(completionIO) + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:aio_read_enter": + name = event["name"] + readid = event["id"] + completion = event["completion"] + imagectx = event["imagectx"] + ionum = self.nextID() + thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) + thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions) + ios.append(thread.pendingIO) + thread.issuedIO(thread.pendingIO) + pendingIOs[completion] = thread.pendingIO + elif event.name == "librbd:aio_read_extent": + offset = event["offset"] + length = event["length"] + thread.pendingIO.extents.append(Extent(offset, length)) + elif event.name == "librbd:aio_read_exit": + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:aio_write_enter": + if not ignoreWrites: + name = event["name"] + writeid = event["id"] + offset = event["off"] + length = event["len"] + completion = event["completion"] + imagectx = event["imagectx"] + ionum = self.nextID() + thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) + thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions) + thread.issuedIO(thread.pendingIO) + ios.append(thread.pendingIO) + pendingIOs[completion] = thread.pendingIO + if printOnRead: + print str(thread.pendingIO) + elif event.name == "librbd:aio_complete_enter": + completion = event["completion"] + retval = event["rval"] + if completion in pendingIOs: + completedIO = pendingIOs[completion] + del pendingIOs[completion] + completedIO.end_time = ts + completionIO = CompletionIO(ts, thread, completedIO) + completedIO.thread.completedIO(completionIO) + ios.append(completionIO) + self.completed(completionIO) + if printOnRead: + print str(completionIO) - # Insert-thread-stop phase - ios = sorted(ios, key = lambda io: io.start_time) - for threadID in threads: - thread = threads[threadID] - ionum = None - maxIONum = 0 # only valid if ionum is None - for io in ios: - if io.ionum > maxIONum: - maxIONum = io.ionum - if io.start_time > thread.lastTS: - ionum = io.ionum - if ionum % 2 == 1: - ionum = ionum + 1 - break - if not ionum: - if maxIONum % 2 == 1: - maxIONum = maxIONum - 1 - ionum = maxIONum + 2 - for io in ios: - if io.ionum >= ionum: - io.ionum = io.ionum + 2 - # TODO: Insert in the right place, don't append and re-sort - ios.append(StopThreadIO(ionum, thread.lastTS, thread)) + # Insert-thread-stop phase ios = sorted(ios, key = lambda io: io.start_time) + for threadID in threads: + thread = threads[threadID] + ionum = None + maxIONum = 0 # only valid if ionum is None + for io in ios: + if io.ionum > maxIONum: + maxIONum = io.ionum + if io.start_time > thread.lastTS: + ionum = io.ionum + if ionum % 2 == 1: + ionum = ionum + 1 + break + if not ionum: + if maxIONum % 2 == 1: + maxIONum = maxIONum - 1 + ionum = maxIONum + 2 + for io in ios: + if io.ionum >= ionum: + io.ionum = io.ionum + 2 + # TODO: Insert in the right place, don't append and re-sort + ios.append(StopThreadIO(ionum, thread.lastTS, thread)) + ios = sorted(ios, key = lambda io: io.start_time) + for io in ios: + if io.prev and io.prev in io.dependencies: + io.dependencies.remove(io.prev) + if io.isCompletion: + io.dependencies.clear() + for dep in io.dependencies: + dep.numSuccessors = dep.numSuccessors + 1 - for io in ios: - if io.prev and io.prev in io.dependencies: - io.dependencies.remove(io.prev) - if io.isCompletion: - io.dependencies.clear() - for dep in io.dependencies: - dep.numSuccessors = dep.numSuccessors + 1 - - print - # for io in ios: - # if not io.isCompletion: - # print str(io) + if printOnRead and printOnWrite: + print - with open("replay.bin", "wb") as f: - for io in ios: - if not io.isCompletion: - print str(io) - io.writeTo(f) + with open(outputFileName, "wb") as f: + for io in ios: + if printOnWrite and not io.isCompletion: + print str(io) + io.writeTo(f) -cProfile.run("main()") +if __name__ == '__main__': + Processor().run(sys.argv[1:]) -- 2.39.5