+import sys
+import itertools
import nose
import random
import string
'key1': 'value1',
'key2': 'value2'
},
+ 'headers': [
+ ['1-2', 'random-header-{random 5-10 printable}', '{random 20-30 punctuation}']
+ ],
'choices': []
}
graph['node1'] = {
'set': {
- 'key3': 'value3'
+ 'key3': 'value3',
+ 'header_val': [
+ '3 h1',
+ '2 h2',
+ 'h3'
+ ]
},
+ 'headers': [
+ ['1-1', 'my-header', '{header_val}'],
+ ],
'choices': ['leaf']
}
graph['node2'] = {
}
decision = SpecialVariables(test_decision, prng)
- randkey = expand_key(decision, 'randkey')
- indirect = expand_key(decision, 'indirect')
- dbl_indirect = expand_key(decision, 'dbl_indirect')
+ randkey = expand_key(decision, test_decision['randkey'])
+ indirect = expand_key(decision, test_decision['indirect'])
+ dbl_indirect = expand_key(decision, test_decision['dbl_indirect'])
eq(indirect, 'value1')
eq(dbl_indirect, 'value1')
'key2': '{key1}',
}
decision = SpecialVariables(test_decision, prng)
- assert_raises(RuntimeError, expand_key, decision, 'key1')
+ assert_raises(RuntimeError, expand_key, decision, test_decision['key1'])
def test_expand_decision():
graph = build_graph()
eq(request['key1'], 'value1')
eq(request['indirect_key1'], 'value1')
eq(request['path'], '/my-readable-bucket')
- eq(request['randkey'], 'value-NI$;92@H/0I')
+ eq(request['randkey'], 'value-cx+*~G@&uW_[OW3')
assert_raises(KeyError, lambda x: decision[x], 'key3')
def test_weighted_choices():
nose.tools.assert_almost_equal(bar_percentage, 0.50, 1)
nose.tools.assert_almost_equal(baz_percentage, 0.25, 1)
+def test_header_presence():
+ graph = build_graph()
+ prng = random.Random(1)
+ decision = descend_graph(graph, 'node1', prng)
+
+ c1 = itertools.count()
+ c2 = itertools.count()
+ for header, value in decision['headers']:
+ if header == 'my-header':
+ eq(value, '{header_val}')
+ nose.tools.assert_true(next(c1) < 1)
+ elif header == 'random-header-{random 5-10 printable}':
+ eq(value, '{random 20-30 punctuation}')
+ nose.tools.assert_true(next(c2) < 2)
+ else:
+ raise KeyError('unexpected header found: %s' % header)
+
+ nose.tools.assert_true(next(c1))
+ nose.tools.assert_true(next(c2))
+
+
+
+def test_header_expansion():
+ graph = build_graph()
+ prng = random.Random(1)
+ decision = descend_graph(graph, 'node1', prng)
+ expanded_decision = expand_decision(decision, prng)
+
+ for header, value in expanded_decision['headers']:
+ if header == 'my-header':
+ nose.tools.assert_true(value in ['h1', 'h2', 'h3'])
+ elif header.startswith('random-header-'):
+ nose.tools.assert_true(20 <= len(value) <= 30)
+ nose.tools.assert_true(string.strip(value, SpecialVariables.charsets['punctuation']) is '')
+ else:
+ raise KeyError('unexpected header found: "%s"' % header)
+
import struct
import yaml
import sys
+import re
def assemble_decision(decision_graph, prng):
except IndexError:
decision = {}
- #TODO: Add in headers
for key in node['set']:
if decision.has_key(key):
raise KeyError("Node %s tried to set '%s', but that key was already set by a lower node!" %(node_name, key))
decision[key] = make_choice(node['set'][key], prng)
+
+ if node.has_key('headers'):
+ if not decision.has_key('headers'):
+ decision['headers'] = []
+
+ for desc in node['headers']:
+ if len(desc) == 3:
+ repetition_range = desc.pop(0)
+ try:
+ size_min, size_max = [int(x) for x in repetition_range.split('-')]
+ except IndexError:
+ size_min = size_max = int(repetition_range)
+ else:
+ size_min = size_max = 1
+ num_reps = prng.randint(size_min, size_max)
+ for _ in xrange(num_reps):
+ header = desc[0]
+ value = desc[1]
+ if header in [h for h, v in decision['headers']]:
+ if not re.search('{[a-zA-Z_0-9 -]+}', header):
+ raise KeyError("Node %s tried to add header '%s', but that header already exists!" %(node_name, header))
+ decision['headers'].append([header, value])
+
return decision
def make_choice(choices, prng):
+ """ Given a list of (possibly weighted) options or just a single option!,
+ choose one of the options taking weights into account and return the
+ choice
+ """
if isinstance(choices, str):
return choices
weighted_choices = []
"""
special_decision = SpecialVariables(decision, prng)
for key in special_decision:
- decision[key] = expand_key(special_decision, key)
-
+ if not key == 'headers':
+ decision[key] = expand_key(special_decision, decision[key])
+ else:
+ for header in special_decision[key]:
+ header[0] = expand_key(special_decision, header[0])
+ header[1] = expand_key(special_decision, header[1])
return decision
-def expand_key(decision, key):
+def expand_key(decision, value):
c = itertools.count()
fmt = string.Formatter()
- old = decision[key]
+ old = value
while True:
new = fmt.vformat(old, [], decision)
- if new == old:
+ if new == old.replace('{{', '{').replace('}}', '}'):
return old
if next(c) > 5:
raise RuntimeError
old = new
-
class SpecialVariables(dict):
charsets = {
'binary': 'binary',
tmpstring = struct.pack((num_bytes / 8) * 'Q', *tmplist)
return tmpstring[0:length]
else:
- return ''.join([self.prng.choice(charset) for _ in xrange(length)]) # Won't scale nicely; won't do binary
+ tmpstring = ''.join([self.prng.choice(charset) for _ in xrange(length)]) # Won't scale nicely; won't do binary
+ return tmpstring.replace('{', '{{').replace('}', '}}')