]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/ansible: Adapt to new orchestrator completions
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 6 Sep 2019 08:45:30 +0000 (10:45 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 27 Nov 2019 12:38:20 +0000 (13:38 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/ansible/ansible_runner_svc.py
src/pybind/mgr/ansible/module.py
src/pybind/mgr/ansible/output_wizards.py
src/pybind/mgr/ansible/tests/test_client_playbooks.py
src/pybind/mgr/ansible/tests/test_output_wizards.py

index 18019455269699074ac6ec99084061f2d4989148..116136c6153034f4301e4c93acef481b5c0a800e 100644 (file)
@@ -6,8 +6,13 @@ import json
 import re
 from functools import wraps
 import collections
+import logging
+from typing import Optional
 
 import requests
+from orchestrator import OrchestratorError
+
+logger = logging.getLogger(__name__)
 
 # Ansible Runner service API endpoints
 API_URL = "api"
@@ -17,7 +22,7 @@ EVENT_DATA_URL = "api/v1/jobs/%s/events/%s"
 URL_MANAGE_GROUP = "api/v1/groups/{group_name}"
 URL_ADD_RM_HOSTS = "api/v1/hosts/{host_name}/groups/{inventory_group}"
 
-class AnsibleRunnerServiceError(Exception):
+class AnsibleRunnerServiceError(OrchestratorError):
     """Generic Ansible Runner Service Exception"""
     pass
 
@@ -46,9 +51,12 @@ class PlayBookExecution(object):
     """Object to provide all the results of a Playbook execution
     """
 
-    def __init__(self, rest_client, playbook, logger, result_pattern="",
-                 the_params=None,
-                 querystr_dict=None):
+    def __init__(self, rest_client,  # type: Client
+                 playbook,  # type: str
+                 result_pattern="",  # type: str
+                 the_params=None,  # type: Optional[dict]
+                 querystr_dict=None  # type: Optional[dict]
+                 ):
 
         self.rest_client = rest_client
 
@@ -67,9 +75,6 @@ class PlayBookExecution(object):
         # Query string used in the "launch" request
         self.querystr_dict = querystr_dict
 
-        # Logger
-        self.log = logger
-
     def launch(self):
         """ Launch the playbook execution
         """
@@ -82,7 +87,7 @@ class PlayBookExecution(object):
                                                   self.params,
                                                   self.querystr_dict)
         except AnsibleRunnerServiceError:
-            self.log.exception("Error launching playbook <%s>", self.playbook)
+            logger.exception("Error launching playbook <%s>", self.playbook)
             raise
 
         # Here we have a server response, but an error trying
@@ -91,7 +96,7 @@ class PlayBookExecution(object):
         # to the orchestrator (via completion object)
         if response.ok:
             self.play_uuid = json.loads(response.text)["data"]["play_uuid"]
-            self.log.info("Playbook execution launched succesfuly")
+            logger.info("Playbook execution launched succesfuly")
         else:
             raise AnsibleRunnerServiceError(response.reason)
 
@@ -117,7 +122,7 @@ class PlayBookExecution(object):
             try:
                 response = self.rest_client.http_get(endpoint)
             except AnsibleRunnerServiceError:
-                self.log.exception("Error getting playbook <%s> status",
+                logger.exception("Error getting playbook <%s> status",
                                    self.playbook)
 
             if response:
@@ -131,7 +136,7 @@ class PlayBookExecution(object):
             else:
                 status_value = ExecutionStatusCode.ERROR
 
-        self.log.info("Requested playbook execution status is: %s", status_value)
+        logger.info("Requested playbook execution status is: %s", status_value)
         return status_value
 
     def get_result(self, event_filter):
@@ -148,7 +153,7 @@ class PlayBookExecution(object):
         try:
             response = self.rest_client.http_get(PLAYBOOK_EVENTS % self.play_uuid)
         except AnsibleRunnerServiceError:
-            self.log.exception("Error getting playbook <%s> result", self.playbook)
+            logger.exception("Error getting playbook <%s> result", self.playbook)
 
         if not response:
             result_events = {}
@@ -170,7 +175,7 @@ class PlayBookExecution(object):
                 result_events = {event:data for event, data in result_events.items()
                                  if re.match(type_of_events, data['event'])}
 
-        self.log.info("Requested playbook result is: %s", json.dumps(result_events))
+        logger.info("Requested playbook result is: %s", json.dumps(result_events))
         return result_events
 
 class Client(object):
@@ -178,8 +183,13 @@ class Client(object):
     and execute easily playbooks
     """
 
-    def __init__(self, server_url, verify_server, ca_bundle, client_cert,
-                 client_key, logger):
+    def __init__(self,
+                 server_url,  # type: str
+                 verify_server,  # type: bool
+                 ca_bundle,  # type: str
+                 client_cert, # type: str
+                 client_key  # type: str
+                 ):
         """Provide an https client to make easy interact with the Ansible
         Runner Service"
 
@@ -194,17 +204,15 @@ class Client(object):
                             file
         :param client_key: Path to Ansible Runner Service client certificate key
                            file
-        :param logger: Log file
         """
         self.server_url = server_url
-        self.log = logger
         self.client_cert = (client_cert, client_key)
 
         # used to provide the "verify" parameter in requests
         # a boolean that sometimes contains a string :-(
         self.verify_server = verify_server
         if ca_bundle: # This intentionallly overwrites
-            self.verify_server = ca_bundle
+            self.verify_server = ca_bundle  # type: ignore
 
         self.server_url = "https://{0}".format(self.server_url)
 
@@ -238,11 +246,11 @@ class Client(object):
                                 headers={})
 
         if response.status_code != requests.codes.ok:
-            self.log.error("http GET %s <--> (%s - %s)\n%s",
+            logger.error("http GET %s <--> (%s - %s)\n%s",
                            the_url, response.status_code, response.reason,
                            response.text)
         else:
-            self.log.info("http GET %s <--> (%s - %s)",
+            logger.info("http GET %s <--> (%s - %s)",
                           the_url, response.status_code, response.text)
 
         return response
@@ -267,11 +275,11 @@ class Client(object):
                                  params=params_dict)
 
         if response.status_code != requests.codes.ok:
-            self.log.error("http POST %s [%s] <--> (%s - %s:%s)\n",
+            logger.error("http POST %s [%s] <--> (%s - %s:%s)\n",
                            the_url, payload, response.status_code,
                            response.reason, response.text)
         else:
-            self.log.info("http POST %s <--> (%s - %s)",
+            logger.info("http POST %s <--> (%s - %s)",
                           the_url, response.status_code, response.text)
 
         return response
@@ -292,11 +300,11 @@ class Client(object):
                                    headers={})
 
         if response.status_code != requests.codes.ok:
-            self.log.error("http DELETE %s <--> (%s - %s)\n%s",
+            logger.error("http DELETE %s <--> (%s - %s)\n%s",
                            the_url, response.status_code, response.reason,
                            response.text)
         else:
-            self.log.info("http DELETE %s <--> (%s - %s)",
+            logger.info("http DELETE %s <--> (%s - %s)",
                           the_url, response.status_code, response.text)
 
         return response
@@ -400,6 +408,7 @@ class InventoryGroup(collections.MutableSet):
     """ Manages an Ansible Inventory Group
     """
     def __init__(self, group_name, ars_client):
+        # type: (str, Client) -> None
         """Init the group_name attribute and
            Create the inventory group if it does not exist
 
index 0498625a596a94fdeac613e738972f690f31869b..05c86214c673b51b3800d34e6d870995ac5bf59e 100644 (file)
@@ -5,11 +5,14 @@ The external Orchestrator is the Ansible runner service (RESTful https service)
 """
 
 # pylint: disable=abstract-method, no-member, bad-continuation
-
+import functools
 import json
 import os
 import errno
 import tempfile
+from typing import List, Any, Optional, Callable, Tuple, TypeVar
+T = TypeVar('T')
+
 import requests
 
 from mgr_module import MgrModule, Option, CLIWriteCommand
@@ -21,7 +24,9 @@ from .ansible_runner_svc import Client, PlayBookExecution, ExecutionStatusCode,\
                                 URL_MANAGE_GROUP, URL_ADD_RM_HOSTS
 
 from .output_wizards import ProcessInventory, ProcessPlaybookResult, \
-                            ProcessHostsList
+                            ProcessHostsList, OutputWizard
+
+
 
 # Time to clean the completions list
 WAIT_PERIOD = 10
@@ -59,366 +64,158 @@ URL_GET_HOST_GROUPS = "api/v1/hosts/{host_name}"
 URL_GET_HOSTS = "api/v1/hosts"
 
 
-class AnsibleReadOperation(orchestrator.ReadCompletion):
-    """ A read operation means to obtain information from the cluster.
-    """
-    def __init__(self, client, logger):
-        """
-        :param client        : Ansible Runner Service Client
-        :param logger        : The object used to log messages
-        """
-        super(AnsibleReadOperation, self).__init__()
-
-        # Private attributes
-        self._is_complete = False
-        self._is_errored = False
-        self._result = []
-        self._status = ExecutionStatusCode.NOT_LAUNCHED
-
-        # Object used to process operation result in different ways
-        self.output_wizard = None
-
-        # Error description in operation
-        self.error = ""
-
-        # Ansible Runner Service client
-        self.ar_client = client
-
-        # Logger
-        self.log = logger
-
-    def __str__(self):
-        return "Playbook {playbook_name}".format(playbook_name=self.playbook)
-
-    @property
-    def has_result(self):
-        return self._is_complete
-
-    @property
-    def is_errored(self):
-        return self._is_errored
 
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def status(self):
-        """Retrieve the current status of the operation and update state
-        attributes
-        """
-        raise NotImplementedError()
-
-class ARSOperation(AnsibleReadOperation):
-    """Execute an Ansible Runner Service Operation
+def deferred(f):
+    # type: (Callable[..., T]) -> Callable[..., orchestrator.Completion[T]]
+    """
+    Decorator to make RookOrchestrator methods return
+    a completion object that executes themselves.
     """
 
-    def __init__(self, client, logger, url, get_operation=True, payload=None):
-        """
-        :param client        : Ansible Runner Service Client
-        :param logger        : The object used to log messages
-        :param url           : The Ansible Runner Service URL that provides
-                               the operation
-        :param get_operation : True if operation is provided using an http GET
-        :param payload       : http request payload
-        """
-        super(ARSOperation, self).__init__(client, logger)
-
-        self.url = url
-        self.get_operation = get_operation
-        self.payload = payload
-
-    def __str__(self):
-        return "Ansible Runner Service: {operation} {url}".format(
-                    operation="GET" if self.get_operation else "POST",
-                    url=self.url)
-
-    @property
-    def status(self):
-        """ Execute the Ansible Runner Service operation and update the status
-        and result of the underlying Completion object.
-        """
-
-        # Execute the right kind of http request
-        if self.get_operation:
-            response = self.ar_client.http_get(self.url)
-        else:
-            response = self.ar_client.http_post(self.url, self.payload)
-
-        # If no connection errors, the operation is complete
-        self._is_complete = True
-
-        # Depending of the response, status and result is updated
-        if not response:
-            self._is_errored = True
-            self._status = ExecutionStatusCode.ERROR
-            self._result = "Ansible Runner Service not Available"
-        else:
-            self._is_errored = (response.status_code != requests.codes.ok)
-
-            if not self._is_errored:
-                self._status = ExecutionStatusCode.SUCCESS
-                if self.output_wizard:
-                    self._result = self.output_wizard.process(self.url,
-                                                              response.text)
-                else:
-                    self._result = response.text
-            else:
-                self._status = ExecutionStatusCode.ERROR
-                self._result = response.reason
+    @functools.wraps(f)
+    def wrapper(*args, **kwargs):
+        return orchestrator.Completion(on_complete=lambda _: f(*args, **kwargs))
 
-        return self._status
+    return wrapper
 
 
-class PlaybookOperation(AnsibleReadOperation):
-    """Execute a playbook using the Ansible Runner Service
+def clean_inventory(ar_client, clean_hosts_on_success):
+    # type: (Client, dict) -> None
+    """ Remove hosts from inventory groups
     """
 
-    def __init__(self, client, playbook, logger, result_pattern,
-                 params,
-                 querystr_dict={}):
-        """
-        :param client        : Ansible Runner Service Client
-        :param playbook      : The playbook to execute
-        :param logger        : The object used to log messages
-        :param result_pattern: The "pattern" to discover what execution events
-                               have the information deemed as result
-        :param params        : http request payload for the playbook execution
-        :param querystr_dict : http request querystring for the playbook
-                               execution (DO NOT MODIFY HERE)
-
-        """
-        super(PlaybookOperation, self).__init__(client, logger)
-
-       # Private attributes
-        self.playbook = playbook
+    for group, hosts in clean_hosts_on_success.items():
+        InventoryGroup(group, ar_client).clean(hosts)
 
-        # An aditional filter of result events based in the event
-        self.event_filter_list = [""]
 
-        # A dict with groups and hosts to remove from inventory if operation is
-        # succesful. Ex: {"group1": ["host1"], "group2": ["host3", "host4"]}
-        self.clean_hosts_on_success = {}
-
-        # Playbook execution object
-        self.pb_execution = PlayBookExecution(client,
-                                              playbook,
-                                              logger,
-                                              result_pattern,
-                                              params,
-                                              querystr_dict)
+def playbook_operation(client,  # type: Client
+                       playbook,  # type: str
+                       result_pattern,  # type: str
+                       params,  # type: dict
+                       event_filter_list=None,  # type: Optional[List[str]]
+                       querystr_dict=None,  # type: Optional[dict]
+                       output_wizard=None,  # type: Optional[OutputWizard]
+                       clean_hosts_on_success=None  # type: Optional[dict]
+                       ):
+    # type: (...) -> orchestrator.Completion
+    """
+    :param client        : Ansible Runner Service Client
+    :param playbook      : The playbook to execute
+    :param result_pattern: The "pattern" to discover what execution events
+                           have the information deemed as result
+    :param params        : http request payload for the playbook execution
+    :param querystr_dict : http request querystring for the playbook
+                           execution (DO NOT MODIFY HERE)
+    :param event_filter_list: An aditional filter of result events based in the event
+    :param clean_hosts_on_success: A dict with groups and hosts to remove from inventory if operation is
+        succesful. Ex: {"group1": ["host1"], "group2": ["host3", "host4"]}
+    """
 
-    def __str__(self):
-        return "Playbook {playbook_name}".format(playbook_name=self.playbook)
+    querystr_dict = querystr_dict or {}
+    event_filter_list = event_filter_list or [""]
+    clean_hosts_on_success = clean_hosts_on_success or {}
 
-    @property
-    def status(self):
+    def status(_):
         """Check the status of the playbook execution and update the status
         and result of the underlying Completion object.
         """
 
-        if self._status in [ExecutionStatusCode.ON_GOING,
-                            ExecutionStatusCode.NOT_LAUNCHED]:
-            self._status = self.pb_execution.get_status()
-
-        self._is_complete = (self._status == ExecutionStatusCode.SUCCESS) or \
-                            (self._status == ExecutionStatusCode.ERROR)
+        status = pb_execution.get_status()
 
-        self._is_errored = (self._status == ExecutionStatusCode.ERROR)
+        if status in (ExecutionStatusCode.SUCCESS, ExecutionStatusCode.ERROR):
+            if status == ExecutionStatusCode.ERROR:
+                raw_result = pb_execution.get_result(["runner_on_failed",
+                                                      "runner_on_unreachable",
+                                                      "runner_on_no_hosts",
+                                                      "runner_on_async_failed",
+                                                      "runner_item_on_failed"])
+            else:
+                raw_result = pb_execution.get_result(event_filter_list)
 
-        if self._is_complete:
-            self.update_result()
+            if output_wizard:
+                processed_result = output_wizard.process(pb_execution.play_uuid,
+                                                         raw_result)
+            else:
+                processed_result = raw_result
 
             # Clean hosts if operation is succesful
-            if self._status == ExecutionStatusCode.SUCCESS:
-                self.clean_inventory()
-
-        return self._status
+            if status == ExecutionStatusCode.SUCCESS:
+                clean_inventory(client, clean_hosts_on_success)
 
-    def execute_playbook(self):
-        """Launch the execution of the playbook with the parameters configured
-        """
-        try:
-            self.pb_execution.launch()
-        except AnsibleRunnerServiceError:
-            self._status = ExecutionStatusCode.ERROR
-            raise
-
-    def update_result(self):
-        """Output of the read operation
-
-        The result of the playbook execution can be customized through the
-        function provided as 'process_output' attribute
+            return processed_result
+        else:
+            return orchestrator.Completion(on_complete=status)
 
-        :return string: Result of the operation formatted if it is possible
-        """
+    pb_execution = PlayBookExecution(client, playbook, result_pattern, params, querystr_dict)
 
-        processed_result = []
+    return orchestrator.Completion(on_complete=lambda _: pb_execution.launch()).then(status)
 
-        if self._is_errored:
-            raw_result = self.pb_execution.get_result(["runner_on_failed",
-                                                       "runner_on_unreachable",
-                                                       "runner_on_no_hosts",
-                                                       "runner_on_async_failed",
-                                                       "runner_item_on_failed"])
-        elif self._is_complete:
-            raw_result = self.pb_execution.get_result(self.event_filter_list)
 
-        if self.output_wizard:
-            processed_result = self.output_wizard.process(self.pb_execution.play_uuid,
-                                                          raw_result)
+def ars_http_operation(url, http_operation, payload="", params_dict=None):
+    def inner(ar_client):
+        # type: (Client) -> str
+        if http_operation == "post":
+            response = ar_client.http_post(url,
+                                           payload,
+                                           params_dict)
+        elif http_operation == "delete":
+            response = ar_client.http_delete(url)
+        elif http_operation == "get":
+            response = ar_client.http_get(url)
         else:
-            processed_result = raw_result
-
-        self._result = processed_result
-
-    def clean_inventory(self):
-        """ Remove hosts from inventory groups
-        """
-
-        for group, hosts in self.clean_hosts_on_success.items():
-            InventoryGroup(group, self.ar_client).clean(hosts)
-            del self.clean_hosts_on_success[group]
+            assert False, http_operation
 
+        # Any problem executing the secuence of operations will
+        # produce an errored completion object.
+        try:
+            response.raise_for_status()
+        except Exception as e:
+            raise AnsibleRunnerServiceError(str(e))
 
+        return response.text
+    return inner
 
-class AnsibleChangeOperation(orchestrator.WriteCompletion):
-    """Operations that changes the "cluster" state
 
-    Modifications/Changes (writes) are a two-phase thing, firstly execute
-    the playbook that is going to change elements in the Ceph Cluster.
-    When the playbook finishes execution (independently of the result),
-    the modification/change operation has finished.
+@deferred
+def ars_change(client, operations, output_wizard=None):
+    # type: (Client, List[Callable[[Client], str]], Optional[OutputWizard]) -> str
     """
-    def __init__(self):
-        super(AnsibleChangeOperation, self).__init__()
-
-        self._status = ExecutionStatusCode.NOT_LAUNCHED
-        self._result = None
-
-        # Object used to process operation result in different ways
-        self.output_wizard = None
-
-    @property
-    def status(self):
-        """Return the status code of the operation
-        """
-        raise NotImplementedError()
-
-    @property
-    def has_result(self):
-        """
-        Has the operation updated the orchestrator's configuration
-        persistently?  Typically this would indicate that an update
-        had been written to a manifest, but that the update
-        had not necessarily been pushed out to the cluster.
-
-        :return Boolean: True if the execution of the Ansible Playbook or the
-                         operation over the Ansible Runner Service has finished
-        """
-
-        return self._status in [ExecutionStatusCode.SUCCESS,
-                                ExecutionStatusCode.ERROR]
-
-    @property
-    def is_effective(self):
-        """Has the operation taken effect on the cluster?
-        For example, if we were adding a service, has it come up and appeared
-        in Ceph's cluster maps?
-
-        In the case of Ansible, this will be True if the playbooks has been
-        executed succesfully.
+    Execute one or more Ansible Runner Service Operations that implies
+    a change in the cluster
 
-        :return Boolean: if the playbook/ARS operation has been executed
-                         succesfully
-        """
+    :param client         : Ansible Runner Service Client
+    :param operations     : A list of http_operation objects
 
-        return self._status == ExecutionStatusCode.SUCCESS
+    Execute the Ansible Runner Service operations and update the status
+    and result of the underlying Completion object.
+    """
 
-    @property
-    def is_errored(self):
-        return self._status == ExecutionStatusCode.ERROR
+    out = None
+    for my_request in operations:
+        # Execute the right kind of http request
+        out = my_request(client)
+        # If this point is reached, all the operations has been succesfuly
+        # executed, and the final result is updated
+    assert out is not None
+    if output_wizard:
+        return output_wizard.process("", out)
+    else:
+        return out
 
-    @property
-    def result(self):
-        return self._result
 
-class HttpOperation(object):
-    """A class to ease the management of http operations
+def ars_read(client, url, get_operation=True, payload=None, output_wizard=None):
+    # type: (Client, str, bool, Optional[str], Optional[OutputWizard]) -> orchestrator.Completion[str]
     """
+    Execute the Ansible Runner Service operation
 
-    def __init__(self, url, http_operation, payload="", query_string="{}"):
-        self.url = url
-        self.http_operation = http_operation
-        self.payload = payload
-        self.query_string = query_string
-        self.response = None
-
-class ARSChangeOperation(AnsibleChangeOperation):
-    """Execute one or more Ansible Runner Service Operations that implies
-    a change in the cluster
+    :param client        : Ansible Runner Service Client
+    :param url           : The Ansible Runner Service URL that provides
+                           the operation
+    :param get_operation : True if operation is provided using an http GET
+    :param payload       : http request payload
     """
-    def __init__(self, client, logger, operations):
-        """
-        :param client         : Ansible Runner Service Client
-        :param logger         : The object used to log messages
-        :param operations     : A list of http_operation objects
-        :param payload        : dict with http request payload
-        """
-        super(ARSChangeOperation, self).__init__()
-
-        assert operations, "At least one operation is needed"
-        self.ar_client = client
-        self.log = logger
-        self.operations = operations
-
-    def __str__(self):
-        # Use the last operation as the main
-        return "Ansible Runner Service: {operation} {url}".format(
-                                operation=self.operations[-1].http_operation,
-                                url=self.operations[-1].url)
-
-    @property
-    def status(self):
-        """Execute the Ansible Runner Service operations and update the status
-        and result of the underlying Completion object.
-        """
+    return ars_change(client, [ars_http_operation(url, 'get' if get_operation else 'post', payload)], output_wizard)
 
-        for my_request in self.operations:
-            # Execute the right kind of http request
-            try:
-                if my_request.http_operation == "post":
-                    response = self.ar_client.http_post(my_request.url,
-                                                        my_request.payload,
-                                                        my_request.query_string)
-                elif my_request.http_operation == "delete":
-                    response = self.ar_client.http_delete(my_request.url)
-                elif my_request.http_operation == "get":
-                    response = self.ar_client.http_get(my_request.url)
-
-                # Any problem executing the secuence of operations will
-                # produce an errored completion object.
-                if response.status_code != requests.codes.ok:
-                    self._status = ExecutionStatusCode.ERROR
-                    self._result = response.text
-                    return self._status
-
-            # Any kind of error communicating with ARS or preventing
-            # to have a right http response
-            except AnsibleRunnerServiceError as ex:
-                self._status = ExecutionStatusCode.ERROR
-                self._result = str(ex)
-                return self._status
-
-        # If this point is reached, all the operations has been succesfuly
-        # executed, and the final result is updated
-        self._status = ExecutionStatusCode.SUCCESS
-        if self.output_wizard:
-            self._result = self.output_wizard.process("", response.text)
-        else:
-            self._result = response.text
-
-        return self._status
 
 class Module(MgrModule, orchestrator.Orchestrator):
     """An Orchestrator that uses <Ansible Runner Service> to perform operations
@@ -440,7 +237,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         self.all_completions = []
 
-        self.ar_client = None
+        self.ar_client = None  # type: Client
 
         # TLS certificate and key file names used to connect with the external
         # Ansible Runner Service
@@ -450,6 +247,9 @@ class Module(MgrModule, orchestrator.Orchestrator):
         # used to provide more verbose explanation of errors in status method
         self.status_message = ""
 
+        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
+
+
     def available(self):
         """ Check if Ansible Runner service is working
         """
@@ -486,7 +286,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
         # Check progress and update status in each operation
         # Access completion.status property do the trick
         for operation in completions:
-            self.log.info("<%s> status:%s", operation, operation.status)
+            self.log.info("<%s> is_finished:%s", operation, operation.is_finished)
 
     def serve(self):
         """ Mandatory for standby modules
@@ -503,8 +303,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
                 verify_server=self.get_module_option('verify_server', True),
                 ca_bundle=self.get_module_option('ca_bundle', ''),
                 client_cert=self.client_cert_fname,
-                client_key=self.client_key_fname,
-                logger=self.log)
+                client_key=self.client_key_fname)
 
         except AnsibleRunnerServiceError:
             self.log.exception("Ansible Runner Service not available. "
@@ -530,54 +329,41 @@ class Module(MgrModule, orchestrator.Orchestrator):
         """
 
         # Create a new read completion object for execute the playbook
-        playbook_operation = PlaybookOperation(client=self.ar_client,
-                                               playbook=GET_STORAGE_DEVICES_CATALOG_PLAYBOOK,
-                                               logger=self.log,
-                                               result_pattern="list storage inventory",
-                                               params={})
-
+        op = playbook_operation(client=self.ar_client,
+                                playbook=GET_STORAGE_DEVICES_CATALOG_PLAYBOOK,
+                                result_pattern="list storage inventory",
+                                params={},
+                                output_wizard=ProcessInventory(self.ar_client),
+                                event_filter_list=["runner_on_ok"])
 
-        # Assign the process_output function
-        playbook_operation.output_wizard = ProcessInventory(self.ar_client,
-                                                            self.log)
-        playbook_operation.event_filter_list = ["runner_on_ok"]
+        self._launch_operation(op)
 
-        # Execute the playbook to obtain data
-        self._launch_operation(playbook_operation)
+        return op
 
-        return playbook_operation
-
-    def create_osds(self, drive_group, all_hosts):
+    def create_osds(self, drive_group):
         """Create one or more OSDs within a single Drive Group.
         If no host provided the operation affects all the host in the OSDS role
 
 
         :param drive_group: (ceph.deployment.drive_group.DriveGroupSpec),
                             Drive group with the specification of drives to use
-        :param all_hosts  : (List[str]),
-                            List of hosts where the OSD's must be created
         """
 
         # Transform drive group specification to Ansible playbook parameters
         host, osd_spec = dg_2_ansible(drive_group)
 
         # Create a new read completion object for execute the playbook
-        playbook_operation = PlaybookOperation(client=self.ar_client,
-                                               playbook=ADD_OSD_PLAYBOOK,
-                                               logger=self.log,
-                                               result_pattern="",
-                                               params=osd_spec,
-                                               querystr_dict={"limit": host})
+        op = playbook_operation(client=self.ar_client,
+                                playbook=ADD_OSD_PLAYBOOK,
+                                result_pattern="",
+                                params=osd_spec,
+                                querystr_dict={"limit": host},
+                                output_wizard=ProcessPlaybookResult(self.ar_client),
+                                event_filter_list=["playbook_on_stats"])
 
-        # Filter to get the result
-        playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
-                                                                  self.log)
-        playbook_operation.event_filter_list = ["playbook_on_stats"]
+        self._launch_operation(op)
 
-        # Execute the playbook
-        self._launch_operation(playbook_operation)
-
-        return playbook_operation
+        return op
 
     def remove_osds(self, osd_ids, destroy=False):
         """Remove osd's.
@@ -591,32 +377,24 @@ class Module(MgrModule, orchestrator.Orchestrator):
                      'ireallymeanit':'yes'}
 
         # Create a new read completion object for execute the playbook
