"""Object to provide all the results of a Playbook execution
"""
- def __init__(self, rest_client, playbook, logger, result_pattern="", the_params={}):
+ def __init__(self, rest_client, playbook, logger, result_pattern="",
+ the_params={},
+ querystr_dict={}):
self.rest_client = rest_client
# Playbook name
self.playbook = playbook
- # Params used in the playbook
+ # Parameters used in the playbook
self.params = the_params
+ # Query string used in the "launch" request
+ self.querystr_dict = querystr_dict
+
+ # Logger
self.log = logger
def launch(self):
endpoint = "%s/%s" % (PLAYBOOK_EXEC_URL, self.playbook)
- response = self.rest_client.http_post(endpoint, self.params)
+ response = self.rest_client.http_post(endpoint,
+ self.params,
+ self.querystr_dict)
if response:
self.play_uuid = json.loads(response.text)["data"]["play_uuid"]
@returns: the events that matches with the patterns provided
"""
- if not self.result_task_pattern or not self.play_uuid:
- result_events = {}
+ if not self.play_uuid:
+ return {}
response = self.rest_client.http_get(PLAYBOOK_EVENTS % self.play_uuid)
result_events = {}
else:
events = json.loads(response.text)["data"]["events"]
- result_events = {event:data for event,data in events.items()
- if "task" in data and
- re.match(self.result_task_pattern, data["task"])}
+
+ if self.result_task_pattern:
+ result_events = {event:data for event,data in events.items()
+ if "task" in data and
+ re.match(self.result_task_pattern, data["task"])}
+ else:
+ result_events = events
+
if event_filter:
result_events = {event:data for event,data in result_events.items()
if re.match(event_filter, data['event'])}
"""Provide an https client to make easy interact with the Ansible
Runner Service"
- @param servers_url: The base URL >server>:<port> of the Ansible Runner Service
- @param user: User name of the authorized user
- @param password: Password of the authotized user
- @param verify_server: Either a boolean, in which case it controls whether we verify
+ :param servers_url: The base URL >server>:<port> of the Ansible Runner Service
+ :param user: Username of the authorized user
+ :param password: Password of the authorized user
+ :param verify_server: Either a boolean, in which case it controls whether we verify
the server's TLS certificate, or a string, in which case it must be a path
to a CA bundle to use. Defaults to ``True``.
- @param logger: Log file
+ :param logger: Log file
"""
self.server_url = server_url
self.user = user
def http_get(self, endpoint):
"""Execute an http get request
- @param endpoint: Ansible Runner service RESTful API endpoint
+ :param endpoint: Ansible Runner service RESTful API endpoint
- @returns: A requests object
+ :returns: A requests object
"""
response = None
return response
- def http_post(self, endpoint, payload):
+ def http_post(self, endpoint, payload, params_dict = {}):
"""Execute an http post request
- @param endpoint: Ansible Runner service RESTful API endpoint
- @param payload: Dictionary with the data used in the post request
+ :param endpoint: Ansible Runner service RESTful API endpoint
+ :param payload: Dictionary with the data used in the post request
+ :param params_dict: A dict used to build a query string
- @returns: A requests object
+ :returns: A requests object
"""
response = None
verify = self.verify_server,
headers = {"Authorization": self.token,
"Content-type": "application/json"},
- data = payload)
+ json = payload,
+ params = params_dict)
if r.status_code != requests.codes.ok:
- self.log.error("http POST %s [%s] <--> (%s - %s)\n%s",
+ self.log.error("http POST %s [%s] <--> (%s - %s:%s)\n",
the_url, payload, r.status_code, r.reason, r.text)
else:
self.log.info("http POST %s <--> (%s - %s)",
def http_put(self, endpoint, payload):
"""Execute an http put request
- @param endpoint: Ansible Runner service RESTful API endpoint
- @param payload: Dictionary with the data used in the put request
+ :param endpoint: Ansible Runner service RESTful API endpoint
+ :param payload: Dictionary with the data used in the put request
- @returns: A requests object
+ :returns: A requests object
"""
# TODO
raise NotImplementedError("TODO")
# where the playbook is executed.
GET_STORAGE_DEVICES_CATALOG_PLAYBOOK = "host-disks.yml"
+# Used in the create_osd method
+ADD_OSD_PLAYBOOK = "add-osd.yml"
+
+# Used in the remove_osds method
+REMOVE_OSD_PLAYBOOK = "shrink-osd.yml"
+
class AnsibleReadOperation(orchestrator.ReadCompletion):
""" A read operation means to obtain information from the cluster.
"""
- def __init__(self, client, playbook, logger, result_pattern, params):
+ def __init__(self, client, playbook, logger, result_pattern,
+ params,
+ querystr_dict={}):
super(AnsibleReadOperation, self).__init__()
-
# Private attributes
self.playbook = playbook
self._is_complete = False
self._is_errored = False
self._result = []
+ self._status = ExecutionStatusCode.NOT_LAUNCHED
# Error description in operation
self.error = ""
playbook,
logger,
result_pattern,
- params)
+ params,
+ querystr_dict)
@property
def is_complete(self):
"""Return the status code of the operation
updating conceptually 'linked' attributes
"""
- current_status = self.pb_execution.get_status()
- self._is_complete = (current_status == ExecutionStatusCode.SUCCESS) or \
- (current_status == ExecutionStatusCode.ERROR)
+ if self._status in [ExecutionStatusCode.ON_GOING,
+ ExecutionStatusCode.NOT_LAUNCHED]:
+ self._status = self.pb_execution.get_status()
+
+ self._is_complete = (self._status == ExecutionStatusCode.SUCCESS) or \
+ (self._status == ExecutionStatusCode.ERROR)
- self._is_errored = (current_status == ExecutionStatusCode.ERROR)
+ self._is_errored = (self._status == ExecutionStatusCode.ERROR)
- return current_status
+ if self._is_complete:
+ self.update_result()
+
+ return self._status
def execute_playbook(self):
- """Execute the playbook with the provided params.
+ """Launch the execution of the playbook with the parameters configured
"""
self.pb_execution.launch()
The result of the playbook execution can be customized through the
function provided as 'process_output' attribute
- @return string: Result of the operation formatted if it is possible
+ :return string: Result of the operation formatted if it is possible
"""
processed_result = []
In the case of Ansible, this will be True if the playbooks has been
executed succesfully.
- @return Boolean: if the playbook has been executed succesfully
+ :return Boolean: if the playbook has been executed succesfully
"""
return self.status == ExecutionStatusCode.SUCCESS
class Module(MgrModule, orchestrator.Orchestrator):
- """An Orchestrator that an external Ansible runner service to perform
- operations
+ """An Orchestrator that uses <Ansible Runner Service> to perform operations
"""
MODULE_OPTIONS = [
{'name': 'server_url'},
"""Given a list of Completion instances, progress any which are
incomplete.
- @param completions: list of Completion instances
- @Returns : true if everything is done.
+ :param completions: list of Completion instances
+ :Returns : True if everything is done.
"""
# Check progress and update status in each operation
+ # Access completion.status property do the trick
for operation in completions:
self.log.info("playbook <%s> status:%s", operation.playbook, operation.status)
- if operation.is_complete:
- operation.update_result()
completions = filter(lambda x: not x.is_complete, completions)
def get_inventory(self, node_filter=None):
"""
- @param : node_filter instance
- @Return : A AnsibleReadOperation instance (Completion Object)
+ :param : node_filter instance
+ :Return : A AnsibleReadOperation instance (Completion Object)
"""
# Create a new read completion object for execute the playbook
playbook = GET_STORAGE_DEVICES_CATALOG_PLAYBOOK,
logger = self.log,
result_pattern = "RESULTS",
- params = "{}")
+ params = {})
# Assign the process_output function
ansible_operation.process_output = process_inventory_json
ansible_operation.event_filter = "runner_on_ok"
# Execute the playbook to obtain data
- ansible_operation.execute_playbook()
+ self._launch_operation(ansible_operation)
- self.all_completions.append(ansible_operation)
+ return ansible_operation
+
+ def create_osds(self, drive_group, all_hosts=None):
+ """Create one or more OSDs within a single Drive Group.
+ If no host provided the operation affects all the host in the OSDS role
+
+
+ :param drive_group: (orchestrator.DriveGroupSpec),
+ Drive group with the specification of drives to use
+ :param all_hosts: (List[str]),
+ List of hosts where the OSD's must be created
+ """
+
+ # Transform drive group specification to Ansible playbook parameters
+ host, osd_spec = dg_2_ansible(drive_group)
+
+ # Create a new read completion object for execute the playbook
+ ansible_operation = AnsibleReadOperation(client = self.ar_client,
+ playbook = ADD_OSD_PLAYBOOK,
+ logger = self.log,
+ result_pattern = "",
+ params = osd_spec,
+ querystr_dict = {"limit": host})
+
+ # Filter to get the result
+ ansible_operation.process_output = process_playbook_result
+ ansible_operation.event_filter = "playbook_on_stats"
+
+ # Execute the playbook
+ self._launch_operation(ansible_operation)
return ansible_operation
- def create_osds(self, drive_group, all_hosts):
+ def remove_osds(self, osd_ids):
+ """Remove osd's.
+
+ :param osd_ids: List of osd's to be removed (List[int])
"""
- Create one or more OSDs within a single Drive Group.
- The principal argument here is the drive_group member
- of OsdSpec: other fields are advisory/extensible for any
- finer-grained OSD feature enablement (choice of backing store,
- compression/encryption, etc).
+ extravars = {'osd_to_kill': ",".join([str(id) for id in osd_ids]),
+ 'ireallymeanit':'yes'}
- :param osd_spec: DriveGroupSpec
+ # Create a new read completion object for execute the playbook
+ ansible_operation = AnsibleReadOperation(client = self.ar_client,
+ playbook = REMOVE_OSD_PLAYBOOK,
+ logger = self.log,
+ result_pattern = "",
+ params = extravars)
+
+ # Filter to get the result
+ ansible_operation.process_output = process_playbook_result
+ ansible_operation.event_filter = "playbook_on_stats"
+
+ # Execute the playbook
+ self._launch_operation(ansible_operation)
+
+ return ansible_operation
+
+ def _launch_operation(self, ansible_operation):
+ """Launch the operation and add the operation to the completion objects
+ ongoing
+
+ :ansible_operation: A read/write ansible operation (completion object)
"""
+ # Execute the playbook
+ ansible_operation.execute_playbook()
+
+ # Add the operation to the list of things ongoing
+ self.all_completions.append(ansible_operation)
+
def verify_config(self):
if not self.get_module_option('server_url', ''):
""" Adapt the output of the playbook used in 'get_inventory'
to the Orchestrator expected output (list of InventoryNode)
- @param inventory_events: events dict with the results
+ :param inventory_events: events dict with the results
Example:
inventory_events =
'36-2016b900-e38f-7dcd-a2e7-00000000000e': {'host': '192.168.121.252'
'task': 'RESULTS',
'event': 'runner_on_ok'}}
- @param ar_client: Ansible Runner Service client
- @param playbook_uuid: Palybooud identifier
+ :param ar_client: Ansible Runner Service client
+ :param playbook_uuid: Playbook identifier
- @return : list of InventoryNode
+ :return : list of InventoryNode
"""
#Obtain the needed data for each result event
return inventory_nodes
+
+
+def process_playbook_result(inventory_events, ar_client, playbook_uuid):
+
+ result = ""
+
+ # Loop over the result events and request the data
+ for event_key, dummy_data in inventory_events.items():
+ event_response = ar_client.http_get(EVENT_DATA_URL % (playbook_uuid,
+ event_key))
+
+ result += event_response.text
+
+ return result
+
+def dg_2_ansible(drive_group):
+ """ Transform a drive group especification into:
+
+ a host : limit the playbook execution to this host
+ a osd_spec : dict of parameters to pass to the Ansible playbook used
+ to create the osds
+
+ :param drive_group: (type: DriveGroupSpec)
+
+ TODO: Possible this function will be removed/or modified heavily when
+ the ansible playbook to create osd's use ceph volume batch with
+ drive group parameter
+ """
+
+ # Limit the execution of the playbook to certain hosts
+ # TODO: Now only accepted "*" (all the hosts) or a host_name in the
+ # drive_group.host_pattern
+ # This attribute is intended to be used with "fnmatch" patterns, so when
+ # this become effective it will be needed to use the "get_inventory" method
+ # in order to have a list of hosts to be filtered with the "host_pattern"
+ if drive_group.host_pattern in ["*"]:
+ host = None # No limit in the playbook
+ else:
+ # For the moment, we assume that we only have 1 host
+ host = drive_group.host_pattern
+
+ # Compose the OSD configuration
+
+
+ osd = {}
+ osd["data"] = drive_group.data_devices.paths[0]
+ # Other parameters will be extracted in the same way
+ #osd["dmcrypt"] = drive_group.encryption
+
+ # lvm_volumes parameters
+ # (by the moment is what is accepted in the current playbook)
+ osd_spec = {"lvm_volumes":[osd]}
+
+ #Global scope variables also can be included in the osd_spec
+ #osd_spec["osd_objectstore"] = drive_group.objectstore
+
+ return host, osd_spec
assert not (service_name and service_id)
raise NotImplementedError()
- def create_osds(self, drive_group, all_hosts):
+ def create_osds(self, drive_group, all_hosts=None):
# type: (DriveGroupSpec, List[str]) -> WriteCompletion
"""
Create one or more OSDs within a single Drive Group.
:param drive_group: DriveGroupSpec
:param all_hosts: TODO, this is required because the orchestrator methods are not composable
+ Probably this parameter can be easily removed because each orchestrator can use
+ the "get_inventory" method and the "drive_group.host_pattern" attribute
+ to obtain the list of hosts where to apply the operation
"""
raise NotImplementedError()
"""
raise NotImplementedError()
- def remove_osds(self, node, osd_ids):
- # type: (str, List[str]) -> WriteCompletion
+ def remove_osds(self, osd_ids):
+ # type: (List[str]) -> WriteCompletion
"""
- :param node: A node name, must exist.
:param osd_ids: list of OSD IDs
Note that this can only remove OSDs that were successfully
"cmd": "orchestrator status",
"desc": "Report configured backend and its status",
"perm": "r"
+ },
+ { 'cmd': "orchestrator osd create "
+ "name=drive_group,type=CephString,req=false ",
+ "desc": "OSD's creation following specification \
+ in <drive_group> parameter or readed from -i <file> input.",
+ "perm": "rw"
+ },
+ { 'cmd': "orchestrator osd remove "
+ "name=osd_ids,type=CephInt,n=N ",
+ "desc": "Remove Osd's",
+ "perm": "rw"
}
]
spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
# TODO: Remove this and make the orchestrator composable
+ # or
+ # Probably this should be moved to each of the orchestrators,
+ # then we wouldn't need the "all_hosts" parameter at all.
host_completion = self.get_hosts()
self.wait([host_completion])
all_hosts = [h.name for h in host_completion.result]
+ completion = self.create_osds(spec, all_hosts=all_hosts)
+ self._orchestrator_wait([completion])
+
+ return HandleCommandResult()
+
+ def _create_osd(self, inbuf, cmd):
+ """Create one or more OSDs
+
+ :cmd : Arguments for the create osd
+ """
+ #Obtain/validate parameters for the operation
+ cmdline_error = ""
+ if "drive_group" in cmd.keys():
+ params = cmd["drive_group"]
+ elif inbuf:
+ params = inbuf
+ else:
+ cmdline_error = "Please, use 'drive_group' parameter \
+ or specify -i <input file>"
+
+ if cmdline_error:
+ return HandleCommandResult(-errno.EINVAL, stderr=cmdline_error)
+
try:
json_dg = json.loads(params)
except ValueError as msg:
return HandleCommandResult(-errno.EINVAL, stderr=msg)
+ # Create the drive group
+ drive_group = orchestrator.DriveGroupSpec.from_json(json_dg)
+ #Read other Drive_group
+
+ #Launch the operation in the orchestrator module
+ completion = self.create_osds(drive_group)
+
+ #Wait until the operation finishes
+ self._orchestrator_wait([completion])
+
+ #return result
+ return HandleCommandResult(stdout=completion.result)
+
+ def _remove_osd(self, cmd):
+ """
+ Remove OSD's
+ :cmd : Arguments for remove the osd
+ """
+
+ #Launch the operation in the orchestrator module
+ completion = self.remove_osds(cmd["osd_ids"])
+
+ #Wait until the operation finishes
+ self._orchestrator_wait([completion])
+
+ #return result
+ return HandleCommandResult(stdout=completion.result)
+
def _add_stateless_svc(self, svc_type, spec):
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
enabled = module['name'] in mgr_map['modules']
if not enabled:
return HandleCommandResult(-errno.EINVAL,
- stdout="Module '{module_name}' is not enabled. \n Run "
+ stderr="Module '{module_name}' is not enabled. \n Run "
"`ceph mgr module enable {module_name}` "
"to enable.".format(module_name=module_name))
try:
avail, why = self.available()
except NoOrchestrator:
- return HandleCommandResult(stderr="No orchestrator configured (try "
+ return HandleCommandResult(-errno.ENODEV,
+ stderr="No orchestrator configured (try "
"`ceph orchestrator set backend`)")
if avail is None:
return self._set_backend(cmd)
elif cmd['prefix'] == "orchestrator status":
return self._status()
+ elif cmd['prefix'] == "orchestrator osd create":
+ return self._create_osd(_, cmd)
+ elif cmd['prefix'] == "orchestrator osd remove":
+ return self._remove_osd(cmd)
else:
raise NotImplementedError()