import json
import errno
import logging
+import re
import shlex
from collections import defaultdict
from configparser import ConfigParser
self._reset_cons()
+ def validate_ssh_config_content(self, ssh_config):
+ if ssh_config is None or len(ssh_config.strip()) == 0:
+ raise OrchestratorValidationError('ssh_config cannot be empty')
+ # StrictHostKeyChecking is [yes|no] ?
+ l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
+ if not l:
+ raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
+ for s in l:
+ if 'ask' in s.lower():
+ raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
+
def validate_ssh_config_fname(self, ssh_config_fname):
if not os.path.isfile(ssh_config_fname):
raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
def _set_ssh_config(self, inbuf=None):
"""
Set an ssh_config file provided from stdin
-
- TODO:
- - validation
"""
- if inbuf is None or len(inbuf) == 0:
- return -errno.EINVAL, "", "empty ssh config provided"
if inbuf == self.ssh_config:
return 0, "value unchanged", ""
+ self.validate_ssh_config_content(inbuf)
self.set_store("ssh_config", inbuf)
self.log.info('Set ssh_config')
self._reconfig_ssh()