-        playbook_operation = PlaybookOperation(client=self.ar_client,
-                                               playbook=REMOVE_OSD_PLAYBOOK,
-                                               logger=self.log,
-                                               result_pattern="",
-                                               params=extravars)
-
-        # Filter to get the result
-        playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
-                                                                 self.log)
-        playbook_operation.event_filter_list = ["playbook_on_stats"]
-
+        op = playbook_operation(client=self.ar_client,
+                                playbook=REMOVE_OSD_PLAYBOOK,
+                                result_pattern="",
+                                params=extravars,
+                                output_wizard=ProcessPlaybookResult(self.ar_client),
+                                event_filter_list=["playbook_on_stats"])
 
         # Execute the playbook
-        self._launch_operation(playbook_operation)
+        self._launch_operation(op)
 
-        return playbook_operation
+        return op
 
     def get_hosts(self):
         """Provides a list Inventory nodes
         """
 
-        host_ls_op = ARSOperation(self.ar_client, self.log, URL_GET_HOSTS)
-
-        host_ls_op.output_wizard = ProcessHostsList(self.ar_client,
-                                                    self.log)
-
+        host_ls_op = ars_read(self.ar_client, url=URL_GET_HOSTS,
+                              output_wizard=ProcessHostsList(self.ar_client))
         return host_ls_op
 
     def add_host(self, host):
