From: Kyle Marsh Date: Tue, 9 Aug 2011 22:44:25 +0000 (-0700) Subject: S3 Fuzzer: Added SpecialVariables dict subclass X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7d9ec02686d04882ef6448f481c5abd8652f4186;p=s3-tests.git S3 Fuzzer: Added SpecialVariables dict subclass Helper class to catch sentinal keys and turn them into random values. This will be used to generate garbage data when expanding a decision. Also add unit tests for expand_decision and assemble_decision --- diff --git a/s3tests/functional/test_fuzzer.py b/s3tests/functional/test_fuzzer.py index 69dd8fc6..b8393fb4 100644 --- a/s3tests/functional/test_fuzzer.py +++ b/s3tests/functional/test_fuzzer.py @@ -23,7 +23,7 @@ def build_graph(): graph = {} graph['start'] = { 'set': {}, - 'choices': ['node1'] + 'choices': ['node2'] } graph['leaf'] = { 'set': { @@ -38,6 +38,14 @@ def build_graph(): }, 'choices': ['leaf'] } + graph['node2'] = { + 'set': { + 'randkey': 'value-{random 10-15 printable}', + 'path': '/{bucket_readable}', + 'indirect_key1': '{key1}' + }, + 'choices': ['leaf'] + } graph['bad_node'] = { 'set': { 'key1': 'value1' @@ -62,6 +70,7 @@ def test_descend_leaf_node(): eq(decision['key2'], 'value2') e = assert_raises(KeyError, lambda x: decision[x], 'key3') + def test_descend_node(): graph = build_graph() prng = random.Random(1) @@ -71,8 +80,45 @@ def test_descend_node(): eq(decision['key2'], 'value2') eq(decision['key3'], 'value3') + def test_descend_bad_node(): graph = build_graph() prng = random.Random(1) assert_raises(KeyError, descend_graph, graph, 'bad_node', prng) + +def test_SpecialVariables_dict(): + prng = random.Random(1) + testdict = {'foo': 'bar'} + tester = SpecialVariables(testdict, prng) + + eq(tester['foo'], 'bar') + eq(tester['random 10-15 printable'], '[/pNI$;92@') #FIXME: how should I test pseudorandom content? + +def test_assemble_decision(): + graph = build_graph() + prng = random.Random(1) + decision = assemble_decision(graph, prng) + + eq(decision['key1'], 'value1') + eq(decision['key2'], 'value2') + eq(decision['randkey'], 'value-{random 10-15 printable}') + eq(decision['indirect_key1'], '{key1}') + eq(decision['path'], '/{bucket_readable}') + assert_raises(KeyError, lambda x: decision[x], 'key3') + +def test_expand_decision(): + graph = build_graph() + prng = random.Random(1) + + decision = assemble_decision(graph, prng) + decision.update({'bucket_readable': 'my-readable-bucket'}) + + request = expand_decision(decision, prng) + + eq(request['key1'], 'value1') + eq(request['indirect_key1'], 'value1') + eq(request['path'], '/my-readable-bucket') + eq(request['randkey'], 'value-?') #FIXME: again, how to handle the pseudorandom content? + assert_raises(KeyError, lambda x: decision[x], 'key3') + diff --git a/s3tests/fuzz_headers.py b/s3tests/fuzz_headers.py index 5d538dd1..3ac6464e 100644 --- a/s3tests/fuzz_headers.py +++ b/s3tests/fuzz_headers.py @@ -22,14 +22,16 @@ def descend_graph(decision_graph, node_name, prng): the node's "set" list, pick a choice from the "choice" list, and recurse. Finally, return dictionary of values """ + node = decision_graph[node_name] + try: - choice = prng.choice(decision_graph[node_name]['choices']) + #TODO: Give weights to each choice + choice = prng.choice(node['choices']) decision = descend_graph(decision_graph, choice, prng) except IndexError: decision = {} - node = decision_graph[node_name] - + #TODO: Add in headers for key in node['set']: if decision.has_key(key): raise KeyError("Node %s tried to set '%s', but that key was already set by a lower node!" %(node_name, key)) @@ -45,6 +47,46 @@ def expand_decision(decision, prng): raise NotImplementedError +class SpecialVariables(dict): + charsets = { + 'printable': string.printable, + 'punctuation': string.punctuation, + 'whitespace': string.whitespace + } + + def __init__(self, orig_dict, prng): + self.update(orig_dict) + self.prng = prng + + + def __getitem__(self, key): + fields = key.split(None, 1) + fn = getattr(self, 'special_{name}'.format(name=fields[0]), None) + if fn is None: + return super(SpecialVariables, self).__getitem__(key) + + if len(fields) == 1: + fields.apppend('') + return fn(fields[1]) + + + def special_random(self, args): + arg_list = args.split() + try: + size_min, size_max = [int(x) for x in arg_list[0].split('-')] + except IndexError: + size_min = 0 + size_max = 1000 + try: + charset = self.charsets[arg_list[1]] + except IndexError: + charset = self.charsets['printable'] + + length = self.prng.randint(size_min, size_max) + return ''.join([self.prng.choice(charset) for _ in xrange(length)]) # Won't scale nicely + + + def parse_options(): parser = OptionParser() parser.add_option('-O', '--outfile', help='write output to FILE. Defaults to STDOUT', metavar='FILE')