import collections
import logging
try:
- from typing import Optional
+ from typing import Optional, Dict, Any, List, Set
except ImportError:
pass # just for type checking
logger.exception("Error getting playbook <%s> result", self.playbook)
if not response:
- result_events = {}
+ result_events = {} # type: Dict[str, Any]
else:
events = json.loads(response.text)["data"]["events"]
if response.status_code != 200:
raise AnsibleRunnerServiceError("Error when trying to "\
"create group:{}".format(group))
- hosts_in_group = []
+ hosts_in_group = [] # type: List[str]
else:
hosts_in_group = json.loads(response.text)["data"]["members"]
: returns : Nothing
"""
- self.elements = set()
+ self.elements = set() # type: Set[Any]
self.group_name = group_name
self.url_group = URL_MANAGE_GROUP.format(group_name=self.group_name)
"create group:{}".format(
self.group_name))
self.created = True
- self.elements = {}
+ self.elements = set()
import tempfile
try:
- from typing import List, Optional, Callable
+ from typing import List, Optional, Callable, Any
except ImportError:
pass # just for type checking
# Clean hosts if operation is succesful
if status == ExecutionStatusCode.SUCCESS:
+ assert clean_hosts_on_success is not None
clean_inventory(client, clean_hosts_on_success)
return processed_result
def ars_read(client, url, get_operation=True, payload=None, output_wizard=None):
- # type: (Client, str, bool, Optional[str], Optional[OutputWizard]) -> orchestrator.Completion[str]
+ # type: (Client, str, bool, Optional[str], Optional[OutputWizard]) -> orchestrator.Completion
"""
Execute the Ansible Runner Service operation
self.all_completions = []
- self.ar_client = None # type: Client
+ self._ar_client = None # type: Optional[Client]
# TLS certificate and key file names used to connect with the external
# Ansible Runner Service
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
+ @property
+ def ar_client(self):
+ # type: () -> Client
+ assert self._ar_client is not None
+ return self._ar_client
def available(self):
""" Check if Ansible Runner service is working
msg = ""
try:
- if self.ar_client:
+ if self._ar_client:
available = self.ar_client.is_operative()
if not available:
msg = "No response from Ansible Runner Service"
self.verify_config()
# Ansible runner service client
- self.ar_client = Client(
+ self._ar_client = Client(
server_url=self.get_module_option('server_location', ''),
verify_server=self.get_module_option('verify_server', True),
ca_bundle=self.get_module_option('ca_bundle', ''),
:returns : orchestrator.Completion
"""
- host_groups = []
+ host_groups = [] # type: List[Any]
try:
# Get the list of groups where the host is included