@@ -625,7 +403,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
         group
 
         :param host: hostname
-        :returns : orchestrator.WriteCompletion
+        :returns : orchestrator.Completion
         """
 
         url_group = URL_MANAGE_GROUP.format(group_name=ORCHESTRATOR_GROUP)
@@ -640,16 +418,16 @@ class Module(MgrModule, orchestrator.Orchestrator):
             add_url = URL_ADD_RM_HOSTS.format(host_name=host,
                                               inventory_group=ORCHESTRATOR_GROUP)
 
-            operations = [HttpOperation(add_url, "post", "", None)]
+            operations = [ars_http_operation(add_url, "post", "", None)]
 
         except AnsibleRunnerServiceError as ex:
             # Problems with the external orchestrator.
             # Prepare the operation to return the error in a Completion object.
             self.log.exception("Error checking <orchestrator> group: %s",
                                str(ex))
-            operations = [HttpOperation(url_group, "post", "", None)]
+            operations = [ars_http_operation(url_group, "post", "", None)]
 
-        return ARSChangeOperation(self.ar_client, self.log, operations)
+        return ars_change(self.ar_client, operations)
 
     def remove_host(self, host):
         """
@@ -657,10 +435,9 @@ class Module(MgrModule, orchestrator.Orchestrator):
         inventory.
 
         :param host: hostname
