import argparse
import configobj
import contextlib
+import functools
+import json
import logging
import os
-import json
import re
import uuid
import yaml
+import jinja2
+
from copy import deepcopy
from io import BytesIO, StringIO
from tarfile import ReadError
log = logging.getLogger(__name__)
+def _convert_strs_in(o, conv):
+ """A function to walk the contents of a dict/list and recurisvely apply
+ a conversion function (`conv`) to the strings within.
+ """
+ if isinstance(o, str):
+ return conv(o)
+ if isinstance(o, dict):
+ for k in o:
+ o[k] = _convert_strs_in(o[k], conv)
+ if isinstance(o, list):
+ o[:] = [_convert_strs_in(v, conv) for v in o]
+ return o
+
+
+def _apply_template(jinja_env, rctx, template):
+ """Apply jinja2 templating to the template string `template` via the jinja
+ environment `jinja_env`, passing a dictionary containing top-level context
+ to render into the template.
+ """
+ if '{{' in template or '{%' in template:
+ return jinja_env.from_string(template).render(**rctx)
+ return template
+
+
+def _template_transform(ctx, config, target):
+ """Apply jinja2 based templates to strings within the target object,
+ returning a transformed target. Target objects may be a list or dict or
+ str.
+
+ Note that only string values in the list or dict objects are modified.
+ Therefore one can read & parse yaml or json that contain templates in
+ string values without the risk of changing the structure of the yaml/json.
+ """
+ jenv = getattr(ctx, '_jinja_env', None)
+ if jenv is None:
+ loader = jinja2.BaseLoader()
+ jenv = jinja2.Environment(loader=loader)
+ setattr(ctx, '_jinja_env', jenv)
+ rctx = dict(ctx=ctx, config=config, cluster_name=config.get('cluster', ''))
+ _vip_vars(rctx)
+ conv = functools.partial(_apply_template, jenv, rctx)
+ return _convert_strs_in(target, conv)
+
+
+def _vip_vars(rctx):
+ """For backwards compat with the previous subst_vip function."""
+ ctx = rctx['ctx']
+ if 'vnet' in getattr(ctx, 'vip', {}):
+ rctx['VIPPREFIXLEN'] = str(ctx.vip["vnet"].prefixlen)
+ rctx['VIPSUBNET'] = str(ctx.vip["vnet"].network_address)
+ if 'vips' in getattr(ctx, 'vip', {}):
+ vips = ctx.vip['vips']
+ for idx, vip in enumerate(vips):
+ rctx[f'VIP{idx}'] = str(vip)
+
+
def _shell(ctx, cluster_name, remote, args, extra_cephadm_args=[], **kwargs):
teuthology.get_testdir(ctx)
return remote.run(