From 34b993c2cfc7484b11e5f73694941c43cb879f47 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Juan=20Miguel=20Olmo=20Mart=C3=ADnez?= Date: Wed, 23 Jan 2019 13:49:07 +0100 Subject: [PATCH] mgr/ansible: Solve merge problems MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Add/fix things after a Signed-off-by: Juan Miguel Olmo Martínez --- src/pybind/mgr/ansible/ansible_runner_svc.py | 63 +++--- src/pybind/mgr/ansible/module.py | 191 +++++++++++++++---- src/pybind/mgr/orchestrator.py | 10 +- src/pybind/mgr/orchestrator_cli/module.py | 74 ++++++- 4 files changed, 275 insertions(+), 63 deletions(-) diff --git a/src/pybind/mgr/ansible/ansible_runner_svc.py b/src/pybind/mgr/ansible/ansible_runner_svc.py index a7dd30f1724a7..ac49d04067390 100644 --- a/src/pybind/mgr/ansible/ansible_runner_svc.py +++ b/src/pybind/mgr/ansible/ansible_runner_svc.py @@ -26,7 +26,9 @@ class PlayBookExecution(object): """Object to provide all the results of a Playbook execution """ - def __init__(self, rest_client, playbook, logger, result_pattern="", the_params={}): + def __init__(self, rest_client, playbook, logger, result_pattern="", + the_params={}, + querystr_dict={}): self.rest_client = rest_client @@ -39,9 +41,13 @@ class PlayBookExecution(object): # Playbook name self.playbook = playbook - # Params used in the playbook + # Parameters used in the playbook self.params = the_params + # Query string used in the "launch" request + self.querystr_dict = querystr_dict + + # Logger self.log = logger def launch(self): @@ -50,7 +56,9 @@ class PlayBookExecution(object): endpoint = "%s/%s" % (PLAYBOOK_EXEC_URL, self.playbook) - response = self.rest_client.http_post(endpoint, self.params) + response = self.rest_client.http_post(endpoint, + self.params, + self.querystr_dict) if response: self.play_uuid = json.loads(response.text)["data"]["play_uuid"] @@ -101,8 +109,8 @@ class PlayBookExecution(object): @returns: the events that matches with the patterns provided """ - if not self.result_task_pattern or not self.play_uuid: - result_events = {} + if not self.play_uuid: + return {} response = self.rest_client.http_get(PLAYBOOK_EVENTS % self.play_uuid) @@ -110,9 +118,14 @@ class PlayBookExecution(object): result_events = {} else: events = json.loads(response.text)["data"]["events"] - result_events = {event:data for event,data in events.items() - if "task" in data and - re.match(self.result_task_pattern, data["task"])} + + if self.result_task_pattern: + result_events = {event:data for event,data in events.items() + if "task" in data and + re.match(self.result_task_pattern, data["task"])} + else: + result_events = events + if event_filter: result_events = {event:data for event,data in result_events.items() if re.match(event_filter, data['event'])} @@ -129,13 +142,13 @@ class Client(object): """Provide an https client to make easy interact with the Ansible Runner Service" - @param servers_url: The base URL >server>: of the Ansible Runner Service - @param user: User name of the authorized user - @param password: Password of the authotized user - @param verify_server: Either a boolean, in which case it controls whether we verify + :param servers_url: The base URL >server>: of the Ansible Runner Service + :param user: Username of the authorized user + :param password: Password of the authorized user + :param verify_server: Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True``. - @param logger: Log file + :param logger: Log file """ self.server_url = server_url self.user = user @@ -197,9 +210,9 @@ class Client(object): def http_get(self, endpoint): """Execute an http get request - @param endpoint: Ansible Runner service RESTful API endpoint + :param endpoint: Ansible Runner service RESTful API endpoint - @returns: A requests object + :returns: A requests object """ response = None @@ -224,13 +237,14 @@ class Client(object): return response - def http_post(self, endpoint, payload): + def http_post(self, endpoint, payload, params_dict = {}): """Execute an http post request - @param endpoint: Ansible Runner service RESTful API endpoint - @param payload: Dictionary with the data used in the post request + :param endpoint: Ansible Runner service RESTful API endpoint + :param payload: Dictionary with the data used in the post request + :param params_dict: A dict used to build a query string - @returns: A requests object + :returns: A requests object """ response = None @@ -241,10 +255,11 @@ class Client(object): verify = self.verify_server, headers = {"Authorization": self.token, "Content-type": "application/json"}, - data = payload) + json = payload, + params = params_dict) if r.status_code != requests.codes.ok: - self.log.error("http POST %s [%s] <--> (%s - %s)\n%s", + self.log.error("http POST %s [%s] <--> (%s - %s:%s)\n", the_url, payload, r.status_code, r.reason, r.text) else: self.log.info("http POST %s <--> (%s - %s)", @@ -259,10 +274,10 @@ class Client(object): def http_put(self, endpoint, payload): """Execute an http put request - @param endpoint: Ansible Runner service RESTful API endpoint - @param payload: Dictionary with the data used in the put request + :param endpoint: Ansible Runner service RESTful API endpoint + :param payload: Dictionary with the data used in the put request - @returns: A requests object + :returns: A requests object """ # TODO raise NotImplementedError("TODO") diff --git a/src/pybind/mgr/ansible/module.py b/src/pybind/mgr/ansible/module.py index 87d016d135bf1..d22f1bb6c6365 100644 --- a/src/pybind/mgr/ansible/module.py +++ b/src/pybind/mgr/ansible/module.py @@ -23,21 +23,29 @@ WAIT_PERIOD = 10 # where the playbook is executed. GET_STORAGE_DEVICES_CATALOG_PLAYBOOK = "host-disks.yml" +# Used in the create_osd method +ADD_OSD_PLAYBOOK = "add-osd.yml" + +# Used in the remove_osds method +REMOVE_OSD_PLAYBOOK = "shrink-osd.yml" + class AnsibleReadOperation(orchestrator.ReadCompletion): """ A read operation means to obtain information from the cluster. """ - def __init__(self, client, playbook, logger, result_pattern, params): + def __init__(self, client, playbook, logger, result_pattern, + params, + querystr_dict={}): super(AnsibleReadOperation, self).__init__() - # Private attributes self.playbook = playbook self._is_complete = False self._is_errored = False self._result = [] + self._status = ExecutionStatusCode.NOT_LAUNCHED # Error description in operation self.error = "" @@ -59,7 +67,8 @@ class AnsibleReadOperation(orchestrator.ReadCompletion): playbook, logger, result_pattern, - params) + params, + querystr_dict) @property def is_complete(self): @@ -78,17 +87,23 @@ class AnsibleReadOperation(orchestrator.ReadCompletion): """Return the status code of the operation updating conceptually 'linked' attributes """ - current_status = self.pb_execution.get_status() - self._is_complete = (current_status == ExecutionStatusCode.SUCCESS) or \ - (current_status == ExecutionStatusCode.ERROR) + if self._status in [ExecutionStatusCode.ON_GOING, + ExecutionStatusCode.NOT_LAUNCHED]: + self._status = self.pb_execution.get_status() + + self._is_complete = (self._status == ExecutionStatusCode.SUCCESS) or \ + (self._status == ExecutionStatusCode.ERROR) - self._is_errored = (current_status == ExecutionStatusCode.ERROR) + self._is_errored = (self._status == ExecutionStatusCode.ERROR) - return current_status + if self._is_complete: + self.update_result() + + return self._status def execute_playbook(self): - """Execute the playbook with the provided params. + """Launch the execution of the playbook with the parameters configured """ self.pb_execution.launch() @@ -99,7 +114,7 @@ class AnsibleReadOperation(orchestrator.ReadCompletion): The result of the playbook execution can be customized through the function provided as 'process_output' attribute - @return string: Result of the operation formatted if it is possible + :return string: Result of the operation formatted if it is possible """ processed_result = [] @@ -164,7 +179,7 @@ class AnsibleChangeOperation(orchestrator.WriteCompletion): In the case of Ansible, this will be True if the playbooks has been executed succesfully. - @return Boolean: if the playbook has been executed succesfully + :return Boolean: if the playbook has been executed succesfully """ return self.status == ExecutionStatusCode.SUCCESS @@ -179,8 +194,7 @@ class AnsibleChangeOperation(orchestrator.WriteCompletion): class Module(MgrModule, orchestrator.Orchestrator): - """An Orchestrator that an external Ansible runner service to perform - operations + """An Orchestrator that uses to perform operations """ MODULE_OPTIONS = [ {'name': 'server_url'}, @@ -208,15 +222,14 @@ class Module(MgrModule, orchestrator.Orchestrator): """Given a list of Completion instances, progress any which are incomplete. - @param completions: list of Completion instances - @Returns : true if everything is done. + :param completions: list of Completion instances + :Returns : True if everything is done. """ # Check progress and update status in each operation + # Access completion.status property do the trick for operation in completions: self.log.info("playbook <%s> status:%s", operation.playbook, operation.status) - if operation.is_complete: - operation.update_result() completions = filter(lambda x: not x.is_complete, completions) @@ -257,8 +270,8 @@ class Module(MgrModule, orchestrator.Orchestrator): def get_inventory(self, node_filter=None): """ - @param : node_filter instance - @Return : A AnsibleReadOperation instance (Completion Object) + :param : node_filter instance + :Return : A AnsibleReadOperation instance (Completion Object) """ # Create a new read completion object for execute the playbook @@ -266,31 +279,86 @@ class Module(MgrModule, orchestrator.Orchestrator): playbook = GET_STORAGE_DEVICES_CATALOG_PLAYBOOK, logger = self.log, result_pattern = "RESULTS", - params = "{}") + params = {}) # Assign the process_output function ansible_operation.process_output = process_inventory_json ansible_operation.event_filter = "runner_on_ok" # Execute the playbook to obtain data - ansible_operation.execute_playbook() + self._launch_operation(ansible_operation) - self.all_completions.append(ansible_operation) + return ansible_operation + + def create_osds(self, drive_group, all_hosts=None): + """Create one or more OSDs within a single Drive Group. + If no host provided the operation affects all the host in the OSDS role + + + :param drive_group: (orchestrator.DriveGroupSpec), + Drive group with the specification of drives to use + :param all_hosts: (List[str]), + List of hosts where the OSD's must be created + """ + + # Transform drive group specification to Ansible playbook parameters + host, osd_spec = dg_2_ansible(drive_group) + + # Create a new read completion object for execute the playbook + ansible_operation = AnsibleReadOperation(client = self.ar_client, + playbook = ADD_OSD_PLAYBOOK, + logger = self.log, + result_pattern = "", + params = osd_spec, + querystr_dict = {"limit": host}) + + # Filter to get the result + ansible_operation.process_output = process_playbook_result + ansible_operation.event_filter = "playbook_on_stats" + + # Execute the playbook + self._launch_operation(ansible_operation) return ansible_operation - def create_osds(self, drive_group, all_hosts): + def remove_osds(self, osd_ids): + """Remove osd's. + + :param osd_ids: List of osd's to be removed (List[int]) """ - Create one or more OSDs within a single Drive Group. - The principal argument here is the drive_group member - of OsdSpec: other fields are advisory/extensible for any - finer-grained OSD feature enablement (choice of backing store, - compression/encryption, etc). + extravars = {'osd_to_kill': ",".join([str(id) for id in osd_ids]), + 'ireallymeanit':'yes'} - :param osd_spec: DriveGroupSpec + # Create a new read completion object for execute the playbook + ansible_operation = AnsibleReadOperation(client = self.ar_client, + playbook = REMOVE_OSD_PLAYBOOK, + logger = self.log, + result_pattern = "", + params = extravars) + + # Filter to get the result + ansible_operation.process_output = process_playbook_result + ansible_operation.event_filter = "playbook_on_stats" + + # Execute the playbook + self._launch_operation(ansible_operation) + + return ansible_operation + + def _launch_operation(self, ansible_operation): + """Launch the operation and add the operation to the completion objects + ongoing + + :ansible_operation: A read/write ansible operation (completion object) """ + # Execute the playbook + ansible_operation.execute_playbook() + + # Add the operation to the list of things ongoing + self.all_completions.append(ansible_operation) + def verify_config(self): if not self.get_module_option('server_url', ''): @@ -326,7 +394,7 @@ def process_inventory_json(inventory_events, ar_client, playbook_uuid): """ Adapt the output of the playbook used in 'get_inventory' to the Orchestrator expected output (list of InventoryNode) - @param inventory_events: events dict with the results + :param inventory_events: events dict with the results Example: inventory_events = @@ -336,10 +404,10 @@ def process_inventory_json(inventory_events, ar_client, playbook_uuid): '36-2016b900-e38f-7dcd-a2e7-00000000000e': {'host': '192.168.121.252' 'task': 'RESULTS', 'event': 'runner_on_ok'}} - @param ar_client: Ansible Runner Service client - @param playbook_uuid: Palybooud identifier + :param ar_client: Ansible Runner Service client + :param playbook_uuid: Playbook identifier - @return : list of InventoryNode + :return : list of InventoryNode """ #Obtain the needed data for each result event @@ -372,3 +440,60 @@ def process_inventory_json(inventory_events, ar_client, playbook_uuid): return inventory_nodes + + +def process_playbook_result(inventory_events, ar_client, playbook_uuid): + + result = "" + + # Loop over the result events and request the data + for event_key, dummy_data in inventory_events.items(): + event_response = ar_client.http_get(EVENT_DATA_URL % (playbook_uuid, + event_key)) + + result += event_response.text + + return result + +def dg_2_ansible(drive_group): + """ Transform a drive group especification into: + + a host : limit the playbook execution to this host + a osd_spec : dict of parameters to pass to the Ansible playbook used + to create the osds + + :param drive_group: (type: DriveGroupSpec) + + TODO: Possible this function will be removed/or modified heavily when + the ansible playbook to create osd's use ceph volume batch with + drive group parameter + """ + + # Limit the execution of the playbook to certain hosts + # TODO: Now only accepted "*" (all the hosts) or a host_name in the + # drive_group.host_pattern + # This attribute is intended to be used with "fnmatch" patterns, so when + # this become effective it will be needed to use the "get_inventory" method + # in order to have a list of hosts to be filtered with the "host_pattern" + if drive_group.host_pattern in ["*"]: + host = None # No limit in the playbook + else: + # For the moment, we assume that we only have 1 host + host = drive_group.host_pattern + + # Compose the OSD configuration + + + osd = {} + osd["data"] = drive_group.data_devices.paths[0] + # Other parameters will be extracted in the same way + #osd["dmcrypt"] = drive_group.encryption + + # lvm_volumes parameters + # (by the moment is what is accepted in the current playbook) + osd_spec = {"lvm_volumes":[osd]} + + #Global scope variables also can be included in the osd_spec + #osd_spec["osd_objectstore"] = drive_group.objectstore + + return host, osd_spec diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index 2f113ce1cdef1..bfa71e66a1c2d 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -255,7 +255,7 @@ class Orchestrator(object): assert not (service_name and service_id) raise NotImplementedError() - def create_osds(self, drive_group, all_hosts): + def create_osds(self, drive_group, all_hosts=None): # type: (DriveGroupSpec, List[str]) -> WriteCompletion """ Create one or more OSDs within a single Drive Group. @@ -267,6 +267,9 @@ class Orchestrator(object): :param drive_group: DriveGroupSpec :param all_hosts: TODO, this is required because the orchestrator methods are not composable + Probably this parameter can be easily removed because each orchestrator can use + the "get_inventory" method and the "drive_group.host_pattern" attribute + to obtain the list of hosts where to apply the operation """ raise NotImplementedError() @@ -278,10 +281,9 @@ class Orchestrator(object): """ raise NotImplementedError() - def remove_osds(self, node, osd_ids): - # type: (str, List[str]) -> WriteCompletion + def remove_osds(self, osd_ids): + # type: (List[str]) -> WriteCompletion """ - :param node: A node name, must exist. :param osd_ids: list of OSD IDs Note that this can only remove OSDs that were successfully diff --git a/src/pybind/mgr/orchestrator_cli/module.py b/src/pybind/mgr/orchestrator_cli/module.py index 76711170754ab..916b4d0fcd980 100644 --- a/src/pybind/mgr/orchestrator_cli/module.py +++ b/src/pybind/mgr/orchestrator_cli/module.py @@ -117,6 +117,17 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): "cmd": "orchestrator status", "desc": "Report configured backend and its status", "perm": "r" + }, + { 'cmd': "orchestrator osd create " + "name=drive_group,type=CephString,req=false ", + "desc": "OSD's creation following specification \ + in parameter or readed from -i input.", + "perm": "rw" + }, + { 'cmd': "orchestrator osd remove " + "name=osd_ids,type=CephInt,n=N ", + "desc": "Remove Osd's", + "perm": "rw" } ] @@ -228,15 +239,69 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs) # TODO: Remove this and make the orchestrator composable + # or + # Probably this should be moved to each of the orchestrators, + # then we wouldn't need the "all_hosts" parameter at all. host_completion = self.get_hosts() self.wait([host_completion]) all_hosts = [h.name for h in host_completion.result] + completion = self.create_osds(spec, all_hosts=all_hosts) + self._orchestrator_wait([completion]) + + return HandleCommandResult() + + def _create_osd(self, inbuf, cmd): + """Create one or more OSDs + + :cmd : Arguments for the create osd + """ + #Obtain/validate parameters for the operation + cmdline_error = "" + if "drive_group" in cmd.keys(): + params = cmd["drive_group"] + elif inbuf: + params = inbuf + else: + cmdline_error = "Please, use 'drive_group' parameter \ + or specify -i " + + if cmdline_error: + return HandleCommandResult(-errno.EINVAL, stderr=cmdline_error) + try: json_dg = json.loads(params) except ValueError as msg: return HandleCommandResult(-errno.EINVAL, stderr=msg) + # Create the drive group + drive_group = orchestrator.DriveGroupSpec.from_json(json_dg) + #Read other Drive_group + + #Launch the operation in the orchestrator module + completion = self.create_osds(drive_group) + + #Wait until the operation finishes + self._orchestrator_wait([completion]) + + #return result + return HandleCommandResult(stdout=completion.result) + + def _remove_osd(self, cmd): + """ + Remove OSD's + :cmd : Arguments for remove the osd + """ + + #Launch the operation in the orchestrator module + completion = self.remove_osds(cmd["osd_ids"]) + + #Wait until the operation finishes + self._orchestrator_wait([completion]) + + #return result + return HandleCommandResult(stdout=completion.result) + def _add_stateless_svc(self, svc_type, spec): completion = self.add_stateless_service(svc_type, spec) self._orchestrator_wait([completion]) @@ -328,7 +393,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): enabled = module['name'] in mgr_map['modules'] if not enabled: return HandleCommandResult(-errno.EINVAL, - stdout="Module '{module_name}' is not enabled. \n Run " + stderr="Module '{module_name}' is not enabled. \n Run " "`ceph mgr module enable {module_name}` " "to enable.".format(module_name=module_name)) @@ -352,7 +417,8 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): try: avail, why = self.available() except NoOrchestrator: - return HandleCommandResult(stderr="No orchestrator configured (try " + return HandleCommandResult(-errno.ENODEV, + stderr="No orchestrator configured (try " "`ceph orchestrator set backend`)") if avail is None: @@ -406,5 +472,9 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): return self._set_backend(cmd) elif cmd['prefix'] == "orchestrator status": return self._status() + elif cmd['prefix'] == "orchestrator osd create": + return self._create_osd(_, cmd) + elif cmd['prefix'] == "orchestrator osd remove": + return self._remove_osd(cmd) else: raise NotImplementedError() -- 2.39.5