-        :returns : orchestrator.WriteCompletion
+        :returns : orchestrator.Completion
         """
 
-        operations = []
         host_groups = []
 
         try:
@@ -673,25 +450,26 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         except AnsibleRunnerServiceError:
             self.log.exception("Error retrieving host groups")
+            raise
 
         if not host_groups:
             # Error retrieving the groups, prepare the completion object to
             # execute the problematic operation just to provide the error
             # to the caller
-            operations = [HttpOperation(groups_url, "get")]
+            operations = [ars_http_operation(groups_url, "get")]
         else:
             # Build the operations list
             operations = list(map(lambda x:
-                                  HttpOperation(URL_ADD_RM_HOSTS.format(
-                                                    host_name=host,
-                                                    inventory_group=x),
-                                                "delete"),
+                                  ars_http_operation(URL_ADD_RM_HOSTS.format(
+                                      host_name=host,
+                                      inventory_group=x),
+                                      "delete"),
                                   host_groups))
 
-        return ARSChangeOperation(self.ar_client, self.log, operations)
+        return ars_change(self.ar_client, operations)
 
     def add_rgw(self, spec):
-        # type: (orchestrator.RGWSpec) -> PlaybookOperation
+        # type: (orchestrator.RGWSpec) -> orchestrator.Completion
         """ Add a RGW service in the cluster
 
         : spec        : an Orchestrator.RGWSpec object
