#
from babeltrace import *
-import cProfile
import datetime
import struct
import sys
-traces = TraceCollection()
-ret = traces.add_trace(sys.argv[1], "ctf")
-
-pendingIOs = {}
-threads = {}
-ios = []
-printOnRead = False
-recentCompletions = []
-limit = 100000000000
-ignoreWrites = True
-
-ioCount = 0
-def nextID():
- global ioCount
- val = ioCount
- ioCount = ioCount + 2
- return val
class Extent(object):
def __init__(self, offset, length):
return "Extent(" + str(self.offset) + "," + str(self.length) + ")"
class Thread(object):
- def __init__(self, id):
+ def __init__(self, id, threads, window):
self.id = id
+ self.threads = threads
+ self.window = window
self.pendingIO = None
self.latestIO = None # may not be meaningful
self.latestCompletion = None # may not be meaningful
self.lastTS = ts
def issuedIO(self, io):
latestIOs = []
- for threadID in threads:
- thread = threads[threadID]
- if thread.latestIO and thread.latestIO.start_time > io.start_time - window:
+ for threadID in self.threads:
+ thread = self.threads[threadID]
+ if thread.latestIO and thread.latestIO.start_time > io.start_time - self.window:
latestIOs.append(thread.latestIO)
io.addDependencies(latestIOs)
self.latestIO = io
for dep in self.dependencies:
deps[dep.ionum] = self.start_time - dep.start_time
return deps
- def addThreadCompletionDependencies(self, threads):
+ def addThreadCompletionDependencies(self, threads, recentCompletions):
self.addDependencies(recentCompletions)
def numCompletionSuccessors(self):
return self.completion.numSuccessors if self.completion else 0
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap())
-window = 1 * 1e9
-
-def completed(io):
- global recentCompletions
- recentCompletions.append(io)
- recentCompletions[:] = [x for x in recentCompletions if x.start_time > io.start_time - window]
- # while recentCompletions[0].start_time < io.start_time - window:
- # del recentCompletions[0]
+class Processor(object):
+ def __init__(self):
+ self.window = 1 * 1e9
+ self.threads = {}
+ self.ioCount = 0
+ self.recentCompletions = []
+ def nextID(self):
+ val = self.ioCount
+ self.ioCount = self.ioCount + 2
+ return val
+ def completed(self, io):
+ self.recentCompletions.append(io)
+ self.recentCompletions[:] = [x for x in self.recentCompletions if x.start_time > io.start_time - self.window]
+ def run(self, args):
+ inputFileName = args[0]
+ outputFileName = args[1]
+ ios = []
+ pendingIOs = {}
+ limit = 100000000000
+ ignoreWrites = True
+ printOnRead = False
+ printOnWrite = False
+ threads = {}
+ traces = TraceCollection()
+ traces.add_trace(inputFileName, "ctf")
-def main():
- global ios
- # Parse phase
- trace_start = None
- count = 0
- for event in traces.events:
- count = count + 1
- if count > limit:
- break
- ts = event.timestamp
- if not trace_start:
- trace_start = ts
- ts = ts - trace_start
- threadID = event["pthread_id"]
- if threadID in threads:
- thread = threads[threadID]
- else:
- thread = Thread(threadID)
- threads[threadID] = thread
- ionum = nextID()
- io = StartThreadIO(ionum, ts, thread)
- ios.append(io)
- if printOnRead:
- print str(io)
- thread.insertTS(ts)
- if event.name == "librbd:read_enter":
- name = event["name"]
- readid = event["id"]
- imagectx = event["imagectx"]
- ionum = nextID()
- thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
- thread.pendingIO.addThreadCompletionDependencies(threads)
- thread.issuedIO(thread.pendingIO)
- ios.append(thread.pendingIO)
- elif event.name == "librbd:open_image_enter":
- imagectx = event["imagectx"]
- name = event["name"]
- snap_name = event["snap_name"]
- readid = event["id"]
- readonly = event["read_only"]
- ionum = nextID()
- thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
- thread.pendingIO.addThreadCompletionDependencies(threads)
- thread.issuedIO(thread.pendingIO)
- ios.append(thread.pendingIO)
- elif event.name == "librbd:open_image_exit":
- thread.pendingIO.end_time = ts
- completionIO = CompletionIO(ts, thread, thread.pendingIO)
- thread.completedIO(completionIO)
- ios.append(completionIO)
- completed(completionIO)
- if printOnRead:
- print str(thread.pendingIO)
- elif event.name == "librbd:close_image_enter":
- imagectx = event["imagectx"]
- ionum = nextID()
- thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
- thread.pendingIO.addThreadCompletionDependencies(threads)
- thread.issuedIO(thread.pendingIO)
- ios.append(thread.pendingIO)
- elif event.name == "librbd:close_image_exit":
- thread.pendingIO.end_time = ts
- completionIO = CompletionIO(ts, thread, thread.pendingIO)
- thread.completedIO(completionIO)
- ios.append(completionIO)
- completed(completionIO)
- if printOnRead:
- print str(thread.pendingIO)
- elif event.name == "librbd:read_extent":
- offset = event["offset"]
- length = event["length"]
- thread.pendingIO.extents.append(Extent(offset, length))
- elif event.name == "librbd:read_exit":
- thread.pendingIO.end_time = ts
- completionIO = CompletionIO(ts, thread, thread.pendingIO)
- thread.completedIO(completionIO)
- ios.append(completionIO)
- completed(completionIO)
- if printOnRead:
- print str(thread.pendingIO)
- elif event.name == "librbd:write_enter":
- if not ignoreWrites:
+ # Parse phase
+ trace_start = None
+ count = 0
+ for event in traces.events:
+ count = count + 1
+ if count > limit:
+ break
+ ts = event.timestamp
+ if not trace_start:
+ trace_start = ts
+ ts = ts - trace_start
+ threadID = event["pthread_id"]
+ if threadID in threads:
+ thread = threads[threadID]
+ else:
+ thread = Thread(threadID, threads, self.window)
+ threads[threadID] = thread
+ ionum = self.nextID()
+ io = StartThreadIO(ionum, ts, thread)
+ ios.append(io)
+ if printOnRead:
+ print str(io)
+ thread.insertTS(ts)
+ if event.name == "librbd:read_enter":
name = event["name"]
readid = event["id"]
- offset = event["off"]
- length = event["buf_len"]
imagectx = event["imagectx"]
- ionum = nextID()
- thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
- thread.pendingIO.addThreadCompletionDependencies(threads)
+ ionum = self.nextID()
+ thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+ thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
ios.append(thread.pendingIO)
- elif event.name == "librbd:write_exit":
- if not ignoreWrites:
+ elif event.name == "librbd:open_image_enter":
+ imagectx = event["imagectx"]
+ name = event["name"]
+ snap_name = event["snap_name"]
+ readid = event["id"]
+ readonly = event["read_only"]
+ ionum = self.nextID()
+ thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
+ thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ elif event.name == "librbd:open_image_exit":
thread.pendingIO.end_time = ts
completionIO = CompletionIO(ts, thread, thread.pendingIO)
thread.completedIO(completionIO)
ios.append(completionIO)
- completed(completionIO)
+ self.completed(completionIO)
if printOnRead:
print str(thread.pendingIO)
- elif event.name == "librbd:aio_read_enter":
- name = event["name"]
- readid = event["id"]
- completion = event["completion"]
- imagectx = event["imagectx"]
- ionum = nextID()
- thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
- thread.pendingIO.addThreadCompletionDependencies(threads)
- ios.append(thread.pendingIO)
- thread.issuedIO(thread.pendingIO)
- pendingIOs[completion] = thread.pendingIO
- elif event.name == "librbd:aio_read_extent":
- offset = event["offset"]
- length = event["length"]
- thread.pendingIO.extents.append(Extent(offset, length))
- elif event.name == "librbd:aio_read_exit":
- if printOnRead:
- print str(thread.pendingIO)
- elif event.name == "librbd:aio_write_enter":
- if not ignoreWrites:
- name = event["name"]
- writeid = event["id"]
- offset = event["off"]
- length = event["len"]
- completion = event["completion"]
+ elif event.name == "librbd:close_image_enter":
imagectx = event["imagectx"]
- ionum = nextID()
- thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
- thread.pendingIO.addThreadCompletionDependencies(threads)
+ ionum = self.nextID()
+ thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
+ thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
ios.append(thread.pendingIO)
- pendingIOs[completion] = thread.pendingIO
+ elif event.name == "librbd:close_image_exit":
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
+ ios.append(completionIO)
+ self.completed(completionIO)
if printOnRead:
print str(thread.pendingIO)
- elif event.name == "librbd:aio_complete_enter":
- completion = event["completion"]
- retval = event["rval"]
- if completion in pendingIOs:
- completedIO = pendingIOs[completion]
- del pendingIOs[completion]
- completedIO.end_time = ts
- completionIO = CompletionIO(ts, thread, completedIO)
- completedIO.thread.completedIO(completionIO)
+ elif event.name == "librbd:read_extent":
+ offset = event["offset"]
+ length = event["length"]
+ thread.pendingIO.extents.append(Extent(offset, length))
+ elif event.name == "librbd:read_exit":
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
ios.append(completionIO)
completed(completionIO)
if printOnRead:
- print str(completionIO)
-
+ print str(thread.pendingIO)
+ elif event.name == "librbd:write_enter":
+ if not ignoreWrites:
+ name = event["name"]
+ readid = event["id"]
+ offset = event["off"]
+ length = event["buf_len"]
+ imagectx = event["imagectx"]
+ ionum = self.nextID()
+ thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+ thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ elif event.name == "librbd:write_exit":
+ if not ignoreWrites:
+ thread.pendingIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, thread.pendingIO)
+ thread.completedIO(completionIO)
+ ios.append(completionIO)
+ completed(completionIO)
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:aio_read_enter":
+ name = event["name"]
+ readid = event["id"]
+ completion = event["completion"]
+ imagectx = event["imagectx"]
+ ionum = self.nextID()
+ thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
+ thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+ ios.append(thread.pendingIO)
+ thread.issuedIO(thread.pendingIO)
+ pendingIOs[completion] = thread.pendingIO
+ elif event.name == "librbd:aio_read_extent":
+ offset = event["offset"]
+ length = event["length"]
+ thread.pendingIO.extents.append(Extent(offset, length))
+ elif event.name == "librbd:aio_read_exit":
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:aio_write_enter":
+ if not ignoreWrites:
+ name = event["name"]
+ writeid = event["id"]
+ offset = event["off"]
+ length = event["len"]
+ completion = event["completion"]
+ imagectx = event["imagectx"]
+ ionum = self.nextID()
+ thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
+ thread.pendingIO.addThreadCompletionDependencies(threads, self.recentCompletions)
+ thread.issuedIO(thread.pendingIO)
+ ios.append(thread.pendingIO)
+ pendingIOs[completion] = thread.pendingIO
+ if printOnRead:
+ print str(thread.pendingIO)
+ elif event.name == "librbd:aio_complete_enter":
+ completion = event["completion"]
+ retval = event["rval"]
+ if completion in pendingIOs:
+ completedIO = pendingIOs[completion]
+ del pendingIOs[completion]
+ completedIO.end_time = ts
+ completionIO = CompletionIO(ts, thread, completedIO)
+ completedIO.thread.completedIO(completionIO)
+ ios.append(completionIO)
+ self.completed(completionIO)
+ if printOnRead:
+ print str(completionIO)
- # Insert-thread-stop phase
- ios = sorted(ios, key = lambda io: io.start_time)
- for threadID in threads:
- thread = threads[threadID]
- ionum = None
- maxIONum = 0 # only valid if ionum is None
- for io in ios:
- if io.ionum > maxIONum:
- maxIONum = io.ionum
- if io.start_time > thread.lastTS:
- ionum = io.ionum
- if ionum % 2 == 1:
- ionum = ionum + 1
- break
- if not ionum:
- if maxIONum % 2 == 1:
- maxIONum = maxIONum - 1
- ionum = maxIONum + 2
- for io in ios:
- if io.ionum >= ionum:
- io.ionum = io.ionum + 2
- # TODO: Insert in the right place, don't append and re-sort
- ios.append(StopThreadIO(ionum, thread.lastTS, thread))
+ # Insert-thread-stop phase
ios = sorted(ios, key = lambda io: io.start_time)
+ for threadID in threads:
+ thread = threads[threadID]
+ ionum = None
+ maxIONum = 0 # only valid if ionum is None
+ for io in ios:
+ if io.ionum > maxIONum:
+ maxIONum = io.ionum
+ if io.start_time > thread.lastTS:
+ ionum = io.ionum
+ if ionum % 2 == 1:
+ ionum = ionum + 1
+ break
+ if not ionum:
+ if maxIONum % 2 == 1:
+ maxIONum = maxIONum - 1
+ ionum = maxIONum + 2
+ for io in ios:
+ if io.ionum >= ionum:
+ io.ionum = io.ionum + 2
+ # TODO: Insert in the right place, don't append and re-sort
+ ios.append(StopThreadIO(ionum, thread.lastTS, thread))
+ ios = sorted(ios, key = lambda io: io.start_time)
+ for io in ios:
+ if io.prev and io.prev in io.dependencies:
+ io.dependencies.remove(io.prev)
+ if io.isCompletion:
+ io.dependencies.clear()
+ for dep in io.dependencies:
+ dep.numSuccessors = dep.numSuccessors + 1
- for io in ios:
- if io.prev and io.prev in io.dependencies:
- io.dependencies.remove(io.prev)
- if io.isCompletion:
- io.dependencies.clear()
- for dep in io.dependencies:
- dep.numSuccessors = dep.numSuccessors + 1
-
- print
- # for io in ios:
- # if not io.isCompletion:
- # print str(io)
+ if printOnRead and printOnWrite:
+ print
- with open("replay.bin", "wb") as f:
- for io in ios:
- if not io.isCompletion:
- print str(io)
- io.writeTo(f)
+ with open(outputFileName, "wb") as f:
+ for io in ios:
+ if printOnWrite and not io.isCompletion:
+ print str(io)
+ io.writeTo(f)
-cProfile.run("main()")
+if __name__ == '__main__':
+ Processor().run(sys.argv[1:])