default=None,
help='OS (distro) version such as "12.10"',
)
+ parser.add_argument(
+ '--json-query',
+ default=None,
+ help=textwrap.dedent('''\
+ JSON fragment, explicitly given, or a file containing
+ JSON, containing a query for --list or --brief.
+ Example: teuthology-lock --list --all --json-query
+ '{"vm_host":{"name":"mira003.front.sepia.ceph.com"}'
+ will list all machines who have a vm_host entry
+ with a dictionary that contains at least the name key
+ with value mira003.front.sepia.ceph.com.'''),
+ )
return parser.parse_args()
return statuses
+def json_matching_statuses(json_file_or_str, statuses):
+ """
+ Filter statuses by json dict in file or fragment; return list of
+ matching statuses. json_file_or_str must be a file containing
+ json or json in a string.
+ """
+ try:
+ open(json_file_or_str, 'r')
+ except IOError:
+ query = json.loads(json_file_or_str)
+ else:
+ query = json.load(json_file_or_str)
+
+ if not isinstance(query, dict):
+ raise RuntimeError('--json-query must be a dict')
+
+ return_statuses = list()
+ for status in statuses:
+ for k, v in query.iteritems():
+ if misc.is_in_dict(k, v, status):
+ return_statuses.append(status)
+
+ return return_statuses
+
+
def main(ctx):
if ctx.verbose:
teuthology.log.setLevel(logging.DEBUG)
statuses = [_status for _status in statuses
if _status['description'] is not None and
_status['description'].find(ctx.desc_pattern) >= 0]
+ if ctx.json_query:
+ statuses = json_matching_statuses(ctx.json_query, statuses)
# When listing, only show the vm_host's name, not every detail
for s in statuses:
if not machinetypes:
machinetypes.append(machinetype)
return machinetypes
+
+
+def is_in_dict(searchkey, searchval, d):
+ """
+ Test if searchkey/searchval are in dictionary. searchval may
+ itself be a dict, in which case, recurse. searchval may be
+ a subset at any nesting level (that is, all subkeys in searchval
+ must be found in d at the same level/nest position, but searchval
+ is not required to fully comprise d[searchkey]).
+
+ >>> is_in_dict('a', 'foo', {'a':'foo', 'b':'bar'})
+ True
+
+ >>> is_in_dict(
+ ... 'a',
+ ... {'sub1':'key1', 'sub2':'key2'},
+ ... {'a':{'sub1':'key1', 'sub2':'key2', 'sub3':'key3'}}
+ ... )
+ True
+
+ >>> is_in_dict('a', 'foo', {'a':'bar', 'b':'foo'})
+ False
+
+ >>> is_in_dict('a', 'foo', {'a':{'a': 'foo'}})
+ False
+ """
+ val = d.get(searchkey, None)
+ if isinstance(val, dict) and isinstance(searchval, dict):
+ for foundkey, foundval in searchval.iteritems():
+ if not is_in_dict(foundkey, foundval, val):
+ return False
+ return True
+ else:
+ return searchval == val
def test_invalid_b_deep_merge(self):
with pytest.raises(AssertionError):
misc.deep_merge({"a": "b"}, "invalid")
+
+
+class TestIsInDict(object):
+ def test_simple_membership(self):
+ assert misc.is_in_dict('a', 'foo', {'a':'foo', 'b':'bar'})
+
+ def test_dict_membership(self):
+ assert misc.is_in_dict(
+ 'a', {'sub1':'key1', 'sub2':'key2'},
+ {'a':{'sub1':'key1', 'sub2':'key2', 'sub3':'key3'}}
+ )
+
+ def test_simple_nonmembership(self):
+ assert not misc.is_in_dict('a', 'foo', {'a':'bar', 'b':'foo'})
+
+ def test_nonmembership_with_presence_at_lower_level(self):
+ assert not misc.is_in_dict('a', 'foo', {'a':{'a': 'foo'}})