@@ -743,31 +521,26 @@ class Module(MgrModule, orchestrator.Orchestrator):
         resource_group = "rgw_zone_{}".format(spec.name)
         InventoryGroup(resource_group, self.ar_client).update(hosts)
 
-
         # Execute the playbook to create the service
-        playbook_operation = PlaybookOperation(client=self.ar_client,
-                                               playbook=SITE_PLAYBOOK,
-                                               logger=self.log,
-                                               result_pattern="",
-                                               params=extravars,
-                                               querystr_dict={"limit": limited})
-
-        # Filter to get the result
-        playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
-                                                                 self.log)
-        playbook_operation.event_filter_list = ["playbook_on_stats"]
+        op = playbook_operation(client=self.ar_client,
+                                playbook=SITE_PLAYBOOK,
+                                result_pattern="",
+                                params=extravars,
+                                querystr_dict={"limit": limited},
+                                output_wizard=ProcessPlaybookResult(self.ar_client),
+                                event_filter_list=["playbook_on_stats"])
 
         # Execute the playbook
-        self._launch_operation(playbook_operation)
+        self._launch_operation(op)
 
-        return playbook_operation
+        return op
 
     def remove_rgw(self, zone):
         """ Remove a RGW service providing <zone>
 
-        :zone : <zone name> of the RGW
+        :param zone: <zone name> of the RGW
                             ...
-        : returns    : Completion object
+        :returns    : Completion object
         """
 
 
