import json
import re
from functools import wraps
+import collections
import requests
PLAYBOOK_EXEC_URL = "api/v1/playbooks"
PLAYBOOK_EVENTS = "api/v1/jobs/%s/events"
EVENT_DATA_URL = "api/v1/jobs/%s/events/%s"
+URL_MANAGE_GROUP = "api/v1/groups/{group_name}"
+URL_ADD_RM_HOSTS = "api/v1/hosts/{host_name}/groups/{inventory_group}"
class AnsibleRunnerServiceError(Exception):
"""Generic Ansible Runner Service Exception"""
"""
# TODO
raise NotImplementedError("TODO")
+
+
+ def add_hosts_to_group(self, hosts, group):
+ """ Add one or more hosts to an Ansible inventory group
+
+ :host : host to add
+ :group: Ansible inventory group where the hosts will be included
+
+ :return : Nothing
+
+ :raises : AnsibleRunnerServiceError if not possible to complete
+ the operation
+ """
+
+ url_group = URL_MANAGE_GROUP.format(group_name=group)
+
+ # Get/Create the group
+ response = self.http_get(url_group)
+ if response.status_code == 404:
+ # Create the new group
+ response = self.http_post(url_group, "", {})
+ if response.status_code != 200:
+ raise AnsibleRunnerServiceError("Error when trying to "\
+ "create group:{}".format(group))
+ hosts_in_group = []
+ else:
+ hosts_in_group = json.loads(response.text)["data"]["members"]
+
+ # Here we have the group in the inventory. Add the hosts
+ for host in hosts:
+ if host not in hosts_in_group:
+ add_url = URL_ADD_RM_HOSTS.format(host_name=host,
+ inventory_group=group)
+
+ response = self.http_post(add_url, "", {})
+ if response.status_code != 200:
+ raise AnsibleRunnerServiceError("Error when trying to "\
+ "include host '{}' in group"\
+ " '{}'".format(host, group))
+
+ def remove_hosts_from_group(self, group, hosts):
+ """ Remove all the hosts from group, it also removes the group itself if
+ it is empty
+
+ : group : Group name (str)
+ : hosts : List of hosts to remove
+ """
+
+ url_group = URL_MANAGE_GROUP.format(group_name=group)
+ response = self.http_get(url_group)
+
+ # Group not found is OK!
+ if response.status_code == 404:
+ return
+
+ # Once we have the group, we remove the hosts required
+ if response.status_code == 200:
+ hosts_in_group = json.loads(response.text)["data"]["members"]
+
+ # Delete the hosts (it does not matter if the host does not exist)
+ for host in hosts:
+ if host in hosts_in_group:
+ url_host = URL_ADD_RM_HOSTS.format(host_name=host,
+ inventory_group=group)
+ response = self.http_delete(url_host)
+ hosts_in_group.remove(host)
+
+ # Delete the group if no hosts in it
+ if not hosts_in_group:
+ response = self.http_delete(url_group)
+
+ def get_hosts_in_group(self, group):
+ """ Return the list of hosts in and inventory group
+
+ : group : Group name (str)
+ """
+ url_group = URL_MANAGE_GROUP.format(group_name=group)
+ response = self.http_get(url_group)
+ if response.status_code == 404:
+ raise AnsibleRunnerServiceError("Group {} not found in Ansible"\
+ " inventory".format(group))
+
+ return json.loads(response.text)["data"]["members"]
+
+
+class InventoryGroup(collections.MutableSet):
+ """ Manages an Ansible Inventory Group
+ """
+ def __init__(self, group_name, ars_client):
+ """Init the group_name attribute and
+ Create the inventory group if it does not exist
+
+ : group_name : Name of the group in the Ansible Inventory
+ : returns : Nothing
+ """
+
+ self.elements = set()
+
+ self.group_name = group_name
+ self.url_group = URL_MANAGE_GROUP.format(group_name=self.group_name)
+ self.created = False
+ self.ars_client = ars_client
+
+ # Get/Create the group
+ response = self.ars_client.http_get(self.url_group)
+ if response.status_code == 404:
+ return
+
+ # get members if the group exists previously
+ self.created = True
+ self.elements.update(json.loads(response.text)["data"]["members"])
+
+ def __contains__(self, host):
+ """ Check if the host is in the group
+
+ : host: Check if hosts is in Ansible Inventory Group
+ """
+ return host in self.elements
+
+ def __iter__(self):
+ return iter(self.elements)
+
+ def __len__(self):
+ return len(self.elements)
+
+ def add(self, value):
+ """ Add a new host to the group
+ Create the Ansible Inventory group if it does not exist
+
+ : value : The host(string) to add
+ """
+
+ if not self.created:
+ self.__create_group__()
+
+ add_url = URL_ADD_RM_HOSTS.format(host_name=value,
+ inventory_group=self.group_name)
+
+ response = self.ars_client.http_post(add_url, "", {})
+ if response.status_code != 200:
+ raise AnsibleRunnerServiceError("Error when trying to "\
+ "include host '{}' in group"\
+ " '{}'".format(value,
+ self.group_name))
+
+ # Refresh members
+ response = self.ars_client.http_get(self.url_group)
+ self.elements.update(json.loads(response.text)["data"]["members"])
+
+ def discard(self, value):
+ """Remove a host from the group.
+ Remove the group from the Ansible inventory if it is empty
+
+ : value : The host(string) to remove
+ """
+ url_host = URL_ADD_RM_HOSTS.format(host_name=value,
+ inventory_group=self.group_name)
+ response = self.ars_client.http_delete(url_host)
+
+ # Refresh members
+ response = self.ars_client.http_get(self.url_group)
+ self.elements.update(json.loads(response.text)["data"]["members"])
+
+ # Delete the group if no members
+ if not self.elements:
+ response = self.ars_client.http_delete(self.url_group)
+
+ def update(self, iterable=None):
+ """ Update the hosts in the group with the iterable items
+
+ :iterable : And iterable object with hosts names
+ """
+ for item in iterable:
+ self.add(item)
+
+ def clean(self, iterable=None):
+ """ Remove from the group the hosts included in iterable
+ If not provided an iterable, all the hosts are removed from the group
+
+ :iterable : And iterable object with hosts names
+ """
+
+ if not iterable:
+ iterable = self.elements
+
+ for item in iterable:
+ self.discard(item)
+
+ def __create_group__(self):
+ """ Create the Ansible inventory group
+ """
+ response = self.ars_client.http_post(self.url_group, "", {})
+
+ if response.status_code != 200:
+ raise AnsibleRunnerServiceError("Error when trying to "\
+ "create group:{}".format(
+ self.group_name))
+ self.created = True
+ self.elements = {}
import orchestrator
from .ansible_runner_svc import Client, PlayBookExecution, ExecutionStatusCode,\
- AnsibleRunnerServiceError
+ AnsibleRunnerServiceError, InventoryGroup,\
+ URL_MANAGE_GROUP, URL_ADD_RM_HOSTS
from .output_wizards import ProcessInventory, ProcessPlaybookResult, \
ProcessHostsList
# Used in the remove_osds method
REMOVE_OSD_PLAYBOOK = "shrink-osd.yml"
+# General multi purpose cluster playbook
+SITE_PLAYBOOK = "site.yml"
+
+# General multi-purpose playbook for removing daemons
+PURGE_PLAYBOOK = "purge-cluster.yml"
+
# Default name for the inventory group for hosts managed by the Orchestrator
ORCHESTRATOR_GROUP = "orchestrator"
# URLs for Ansible Runner Operations
-# Add or remove host in one group
-URL_ADD_RM_HOSTS = "api/v1/hosts/{host_name}/groups/{inventory_group}"
# Retrieve the groups where the host is included in.
URL_GET_HOST_GROUPS = "api/v1/hosts/{host_name}"
-# Manage groups
-URL_MANAGE_GROUP = "api/v1/groups/{group_name}"
+
+
+
# URLs for Ansible Runner Operations
URL_GET_HOSTS = "api/v1/hosts"
# An aditional filter of result events based in the event
self.event_filter_list = [""]
+ # A dict with groups and hosts to remove from inventory if operation is
+ # succesful. Ex: {"group1": ["host1"], "group2": ["host3", "host4"]}
+ self.clean_hosts_on_success = {}
+
# Playbook execution object
self.pb_execution = PlayBookExecution(client,
playbook,
if self._is_complete:
self.update_result()
+ # Clean hosts if operation is succesful
+ if self._status == ExecutionStatusCode.SUCCESS:
+ self.clean_inventory()
+
return self._status
def execute_playbook(self):
processed_result = []
if self._is_errored:
- processed_result = self.pb_execution.get_result(["runner_on_failed",
+ raw_result = self.pb_execution.get_result(["runner_on_failed",
"runner_on_unreachable",
"runner_on_no_hosts",
"runner_on_async_failed",
"runner_item_on_failed"])
-
elif self._is_complete:
raw_result = self.pb_execution.get_result(self.event_filter_list)
- if self.output_wizard:
- processed_result = self.output_wizard.process(self.pb_execution.play_uuid,
- raw_result)
- else:
- processed_result = raw_result
+ if self.output_wizard:
+ processed_result = self.output_wizard.process(self.pb_execution.play_uuid,
+ raw_result)
+ else:
+ processed_result = raw_result
self._result = processed_result
+ def clean_inventory(self):
+ """ Remove hosts from inventory groups
+ """
+
+ for group, hosts in self.clean_hosts_on_success.items():
+ InventoryGroup(group, self.ar_client).clean(hosts)
+ del self.clean_hosts_on_success[group]
+
+
class AnsibleChangeOperation(orchestrator.WriteCompletion):
"""Operations that changes the "cluster" state
return ARSChangeOperation(self.ar_client, self.log, operations)
+ def add_stateless_service(self, service_type, spec):
+ """ Add a stateless service in the cluster
+
+ : service_type: Kind of service (nfs, rgw, mds)
+ : spec : an Orchestrator.StatelessServiceSpec object
+
+ : returns : Completion object
+ """
+
+ # Check service_type is supported
+ if service_type not in ["rgw"]:
+ raise orchestrator.OrchestratorError(
+ "{} service not supported".format(service_type))
+
+ # Add the hosts to the inventory in the right group
+ hosts = spec.service_spec.hosts
+ if not hosts:
+ raise orchestrator.OrchestratorError("No hosts provided."\
+ "At least one destination host is needed to install the RGW "\
+ "service")
+ InventoryGroup("{}s".format(service_type), self.ar_client).update(hosts)
+
+ # Limit playbook execution to certain hosts
+ limited = ",".join(hosts)
+
+ # Add the settings for this service
+ extravars = vars(spec.service_spec)
+
+ # Group hosts by resource (used in rm ops)
+ if service_type == "rgw":
+ resource_group = "rgw_zone_{}".format(spec.service_spec.rgw_zone)
+ InventoryGroup(resource_group, self.ar_client).update(hosts)
+
+
+ # Execute the playbook to create the service
+ playbook_operation = PlaybookOperation(client=self.ar_client,
+ playbook=SITE_PLAYBOOK,
+ logger=self.log,
+ result_pattern="",
+ params=extravars,
+ querystr_dict={"limit": limited})
+
+ # Filter to get the result
+ playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
+ self.log)
+ playbook_operation.event_filter_list = ["playbook_on_stats"]
+
+ # Execute the playbook
+ self._launch_operation(playbook_operation)
+
+ return playbook_operation
+
+ def remove_stateless_service(self, service_type, id_resource):
+ """ Remove a stateles services providing <sv_id> resources
+
+ :svc_type : Kind of service (nfs, rgw, mds)
+ :id_resource : Id of the resource provided
+ <zone name> if service is RGW
+ ...
+ : returns : Completion object
+ """
+
+ # Check service_type is supported
+ if service_type not in ["rgw"]:
+ raise orchestrator.OrchestratorError(
+ "{} service not supported".format(service_type))
+
+ # Ansible Inventory group for the kind of service
+ group = "{}s".format(service_type)
+
+ # get the list of hosts where to remove the service
+ # (hosts in resource group)
+ if service_type == "rgw":
+ group_prefix = "rgw_zone_{}"
+
+ resource_group = group_prefix.format(id_resource)
+
+ hosts_list = list(InventoryGroup(resource_group, self.ar_client))
+ limited = ",".join(hosts_list)
+
+ # Avoid manual confirmation
+ extravars = {"ireallymeanit": "yes"}
+
+ # Execute the playbook to remove the service
+ playbook_operation = PlaybookOperation(client=self.ar_client,
+ playbook=PURGE_PLAYBOOK,
+ logger=self.log,
+ result_pattern="",
+ params=extravars,
+ querystr_dict={"limit": limited})
+
+ # Filter to get the result
+ playbook_operation.output_wizard = ProcessPlaybookResult(self.ar_client,
+ self.log)
+ playbook_operation.event_filter_list = ["playbook_on_stats"]
+
+ # Cleaning of inventory after a sucessful operation
+ clean_inventory = {}
+ clean_inventory[resource_group] = hosts_list
+ clean_inventory[group] = hosts_list
+ playbook_operation.clean_hosts_on_success = clean_inventory
+
+ # Execute the playbook
+ self.log.info("Removing service %s for resource %s", service_type,
+ id_resource)
+ self._launch_operation(playbook_operation)
+
+ return playbook_operation
+
def _launch_operation(self, ansible_operation):
"""Launch the operation and add the operation to the completion objects
ongoing
"""
# Just making more readable the method
inventory_events = raw_result
-
result = ""
# Loop over the result events and request the data
(operation_id, event_key))
result += event_response.text
-
return result
import time
import fnmatch
import uuid
+import string
+import random
import six
"""
raise NotImplementedError()
- def remove_stateless_service(self, service_type, id_):
+ def remove_stateless_service(self, service_type, id_resource):
# type: (str, str) -> WriteCompletion
"""
Uninstalls an existing service from the cluster.
# some replicaset special sauce for autoscaling?
self.extended = {}
+ # Object with specific settings for the service
+ self.service_spec = None
+
+class RGWSpec(object):
+ """
+ Settings to configure a multisite Ceph RGW
+ """
+ def __init__(self, hosts=None, rgw_multisite=True, rgw_zone="Default_Zone",
+ rgw_zonemaster=True, rgw_zonesecondary=False,
+ rgw_multisite_proto="http", rgw_frontend_port="8080",
+ rgw_zonegroup="Main", rgw_zone_user="zone.user",
+ rgw_realm="RGW_Realm", system_access_key=None,
+ system_secret_key=None):
+
+ self.hosts = hosts
+ self.rgw_multisite = rgw_multisite
+ self.rgw_zone = rgw_zone
+ self.rgw_zonemaster = rgw_zonemaster
+ self.rgw_zonesecondary = rgw_zonesecondary
+ self.rgw_multisite_proto = rgw_multisite_proto
+ self.rgw_frontend_port = rgw_frontend_port
+
+ if hosts:
+ self.rgw_multisite_endpoint_addr = hosts[0]
+
+ self.rgw_multisite_endpoints_list = ",".join(
+ ["{}://{}:{}".format(self.rgw_multisite_proto,
+ host,
+ self.rgw_frontend_port) for host in hosts])
+
+ self.rgw_zonegroup = rgw_zonegroup
+ self.rgw_zone_user = rgw_zone_user
+ self.rgw_realm = rgw_realm
+
+ if system_access_key:
+ self.system_access_key = system_access_key
+ else:
+ self.system_access_key = self.genkey(20)
+ if system_secret_key:
+ self.system_secret_key = system_secret_key
+ else:
+ self.system_secret_key = self.genkey(40)
+
+ def genkey(self, nchars):
+ """ Returns a random string of nchars
+
+ :nchars : Length of the returned string
+ """
+
+ return ''.join(random.choice(string.ascii_uppercase +
+ string.ascii_lowercase +
+ string.digits) for _ in range(nchars))
+
+ @classmethod
+ def from_json(self, json_rgw_spec):
+ """
+ Initialize 'RGWSpec' object geting data from a json estructure
+ :param json_rgw_spec: A valid json string with a the RGW settings
+ """
+ args = {k:v for k, v in json_rgw_spec.items()}
+ return RGWSpec(**args)
+
class InventoryFilter(object):
"""
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
- return HandleCommandResult()
+ return HandleCommandResult(stdout=str(completion.result))
@_write_cli('orchestrator mds add',
"name=svc_arg,type=CephString",
return self._add_stateless_svc("mds", spec)
@_write_cli('orchestrator rgw add',
- "name=svc_arg,type=CephString",
- 'Create an RGW service')
- def _rgw_add(self, svc_arg):
+ 'name=svc_arg,type=CephString,req=false',
+ 'Create an RGW service. A complete <rgw_spec> can be provided'\
+ ' using <-i> to customize completelly the RGW service')
+ def _rgw_add(self, svc_arg=None, inbuf=None):
+ """
+ """
+ usage = """
+Usage:
+ ceph orchestrator rgw add -i <json_file>
+ ceph orchestrator rgw add <zone_name>
+ """
+
+ if inbuf:
+ try:
+ rgw_spec = orchestrator.RGWSpec.from_json(json.loads(inbuf))
+ except ValueError as e:
+ msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
+ elif svc_arg:
+ rgw_spec = orchestrator.RGWSpec()
+ rgw_spec.zone_name = svc_arg
+
spec = orchestrator.StatelessServiceSpec()
- spec.name = svc_arg
+ spec.service_spec = rgw_spec
+ spec.name = rgw_spec.rgw_zone
+
return self._add_stateless_svc("rgw", spec)
@_write_cli('orchestrator nfs add',
completion = self.remove_stateless_service(svc_type, svc_id)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
- return HandleCommandResult()
+ return HandleCommandResult(stdout=str(completion.result))
@_write_cli('orchestrator mds rm',
"name=svc_id,type=CephString",