]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/ansible: Host ls implementation 26185/head
authorJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Tue, 29 Jan 2019 10:24:12 +0000 (11:24 +0100)
committerJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Wed, 24 Apr 2019 16:00:08 +0000 (18:00 +0200)
Implement the <<host ls>> operation in the orchestrator

Example:
[ ~]$ ceph orchestrator host ls
Hosts
192.168.121.20
192.168.121.170
192.168.121.213
192.168.121.212

Followed @sebastian-philipp suggestions
- Improved exception management in process_hosts_ls
- use the orchestrator <get_host> function instead <host_ls>
- Orchestrator now returns a list of Inventory Node
- Orchestrator-cli use the list of inventory nodes to print the hosts names
- Fixed comparation error
- <process_hosts_ls> method: include json errors in output
- removed <host_ls> orchestrator method stuff (<get_hosts> is the new method to use)
- Fixed wrong assignations
- rebased
02/01/19:
- Addressed @sebastian-philipp suggestions
- Housekeeping tasks.
- Refactor completion's result management. Created Output wizards and basic unit tests for them.
- rebased

Signed-off-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
src/pybind/mgr/ansible/module.py
src/pybind/mgr/ansible/output_wizards.py [new file with mode: 0644]
src/pybind/mgr/ansible/tests/test_output_wizards.py [new file with mode: 0644]
src/pybind/mgr/orchestrator.py