@@ -784,30 +557,25 @@ class Module(MgrModule, orchestrator.Orchestrator):
         # Avoid manual confirmation
         extravars = {"ireallymeanit": "yes"}
 
-        # Execute the playbook to remove the service
-        playbook_operation = PlaybookOperation(client=self.ar_client,
-                                               playbook=PURGE_PLAYBOOK,
-                                               logger=self.log,
-                                               result_pattern="",
-                                               params=extravars,
-                                               querystr_dict={"limit": limited})
-
-        # Filter to get the result
-        playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
-                                                                 self.log)
-        playbook_operation.event_filter_list = ["playbook_on_stats"]
-
         # Cleaning of inventory after a sucessful operation
         clean_inventory = {}
         clean_inventory[resource_group] = hosts_list
         clean_inventory[group] = hosts_list
-        playbook_operation.clean_hosts_on_success = clean_inventory
+
+        # Execute the playbook to remove the service
+        op = playbook_operation(client=self.ar_client,
+                                               playbook=PURGE_PLAYBOOK,
+                                               result_pattern="",
+                                               params=extravars,
+                                               querystr_dict={"limit": limited},
+                                output_wizard=ProcessPlaybookResult(self.ar_client),
+                                event_filter_list=["playbook_on_stats"],
+                                clean_hosts_on_success=clean_inventory)
 
         # Execute the playbook
