from s3tests.fuzz_headers import *
from nose.tools import eq_ as eq
+from nose.tools import assert_true
from nose.plugins.attrib import attr
from .utils import assert_raises
'set': {},
'choices': [None]
}
+ graph['repeated_headers_node'] = {
+ 'set': {},
+ 'headers': [
+ ['1-2', 'random-header-{random 5-10 printable}', '{random 20-30 punctuation}']
+ ],
+ 'choices': ['leaf']
+ }
graph['weighted_null_choice_node'] = {
'set': {},
'choices': ['3 null']
def test_descend_bad_node():
graph = build_graph()
prng = random.Random(1)
- assert_raises(KeyError, descend_graph, graph, 'bad_node', prng)
+ assert_raises(DecisionGraphError, descend_graph, graph, 'bad_node', prng)
def test_descend_nonexistant_child():
eq(tester['random 10-15 binary'], '\xdfj\xf1\xd80>a\xcd\xc4\xbb')
+def test_SpeicalVariables_random_no_args():
+ prng = random.Random(1)
+ tester = SpecialVariables({}, prng)
+
+ for _ in xrange(1000):
+ val = tester['random']
+ val = val.replace('{{', '{').replace('}}','}')
+ assert_true(0 <= len(val) <= 1000)
+ assert_true(reduce(lambda x, y: x and y, [x in string.printable for x in val]))
+
+
+def test_SpeicalVariables_random_no_charset():
+ prng = random.Random(1)
+ tester = SpecialVariables({}, prng)
+
+ for _ in xrange(1000):
+ val = tester['random 10-30']
+ val = val.replace('{{', '{').replace('}}','}')
+ assert_true(10 <= len(val) <= 30)
+ assert_true(reduce(lambda x, y: x and y, [x in string.printable for x in val]))
+
+
+def test_SpeicalVariables_random_exact_length():
+ prng = random.Random(1)
+ tester = SpecialVariables({}, prng)
+
+ for _ in xrange(1000):
+ val = tester['random 10 digits']
+ assert_true(len(val) == 10)
+ assert_true(reduce(lambda x, y: x and y, [x in string.digits for x in val]))
+
+
+def test_SpecialVariables_random_errors():
+ prng = random.Random(1)
+ tester = SpecialVariables({}, prng)
+
+ assert_raises(KeyError, lambda x: tester[x], 'random 10-30 foo')
+ assert_raises(ValueError, lambda x: tester[x], 'random printable')
+
+
def test_assemble_decision():
graph = build_graph()
prng = random.Random(1)
assert_raises(KeyError, lambda x: decision[x], 'key3')
-def test_expand_key():
- prng = random.Random(1)
- test_decision = {
- 'key1': 'value1',
- 'randkey': 'value-{random 10-15 printable}',
- 'indirect': '{key1}',
- 'dbl_indirect': '{indirect}'
- }
- decision = SpecialVariables(test_decision, prng)
+def test_expand_escape():
+ decision = dict(
+ foo='{{bar}}',
+ )
+ got = expand(decision, '{foo}')
+ eq(got, '{bar}')
- randkey = expand_key(decision, test_decision['randkey'])
- indirect = expand_key(decision, test_decision['indirect'])
- dbl_indirect = expand_key(decision, test_decision['dbl_indirect'])
- eq(indirect, 'value1')
- eq(dbl_indirect, 'value1')
- eq(randkey, 'value-[/pNI$;92@')
+def test_expand_indirect():
+ decision = dict(
+ foo='{bar}',
+ bar='quux',
+ )
+ got = expand(decision, '{foo}')
+ eq(got, 'quux')
-def test_expand_loop():
- prng = random.Random(1)
- test_decision = {
- 'key1': '{key2}',
- 'key2': '{key1}',
- }
- decision = SpecialVariables(test_decision, prng)
- assert_raises(RuntimeError, expand_key, decision, test_decision['key1'])
+def test_expand_indirect_double():
+ decision = dict(
+ foo='{bar}',
+ bar='{quux}',
+ quux='thud',
+ )
+ got = expand(decision, '{foo}')
+ eq(got, 'thud')
-def test_expand_decision():
- graph = build_graph()
- prng = random.Random(1)
+def test_expand_recursive():
+ decision = dict(
+ foo='{foo}',
+ )
+ e = assert_raises(RecursionError, expand, decision, '{foo}')
+ eq(str(e), "Runaway recursion in string formatting: 'foo'")
- decision = assemble_decision(graph, prng)
- decision.update({'bucket_readable': 'my-readable-bucket'})
- request = expand_decision(decision, prng)
+def test_expand_recursive_mutual():
+ decision = dict(
+ foo='{bar}',
+ bar='{foo}',
+ )
+ e = assert_raises(RecursionError, expand, decision, '{foo}')
+ eq(str(e), "Runaway recursion in string formatting: 'foo'")
- eq(request['key1'], 'value1')
- eq(request['indirect_key1'], 'value1')
- eq(request['path'], '/my-readable-bucket')
- eq(request['randkey'], 'value-cx+*~G@&uW_[OW3')
- assert_raises(KeyError, lambda x: decision[x], 'key3')
+
+def test_expand_recursive_not_too_eager():
+ decision = dict(
+ foo='bar',
+ )
+ got = expand(decision, 100*'{foo}')
+ eq(got, 100*'bar')
def test_weighted_choices():
for header, value in decision['headers']:
if header == 'my-header':
eq(value, '{header_val}')
- nose.tools.assert_true(next(c1) < 1)
+ assert_true(next(c1) < 1)
elif header == 'random-header-{random 5-10 printable}':
eq(value, '{random 20-30 punctuation}')
- nose.tools.assert_true(next(c2) < 2)
+ assert_true(next(c2) < 2)
else:
raise KeyError('unexpected header found: %s' % header)
- nose.tools.assert_true(next(c1))
- nose.tools.assert_true(next(c2))
+ assert_true(next(c1))
+ assert_true(next(c2))
+
+
+def test_duplicate_header():
+ graph = build_graph()
+ prng = random.Random(1)
+ assert_raises(DecisionGraphError, descend_graph, graph, 'repeated_headers_node', prng)
-def test_header_expansion():
+def test_expand_headers():
graph = build_graph()
prng = random.Random(1)
decision = descend_graph(graph, 'node1', prng)
- expanded_decision = expand_decision(decision, prng)
+ special_decision = SpecialVariables(decision, prng)
+ expanded_headers = expand_headers(special_decision)
- for header, value in expanded_decision['headers']:
+ for header, value in expanded_headers:
if header == 'my-header':
- nose.tools.assert_true(value in ['h1', 'h2', 'h3'])
+ assert_true(value in ['h1', 'h2', 'h3'])
elif header.startswith('random-header-'):
- nose.tools.assert_true(20 <= len(value) <= 30)
- nose.tools.assert_true(string.strip(value, SpecialVariables.charsets['punctuation']) is '')
+ assert_true(20 <= len(value) <= 30)
+ assert_true(string.strip(value, SpecialVariables.charsets['punctuation']) is '')
else:
- raise KeyError('unexpected header found: "%s"' % header)
+ raise DecisionGraphError('unexpected header found: "%s"' % header)
import re
+class DecisionGraphError(Exception):
+ """ Raised when a node in a graph tries to set a header or
+ key that was previously set by another node
+ """
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+
+class RecursionError(Exception):
+ """Runaway recursion in string formatting"""
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return '{0.__doc__}: {0.msg!r}'.format(self)
+
+
def assemble_decision(decision_graph, prng):
""" Take in a graph describing the possible decision space and a random
number generator and traverse the graph to build a decision
except IndexError:
decision = {}
- for key in node['set']:
- if decision.has_key(key):
- raise KeyError("Node %s tried to set '%s', but that key was already set by a lower node!" %(node_name, key))
- decision[key] = make_choice(node['set'][key], prng)
+ for key, choices in node['set'].iteritems():
+ if key in decision:
+ raise DecisionGraphError("Node %s tried to set '%s', but that key was already set by a lower node!" %(node_name, key))
+ decision[key] = make_choice(choices, prng)
- if node.has_key('headers'):
- if not decision.has_key('headers'):
- decision['headers'] = []
+ if 'headers' in node:
+ decision.setdefault('headers', [])
for desc in node['headers']:
- if len(desc) == 3:
- repetition_range = desc.pop(0)
- try:
- size_min, size_max = [int(x) for x in repetition_range.split('-')]
- except IndexError:
- size_min = size_max = int(repetition_range)
- else:
- size_min = size_max = 1
+ try:
+ (repetition_range, header, value) = desc
+ except ValueError:
+ (header, value) = desc
+ repetition_range = '1'
+
+ try:
+ size_min, size_max = repetition_range.split('-', 1)
+ except ValueError:
+ size_min = size_max = repetition_range
+
+ size_min = int(size_min)
+ size_max = int(size_max)
+
num_reps = prng.randint(size_min, size_max)
+ if header in [h for h, v in decision['headers']]:
+ raise DecisionGraphError("Node %s tried to add header '%s', but that header already exists!" %(node_name, header))
for _ in xrange(num_reps):
- header = desc[0]
- value = desc[1]
- if header in [h for h, v in decision['headers']]:
- if not re.search('{[a-zA-Z_0-9 -]+}', header):
- raise KeyError("Node %s tried to add header '%s', but that header already exists!" %(node_name, header))
decision['headers'].append([header, value])
return decision
if option is None:
weighted_choices.append('')
continue
- fields = option.split(None, 1)
- if len(fields) == 1:
- weight = 1
- value = fields[0]
- else:
- weight = int(fields[0])
- value = fields[1]
- if value == 'null' or value == 'None':
- value = ''
+ try:
+ (weight, value) = option.split(None, 1)
+ except ValueError:
+ weight = '1'
+ value = option
+
+ weight = int(weight)
+ if value == 'null' or value == 'None':
+ value = ''
+
for _ in xrange(weight):
weighted_choices.append(value)
return prng.choice(weighted_choices)
-def expand_decision(decision, prng):
- """ Take in a decision and a random number generator. Expand variables in
- decision's values and headers until all values are fully expanded and
- build a request out of the information
- """
- special_decision = SpecialVariables(decision, prng)
- for key in special_decision:
- if not key == 'headers':
- decision[key] = expand_key(special_decision, decision[key])
- else:
- for header in special_decision[key]:
- header[0] = expand_key(special_decision, header[0])
- header[1] = expand_key(special_decision, header[1])
- return decision
+def expand_headers(decision):
+ expanded_headers = []
+ for header in decision['headers']:
+ h = expand(decision, header[0])
+ v = expand(decision, header[1])
+ expanded_headers.append([h, v])
+ return expanded_headers
-def expand_key(decision, value):
+def expand(decision, value):
c = itertools.count()
- fmt = string.Formatter()
- old = value
- while True:
- new = fmt.vformat(old, [], decision)
- if new == old.replace('{{', '{').replace('}}', '}'):
- return old
- if next(c) > 5:
- raise RuntimeError
- old = new
+ fmt = RepeatExpandingFormatter()
+ new = fmt.vformat(value, [], decision)
+ return new
+
+
+class RepeatExpandingFormatter(string.Formatter):
+
+ def __init__(self, _recursion=0):
+ super(RepeatExpandingFormatter, self).__init__()
+ # this class assumes it is always instantiated once per
+ # formatting; use that to detect runaway recursion
+ self._recursion = _recursion
+
+ def get_value(self, key, args, kwargs):
+ val = super(RepeatExpandingFormatter, self).get_value(key, args, kwargs)
+ if self._recursion > 5:
+ raise RecursionError(key)
+ fmt = self.__class__(_recursion=self._recursion+1)
+ # must use vformat not **kwargs so our SpecialVariables is not
+ # downgraded to just a dict
+ n = fmt.vformat(val, args, kwargs)
+ return n
+
class SpecialVariables(dict):
charsets = {
- 'binary': 'binary',
'printable': string.printable,
'punctuation': string.punctuation,
'whitespace': string.whitespace,
}
def __init__(self, orig_dict, prng):
- self.update(orig_dict)
+ super(SpecialVariables, self).__init__(orig_dict)
self.prng = prng
return super(SpecialVariables, self).__getitem__(key)
if len(fields) == 1:
- fields.apppend('')
+ fields.append('')
return fn(fields[1])
def special_random(self, args):
arg_list = args.split()
try:
- size_min, size_max = [int(x) for x in arg_list[0].split('-')]
+ size_min, size_max = arg_list[0].split('-', 1)
+ except ValueError:
+ size_min = size_max = arg_list[0]
except IndexError:
- size_min = 0
- size_max = 1000
+ size_min = '0'
+ size_max = '1000'
+
+ size_min = int(size_min)
+ size_max = int(size_max)
+ length = self.prng.randint(size_min, size_max)
+
try:
- charset = self.charsets[arg_list[1]]
+ charset_arg = arg_list[1]
except IndexError:
- charset = self.charsets['printable']
+ charset_arg = 'printable'
- length = self.prng.randint(size_min, size_max)
- if charset is 'binary':
+ if charset_arg == 'binary':
num_bytes = length + 8
tmplist = [self.prng.getrandbits(64) for _ in xrange(num_bytes / 8)]
tmpstring = struct.pack((num_bytes / 8) * 'Q', *tmplist)
- return tmpstring[0:length]
+ tmpstring = tmpstring[0:length]
else:
- tmpstring = ''.join([self.prng.choice(charset) for _ in xrange(length)]) # Won't scale nicely; won't do binary
- return tmpstring.replace('{', '{{').replace('}', '}}')
+ charset = self.charsets[charset_arg]
+ tmpstring = ''.join([self.prng.choice(charset) for _ in xrange(length)]) # Won't scale nicely
+
+ return tmpstring.replace('{', '{{').replace('}', '}}')
def parse_options():
return parser.parse_args()
-def randomlist(n, seed=None):
- """ Returns a generator function that spits out a list of random numbers n elements long.
+def randomlist(seed=None):
+ """ Returns an infinite generator of random numbers
"""
- rng = random.Random()
- rng.seed(seed if seed else None)
- for _ in xrange(n):
+ rng = random.Random(seed)
+ while True:
yield rng.random()
FH = open(options.seedfile, 'r')
request_seeds = FH.readlines()
else:
- request_seeds = randomlist(options.num_requests, options.seed)
+ random_list = randomlist(options.seed)
+ request_seeds = itertools.islice(random_list, options.num_requests)
+
graph_file = open(options.graph_filename, 'r')
decision_graph = yaml.safe_load(graph_file)