index 297f98ec54a35e23861c04f8ad9aa5c1b4c322fd..81e1c40a1041841aecc739467d1ddfdd08ebfb5b 100644 (file)
@@ -4,21 +4,23 @@ ceph-mgr Ansible orchestrator module
 The external Orchestrator is the Ansible runner service (RESTful https service)
 """
 
-import types
+# pylint: disable=abstract-method, no-member, bad-continuation
+
 import json
 import requests
 
-
 from mgr_module import MgrModule
 import orchestrator
 
 from .ansible_runner_svc import Client, PlayBookExecution, ExecutionStatusCode,\
-                               EVENT_DATA_URL, AnsibleRunnerServiceError
+                                AnsibleRunnerServiceError
+
+from .output_wizards import ProcessInventory, ProcessPlaybookResult, \
+                            ProcessHostsList
 
 # Time to clean the completions list
 WAIT_PERIOD = 10
 
-
 # List of playbooks names used
 
 # Name of the playbook used in the "get_inventory" method.
@@ -41,26 +43,31 @@ URL_ADD_RM_HOSTS = "api/v1/hosts/{host_name}/groups/{inventory_group}"
 
 # Retrieve the groups where the host is included in.
 URL_GET_HOST_GROUPS = "api/v1/hosts/{host_name}"
-
 # Manage groups
 URL_MANAGE_GROUP = "api/v1/groups/{group_name}"
+# URLs for Ansible Runner Operations
+URL_GET_HOSTS = "api/v1/hosts"
+
 
 class AnsibleReadOperation(orchestrator.ReadCompletion):
     """ A read operation means to obtain information from the cluster.
     """
-
-    def __init__(self, client, playbook, logger, result_pattern,
-                 params,
-                 querystr_dict=None):
+    def __init__(self, client, logger):
+        """
+        :param client        : Ansible Runner Service Client
+        :param logger        : The object used to log messages
+        """
         super(AnsibleReadOperation, self).__init__()
 
         # Private attributes
-        self.playbook = playbook
         self._is_complete = False
         self._is_errored = False
         self._result = []
         self._status = ExecutionStatusCode.NOT_LAUNCHED
 
+        # Object used to process operation result in different ways
+        self.output_wizard = None
+
         # Error description in operation
         self.error = ""
 
@@ -70,22 +77,8 @@ class AnsibleReadOperation(orchestrator.ReadCompletion):
         # Logger
         self.log = logger
 
-        # An aditional filter of result events based in the event
-        self.event_filter = ""
-
-        # Function assigned dinamically to process the result
-        self.process_output = None
-
-        # Playbook execution object
-        self.pb_execution = PlayBookExecution(client,
-                                              playbook,
-                                              logger,
-                                              result_pattern,
-                                              params,
-                                              querystr_dict)
-
-    def __str__(self):
-         return "Playbook {playbook_name}".format(playbook_name = self.playbook)
+        # OutputWizard object used to process the result
+        self.output_wizard = None
 
     @property
     def is_complete(self):
@@ -101,8 +94,113 @@ class AnsibleReadOperation(orchestrator.ReadCompletion):
 
     @property
     def status(self):
-        """Return the status code of the operation
-        updating conceptually 'linked' attributes
+        """Retrieve the current status of the operation and update state
+        attributes
+        """
+        raise NotImplementedError()
+
+class ARSOperation(AnsibleReadOperation):
+    """Execute an Ansible Runner Service Operation
+    """
+
+    def __init__(self, client, logger, url, get_operation=True, payload=None):
+        """
+        :param client        : Ansible Runner Service Client
+        :param logger        : The object used to log messages
+        :param url           : The Ansible Runner Service URL that provides
+                               the operation
+        :param get_operation : True if operation is provided using an http GET
+        :param payload       : http request payload
+        """
+        super(ARSOperation, self).__init__(client, logger)
+
+        self.url = url
+        self.get_operation = get_operation
+        self.payload = payload
+
+    def __str__(self):
+        return "Ansible Runner Service: {operation} {url}".format(
+                    operation="GET" if self.get_operation else "POST",
+                    url=self.url)
+
+    @property
+    def status(self):
+        """ Execute the Ansible Runner Service operation and update the status
+        and result of the underlying Completion object.
+        """
+
+        # Execute the right kind of http request
+        if self.get_operation:
+            response = self.ar_client.http_get(self.url)
+        else:
+            response = self.ar_client.http_post(self.url, self.payload)
+
+        # If no connection errors, the operation is complete
+        self._is_complete = True
+
+        # Depending of the response, status and result is updated
+        if not response:
+            self._is_errored = True
+            self._status = ExecutionStatusCode.ERROR
+            self._result = "Ansible Runner Service not Available"
+        else:
+            self._is_errored = (response.status_code != requests.codes.ok)
+
+            if not self._is_errored:
+                self._status = ExecutionStatusCode.SUCCESS
+                if self.output_wizard:
+                    self._result = self.output_wizard.process(self.url,
+                                                              response.text)
+                else:
+                    self._result = response.text
+            else:
+                self._status = ExecutionStatusCode.ERROR
+                self._result = response.reason
+
+        return self._status
+
+
+class PlaybookOperation(AnsibleReadOperation):
+    """Execute a playbook using the Ansible Runner Service
+    """
+
+    def __init__(self, client, playbook, logger, result_pattern,
+                 params,
+                 querystr_dict={}):
+        """
+        :param client        : Ansible Runner Service Client
+        :param playbook      : The playbook to execute
+        :param logger        : The object used to log messages
+        :param result_pattern: The "pattern" to discover what execution events
+                               have the information deemed as result
+        :param params        : http request payload for the playbook execution
+        :param querystr_dict : http request querystring for the playbook
+                               execution (DO NOT MODIFY HERE)
+
+        """
+        super(PlaybookOperation, self).__init__(client, logger)
+
+       # Private attributes
+        self.playbook = playbook
+
+        # An aditional filter of result events based in the event
+        self.event_filter = ""
+
+        # Playbook execution object
+        self.pb_execution = PlayBookExecution(client,
+                                              playbook,
+                                              logger,
+                                              result_pattern,
+                                              params,
+                                              querystr_dict)
+
+    def __str__(self):
+        return "Playbook {playbook_name}".format(playbook_name=self.playbook)
+
+    @property
+    def status(self):
+        """Check the status of the playbook execution and update the status
+        and result of the underlying Completion object.
         """
 
         if self._status in [ExecutionStatusCode.ON_GOING,
@@ -139,16 +237,12 @@ class AnsibleReadOperation(orchestrator.ReadCompletion):
 
         processed_result = []
 
-
         if self._is_complete:
             raw_result = self.pb_execution.get_result(self.event_filter)
 
-            if self.process_output:
-                processed_result = self.process_output(
-                                            raw_result,
-                                            self.ar_client,
-                                            self.pb_execution.play_uuid,
-                                            self.log)
+            if self.output_wizard:
+                processed_result = self.output_wizard.process(self.pb_execution.play_uuid,
+                                                              raw_result)
             else:
                 processed_result = raw_result
 
@@ -169,6 +263,9 @@ class AnsibleChangeOperation(orchestrator.WriteCompletion):
         self._status = ExecutionStatusCode.NOT_LAUNCHED
         self._result = None
 
+        # Object used to process operation result in different ways
+        self.output_wizard = None
+
     @property
     def status(self):
         """Return the status code of the operation
@@ -214,10 +311,10 @@ class AnsibleChangeOperation(orchestrator.WriteCompletion):
         return self._result
 
 class HttpOperation(object):
+    """A class to ease the management of http operations
+    """
 
     def __init__(self, url, http_operation, payload="", query_string="{}"):
-        """ A class to ease the management of http operations
-        """
         self.url = url
         self.http_operation = http_operation
         self.payload = payload
@@ -242,13 +339,11 @@ class ARSChangeOperation(AnsibleChangeOperation):
         self.log = logger
         self.operations = operations
 
-        self.process_output = None
-
     def __str__(self):
-             # Use the last operation as the main
-             return "Ansible Runner Service: {operation} {url}".format(
-                operation = self.operations[-1].http_operation,
-                url = self.operations[-1].url)
+        # Use the last operation as the main
+        return "Ansible Runner Service: {operation} {url}".format(
+                                operation=self.operations[-1].http_operation,
+                                url=self.operations[-1].url)
 
     @property
     def status(self):
@@ -256,15 +351,17 @@ class ARSChangeOperation(AnsibleChangeOperation):
         and result of the underlying Completion object.
         """
 
-        for op in self.operations:
+        for my_request in self.operations:
             # Execute the right kind of http request
             try:
-                if op.http_operation == "post":
-                    response = self.ar_client.http_post(op.url, op.payload, op.query_string)
-                elif op.http_operation == "delete":
-                    response = self.ar_client.http_delete(op.url)
-                elif op.http_operation == "get":
-                    response = self.ar_client.http_get(op.url)
+                if my_request.http_operation == "post":
+                    response = self.ar_client.http_post(my_request.url,
+                                                        my_request.payload,
+                                                        my_request.query_string)
+                elif my_request.http_operation == "delete":
+                    response = self.ar_client.http_delete(my_request.url)
+                elif my_request.http_operation == "get":
+                    response = self.ar_client.http_get(my_request.url)
 
                 # Any problem executing the secuence of operations will
                 # produce an errored completion object.
@@ -283,8 +380,8 @@ class ARSChangeOperation(AnsibleChangeOperation):
         # If this point is reached, all the operations has been succesfuly
         # executed, and the final result is updated
         self._status = ExecutionStatusCode.SUCCESS
-        if self.process_output:
-            self._result = self.process_output(response.text)
+        if self.output_wizard:
+            self._result = self.output_wizard.process("", response.text)
         else:
             self._result = response.text
 
@@ -346,11 +443,11 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         # Ansible runner service client
         try:
-            self.ar_client = Client(server_url = self.get_module_option('server_url', ''),
-                                    user = self.get_module_option('username', ''),
-                                    password = self.get_module_option('password', ''),
-                                    verify_server = self.get_module_option('verify_server', True),
-                                    logger = self.log)
+            self.ar_client = Client(server_url=self.get_module_option('server_url', ''),
+                                    user=self.get_module_option('username', ''),
+                                    password=self.get_module_option('password', ''),
+                                    verify_server=self.get_module_option('verify_server', True),
+                                    logger=self.log)
         except AnsibleRunnerServiceError:
             self.log.exception("Ansible Runner Service not available. "
                           "Check external server status/TLS identity or "
@@ -362,32 +459,35 @@ class Module(MgrModule, orchestrator.Orchestrator):
         self.run = True
 
     def shutdown(self):
+
         self.log.info('Stopping Ansible orchestrator module')
         self.run = False
 
     def get_inventory(self, node_filter=None, refresh=False):
         """
 
-        :param   :     node_filter instance
-        :param   :      refresh any cached state
-        :Return  :     A AnsibleReadOperation instance (Completion Object)
+        :param   :  node_filter instance
+        :param   :  refresh any cached state
+        :Return  :  A AnsibleReadOperation instance (Completion Object)
         """
 
         # Create a new read completion object for execute the playbook
-        ansible_operation = AnsibleReadOperation(client = self.ar_client,
-                                                 playbook = GET_STORAGE_DEVICES_CATALOG_PLAYBOOK,
-                                                 logger = self.log,
-                                                 result_pattern = "list storage inventory",
-                                                 params = {})
+        playbook_operation = PlaybookOperation(client=self.ar_client,
+                                               playbook=GET_STORAGE_DEVICES_CATALOG_PLAYBOOK,
+                                               logger=self.log,
+                                               result_pattern="list storage inventory",
+                                               params={})
+
 
         # Assign the process_output function
-        ansible_operation.process_output = process_inventory_json
-        ansible_operation.event_filter = "runner_on_ok"
+        playbook_operation.output_wizard = ProcessInventory(self.ar_client,
+                                                            self.log)
+        playbook_operation.event_filter = "runner_on_ok"
 
         # Execute the playbook to obtain data
-        self._launch_operation(ansible_operation)
+        self._launch_operation(playbook_operation)
 
-        return ansible_operation
+        return playbook_operation
 
     def create_osds(self, drive_group, all_hosts):
         """Create one or more OSDs within a single Drive Group.
@@ -396,29 +496,30 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         :param drive_group: (orchestrator.DriveGroupSpec),
                             Drive group with the specification of drives to use
-        :param all_hosts: (List[str]),
-                           List of hosts where the OSD's must be created
+        :param all_hosts  : (List[str]),
+                            List of hosts where the OSD's must be created
         """
 
         # Transform drive group specification to Ansible playbook parameters
         host, osd_spec = dg_2_ansible(drive_group)
 
         # Create a new read completion object for execute the playbook
-        ansible_operation = AnsibleReadOperation(client = self.ar_client,
-                                                 playbook = ADD_OSD_PLAYBOOK,
-                                                 logger = self.log,
-                                                 result_pattern = "",
-                                                 params = osd_spec,
-                                                 querystr_dict = {"limit": host})
+        playbook_operation = PlaybookOperation(client=self.ar_client,
+                                               playbook=ADD_OSD_PLAYBOOK,
+                                               logger=self.log,
+                                               result_pattern="",
+                                               params=osd_spec,
+                                               querystr_dict={"limit": host})
 
         # Filter to get the result
-        ansible_operation.process_output = process_playbook_result
-        ansible_operation.event_filter = "playbook_on_stats"
+        playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
+                                                                  self.log)
+        playbook_operation.event_filter = "playbook_on_stats"
 
         # Execute the playbook
-        self._launch_operation(ansible_operation)
+        self._launch_operation(playbook_operation)
 
-        return ansible_operation
+        return playbook_operation
 
     def remove_osds(self, osd_ids):
         """Remove osd's.
@@ -426,24 +527,36 @@ class Module(MgrModule, orchestrator.Orchestrator):
         :param osd_ids: List of osd's to be removed (List[int])
         """
 
-        extravars = {'osd_to_kill': ",".join([str(id) for id in osd_ids]),
+        extravars = {'osd_to_kill': ",".join([str(osd_id) for osd_id in osd_ids]),
                      'ireallymeanit':'yes'}
 
         # Create a new read completion object for execute the playbook
-        ansible_operation = AnsibleReadOperation(client = self.ar_client,
-                                                 playbook = REMOVE_OSD_PLAYBOOK,
-                                                 logger = self.log,
-                                                 result_pattern = "",
-                                                 params = extravars)
+        playbook_operation = PlaybookOperation(client=self.ar_client,
+                                               playbook=REMOVE_OSD_PLAYBOOK,
+                                               logger=self.log,
+                                               result_pattern="",
+                                               params=extravars)
 
         # Filter to get the result
-        ansible_operation.process_output = process_playbook_result
-        ansible_operation.event_filter = "playbook_on_stats"
+        playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
+                                                                 self.log)
+        playbook_operation.event_filter = "playbook_on_stats"
 
         # Execute the playbook
-        self._launch_operation(ansible_operation)
+        self._launch_operation(playbook_operation)
 
-        return ansible_operation
+        return playbook_operation
+
+    def get_hosts(self):
+        """Provides a list Inventory nodes
+        """
+
+        host_ls_op = ARSOperation(self.ar_client, self.log, URL_GET_HOSTS)
+
+        host_ls_op.output_wizard = ProcessHostsList(self.ar_client,
+                                                    self.log)
+
+        return host_ls_op
 
     def add_host(self, host):
         """
@@ -454,7 +567,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
         :returns : orchestrator.WriteCompletion
         """
 
-        url_group = URL_MANAGE_GROUP.format(group_name = ORCHESTRATOR_GROUP)
+        url_group = URL_MANAGE_GROUP.format(group_name=ORCHESTRATOR_GROUP)
 
         try:
             # Create the orchestrator default group if not exist.
@@ -463,20 +576,19 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
             # Here, the default group exists so...
             # Prepare the operation for adding the new host
-            add_url = URL_ADD_RM_HOSTS.format(host_name = host,
-                                              inventory_group = ORCHESTRATOR_GROUP)
+            add_url = URL_ADD_RM_HOSTS.format(host_name=host,
+                                              inventory_group=ORCHESTRATOR_GROUP)
 
-            operations =  [HttpOperation(add_url, "post")]
+            operations = [HttpOperation(add_url, "post")]
 
         except AnsibleRunnerServiceError as ex:
             # Problems with the external orchestrator.
             # Prepare the operation to return the error in a Completion object.
             self.log.exception("Error checking <orchestrator> group: %s", ex)
-            operations =  [HttpOperation(url_group, "post")]
+            operations = [HttpOperation(url_group, "post")]
 
         return ARSChangeOperation(self.ar_client, self.log, operations)
 
-
     def remove_host(self, host):
         """
         Remove a host from all the groups in the Ansible Runner Service
@@ -491,7 +603,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         try:
             # Get the list of groups where the host is included
-            groups_url = URL_GET_HOST_GROUPS.format(host_name = host)
+            groups_url = URL_GET_HOST_GROUPS.format(host_name=host)
             response = self.ar_client.http_get(groups_url)
 
             if response.status_code == requests.codes.ok:
@@ -509,14 +621,13 @@ class Module(MgrModule, orchestrator.Orchestrator):
             # Build the operations list
             operations = list(map(lambda x:
                                   HttpOperation(URL_ADD_RM_HOSTS.format(
-                                                    host_name = host,
-                                                    inventory_group = x),
+                                                    host_name=host,
+                                                    inventory_group=x),
                                                 "delete"),
                                   host_groups))
 
         return ARSChangeOperation(self.ar_client, self.log, operations)
 
-
     def _launch_operation(self, ansible_operation):
         """Launch the operation and add the operation to the completion objects
         ongoing
@@ -531,96 +642,49 @@ class Module(MgrModule, orchestrator.Orchestrator):
         self.all_completions.append(ansible_operation)
 
     def verify_config(self):
+        """ Verify configuration options for the Ansible orchestrator module
+        """
+        client_msg = ""
 
         if not self.get_module_option('server_url', ''):
-            self.log.error(
-                "No Ansible Runner Service base URL <server_name>:<port>"
-                "Try 'ceph config set mgr mgr/%s/server_url <server name/ip>:<port>'",
-                self.module_name)
+            msg = "No Ansible Runner Service base URL <server_name>:<port>." \
+            "Try 'ceph config set mgr mgr/{0}/server_url " \
+            "<server name/ip>:<port>'".format(self.module_name)
+            self.log.error(msg)
+            client_msg += msg
 
         if not self.get_module_option('username', ''):
-            self.log.error(
-                "No Ansible Runner Service user. "
-                "Try 'ceph config set mgr mgr/%s/username <string value>'",
-                self.module_name)
+            msg = "No Ansible Runner Service user. " \
+            "Try 'ceph config set mgr mgr/{0}/username " \
+            "<string value>'".format(self.module_name)
+            self.log.error(msg)
+            client_msg += msg
 
         if not self.get_module_option('password', ''):
-            self.log.error(
-                "No Ansible Runner Service User password. "
-                "Try 'ceph config set mgr mgr/%s/password <string value>'",
-                self.module_name)
+            msg = "No Ansible Runner Service User password. " \
+            "Try 'ceph config set mgr mgr/{0}/password " \
+            "<string value>'".format(self.module_name)
+            self.log.error(msg)
+            client_msg += msg
 
         if not self.get_module_option('verify_server', ''):
-            self.log.error(
-                "TLS server identity verification is enabled by default."
-                "Use 'ceph config set mgr mgr/{0}/verify_server False' to disable it."
-                "Use 'ceph config set mgr mgr/{0}/verify_server <path>' to "
-                "point the CA bundle path used for verification".format(self.module_name))
-
-
-# Auxiliary functions
-#==============================================================================
-
-def process_inventory_json(inventory_events, ar_client, playbook_uuid, logger):
-    """ Adapt the output of the playbook used in 'get_inventory'
-        to the Orchestrator expected output (list of InventoryNode)
-
-    :param inventory_events: events dict with the results
-
-        Example:
-        inventory_events =
-        {'37-100564f1-9fed-48c2-bd62-4ae8636dfcdb': {'host': '192.168.121.254',
-                                                    'task': 'list storage inventory',
-                                                    'event': 'runner_on_ok'},
-        '36-2016b900-e38f-7dcd-a2e7-00000000000e': {'host': '192.168.121.252'
-                                                    'task': 'list storage inventory',
-                                                    'event': 'runner_on_ok'}}
-    :param ar_client: Ansible Runner Service client
-    :param playbook_uuid: Playbook identifier
-
-    :return              : list of InventoryNode
-    """
-
-    #Obtain the needed data for each result event
-    inventory_nodes = []
-
-    # Loop over the result events and request the event data
-    for event_key, dummy_data in inventory_events.items():
-
-        event_response = ar_client.http_get(EVENT_DATA_URL % (playbook_uuid,
-                                                              event_key))
-
-        # Process the data for each event
-        if event_response:
-            event_data = json.loads(event_response.text)["data"]["event_data"]
-
-            host = event_data["host"]
-            devices = json.loads(event_data["res"]["stdout"])
-
-            devs = []
-            for storage_device in devices:
-                dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(storage_device)
-                devs.append(dev)
-
-            inventory_nodes.append(orchestrator.InventoryNode(host, devs))
+            msg = "TLS server identity verification is enabled by default." \
+            "Use 'ceph config set mgr mgr/{0}/verify_server False' " \
+            "to disable it. Use 'ceph config set mgr mgr/{0}/verify_server " \
+            "<path>' to point the CA bundle path used for " \
+            "verification".format(self.module_name)
+            self.log.error(msg)
+            client_msg += msg
 
+        if client_msg:
+            # Raise error
+            # TODO: Use OrchestratorValidationError
+            raise Exception(client_msg)
 
-    return inventory_nodes
 
 
-def process_playbook_result(inventory_events, ar_client, playbook_uuid):
-
-    result = ""
-
-    # Loop over the result events and request the data
-    for event_key, dummy_data in inventory_events.items():
-        event_response = ar_client.http_get(EVENT_DATA_URL % (playbook_uuid,
-                                                              event_key))
-
-        result += event_response.text
-
-    return result
-
+# Auxiliary functions
+#==============================================================================
 def dg_2_ansible(drive_group):
     """ Transform a drive group especification into:
 
diff --git a/src/pybind/mgr/ansible/output_wizards.py b/src/pybind/mgr/ansible/output_wizards.py
new file mode 100644 (file)
index 0000000..d924bf0
--- /dev/null
@@ -0,0 +1,158 @@
+"""
+ceph-mgr Output Wizards module
+
+Output wizards are used to process results in different ways in
+completion objects
+"""
+
+# pylint: disable=bad-continuation
+
+import json
+
+
+from orchestrator import InventoryDevice, InventoryNode
+
+from .ansible_runner_svc import EVENT_DATA_URL
+
+class OutputWizard(object):
+    """Base class for help to process output in completion objects
+    """
+    def __init__(self, ar_client, logger):
+        """Make easy to work in output wizards using this attributes:
+
+        :param ars_client: Ansible Runner Service client
+        :param logger: log object
+        """
+        self.ar_client = ar_client
+        self.log = logger
+
+    def process(self, operation_id, raw_result):
+        """Make the magic here
+
+        :param operation_id: Allows to identify the Ansible Runner Service
+                             operation whose result we wnat to process
+        :param raw_result: input for processing
+        """
+        raise NotImplementedError
+
+class ProcessInventory(OutputWizard):
+    """ Adapt the output of the playbook used in 'get_inventory'
+        to the Orchestrator expected output (list of InventoryNode)
+    """
+
+    def process(self, operation_id, raw_result):
+        """
+        :param operation_id: Playbook uuid
+        :param raw_result: events dict with the results
+
+            Example:
+            inventory_events =
+            {'37-100564f1-9fed-48c2-bd62-4ae8636dfcdb': {'host': '192.168.121.254',
+                                                        'task': 'list storage inventory',
+                                                        'event': 'runner_on_ok'},
+            '36-2016b900-e38f-7dcd-a2e7-00000000000e': {'host': '192.168.121.252'
+                                                        'task': 'list storage inventory',
+                                                        'event': 'runner_on_ok'}}
+
+        :return              : list of InventoryNode
+        """
+        # Just making more readable the method
+        inventory_events = raw_result
+
+        #Obtain the needed data for each result event
+        inventory_nodes = []
+
+        # Loop over the result events and request the event data
+        for event_key, dummy_data in inventory_events.items():
+
+            event_response = self.ar_client.http_get(EVENT_DATA_URL %
+                            (operation_id, event_key))
+
+            # self.pb_execution.play_uuid
+
+            # Process the data for each event
+            if event_response:
+                event_data = json.loads(event_response.text)["data"]["event_data"]
+
+                host = event_data["host"]
+
+                devices = json.loads(event_data["res"]["stdout"])
+                devs = []
+                for storage_device in devices:
+                    dev = InventoryDevice.from_ceph_volume_inventory(storage_device)
+                    devs.append(dev)
+
+                inventory_nodes.append(InventoryNode(host, devs))
+
+
+        return inventory_nodes
+
+class ProcessPlaybookResult(OutputWizard):
+    """ Provides the result of a playbook execution as plain text
+    """
+    def process(self, operation_id, raw_result):
+        """
+        :param operation_id: Playbook uuid
+        :param raw_result: events dict with the results
+
+        :return              : String with the playbook execution event list
+        """
+        # Just making more readable the method
+        inventory_events = raw_result
+
+        result = ""
+
+        # Loop over the result events and request the data
+        for event_key, dummy_data in inventory_events.items():
+            event_response = self.ar_client.http_get(EVENT_DATA_URL %
+                            (operation_id, event_key))
+
+            result += event_response.text
+
+        return result
+
+
+class ProcessHostsList(OutputWizard):
+    """ Format the output of host ls call
+    """
+    def process(self, operation_id, raw_result):
+        """ Format the output of host ls call
+
+        :param operation_id: Not used in this output wizard
+        :param raw_result: In this case is like the following json:
+                {
+                "status": "OK",
+                "msg": "",
+                "data": {
+                    "hosts": [
+                        "host_a",
+                        "host_b",
+                        ...
+                        "host_x",
+                        ]
+                    }
+                }
+
+        :return: list of InventoryNodes
+        """
+        # Just making more readable the method
+        host_ls_json = raw_result
+
+        inventory_nodes = []
+
+        try:
+            json_resp = json.loads(host_ls_json)
+
+            for host in json_resp["data"]["hosts"]:
+                inventory_nodes.append(InventoryNode(host, []))
+
+        except ValueError:
+            self.log.exception("Malformed json response")
+        except KeyError:
+            self.log.exception("Unexpected content in Ansible Runner Service"
+                               " response")
+        except TypeError:
+            self.log.exception("Hosts data must be iterable in Ansible Runner "
+                               "Service response")
+
+        return inventory_nodes
diff --git a/src/pybind/mgr/ansible/tests/test_output_wizards.py b/src/pybind/mgr/ansible/tests/test_output_wizards.py
new file mode 100644 (file)
index 0000000..2a3a901
--- /dev/null
@@ -0,0 +1,207 @@
+""" Test output wizards
+"""
+import unittest
+import mock
+
+from ..ansible_runner_svc import EVENT_DATA_URL
+from ..output_wizards import ProcessHostsList, ProcessPlaybookResult, \
+                             ProcessInventory
+
+class  OutputWizardProcessHostsList(unittest.TestCase):
+    """Test ProcessHostsList Output Wizard
+    """
+    RESULT_OK = """
+                {
+                "status": "OK",
+                "msg": "",
+                "data": {
+                    "hosts": [
+                        "host_a",
+                        "host_b",
+                        "host_c"
+                        ]
+                    }
+                }
+                """
+    ar_client = mock.Mock()
+    logger = mock.Mock()
+    test_wizard = ProcessHostsList(ar_client, logger)
+
+    def test_process(self):
+        """Test a normal call"""
+
+        nodes_list = self.test_wizard.process("", self.RESULT_OK)
+        self.assertEqual([node.name for node in nodes_list],
+                         ["host_a", "host_b", "host_c"])
+
+    def test_errors(self):
+        """Test different kind of errors processing result"""
+
+         # Malformed json
+        host_list = self.test_wizard.process("", """{"msg": """"")
+        self.assertEqual(host_list, [])
+
+        # key error
+        host_list = self.test_wizard.process("", """{"msg": ""}""")
+        self.assertEqual(host_list, [])
+
+        # Hosts not in iterable
+        host_list = self.test_wizard.process("", """{"data":{"hosts": 123} }""")
+        self.assertEqual(host_list, [])
+
+class OutputWizardProcessPlaybookResult(unittest.TestCase):
+    """Test ProcessPlaybookResult Output Wizard
+    """
+    # Input to process
+    INVENTORY_EVENTS = {1:"first event", 2:"second event"}
+    EVENT_INFORMATION = "event information\n"
+
+    # Mocked response
+    mocked_response = mock.Mock()
+    mocked_response.text = EVENT_INFORMATION
+
+    # The Ansible Runner Service client
+    ar_client = mock.Mock()
+    ar_client.http_get = mock.MagicMock(return_value=mocked_response)
+
+    logger = mock.Mock()
+
+    test_wizard = ProcessPlaybookResult(ar_client, logger)
+
+    def test_process(self):
+        """Test a normal call
+        """
+
+        operation_id = 24
+        result = self.test_wizard.process(operation_id, self.INVENTORY_EVENTS)
+
+        # Check http request are correct and compose expected result
+        expected_result = ""
+        for key, dummy_data in self.INVENTORY_EVENTS.items():
+            http_request = EVENT_DATA_URL % (operation_id, key)
+            self.ar_client.http_get.assert_any_call(http_request)
+            expected_result += self.EVENT_INFORMATION
+
+        #Check result
+        self.assertEqual(result, expected_result)
+
+class OutputWizardProcessInventory(unittest.TestCase):
+    """Test ProcessInventory Output Wizard
+    """
+    # Input to process
+    INVENTORY_EVENTS = {'event_uuid_1': {'host': '192.168.121.144',
+                                         'task': 'list storage inventory',
+                                         'event': 'runner_on_ok'}}
+    EVENT_DATA = r"""
+    {
+    "status": "OK",
+    "msg": "",
+    "data": {
+        "uuid": "5e96d509-174d-4f5f-bd94-e278c3a5b85b",
+        "counter": 11,
+        "stdout": "changed: [192.168.121.144]",
+        "start_line": 17,
+        "end_line": 18,
+        "runner_ident": "6e98b2ba-3ce1-11e9-be81-2016b900e38f",
+        "created": "2019-03-02T11:50:56.582112",
+        "pid": 482,
+        "event_data": {
+            "play_pattern": "osds",
+            "play": "query each host for storage device inventory",
+            "task": "list storage inventory",
+            "task_args": "_ansible_version=2.6.5, _ansible_selinux_special_fs=['fuse', 'nfs', 'vboxsf', 'ramfs', '9p'], _ansible_no_log=False, _ansible_module_name=ceph_volume, _ansible_debug=False, _ansible_verbosity=0, _ansible_keep_remote_files=False, _ansible_syslog_facility=LOG_USER, _ansible_socket=None, action=inventory, _ansible_diff=False, _ansible_remote_tmp=~/.ansible/tmp, _ansible_shell_executable=/bin/sh, _ansible_check_mode=False, _ansible_tmpdir=None",
+            "remote_addr": "192.168.121.144",
+            "res": {
+                "_ansible_parsed": true,
+                "stderr_lines": [],
+                "changed": true,
+                "end": "2019-03-02 11:50:56.554937",
+                "_ansible_no_log": false,
+                "stdout": "[{\"available\": true, \"rejected_reasons\": [], \"sys_api\": {\"scheduler_mode\": \"noop\", \"rotational\": \"1\", \"vendor\": \"ATA\", \"human_readable_size\": \"50.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {}, \"rev\": \"2.5+\", \"sas_address\": \"\", \"locked\": 0, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/sdc\", \"support_discard\": \"\", \"model\": \"QEMU HARDDISK\", \"ro\": \"0\", \"nr_requests\": \"128\", \"size\": 53687091200.0}, \"lvs\": [], \"path\": \"/dev/sdc\"}, {\"available\": false, \"rejected_reasons\": [\"locked\"], \"sys_api\": {\"scheduler_mode\": \"noop\", \"rotational\": \"1\", \"vendor\": \"ATA\", \"human_readable_size\": \"50.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {}, \"rev\": \"2.5+\", \"sas_address\": \"\", \"locked\": 1, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/sda\", \"support_discard\": \"\", \"model\": \"QEMU HARDDISK\", \"ro\": \"0\", \"nr_requests\": \"128\", \"size\": 53687091200.0}, \"lvs\": [{\"cluster_name\": \"ceph\", \"name\": \"osd-data-dcf8a88c-5546-42d2-afa4-b36f7fb23b66\", \"osd_id\": \"3\", \"cluster_fsid\": \"30d61f3e-7ee4-4bdc-8fe7-2ad5bb3f5317\", \"type\": \"block\", \"block_uuid\": \"fVqujC-9dgh-cN9W-1XD4-zVx1-1UdA-fUS3ha\", \"osd_fsid\": \"8b7cbeba-5e86-44ff-a5f3-2e7df77753fe\"}], \"path\": \"/dev/sda\"}, {\"available\": false, \"rejected_reasons\": [\"locked\"], \"sys_api\": {\"scheduler_mode\": \"noop\", \"rotational\": \"1\", \"vendor\": \"ATA\", \"human_readable_size\": \"50.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {}, \"rev\": \"2.5+\", \"sas_address\": \"\", \"locked\": 1, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/sdb\", \"support_discard\": \"\", \"model\": \"QEMU HARDDISK\", \"ro\": \"0\", \"nr_requests\": \"128\", \"size\": 53687091200.0}, \"lvs\": [{\"cluster_name\": \"ceph\", \"name\": \"osd-data-8c92e986-bd97-4b3d-ba77-2cb88e15d80f\", \"osd_id\": \"1\", \"cluster_fsid\": \"30d61f3e-7ee4-4bdc-8fe7-2ad5bb3f5317\", \"type\": \"block\", \"block_uuid\": \"mgzO7O-vUfu-H3mf-4R3K-2f97-ZMRH-SngBFP\", \"osd_fsid\": \"6d067688-3e1b-45f9-ad03-8abd19e9f117\"}], \"path\": \"/dev/sdb\"}, {\"available\": false, \"rejected_reasons\": [\"locked\"], \"sys_api\": {\"scheduler_mode\": \"mq-deadline\", \"rotational\": \"1\", \"vendor\": \"0x1af4\", \"human_readable_size\": \"41.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {\"vda1\": {\"start\": \"2048\", \"holders\": [], \"sectorsize\": 512, \"sectors\": \"2048\", \"size\": \"1024.00 KB\"}, \"vda3\": {\"start\": \"2101248\", \"holders\": [\"dm-0\", \"dm-1\"], \"sectorsize\": 512, \"sectors\": \"81784832\", \"size\": \"39.00 GB\"}, \"vda2\": {\"start\": \"4096\", \"holders\": [], \"sectorsize\": 512, \"sectors\": \"2097152\", \"size\": \"1024.00 MB\"}}, \"rev\": \"\", \"sas_address\": \"\", \"locked\": 1, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/vda\", \"support_discard\": \"\", \"model\": \"\", \"ro\": \"0\", \"nr_requests\": \"256\", \"size\": 44023414784.0}, \"lvs\": [{\"comment\": \"not used by ceph\", \"name\": \"LogVol00\"}, {\"comment\": \"not used by ceph\", \"name\": \"LogVol01\"}], \"path\": \"/dev/vda\"}]",
+                "cmd": [
+                    "ceph-volume",
+                    "inventory",
+                    "--format=json"
+                ],
+                "rc": 0,
+                "start": "2019-03-02 11:50:55.150121",
+                "stderr": "",
+                "delta": "0:00:01.404816",
+                "invocation": {
+                    "module_args": {
+                        "wal_vg": null,
+                        "wal": null,
+                        "dmcrypt": false,
+                        "block_db_size": "-1",
+                        "journal": null,
+                        "objectstore": "bluestore",
+                        "db": null,
+                        "batch_devices": [],
+                        "db_vg": null,
+                        "journal_vg": null,
+                        "cluster": "ceph",
+                        "osds_per_device": 1,
+                        "containerized": "False",
+                        "crush_device_class": null,
+                        "report": false,
+                        "data_vg": null,
+                        "data": null,
+                        "action": "inventory",
+                        "journal_size": "5120"
+                    }
+                },
+                "stdout_lines": [
+                    "[{\"available\": true, \"rejected_reasons\": [], \"sys_api\": {\"scheduler_mode\": \"noop\", \"rotational\": \"1\", \"vendor\": \"ATA\", \"human_readable_size\": \"50.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {}, \"rev\": \"2.5+\", \"sas_address\": \"\", \"locked\": 0, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/sdc\", \"support_discard\": \"\", \"model\": \"QEMU HARDDISK\", \"ro\": \"0\", \"nr_requests\": \"128\", \"size\": 53687091200.0}, \"lvs\": [], \"path\": \"/dev/sdc\"}, {\"available\": false, \"rejected_reasons\": [\"locked\"], \"sys_api\": {\"scheduler_mode\": \"noop\", \"rotational\": \"1\", \"vendor\": \"ATA\", \"human_readable_size\": \"50.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {}, \"rev\": \"2.5+\", \"sas_address\": \"\", \"locked\": 1, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/sda\", \"support_discard\": \"\", \"model\": \"QEMU HARDDISK\", \"ro\": \"0\", \"nr_requests\": \"128\", \"size\": 53687091200.0}, \"lvs\": [{\"cluster_name\": \"ceph\", \"name\": \"osd-data-dcf8a88c-5546-42d2-afa4-b36f7fb23b66\", \"osd_id\": \"3\", \"cluster_fsid\": \"30d61f3e-7ee4-4bdc-8fe7-2ad5bb3f5317\", \"type\": \"block\", \"block_uuid\": \"fVqujC-9dgh-cN9W-1XD4-zVx1-1UdA-fUS3ha\", \"osd_fsid\": \"8b7cbeba-5e86-44ff-a5f3-2e7df77753fe\"}], \"path\": \"/dev/sda\"}, {\"available\": false, \"rejected_reasons\": [\"locked\"], \"sys_api\": {\"scheduler_mode\": \"noop\", \"rotational\": \"1\", \"vendor\": \"ATA\", \"human_readable_size\": \"50.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {}, \"rev\": \"2.5+\", \"sas_address\": \"\", \"locked\": 1, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/sdb\", \"support_discard\": \"\", \"model\": \"QEMU HARDDISK\", \"ro\": \"0\", \"nr_requests\": \"128\", \"size\": 53687091200.0}, \"lvs\": [{\"cluster_name\": \"ceph\", \"name\": \"osd-data-8c92e986-bd97-4b3d-ba77-2cb88e15d80f\", \"osd_id\": \"1\", \"cluster_fsid\": \"30d61f3e-7ee4-4bdc-8fe7-2ad5bb3f5317\", \"type\": \"block\", \"block_uuid\": \"mgzO7O-vUfu-H3mf-4R3K-2f97-ZMRH-SngBFP\", \"osd_fsid\": \"6d067688-3e1b-45f9-ad03-8abd19e9f117\"}], \"path\": \"/dev/sdb\"}, {\"available\": false, \"rejected_reasons\": [\"locked\"], \"sys_api\": {\"scheduler_mode\": \"mq-deadline\", \"rotational\": \"1\", \"vendor\": \"0x1af4\", \"human_readable_size\": \"41.00 GB\", \"sectors\": 0, \"sas_device_handle\": \"\", \"partitions\": {\"vda1\": {\"start\": \"2048\", \"holders\": [], \"sectorsize\": 512, \"sectors\": \"2048\", \"size\": \"1024.00 KB\"}, \"vda3\": {\"start\": \"2101248\", \"holders\": [\"dm-0\", \"dm-1\"], \"sectorsize\": 512, \"sectors\": \"81784832\", \"size\": \"39.00 GB\"}, \"vda2\": {\"start\": \"4096\", \"holders\": [], \"sectorsize\": 512, \"sectors\": \"2097152\", \"size\": \"1024.00 MB\"}}, \"rev\": \"\", \"sas_address\": \"\", \"locked\": 1, \"sectorsize\": \"512\", \"removable\": \"0\", \"path\": \"/dev/vda\", \"support_discard\": \"\", \"model\": \"\", \"ro\": \"0\", \"nr_requests\": \"256\", \"size\": 44023414784.0}, \"lvs\": [{\"comment\": \"not used by ceph\", \"name\": \"LogVol00\"}, {\"comment\": \"not used by ceph\", \"name\": \"LogVol01\"}], \"path\": \"/dev/vda\"}]"
+                ]
+            },
+            "pid": 482,
+            "play_uuid": "2016b900-e38f-0e09-19be-00000000000c",
+            "task_uuid": "2016b900-e38f-0e09-19be-000000000012",
+            "event_loop": null,
+            "playbook_uuid": "e80e66f2-4a78-4a96-aaf6-fbe473f11312",
+            "playbook": "storage-inventory.yml",
+            "task_action": "ceph_volume",
+            "host": "192.168.121.144",
+            "task_path": "/usr/share/ansible-runner-service/project/storage-inventory.yml:29"
+        },
+        "event": "runner_on_ok"
+    }
+    }
+    """
+
+    # Mocked response
+    mocked_response = mock.Mock()
+    mocked_response.text = EVENT_DATA
+
+    # The Ansible Runner Service client
+    ar_client = mock.Mock()
+    ar_client.http_get = mock.MagicMock(return_value=mocked_response)
+
+    logger = mock.Mock()
+
+    test_wizard = ProcessInventory(ar_client, logger)
+
+    def test_process(self):
+        """Test a normal call
+        """
+        operation_id = 12
+        nodes_list = self.test_wizard.process(operation_id, self.INVENTORY_EVENTS)
+
+        for key, dummy_data in self.INVENTORY_EVENTS.items():
+            http_request = EVENT_DATA_URL % (operation_id, key)
+            self.ar_client.http_get.assert_any_call(http_request)
+
+
+        # Only one host
+        self.assertTrue(len(nodes_list), 1)
+
+        # Host retrieved OK
+        self.assertEqual(nodes_list[0].name, "192.168.121.144")
+
+        # Devices
+        self.assertTrue(len(nodes_list[0].devices), 4)
+
+        expected_device_ids = ["/dev/sdc", "/dev/sda", "/dev/sdb", "/dev/vda"]
+        device_ids = [dev.id for dev in nodes_list[0].devices]
+
+        self.assertEqual(expected_device_ids, device_ids)
index 00f87ec01fd4610d19ce1751758617847fac725b..e66f926a3f33f669a61b84f841319bf3e14e4565 100644 (file)
@@ -438,7 +438,6 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-
 class UpgradeSpec(object):
     # Request to orchestrator to initiate an upgrade to a particular
     # version of Ceph