-        self.log.info("Removing service rgw for resource %s", zone)
-        self._launch_operation(playbook_operation)
+        self._launch_operation(op)
 
-        return playbook_operation
+        return op
 
     def _launch_operation(self, ansible_operation):
         """Launch the operation and add the operation to the completion objects
@@ -816,9 +584,6 @@ class Module(MgrModule, orchestrator.Orchestrator):
         :ansible_operation: A read/write ansible operation (completion object)
         """
 
-        # Execute the playbook
-        ansible_operation.execute_playbook()
-
         # Add the operation to the list of things ongoing
         self.all_completions.append(ansible_operation)
 
@@ -837,7 +602,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
         if the_crt is None or the_key is None:
             # If not possible... try to get generic certificates and key content
             # ex: mgr/ansible/[crt/key]
-            self.log.warning("Specific tls files for this manager not "\
+            self.log.warning("Specific tls files for this manager not "
                              "configured, trying to use generic files")
             the_crt = self.get_store("crt")
             the_key = self.get_store("key")
index a49b70d478a091954e9f6cd232b0ecba8456e9c5..c0d8325ba12e148dd1f39eae51a9b409f28ddf7a 100644 (file)
@@ -6,23 +6,24 @@ completion objects
 """
 
 import json
+import logging
 
 from ceph.deployment import inventory
 from orchestrator import InventoryNode
 
 from .ansible_runner_svc import EVENT_DATA_URL
 
+logger = logging.getLogger(__name__)
+
 class OutputWizard(object):
     """Base class for help to process output in completion objects
     """
-    def __init__(self, ar_client, logger):
+    def __init__(self, ar_client):
         """Make easy to work in output wizards using this attributes:
 
         :param ars_client: Ansible Runner Service client
-        :param logger: log object
         """
         self.ar_client = ar_client
-        self.log = logger
 
     def process(self, operation_id, raw_result):
         """Make the magic here
@@ -139,12 +140,12 @@ class ProcessHostsList(OutputWizard):
                 inventory_nodes.append(InventoryNode(host, inventory.Devices([])))
 
         except ValueError:
-            self.log.exception("Malformed json response")
+            logger.exception("Malformed json response")
         except KeyError:
-            self.log.exception("Unexpected content in Ansible Runner Service"
+            logger.exception("Unexpected content in Ansible Runner Service"
                                " response")
         except TypeError:
-            self.log.exception("Hosts data must be iterable in Ansible Runner "
+            logger.exception("Hosts data must be iterable in Ansible Runner "
                                "Service response")
 
         return inventory_nodes
index 23cbbe4555bca4bdfb410c81dfd01a9d325823b2..cadcd9b84509f0423131a8b00b24a65e7dd0c86f 100644 (file)
@@ -33,8 +33,7 @@ logger.addHandler(handler)
 def mock_get_pb(mock_server, playbook_name, return_code):
 
     ars_client = Client(SERVER_URL, verify_server=False, ca_bundle="",
-                        client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH",
-                        logger = logger)
+                        client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH")
 
     the_pb_url = "https://%s/%s/%s" % (SERVER_URL, PLAYBOOK_EXEC_URL, playbook_name)
 
@@ -53,7 +52,7 @@ def mock_get_pb(mock_server, playbook_name, return_code):
                                "data": { "play_uuid": "1733c3ac" }},
                         status_code=return_code)
 
-    return PlayBookExecution(ars_client, playbook_name, logger,
+    return PlayBookExecution(ars_client, playbook_name,
                              result_pattern = "RESULTS")
 
 class ARSclientTest(unittest.TestCase):
@@ -62,8 +61,7 @@ class ARSclientTest(unittest.TestCase):
 
         with self.assertRaises(AnsibleRunnerServiceError):
             ars_client = Client(SERVER_URL, verify_server=False, ca_bundle="",
-                            client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH",
-                            logger = logger)
+                            client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH")
 
             status = ars_client.is_operative()
 
@@ -73,8 +71,7 @@ class ARSclientTest(unittest.TestCase):
         with requests_mock.Mocker() as mock_server:
 
             ars_client = Client(SERVER_URL, verify_server=False, ca_bundle="",
-                                client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH",
-                                logger = logger)
+                                client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH")
 
             the_api_url = "https://%s/%s" % (SERVER_URL,API_URL)
             mock_server.register_uri("GET",
@@ -90,8 +87,7 @@ class ARSclientTest(unittest.TestCase):
         with requests_mock.Mocker() as mock_server:
 
             ars_client = Client(SERVER_URL, verify_server=False, ca_bundle="",
-                                client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH",
-                                logger = logger)
+                                client_cert = "DUMMY_PATH", client_key = "DUMMY_PATH")
 
             url = "https://%s/test" % (SERVER_URL)
             mock_server.register_uri("DELETE",
index 3c3437659d4fa93f86740a5e541a264ab3a111bf..02d72e95aefb3b61cb961756c800808cd006d497 100644 (file)
@@ -24,8 +24,7 @@ class  OutputWizardProcessHostsList(unittest.TestCase):
                 }
                 """
     ar_client = mock.Mock()
-    logger = mock.Mock()
-    test_wizard = ProcessHostsList(ar_client, logger)
+    test_wizard = ProcessHostsList(ar_client)
 
     def test_process(self):
         """Test a normal call"""
@@ -64,9 +63,7 @@ class OutputWizardProcessPlaybookResult(unittest.TestCase):
     ar_client = mock.Mock()
     ar_client.http_get = mock.MagicMock(return_value=mocked_response)
 
-    logger = mock.Mock()
-
-    test_wizard = ProcessPlaybookResult(ar_client, logger)
+    test_wizard = ProcessPlaybookResult(ar_client)
 
     def test_process(self):
         """Test a normal call
@@ -177,9 +174,7 @@ class OutputWizardProcessInventory(unittest.TestCase):
     ar_client = mock.Mock()
     ar_client.http_get = mock.MagicMock(return_value=mocked_response)
 
-    logger = mock.Mock()
-
-    test_wizard = ProcessInventory(ar_client, logger)
+    test_wizard = ProcessInventory(ar_client)
 
     def test_process(self):
         """Test a normal call