From a398fc239f3c5bbeabb22b016448bd42ca43398c Mon Sep 17 00:00:00 2001 From: Daniel-Pivonka Date: Mon, 26 Oct 2020 14:34:39 -0400 Subject: [PATCH] cephadm: HA for RGW endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Cephadm deploying keepalived and HAproxy for providing High availability for RGW endpoints Fixes: https://tracker.ceph.com/issues/45116 Signed-off-by: Daniel-Pivonka Signed-off-by: Adam King Signed-off-by: Juan Miguel Olmo Martínez --- doc/images/HAProxy_for_RGW.svg | 1 + doc/mgr/orchestrator.rst | 174 ++++++++++- src/cephadm/cephadm | 269 ++++++++++++++++-- src/pybind/mgr/cephadm/inventory.py | 12 + src/pybind/mgr/cephadm/module.py | 53 +++- src/pybind/mgr/cephadm/schedule.py | 27 +- src/pybind/mgr/cephadm/serve.py | 139 +++++---- .../mgr/cephadm/services/cephadmservice.py | 8 +- src/pybind/mgr/cephadm/services/ha_rgw.py | 151 ++++++++++ .../templates/services/haproxy/haproxy.cfg.j2 | 66 +++++ .../services/keepalived/keepalived.conf.j2 | 32 +++ src/pybind/mgr/cephadm/tests/test_spec.py | 60 +++- src/pybind/mgr/orchestrator/_interface.py | 64 ++++- src/pybind/mgr/orchestrator/module.py | 4 +- .../ceph/deployment/service_spec.py | 94 +++++- 15 files changed, 1050 insertions(+), 104 deletions(-) create mode 100644 doc/images/HAProxy_for_RGW.svg create mode 100644 src/pybind/mgr/cephadm/services/ha_rgw.py create mode 100644 src/pybind/mgr/cephadm/templates/services/haproxy/haproxy.cfg.j2 create mode 100644 src/pybind/mgr/cephadm/templates/services/keepalived/keepalived.conf.j2 diff --git a/doc/images/HAProxy_for_RGW.svg b/doc/images/HAProxy_for_RGW.svg new file mode 100644 index 00000000000..c2fc95f1f9c --- /dev/null +++ b/doc/images/HAProxy_for_RGW.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/doc/mgr/orchestrator.rst b/doc/mgr/orchestrator.rst index 8940d463efd..f22a4a684b0 100644 --- a/doc/mgr/orchestrator.rst +++ b/doc/mgr/orchestrator.rst @@ -177,7 +177,7 @@ Example:: When the parameter ``all-available-devices`` or a DriveGroup specification is used, a cephadm service is created. This service guarantees that all available devices or devices included in the DriveGroup will be used for OSDs. -Note that the effect of ``--all-available-devices`` is persistent; that is, drives which are added to the system +Note that the effect of ``--all-available-devices`` is persistent; that is, drives which are added to the system or become available (say, by zapping) after the command is complete will be automatically found and added to the cluster. That is, after using:: @@ -312,7 +312,7 @@ error if it doesn't know how to do this transition. Update the number of monitor hosts:: ceph orch apply mon --placement= [--dry-run] - + Where ``placement`` is a :ref:`orchestrator-cli-placement-spec`. Each host can optionally specify a network for the monitor to listen on. @@ -320,7 +320,7 @@ Each host can optionally specify a network for the monitor to listen on. Update the number of manager hosts:: ceph orch apply mgr --placement= [--dry-run] - + Where ``placement`` is a :ref:`orchestrator-cli-placement-spec`. .. @@ -413,6 +413,174 @@ Service Commands:: ceph orch + +.. _orchestrator-haproxy-service-spec: + +High availability service for RGW +================================= + +This service allows the user to create a high avalilability RGW service +providing a mimimun set of configuration options. + +The orchestrator will deploy and configure automatically several HAProxy and +Keepalived containers to assure the continuity of the RGW service while the +Ceph cluster will have at least 1 RGW daemon running. + +The next image explains graphically how this service works: + +.. image:: ../images/HAProxy_for_RGW.svg + +There are N hosts where the HA RGW service is deployed. This means that we have +an HAProxy and a keeplived daemon running in each of this hosts. +Keepalived is used to provide a "virtual IP" binded to the hosts. All RGW +clients use this "virtual IP" to connect with the RGW Service. + +Each keeplived daemon is checking each few seconds what is the status of the +HAProxy daemon running in the same host. Also it is aware that the "master" keepalived +daemon will be running without problems. + +If the "master" keepalived daemon or the Active HAproxy is not responding, one +of the keeplived daemons running in backup mode will be elected as master, and +the "virtual ip" will be moved to that node. + +The active HAProxy also acts like a load balancer, distributing all RGW requests +between all the RGW daemons available. + +**Prerequisites:** + +* At least two RGW daemons running in the Ceph cluster +* Operating system prerequisites: + In order for the Keepalived service to forward network packets properly to the + real servers, each router node must have IP forwarding turned on in the kernel. + So it will be needed to set this system option:: + + net.ipv4.ip_forward = 1 + + Load balancing in HAProxy and Keepalived at the same time also requires the + ability to bind to an IP address that are nonlocal, meaning that it is not + assigned to a device on the local system. This allows a running load balancer + instance to bind to an IP that is not local for failover. + So it will be needed to set this system option:: + + net.ipv4.ip_nonlocal_bind = 1 + + Be sure to set properly these two options in the file ``/etc/sysctl.conf`` in + order to persist this values even if the hosts are restarted. + These configuration changes must be applied in all the hosts where the HAProxy for + RGW service is going to be deployed. + + +**Deploy of the high availability service for RGW** + +Use the command:: + + ceph orch apply -i + +**Service specification file:** + +It is a yaml format file with the following properties: + +.. code-block:: yaml + + service_type: ha-rgw + service_id: haproxy_for_rgw + placement: + hosts: + - host1 + - host2 + - host3 + spec: + virtual_ip_interface: # ex: eth0 + virtual_ip_address: / # ex: 192.168.20.1/24 + frontend_port: # ex: 8080 + ha_proxy_port: # ex: 1967 + ha_proxy_stats_enabled: # ex: true + ha_proxy_stats_user: # ex: admin + ha_proxy_stats_password: # ex: true + ha_proxy_enable_prometheus_exporter: # ex: true + ha_proxy_monitor_uri: # ex: /haproxy_health + keepalived_user: # ex: admin + keepalived_password: # ex: admin + ha_proxy_frontend_ssl_certificate: ex: + [ + "-----BEGIN CERTIFICATE-----", + "MIIDZTCCAk2gAwIBAgIUClb9dnseOsgJWAfhPQvrZw2MP2kwDQYJKoZIhvcNAQEL", + .... + "-----END CERTIFICATE-----", + "-----BEGIN PRIVATE KEY-----", + .... + "sCHaZTUevxb4h6dCEk1XdPr2O2GdjV0uQ++9bKahAy357ELT3zPE8yYqw7aUCyBO", + "aW5DSCo8DgfNOgycVL/rqcrc", + "-----END PRIVATE KEY-----" + ] + ha_proxy_frontend_ssl_port: # ex: 8090 + ha_proxy_ssl_dh_param: # ex: 1024 + ha_proxy_ssl_ciphers: # ex: ECDH+AESGCM:!MD5 + ha_proxy_ssl_options: # ex: no-sslv3 + haproxy_container_image: # ex: haproxy:2.4-dev3-alpine + keepalived_container_image: # ex: arcts/keepalived:1.2.2 + +where the properties of this service specification are: + +* ``service_type`` + Mandatory and set to "ha-rgw" +* ``service_id`` + The name of the service. +* ``placement hosts`` + The hosts where it is desired to run the HA daemons. An HAProxy and a + Keepalived containers will be deployed in these hosts. + The RGW daemons can run in other different hosts or not. +* ``virtual_ip_interface`` + The physical network interface where the virtual ip will be binded +* ``virtual_ip_address`` + The virtual IP ( and network ) where the HA RGW service will be available. + All your RGW clients must point to this IP in order to use the HA RGW + service . +* ``frontend_port`` + The port used to access the HA RGW service +* ``ha_proxy_port`` + The port used by HAProxy containers +* ``ha_proxy_stats_enabled`` + If it is desired to enable the statistics URL in HAProxy daemons +* ``ha_proxy_stats_user`` + User needed to access the HAProxy statistics URL +* ``ha_proxy_stats_password`` + The password needed to access the HAProxy statistics URL +* ``ha_proxy_enable_prometheus_exporter`` + If it is desired to enable the Promethes exporter in HAProxy. This will + allow to consume RGW Service metrics from Grafana. +* ``ha_proxy_monitor_uri``: + To set the API endpoint where the health of HAProxy daemon is provided +* ``keepalived_user`` + User needed to access keepalived daemons +* ``keepalived_password``: + The password needed to access keepalived daemons +* ``ha_proxy_frontend_ssl_certificate``: + SSl certificate. You must paste the content of your .pem file +* ``ha_proxy_frontend_ssl_port``: + The https port used by HAProxy containers +* ``ha_proxy_ssl_dh_param``: + Value used for the `tune.ssl.default-dh-param` setting in the HAProxy + config file +* ``ha_proxy_ssl_ciphers``: + Value used for the `ssl-default-bind-ciphers` setting in HAProxy config + file. +* ``ha_proxy_ssl_options``: + Value used for the `ssl-default-bind-options` setting in HAProxy config + file. +* ``haproxy_container_image``: + HAProxy image location used to pull the image +* ``keepalived_container_image``: + Keepalived image location used to pull the image + +**Useful hints for the RGW Service:** + +* Good to have at least 3 RGW daemons +* Use at least 3 hosts for the HAProxy for RGW service +* In each host an HAProxy and a Keepalived daemon will be deployed. These + daemons can be managed as systemd services + + Deploying custom containers =========================== diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index ea58389c149..4cd676bfa14 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -218,7 +218,17 @@ class Monitoring(object): } # type: ignore ################################## - +def populate_files(config_dir, config_files, uid, gid): + # type: (str, Dict, int, int) -> None + """create config files for different services""" + for fname in config_files: + config_file = os.path.join(config_dir, fname) + config_content = dict_get_join(config_files, fname) + logger.info('Write file: %s' % (config_file)) + with open(config_file, 'w') as f: + os.fchown(f.fileno(), uid, gid) + os.fchmod(f.fileno(), 0o600) + f.write(config_content) class NFSGanesha(object): """Defines a NFS-Ganesha container""" @@ -343,14 +353,7 @@ class NFSGanesha(object): makedirs(config_dir, uid, gid, 0o755) # populate files from the config-json - for fname in self.files: - config_file = os.path.join(config_dir, fname) - config_content = dict_get_join(self.files, fname) - logger.info('Write file: %s' % (config_file)) - with open(config_file, 'w') as f: - os.fchown(f.fileno(), uid, gid) - os.fchmod(f.fileno(), 0o600) - f.write(config_content) + populate_files(config_dir, self.files, uid, gid) # write the RGW keyring if self.rgw: @@ -491,14 +494,7 @@ class CephIscsi(object): makedirs(configfs_dir, uid, gid, 0o755) # populate files from the config-json - for fname in self.files: - config_file = os.path.join(data_dir, fname) - config_content = dict_get_join(self.files, fname) - logger.info('Write file: %s' % (config_file)) - with open(config_file, 'w') as f: - os.fchown(f.fileno(), uid, gid) - os.fchmod(f.fileno(), 0o600) - f.write(config_content) + populate_files(data_dir, self.files, uid, gid) @staticmethod def configfs_mount_umount(data_dir, mount=True): @@ -524,6 +520,163 @@ class CephIscsi(object): ################################## +class HAproxy(object): + """Defines an HAproxy container""" + daemon_type = 'haproxy' + required_files = ['haproxy.cfg'] + default_image = 'haproxy' + + def __init__(self, fsid: str, daemon_id: Union[int, str], + config_json: Dict, image: str) -> None: + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.files = dict_get(config_json, 'files', {}) + + self.validate() + + @classmethod + def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'HAproxy': + return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + # create additional directories in data dir for HAproxy to use + if not os.path.isdir(os.path.join(data_dir, 'haproxy')): + makedirs(os.path.join(data_dir, 'haproxy'), uid, gid, DATA_DIR_MODE) + + data_dir = os.path.join(data_dir, 'haproxy') + populate_files(data_dir, self.files, uid, gid) + + def get_daemon_args(self) -> List[str]: + return ['haproxy', '-f', '/var/lib/haproxy/haproxy.cfg'] + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error('required file missing from config-json: %s' % fname) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def extract_uid_gid_haproxy(self): + # better directory for this? + return extract_uid_gid(file_path='/var/lib') + + @staticmethod + def get_container_mounts(data_dir: str) -> Dict[str, str]: + mounts = dict() + mounts[os.path.join(data_dir,'haproxy')] = '/var/lib/haproxy' + return mounts + +################################## + + +class Keepalived(object): + """Defines an Keepalived container""" + daemon_type = 'keepalived' + required_files = ['keepalived.conf'] + default_image = 'arcts/keepalived' + + def __init__(self, fsid: str, daemon_id: Union[int, str], + config_json: Dict, image: str) -> None: + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.files = dict_get(config_json, 'files', {}) + + self.validate() + + @classmethod + def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'Keepalived': + return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + # create additional directories in data dir for keepalived to use + if not os.path.isdir(os.path.join(data_dir, 'keepalived')): + makedirs(os.path.join(data_dir, 'keepalived'), uid, gid, DATA_DIR_MODE) + + # populate files from the config-json + populate_files(data_dir, self.files, uid, gid) + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error('required file missing from config-json: %s' % fname) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + @staticmethod + def get_container_envs(): + # type: () -> List[str] + envs = [ + 'KEEPALIVED_AUTOCONF=false', + 'KEEPALIVED_CONF=/etc/keepalived/keepalived.conf', + 'KEEPALIVED_CMD="/usr/sbin/keepalived -n -l -f /etc/keepalived/keepalived.conf"', + 'KEEPALIVED_DEBUG=false' + ] + return envs + + def extract_uid_gid_keepalived(self): + # better directory for this? + return extract_uid_gid(file_path='/var/lib') + + @staticmethod + def get_container_mounts(data_dir: str) -> Dict[str, str]: + mounts = dict() + mounts[os.path.join(data_dir,'keepalived.conf')] = '/etc/keepalived/keepalived.conf' + return mounts + +################################## + class CustomContainer(object): """Defines a custom container""" @@ -685,6 +838,8 @@ def get_supported_daemons(): supported_daemons.append(CephIscsi.daemon_type) supported_daemons.append(CustomContainer.daemon_type) supported_daemons.append(CephadmDaemon.daemon_type) + supported_daemons.append(HAproxy.daemon_type) + supported_daemons.append(Keepalived.daemon_type) assert len(supported_daemons) == len(set(supported_daemons)) return supported_daemons @@ -1401,6 +1556,10 @@ def default_image(func): type_ = args.name.split('.', 1)[0] if type_ in Monitoring.components: args.image = Monitoring.components[type_]['image'] + if type_ == 'haproxy': + args.image = HAproxy.default_image + if type_ == 'keepalived': + args.image = Keepalived.default_image if not args.image: args.image = os.environ.get('CEPHADM_IMAGE') if not args.image: @@ -1749,6 +1908,9 @@ def get_daemon_args(fsid, daemon_type, daemon_id): elif daemon_type == NFSGanesha.daemon_type: nfs_ganesha = NFSGanesha.init(fsid, daemon_id) r += nfs_ganesha.get_daemon_args() + elif daemon_type == HAproxy.daemon_type: + haproxy = HAproxy.init(fsid, daemon_id) + r += haproxy.get_daemon_args() elif daemon_type == CustomContainer.daemon_type: cc = CustomContainer.init(fsid, daemon_id) r.extend(cc.get_daemon_args()) @@ -1818,6 +1980,14 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, ceph_iscsi = CephIscsi.init(fsid, daemon_id) ceph_iscsi.create_daemon_dirs(data_dir, uid, gid) + elif daemon_type == HAproxy.daemon_type: + haproxy = HAproxy.init(fsid, daemon_id) + haproxy.create_daemon_dirs(data_dir, uid, gid) + + elif daemon_type == Keepalived.daemon_type: + keepalived = Keepalived.init(fsid, daemon_id) + keepalived.create_daemon_dirs(data_dir, uid, gid) + elif daemon_type == CustomContainer.daemon_type: cc = CustomContainer.init(fsid, daemon_id) cc.create_daemon_dirs(data_dir, uid, gid) @@ -1972,12 +2142,22 @@ def get_container_mounts(fsid, daemon_type, daemon_id, nfs_ganesha = NFSGanesha.init(fsid, daemon_id) mounts.update(nfs_ganesha.get_container_mounts(data_dir)) + if daemon_type == HAproxy.daemon_type: + assert daemon_id + data_dir = get_data_dir(fsid, daemon_type, daemon_id) + mounts.update(HAproxy.get_container_mounts(data_dir)) + if daemon_type == CephIscsi.daemon_type: assert daemon_id data_dir = get_data_dir(fsid, daemon_type, daemon_id) log_dir = get_log_dir(fsid) mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir)) + if daemon_type == Keepalived.daemon_type: + assert daemon_id + data_dir = get_data_dir(fsid, daemon_type, daemon_id) + mounts.update(Keepalived.get_container_mounts(data_dir)) + if daemon_type == CustomContainer.daemon_type: assert daemon_id cc = CustomContainer.init(fsid, daemon_id) @@ -2020,6 +2200,12 @@ def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str], entrypoint = NFSGanesha.entrypoint name = '%s.%s' % (daemon_type, daemon_id) envs.extend(NFSGanesha.get_container_envs()) + elif daemon_type == HAproxy.daemon_type: + name = '%s.%s' % (daemon_type, daemon_id) + elif daemon_type == Keepalived.daemon_type: + name = '%s.%s' % (daemon_type, daemon_id) + envs.extend(Keepalived.get_container_envs()) + container_args.extend(['--cap-add NET_ADMIN']) elif daemon_type == CephIscsi.daemon_type: entrypoint = CephIscsi.entrypoint name = '%s.%s' % (daemon_type, daemon_id) @@ -3514,6 +3700,22 @@ def command_deploy(): reconfig=args.reconfig, ports=daemon_ports) + elif daemon_type == HAproxy.daemon_type: + haproxy = HAproxy.init(args.fsid, daemon_id) + uid, gid = haproxy.extract_uid_gid_haproxy() + c = get_container(args.fsid, daemon_type, daemon_id) + deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + reconfig=args.reconfig, + ports=daemon_ports) + + elif daemon_type == Keepalived.daemon_type: + keepalived = Keepalived.init(args.fsid, daemon_id) + uid, gid = keepalived.extract_uid_gid_keepalived() + c = get_container(args.fsid, daemon_type, daemon_id) + deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + reconfig=args.reconfig, + ports=daemon_ports) + elif daemon_type == CustomContainer.daemon_type: cc = CustomContainer.init(args.fsid, daemon_id) if not args.reconfig and not redeploy: @@ -3952,6 +4154,22 @@ def list_daemons(detail=True, legacy_dir=None): err.startswith('%s, version ' % cmd): version = err.split(' ')[2] seen_versions[image_id] = version + elif daemon_type == 'haproxy': + out, err, code = call( + [container_path, 'exec', container_id, + 'haproxy', '-v']) + if not code and \ + out.startswith('HA-Proxy version '): + version = out.split(' ')[2] + seen_versions[image_id] = version + elif daemon_type == 'keepalived': + out, err, code = call( + [container_path, 'exec', container_id, + 'keepalived', '--version']) + if not code and \ + err.startswith('Keepalived '): + version = err.split(' ')[1] + seen_versions[image_id] = version elif daemon_type == CustomContainer.daemon_type: # Because a custom container can contain # everything, we do not know which command @@ -5646,6 +5864,14 @@ def command_gather_facts(): ################################## +def command_verify_prereqs(): + if args.service_type == 'haproxy' or args.service_type == 'keepalived': + out, err, code = call(['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind']) + if out.strip() != "1": + raise Error('net.ipv4.ip_nonlocal_bind not set to 1') + +################################## + class CephadmCache: task_types = ['disks', 'daemons', 'host', 'http_server'] @@ -6948,6 +7174,15 @@ def _get_parser(): help="Maintenance action - enter maintenance, or exit maintenance") parser_maintenance.set_defaults(func=command_maintenance) + parser_verify_prereqs = subparsers.add_parser( + 'verify-prereqs', + help='verify system prerequisites for a given service are met on this host') + parser_verify_prereqs.set_defaults(func=command_verify_prereqs) + parser_verify_prereqs.add_argument( + '--daemon-type', + required=True, + help='service type of service to whose prereqs will be checked') + return parser diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index f2d5ceea949..a643eb15341 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -417,6 +417,7 @@ class HostCache(): return r def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription: + assert not daemon_name.startswith('ha-rgw.') for _, dm in self.daemons.items(): for _, dd in dm.items(): if dd.name() == daemon_name: @@ -437,6 +438,9 @@ class HostCache(): def get_daemons_by_service(self, service_name): # type: (str) -> List[orchestrator.DaemonDescription] + assert not service_name.startswith('keepalived.') + assert not service_name.startswith('haproxy.') + result = [] # type: List[orchestrator.DaemonDescription] for host, dm in self.daemons.items(): for name, d in dm.items(): @@ -446,6 +450,8 @@ class HostCache(): def get_daemons_by_type(self, service_type): # type: (str) -> List[orchestrator.DaemonDescription] + assert service_type not in ['keepalived', 'haproxy'] + result = [] # type: List[orchestrator.DaemonDescription] for host, dm in self.daemons.items(): for name, d in dm.items(): @@ -578,6 +584,8 @@ class HostCache(): self.daemons[host][dd.name()] = dd def rm_daemon(self, host: str, name: str) -> None: + assert not name.startswith('ha-rgw.') + if host in self.daemons: if name in self.daemons[host]: del self.daemons[host][name] @@ -594,6 +602,8 @@ class HostCache(): for h in self.get_hosts()) def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: + assert not daemon_name.startswith('ha-rgw.') + priorities = { 'start': 1, 'restart': 2, @@ -619,6 +629,8 @@ class HostCache(): del self.scheduled_daemon_actions[host] def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: + assert not daemon.startswith('ha-rgw.') + return self.scheduled_daemon_actions.get(host, {}).get(daemon) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 77c8dbba325..47d8315b5eb 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -25,7 +25,7 @@ from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \ - CustomContainerSpec, HostPlacementSpec + CustomContainerSpec, HostPlacementSpec, HA_RGWSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonSpec @@ -37,6 +37,7 @@ import orchestrator from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription from orchestrator._interface import GenericSpec +from orchestrator._interface import daemon_type_to_service, service_to_daemon_types from . import remotes from . import utils @@ -45,6 +46,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ RbdMirrorService, CrashService, CephadmService, CephadmExporter, CephadmExporterConfig from .services.container import CustomContainerService from .services.iscsi import IscsiService +from .services.ha_rgw import HA_RGWService from .services.nfs import NFSService from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ @@ -217,6 +219,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, default='docker.io/prom/node-exporter:v0.18.1', desc='Prometheus container image', ), + Option( + 'container_image_haproxy', + default='haproxy', + desc='HAproxy container image', + ), + Option( + 'container_image_keepalived', + default='arcts/keepalived', + desc='Keepalived container image', + ), Option( 'warn_on_stray_hosts', type='bool', @@ -337,6 +349,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.container_image_grafana = '' self.container_image_alertmanager = '' self.container_image_node_exporter = '' + self.container_image_haproxy = '' + self.container_image_keepalived = '' self.warn_on_stray_hosts = True self.warn_on_stray_daemons = True self.warn_on_failed_host_check = True @@ -417,6 +431,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.node_exporter_service = NodeExporterService(self) self.crash_service = CrashService(self) self.iscsi_service = IscsiService(self) + self.ha_rgw_service = HA_RGWService(self) self.container_service = CustomContainerService(self) self.cephadm_exporter_service = CephadmExporter(self) self.cephadm_services = { @@ -433,6 +448,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'node-exporter': self.node_exporter_service, 'crash': self.crash_service, 'iscsi': self.iscsi_service, + 'ha-rgw': self.ha_rgw_service, 'container': self.container_service, 'cephadm-exporter': self.cephadm_exporter_service, } @@ -1189,6 +1205,10 @@ To check that the host is reachable: image = self.container_image_alertmanager elif daemon_type == 'node-exporter': image = self.container_image_node_exporter + elif daemon_type == 'haproxy': + image = self.container_image_haproxy + elif daemon_type == 'keepalived': + image = self.container_image_keepalived elif daemon_type == CustomContainerService.TYPE: # The image can't be resolved, the necessary information # is only available when a container is deployed (given @@ -1397,7 +1417,7 @@ To check that the host is reachable: daemon_map[dd.daemon_type].append(dd.daemon_id) for daemon_type, daemon_ids in daemon_map.items(): - r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids) + r = self.cephadm_services[daemon_type_to_service(daemon_type)].ok_to_stop(daemon_ids) if r.retval: self.log.error(f'It is NOT safe to stop host {hostname}') return r.retval, r.stderr @@ -1643,6 +1663,9 @@ To check that the host is reachable: sm[n].container_image_id = 'mix' if sm[n].container_image_name != dd.container_image_name: sm[n].container_image_name = 'mix' + if dd.daemon_type == 'haproxy' or dd.daemon_type == 'keepalived': + # ha-rgw has 2 daemons running per host + sm[n].size = sm[n].size*2 for n, spec in self.spec_store.specs.items(): if n in sm: continue @@ -1659,6 +1682,9 @@ To check that the host is reachable: if service_type == 'nfs': spec = cast(NFSServiceSpec, spec) sm[n].rados_config_location = spec.rados_config_location() + if spec.service_type == 'ha-rgw': + # ha-rgw has 2 daemons running per host + sm[n].size = sm[n].size*2 return list(sm.values()) @trivial_completion @@ -2030,7 +2056,17 @@ To check that the host is reachable: self.log.warning(msg) return msg - cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config( + if daemon_spec.daemon_type == 'haproxy': + haspec = cast(HA_RGWSpec, daemon_spec.spec) + if haspec.haproxy_container_image: + image = haspec.haproxy_container_image + + if daemon_spec.daemon_type == 'keepalived': + haspec = cast(HA_RGWSpec, daemon_spec.spec) + if haspec.keepalived_container_image: + image = haspec.keepalived_container_image + + cephadm_config, deps = self.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config( daemon_spec) # TCP port to open in the host firewall @@ -2123,7 +2159,7 @@ To check that the host is reachable: with set_exception_subject('service', daemon.service_id(), overwrite=True): - self.cephadm_services[daemon_type].pre_remove(daemon) + self.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) args = ['--name', name, '--force'] self.log.info('Removing daemon %s from %s' % (name, host)) @@ -2134,7 +2170,7 @@ To check that the host is reachable: self.cache.rm_daemon(host, name) self.cache.invalidate_host_daemons(host) - self.cephadm_services[daemon_type].post_remove(daemon) + self.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon) return "Removed {} from host '{}'".format(name, host) @@ -2189,7 +2225,7 @@ To check that the host is reachable: config_func(spec) did_config = True - daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec( + daemon_spec = self.cephadm_services[daemon_type_to_service(daemon_type)].make_daemon_spec( host, daemon_id, network, spec) self.log.debug('Placing %s.%s on host %s' % ( daemon_type, daemon_id, host)) @@ -2279,6 +2315,7 @@ To check that the host is reachable: 'mgr': PlacementSpec(count=2), 'mds': PlacementSpec(count=2), 'rgw': PlacementSpec(count=2), + 'ha-rgw': PlacementSpec(count=2), 'iscsi': PlacementSpec(count=1), 'rbd-mirror': PlacementSpec(count=2), 'nfs': PlacementSpec(count=1), @@ -2336,6 +2373,10 @@ To check that the host is reachable: def apply_rgw(self, spec: ServiceSpec) -> str: return self._apply(spec) + @trivial_completion + def apply_ha_rgw(self, spec: ServiceSpec) -> str: + return self._apply(spec) + @trivial_completion def add_iscsi(self, spec): # type: (ServiceSpec) -> List[str] diff --git a/src/pybind/mgr/cephadm/schedule.py b/src/pybind/mgr/cephadm/schedule.py index e2e3c926ab9..2018aa36777 100644 --- a/src/pybind/mgr/cephadm/schedule.py +++ b/src/pybind/mgr/cephadm/schedule.py @@ -127,6 +127,15 @@ class HostAssignment(object): logger.info("deploying %s monitor(s) instead of %s so monitors may achieve consensus" % ( len(candidates) - 1, len(candidates))) return candidates[0:len(candidates)-1] + + # do not deploy ha-rgw on hosts that don't support virtual ips + if self.spec.service_type == 'ha-rgw' and self.filter_new_host: + old = candidates + candidates = [h for h in candidates if self.filter_new_host(h.hostname)] + for h in list(set(old) - set(candidates)): + logger.info( + f"Filtered out host {h.hostname} for ha-rgw. Could not verify host allowed virtual ips") + logger.info('filtered %s down to %s' % (old, candidates)) return candidates # if asked to place even number of mons, deploy 1 less @@ -160,21 +169,29 @@ class HostAssignment(object): # we don't need any additional hosts if need < 0: - return self.prefer_hosts_with_active_daemons(hosts_with_daemons, count) + final_candidates = self.prefer_hosts_with_active_daemons(hosts_with_daemons, count) else: - # exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned + # exclusive to daemons from 'mon' and 'ha-rgw' services. + # Filter out hosts that don't have a public network assigned + # or don't allow virtual ips respectively if self.filter_new_host: old = others others = [h for h in others if self.filter_new_host(h.hostname)] - logger.debug('filtered %s down to %s' % (old, others)) + for h in list(set(old) - set(others)): + if self.spec.service_type == 'ha-rgw': + logger.info( + f"Filtered out host {h.hostname} for ha-rgw. Could not verify host allowed virtual ips") + logger.info('filtered %s down to %s' % (old, others)) # ask the scheduler to return a set of hosts with a up to the value of others = self.scheduler.place(others, need) - logger.debug('Combine hosts with existing daemons %s + new hosts %s' % ( + logger.info('Combine hosts with existing daemons %s + new hosts %s' % ( hosts_with_daemons, others)) # if a host already has the anticipated daemon, merge it with the candidates # to get a list of HostPlacementSpec that can be deployed on. - return list(merge_hostspecs(hosts_with_daemons, others)) + final_candidates = list(merge_hostspecs(hosts_with_daemons, others)) + + return final_candidates def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]: active_hosts: List['HostPlacementSpec'] = [] diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 13bcf14c65f..ffcc69d7197 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -11,7 +11,7 @@ except ImportError: from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec +from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, HA_RGWSpec from ceph.utils import str_to_datetime, datetime_now import orchestrator @@ -19,6 +19,7 @@ from cephadm.schedule import HostAssignment from cephadm.upgrade import CEPH_UPGRADE_ORDER from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest from orchestrator import OrchestratorError +from orchestrator._interface import daemon_type_to_service, service_to_daemon_types if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator, ContainerInspectInfo @@ -449,7 +450,7 @@ class CephadmServe: """ self.mgr.migration.verify_no_migration() - daemon_type = spec.service_type + service_type = spec.service_type service_name = spec.service_name() if spec.unmanaged: self.log.debug('Skipping unmanaged service %s' % service_name) @@ -459,9 +460,9 @@ class CephadmServe: return False self.log.debug('Applying service %s spec' % service_name) - config_func = self._config_fn(daemon_type) + config_func = self._config_fn(service_type) - if daemon_type == 'osd': + if service_type == 'osd': self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) # TODO: return True would result in a busy loop # can't know if daemon count changed; create_from_spec doesn't @@ -471,7 +472,7 @@ class CephadmServe: daemons = self.mgr.cache.get_daemons_by_service(service_name) public_network = None - if daemon_type == 'mon': + if service_type == 'mon': ret, out, err = self.mgr.check_mon_command({ 'prefix': 'config get', 'who': 'mon', @@ -489,20 +490,38 @@ class CephadmServe: # host return len(self.mgr.cache.networks[host].get(public_network, [])) > 0 + def virtual_ip_allowed(host): + # type: (str) -> bool + # Verify that it is possible to use Virtual IPs in the host + try: + if self.mgr.cache.facts[host]['kernel_parameters']['net.ipv4.ip_nonlocal_bind'] == '0': + return False + except KeyError: + return False + + return True + ha = HostAssignment( spec=spec, hosts=self.mgr._hosts_with_daemon_inventory(), get_daemons_func=self.mgr.cache.get_daemons_by_service, - filter_new_host=matches_network if daemon_type == 'mon' else None, + filter_new_host=matches_network if service_type == 'mon' + else virtual_ip_allowed if service_type == 'ha-rgw' else None, ) - hosts: List[HostPlacementSpec] = ha.place() - self.log.debug('Usable hosts: %s' % hosts) + try: + hosts: List[HostPlacementSpec] = ha.place() + self.log.debug('Usable hosts: %s' % hosts) + except OrchestratorError as e: + self.log.error('Failed to apply %s spec %s: %s' % ( + spec.service_name(), spec, e)) + self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) + return False r = None # sanity check - if daemon_type in ['mon', 'mgr'] and len(hosts) < 1: + if service_type in ['mon', 'mgr'] and len(hosts) < 1: self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts) return False @@ -515,50 +534,55 @@ class CephadmServe: remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts) self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts) - for host, network, name in add_daemon_hosts: - daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons, - prefix=spec.service_id, - forcename=name) - - if not did_config and config_func: - if daemon_type == 'rgw': - rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func) - rgw_config_func(cast(RGWSpec, spec), daemon_id) - else: - config_func(spec) - did_config = True - - daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec( - host, daemon_id, network, spec) - self.log.debug('Placing %s.%s on host %s' % ( - daemon_type, daemon_id, host)) + if service_type == 'ha-rgw': + spec = self.update_ha_rgw_definitive_hosts(spec, hosts, add_daemon_hosts) - try: - daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec) - self.mgr._create_daemon(daemon_spec) - r = True - except (RuntimeError, OrchestratorError) as e: - self.mgr.events.for_service(spec, 'ERROR', - f"Failed while placing {daemon_type}.{daemon_id}" - f"on {host}: {e}") - # only return "no change" if no one else has already succeeded. - # later successes will also change to True - if r is None: - r = False - continue + for host, network, name in add_daemon_hosts: + for daemon_type in service_to_daemon_types(service_type): + daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons, + prefix=spec.service_id, + forcename=name) + + if not did_config and config_func: + if daemon_type == 'rgw': + rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func) + rgw_config_func(cast(RGWSpec, spec), daemon_id) + else: + config_func(spec) + did_config = True + + daemon_spec = self.mgr.cephadm_services[service_type].make_daemon_spec( + host, daemon_id, network, spec, daemon_type=daemon_type) + self.log.debug('Placing %s.%s on host %s' % ( + daemon_type, daemon_id, host)) - # add to daemon list so next name(s) will also be unique - sd = orchestrator.DaemonDescription( - hostname=host, - daemon_type=daemon_type, - daemon_id=daemon_id, - ) - daemons.append(sd) + try: + daemon_spec = self.mgr.cephadm_services[service_type].prepare_create( + daemon_spec) + self.mgr._create_daemon(daemon_spec) + r = True + except (RuntimeError, OrchestratorError) as e: + self.mgr.events.for_service(spec, 'ERROR', + f"Failed while placing {daemon_type}.{daemon_id}" + f"on {host}: {e}") + # only return "no change" if no one else has already succeeded. + # later successes will also change to True + if r is None: + r = False + continue + + # add to daemon list so next name(s) will also be unique + sd = orchestrator.DaemonDescription( + hostname=host, + daemon_type=daemon_type, + daemon_id=daemon_id, + ) + daemons.append(sd) # remove any? def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool: daemon_ids = [d.daemon_id for d in remove_daemon_hosts] - r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids) + r = self.mgr.cephadm_services[service_type].ok_to_stop(daemon_ids) return not r.retval while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts): @@ -595,7 +619,7 @@ class CephadmServe: if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']: daemons_post[dd.daemon_type].append(dd) - if self.mgr.cephadm_services[dd.daemon_type].get_active_daemon( + if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: dd.is_active = True else: @@ -653,7 +677,8 @@ class CephadmServe: for daemon_type, daemon_descs in daemons_post.items(): if daemon_type in self.mgr.requires_post_actions: self.mgr.requires_post_actions.remove(daemon_type) - self.mgr._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs) + self.mgr._get_cephadm_service(daemon_type_to_service( + daemon_type)).daemon_check_post(daemon_descs) def convert_tags_to_repo_digest(self) -> None: if not self.mgr.use_repo_digest: @@ -672,3 +697,19 @@ class CephadmServe: image_info = digests[container_image_ref] if image_info.repo_digest: self.mgr.set_container_image(entity, image_info.repo_digest) + + # ha-rgw needs definitve host list to create keepalived config files + # if definitive host list has changed, all ha-rgw daemons must get new + # config, including those that are already on the correct host and not + # going to be deployed + def update_ha_rgw_definitive_hosts(self, spec: ServiceSpec, hosts: List[HostPlacementSpec], + add_hosts: Set[HostPlacementSpec]) -> HA_RGWSpec: + spec = cast(HA_RGWSpec, spec) + if not (set(hosts) == set(spec.definitive_host_list)): + spec.definitive_host_list = hosts + ha_rgw_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name()) + for daemon in ha_rgw_daemons: + if daemon.hostname in [h.hostname for h in hosts] and daemon.hostname not in add_hosts: + self.mgr.cache.schedule_daemon_action( + daemon.hostname, daemon.name(), 'reconfig') + return spec diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 2ae4f1591a1..a2b26bf0690 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -94,12 +94,14 @@ class CephadmService(metaclass=ABCMeta): def make_daemon_spec(self, host: str, daemon_id: str, netowrk: str, - spec: ServiceSpecs) -> CephadmDaemonSpec: + spec: ServiceSpecs, + daemon_type: Optional[str] = None,) -> CephadmDaemonSpec: return CephadmDaemonSpec( host=host, daemon_id=daemon_id, spec=spec, - network=netowrk + network=netowrk, + daemon_type=daemon_type ) def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec: @@ -270,7 +272,7 @@ class CephService(CephadmService): """ Map the daemon id to a cephx keyring entity name """ - if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]: + if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi", 'haproxy', 'keepalived']: return AuthEntity(f'client.{self.TYPE}.{daemon_id}') elif self.TYPE == 'crash': if host == "": diff --git a/src/pybind/mgr/cephadm/services/ha_rgw.py b/src/pybind/mgr/cephadm/services/ha_rgw.py new file mode 100644 index 00000000000..1635d48a901 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/ha_rgw.py @@ -0,0 +1,151 @@ +import json +import logging +from typing import List, cast, Tuple, Dict, Any + +from ceph.deployment.service_spec import HA_RGWSpec + +from orchestrator import DaemonDescription, OrchestratorError +from .cephadmservice import CephadmDaemonSpec, CephService +from ..utils import CephadmNoImage, cephadmNoImage, resolve_ip + +logger = logging.getLogger(__name__) + + +class HA_RGWService(CephService): + TYPE = 'ha-rgw' + + class rgw_server(): + def __init__(self, hostname: str, address: str): + self.name = hostname + self.ip = address + + def prepare_create(self, daemon_spec: CephadmDaemonSpec[HA_RGWSpec]) -> CephadmDaemonSpec: + assert daemon_spec.daemon_type == 'haproxy' or daemon_spec.daemon_type == 'keepalived' + assert daemon_spec.spec + + if daemon_spec.daemon_type == 'haproxy': + return self.haproxy_prepare_create(daemon_spec) + else: + return self.keepalived_prepare_create(daemon_spec) + + def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]: + assert daemon_spec.daemon_type == 'haproxy' or daemon_spec.daemon_type == 'keepalived' + + if daemon_spec.daemon_type == 'haproxy': + return self.haproxy_generate_config(daemon_spec) + else: + return self.keepalived_generate_config(daemon_spec) + + def haproxy_prepare_create(self, daemon_spec: CephadmDaemonSpec[HA_RGWSpec]) -> CephadmDaemonSpec: + assert daemon_spec.daemon_type == 'haproxy' + assert daemon_spec.spec + + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + spec = daemon_spec.spec + + logger.info('Create daemon %s on host %s with spec %s' % ( + daemon_id, host, spec)) + return daemon_spec + + def keepalived_prepare_create(self, daemon_spec: CephadmDaemonSpec[HA_RGWSpec]) -> CephadmDaemonSpec: + assert daemon_spec.daemon_type == 'keepalived' + assert daemon_spec.spec + + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + spec = daemon_spec.spec + + logger.info('Create daemon %s on host %s with spec %s' % ( + daemon_id, host, spec)) + return daemon_spec + + def haproxy_generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]: + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + + service_name: str = "ha-rgw." + daemon_id.split('.')[0] + # if no service spec, return empty config + if not daemon_spec.spec and service_name not in self.mgr.spec_store.specs: + config_files = {'files': {}} # type: Dict[str, Any] + return config_files, [] + elif daemon_spec.spec: + spec = daemon_spec.spec + else: + # service spec is not attached to daemon spec but is in spec store + spec = cast(HA_RGWSpec, self.mgr.spec_store.specs[service_name]) + + rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw') + rgw_servers = [] + for daemon in rgw_daemons: + rgw_servers.append(self.rgw_server( + daemon.name(), resolve_ip(daemon.hostname))) + + # virtual ip address cannot have netmask attached when passed to haproxy config + # since the port is added to the end and something like 123.123.123.10/24:8080 is invalid + virtual_ip_address = spec.virtual_ip_address + if "/" in str(spec.virtual_ip_address): + just_ip = str(spec.virtual_ip_address).split('/')[0] + virtual_ip_address = just_ip + + ha_context = {'spec': spec, 'rgw_servers': rgw_servers, + 'virtual_ip_address': virtual_ip_address} + + haproxy_conf = self.mgr.template.render('services/haproxy/haproxy.cfg.j2', ha_context) + + config_files = { + 'files': { + "haproxy.cfg": haproxy_conf, + } + } + if spec.ha_proxy_frontend_ssl_certificate: + ssl_cert = spec.ha_proxy_frontend_ssl_certificate + if isinstance(ssl_cert, list): + ssl_cert = '\n'.join(ssl_cert) + config_files['files']['haproxy.pem'] = ssl_cert + + return config_files, [] + + def keepalived_generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]: + daemon_id = daemon_spec.daemon_id + host = daemon_spec.host + + service_name: str = "ha-rgw." + daemon_id.split('.')[0] + # if no service spec, return empty config + if not daemon_spec.spec and service_name not in self.mgr.spec_store.specs: + config_file = {'files': {}} # type: Dict[str, Any] + return config_file, [] + elif daemon_spec.spec: + spec = daemon_spec.spec + else: + # service spec is not attached to daemon spec but is in spec store + spec = cast(HA_RGWSpec, self.mgr.spec_store.specs[service_name]) + + all_hosts = [] + for h, network, name in spec.definitive_host_list: + all_hosts.append(h) + + # set state. first host in placement is master all others backups + state = 'BACKUP' + if all_hosts[0] == host: + state = 'MASTER' + + # remove host, daemon is being deployed on from all_hosts list for + # other_ips in conf file and converter to ips + all_hosts.remove(host) + other_ips = [resolve_ip(h) for h in all_hosts] + + ka_context = {'spec': spec, 'state': state, + 'other_ips': other_ips, + 'host_ip': resolve_ip(host)} + + keepalived_conf = self.mgr.template.render( + 'services/keepalived/keepalived.conf.j2', ka_context) + + config_file = { + 'files': { + "keepalived.conf": keepalived_conf, + } + } + + return config_file, [] diff --git a/src/pybind/mgr/cephadm/templates/services/haproxy/haproxy.cfg.j2 b/src/pybind/mgr/cephadm/templates/services/haproxy/haproxy.cfg.j2 new file mode 100644 index 00000000000..4b3b4cf0782 --- /dev/null +++ b/src/pybind/mgr/cephadm/templates/services/haproxy/haproxy.cfg.j2 @@ -0,0 +1,66 @@ +# {{ cephadm_managed }} +global + log 127.0.0.1 local2 + chroot /var/lib/haproxy + pidfile /var/lib/haproxy/haproxy.pid + maxconn 8000 + daemon + stats socket /var/lib/haproxy/stats +{% if spec.ha_proxy_frontend_ssl_certificate %} + {% if spec.ha_proxy_ssl_dh_param %} + tune.ssl.default-dh-param {{ spec.ha_proxy_ssl_dh_param }} + {% endif %} + {% if spec.ha_proxy_ssl_ciphers %} + ssl-default-bind-ciphers {{ spec.ha_proxy_ssl_ciphers | join(':') }} + {% endif %} + {% if spec.ha_proxy_ssl_options %} + ssl-default-bind-options {{ spec.ha_proxy_ssl_options | join(' ') }} + {% endif %} +{% endif %} + +defaults + mode http + log global + option httplog + option dontlognull + option http-server-close + option forwardfor except 127.0.0.0/8 + option redispatch + retries 3 + timeout http-request 1s + timeout queue 20s + timeout connect 5s + timeout client 1s + timeout server 1s + timeout http-keep-alive 5s + timeout check 5s + maxconn 8000 + +frontend stats + bind *:{{ spec.ha_proxy_port }} +{% if spec.ha_proxy_stats_enabled %} + stats enable +{% endif %} + stats uri /stats + stats refresh 10s + stats auth {{ spec.ha_proxy_stats_user }}:{{ spec.ha_proxy_stats_password }} +{% if spec.ha_proxy_enable_prometheus_exporter %} + http-request use-service prometheus-exporter if { path /metrics } +{% endif %} + monitor-uri {{ spec.ha_proxy_monitor_uri }} + +frontend rgw-frontend +{% if spec.ha_proxy_frontend_ssl_certificate %} + bind {{ virtual_ip_address }}:{{ spec.ha_proxy_frontend_ssl_port }} ssl crt /var/lib/haproxy/haproxy.pem +{% else %} + bind {{ virtual_ip_address }}:{{ spec.frontend_port }} +{% endif %} + default_backend rgw-backend + +backend rgw-backend + option forwardfor + balance static-rr + option httpchk HEAD / HTTP/1.0 + {% for server in rgw_servers %} + server {{ server.name }} {{ server.ip }}:80 check weight 100 + {% endfor %} diff --git a/src/pybind/mgr/cephadm/templates/services/keepalived/keepalived.conf.j2 b/src/pybind/mgr/cephadm/templates/services/keepalived/keepalived.conf.j2 new file mode 100644 index 00000000000..e7a066244ce --- /dev/null +++ b/src/pybind/mgr/cephadm/templates/services/keepalived/keepalived.conf.j2 @@ -0,0 +1,32 @@ +# {{ cephadm_managed }} +vrrp_script check_haproxy { + script "curl http://localhost:{{ spec.ha_proxy_port }}/haproxy_test" + weight -20 + interval 2 + rise 2 + fall 2 +} + +vrrp_instance VI_0 { + state {{ state }} + priority 100 + interface {{ spec.virtual_ip_interface }} + virtual_router_id 51 + advert_int 1 + authentication { + auth_type PASS + auth_pass {{ spec.keepalived_password }} + } + unicast_src_ip {{ host_ip }} + unicast_peer { + {% for ip in other_ips %} + {{ ip }} + {% endfor %} + } + virtual_ipaddress { + {{ spec.virtual_ip_address }} dev {{ spec.virtual_ip_interface }} + } + track_script { + check_haproxy + } +} diff --git a/src/pybind/mgr/cephadm/tests/test_spec.py b/src/pybind/mgr/cephadm/tests/test_spec.py index f4f24f352cd..f59174f2a63 100644 --- a/src/pybind/mgr/cephadm/tests/test_spec.py +++ b/src/pybind/mgr/cephadm/tests/test_spec.py @@ -6,8 +6,11 @@ import json import pytest +import yaml + from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ - IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec + IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec, \ + HA_RGWSpec from orchestrator import DaemonDescription, OrchestratorError @@ -137,7 +140,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.725856", "created": "2020-04-02T19:23:08.829543", "started": "2020-04-03T07:29:16.932838", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -152,7 +155,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.725903", "created": "2020-04-02T19:23:11.390694", "started": "2020-04-03T07:29:16.910897", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -167,7 +170,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.725950", "created": "2020-04-02T19:23:52.025088", "started": "2020-04-03T07:29:16.847972", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -182,7 +185,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.725807", "created": "2020-04-02T19:22:18.648584", "started": "2020-04-03T07:29:16.856153", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -197,7 +200,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.725715", "created": "2020-04-02T19:22:13.863300", "started": "2020-04-03T07:29:17.206024", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -212,7 +215,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.725996", "created": "2020-04-02T19:23:53.880197", "started": "2020-04-03T07:29:16.880044", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -227,7 +230,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.726088", "created": "2020-04-02T20:35:02.991435", "started": "2020-04-03T07:29:19.373956", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -242,7 +245,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.726134", "created": "2020-04-02T20:35:17.142272", "started": "2020-04-03T07:29:19.374002", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -257,7 +260,7 @@ def test_spec_octopus(spec_json): "last_refresh": "2020-04-03T15:31:48.726042", "created": "2020-04-02T19:24:10.281163", "started": "2020-04-03T07:29:16.926292", - "is_active": false + "is_active": false }, { "hostname": "ceph-001", @@ -265,7 +268,7 @@ def test_spec_octopus(spec_json): "daemon_type": "rgw", "status": 1, "status_desc": "starting", - "is_active": false + "is_active": false } ]""") ) @@ -657,3 +660,38 @@ def test_custom_container_spec_config_json(): config_json = spec.config_json() for key in ['entrypoint', 'uid', 'gid', 'bind_mounts', 'dirs']: assert key not in config_json + +def test_HA_RGW_spec(): + yaml_str ="""service_type: ha-rgw +service_id: haproxy_for_rgw +placement: + hosts: + - host1 + - host2 + - host3 +spec: + virtual_ip_interface: eth0 + virtual_ip_address: 192.168.20.1/24 + frontend_port: 8080 + ha_proxy_port: 1967 + ha_proxy_stats_enabled: true + ha_proxy_stats_user: admin + ha_proxy_stats_password: admin + ha_proxy_enable_prometheus_exporter: true + ha_proxy_monitor_uri: /haproxy_health + keepalived_password: admin +""" + yaml_file = yaml.safe_load(yaml_str) + spec = ServiceSpec.from_json(yaml_file) + assert spec.service_type == "ha-rgw" + assert spec.service_id == "haproxy_for_rgw" + assert spec.virtual_ip_interface == "eth0" + assert spec.virtual_ip_address == "192.168.20.1/24" + assert spec.frontend_port == 8080 + assert spec.ha_proxy_port == 1967 + assert spec.ha_proxy_stats_enabled == True + assert spec.ha_proxy_stats_user == "admin" + assert spec.ha_proxy_stats_password == "admin" + assert spec.ha_proxy_enable_prometheus_exporter == True + assert spec.ha_proxy_monitor_uri == "/haproxy_health" + assert spec.keepalived_password == "admin" diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index e05646c6fb3..2542b92398d 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -22,7 +22,7 @@ import yaml from ceph.deployment import inventory from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ - ServiceSpecValidationError, IscsiServiceSpec + ServiceSpecValidationError, IscsiServiceSpec, HA_RGWSpec from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.hostspec import HostSpec from ceph.utils import datetime_to_str, str_to_datetime @@ -889,6 +889,7 @@ class Orchestrator(object): 'prometheus': self.apply_prometheus, 'rbd-mirror': self.apply_rbd_mirror, 'rgw': self.apply_rgw, + 'ha-rgw': self.apply_ha_rgw, 'host': self.add_host, 'cephadm-exporter': self.apply_cephadm_exporter, } @@ -1055,6 +1056,10 @@ class Orchestrator(object): """Update RGW cluster""" raise NotImplementedError() + def apply_ha_rgw(self, spec: HA_RGWSpec) -> Completion[str]: + """Update ha-rgw daemons""" + raise NotImplementedError() + def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]: """Create rbd-mirror daemon(s)""" raise NotImplementedError() @@ -1171,6 +1176,51 @@ def json_to_generic_spec(spec: dict) -> GenericSpec: return ServiceSpec.from_json(spec) +def daemon_type_to_service(dtype: str) -> str: + mapping = { + 'mon': 'mon', + 'mgr': 'mgr', + 'mds': 'mds', + 'rgw': 'rgw', + 'osd': 'osd', + 'haproxy': 'ha-rgw', + 'keepalived': 'ha-rgw', + 'iscsi': 'iscsi', + 'rbd-mirror': 'rbd-mirror', + 'nfs': 'nfs', + 'grafana': 'grafana', + 'alertmanager': 'alertmanager', + 'prometheus': 'prometheus', + 'node-exporter': 'node-exporter', + 'crash': 'crash', + 'container': 'container', + 'cephadm-exporter': 'cephadm-exporter', + } + return mapping[dtype] + + +def service_to_daemon_types(stype: str) -> List[str]: + mapping = { + 'mon': ['mon'], + 'mgr': ['mgr'], + 'mds': ['mds'], + 'rgw': ['rgw'], + 'osd': ['osd'], + 'ha-rgw': ['haproxy', 'keepalived'], + 'iscsi': ['iscsi'], + 'rbd-mirror': ['rbd-mirror'], + 'nfs': ['nfs'], + 'grafana': ['grafana'], + 'alertmanager': ['alertmanager'], + 'prometheus': ['prometheus'], + 'node-exporter': ['node-exporter'], + 'crash': ['crash'], + 'container': ['container'], + 'cephadm-exporter': ['cephadm-exporter'], + } + return mapping[stype] + + class UpgradeStatusSpec(object): # Orchestrator's report on what's going on with any ongoing upgrade def __init__(self): @@ -1236,6 +1286,8 @@ class DaemonDescription(object): # The type of service (osd, mon, mgr, etc.) self.daemon_type = daemon_type + assert daemon_type not in ['HA_RGW', 'ha-rgw'] + # The orchestrator will have picked some names for daemons, # typically either based on hostnames or on pod names. # This is the in mds., the ID that will appear @@ -1271,7 +1323,7 @@ class DaemonDescription(object): def matches_service(self, service_name: Optional[str]) -> bool: if service_name: - return self.name().startswith(service_name + '.') + return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.') return False def service_id(self): @@ -1318,15 +1370,15 @@ class DaemonDescription(object): # daemon_id == "service_id" return self.daemon_id - if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID: + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: return _match() return self.daemon_id def service_name(self): - if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID: - return f'{self.daemon_type}.{self.service_id()}' - return self.daemon_type + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: + return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}' + return daemon_type_to_service(self.daemon_type) def __repr__(self): return "({type}.{id})".format(type=self.daemon_type, diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 4b9bc9c0ac2..c2af2f3a2ea 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -17,7 +17,7 @@ from mgr_module import MgrModule, HandleCommandResult, Option from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \ raise_if_exception, _cli_write_command, TrivialReadCompletion, OrchestratorError, \ - NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \ + NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, HA_RGWSpec, \ RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \ ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, GenericSpec @@ -765,7 +765,7 @@ Examples: self._orchestrator_wait([completion]) data = completion.result if format == 'plain': - out = generate_preview_tables(data , True) + out = generate_preview_tables(data, True) else: out = to_format(data, format, many=True, cls=None) return HandleCommandResult(stdout=out) diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 240a83ad1f9..6602c10f9dd 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -381,8 +381,8 @@ class ServiceSpec(object): """ KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \ 'node-exporter osd prometheus rbd-mirror rgw ' \ - 'container cephadm-exporter'.split() - REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container'.split() + 'container cephadm-exporter ha-rgw'.split() + REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container ha-rgw '.split() @classmethod def _cls(cls, service_type): @@ -394,6 +394,7 @@ class ServiceSpec(object): 'osd': DriveGroupSpec, 'iscsi': IscsiServiceSpec, 'alertmanager': AlertManagerSpec, + 'ha-rgw': HA_RGWSpec, 'container': CustomContainerSpec, }.get(service_type, cls) if ret == ServiceSpec and not service_type: @@ -780,6 +781,95 @@ class AlertManagerSpec(ServiceSpec): yaml.add_representer(AlertManagerSpec, ServiceSpec.yaml_representer) +class HA_RGWSpec(ServiceSpec): + def __init__(self, + service_type: str = 'ha-rgw', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + virtual_ip_interface: Optional[str] = None, + virtual_ip_address: Optional[str] = None, + frontend_port: Optional[int] = None, + ha_proxy_port: Optional[int] = None, + ha_proxy_stats_enabled: Optional[bool] = None, + ha_proxy_stats_user: Optional[str] = None, + ha_proxy_stats_password: Optional[str] = None, + ha_proxy_enable_prometheus_exporter: Optional[bool] = None, + ha_proxy_monitor_uri: Optional[str] = None, + keepalived_password: Optional[str] = None, + ha_proxy_frontend_ssl_certificate: Optional[str] = None, + ha_proxy_frontend_ssl_port: Optional[int] = None, + ha_proxy_ssl_dh_param: Optional[str] = None, + ha_proxy_ssl_ciphers: Optional[List[str]] = None, + ha_proxy_ssl_options: Optional[List[str]] = None, + haproxy_container_image: Optional[str] = None, + keepalived_container_image: Optional[str] = None, + definitive_host_list: Optional[List[HostPlacementSpec]] = None + ): + assert service_type == 'ha-rgw' + super(HA_RGWSpec, self).__init__('ha-rgw', service_id=service_id, placement=placement) + + self.virtual_ip_interface = virtual_ip_interface + self.virtual_ip_address = virtual_ip_address + self.frontend_port = frontend_port + self.ha_proxy_port = ha_proxy_port + self.ha_proxy_stats_enabled = ha_proxy_stats_enabled + self.ha_proxy_stats_user = ha_proxy_stats_user + self.ha_proxy_stats_password = ha_proxy_stats_password + self.ha_proxy_enable_prometheus_exporter = ha_proxy_enable_prometheus_exporter + self.ha_proxy_monitor_uri = ha_proxy_monitor_uri + self.keepalived_password = keepalived_password + self.ha_proxy_frontend_ssl_certificate = ha_proxy_frontend_ssl_certificate + self.ha_proxy_frontend_ssl_port = ha_proxy_frontend_ssl_port + self.ha_proxy_ssl_dh_param = ha_proxy_ssl_dh_param + self.ha_proxy_ssl_ciphers = ha_proxy_ssl_ciphers + self.ha_proxy_ssl_options = ha_proxy_ssl_options + self.haproxy_container_image = haproxy_container_image + self.keepalived_container_image = keepalived_container_image + # placeholder variable. Need definitive list of hosts this service will + # be placed on in order to generate keepalived config. Will be populated + # when applying spec + self.definitive_host_list = [] # type: List[HostPlacementSpec] + + def validate(self): + super(HA_RGWSpec, self).validate() + + if not self.virtual_ip_interface: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No Virtual IP Interface specified') + if not self.virtual_ip_address: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No Virtual IP Address specified') + if not self.frontend_port and not self.ha_proxy_frontend_ssl_certificate: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No Frontend Port specified') + if not self.ha_proxy_port: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No HA Proxy Port specified') + if not self.ha_proxy_stats_enabled: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: Ha Proxy Stats Enabled option not set') + if not self.ha_proxy_stats_user: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No HA Proxy Stats User specified') + if not self.ha_proxy_stats_password: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No HA Proxy Stats Password specified') + if not self.ha_proxy_enable_prometheus_exporter: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: HA Proxy Enable Prometheus Exporter option not set') + if not self.ha_proxy_monitor_uri: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No HA Proxy Monitor Uri specified') + if not self.keepalived_password: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: No Keepalived Password specified') + if self.ha_proxy_frontend_ssl_certificate: + if not self.ha_proxy_frontend_ssl_port: + raise ServiceSpecValidationError( + 'Cannot add ha-rgw: Specified Ha Proxy Frontend SSL ' + + 'Certificate but no SSL Port') + + class CustomContainerSpec(ServiceSpec): def __init__(self, service_type: str = 'container', -- 2.39.5