CertificateSource,
RequiresCertificatesEntry
)
-from ceph.deployment.utils import is_ipv6, unwrap_ipv6
+from ceph.deployment.utils import is_ipv6, unwrap_ipv6, wrap_ipv6
from mgr_util import build_url, merge_dicts
from orchestrator import (
OrchestratorError,
if not port:
continue
assert dd.hostname is not None
+ # fqdn may already be a name or numeric address; ensure IPv6
+ # literals are bracketed.
addr = svc.mgr.get_fqdn(dd.hostname)
- dashboard_endpoints.append(f'{addr}:{port}')
+ dashboard_endpoints.append(f'{wrap_ipv6(addr)}:{port}')
return dashboard_endpoints, protocol
import logging
from typing import List, Any, Tuple, Dict, cast, Optional, TYPE_CHECKING
+from ceph.deployment.utils import wrap_ipv6
+
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import MgmtGatewaySpec, GrafanaSpec, ServiceSpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec, get_dashboard_endpoints
return daemon_spec
def get_service_endpoints(self, service_name: str) -> List[str]:
+ # return host:port strings for every daemon of the given service
+ # wrap IPv6 addresses in square brackets so a port can be added later
srv_entries = []
for dd in self.mgr.cache.get_daemons_by_service(service_name):
assert dd.hostname is not None
addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
port = dd.ports[0] if dd.ports else None
- srv_entries.append(f'{addr}:{port}')
+ srv_entries.append(f'{wrap_ipv6(addr)}:{port}')
return srv_entries
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
self.mgr.set_module_option_ex('dashboard', 'standby_behaviour', 'error')
def get_service_discovery_endpoints(self) -> List[str]:
+ # the mgmt gateway uses this internally when generating its nginx
+ # configuration and the URL prefixes that we publish to the world.
+ # A literal IPv6 address needs to be wrapped in brackets.
sd_endpoints = []
for dd in self.mgr.cache.get_daemons_by_service('mgr'):
assert dd.hostname is not None
addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
- sd_endpoints.append(f"{addr}:{self.mgr.service_discovery_port}")
+ sd_endpoints.append(f"{wrap_ipv6(addr)}:{self.mgr.service_discovery_port}")
return sd_endpoints
@classmethod
from unittest.mock import patch
from typing import List
+from orchestrator._interface import DaemonDescription
+
from cephadm.module import CephadmOrchestrator
from ceph.deployment.service_spec import (
MgmtGatewaySpec,
OAuth2ProxySpec
)
+from cephadm.services.service_registry import service_registry
from cephadm.tests.fixtures import with_host, with_service, async_side_effect
from cephadm.tlsobject_types import TLSCredentials
class TestMgmtGateway:
+ def test_ipv6_formatting_helpers(self, cephadm_module: CephadmOrchestrator):
+ # verify that endpoints generated by the mgmt-gateway helper methods
+ # correctly bracket IPv6 addresses before the port portion is added.
+ svc = service_registry.get_service('mgmt-gateway')
+
+ # service discovery endpoints use a fixed port from the orchestrator
+ port = cephadm_module.service_discovery_port
+ mgr_daemons = [
+ DaemonDescription(daemon_type='mgr', hostname='h1', ip='fe80::1', ports=[port]),
+ DaemonDescription(daemon_type='mgr', hostname='h2', ip='192.0.2.1', ports=[port]),
+ ]
+ cephadm_module.cache.get_daemons_by_service = lambda name: mgr_daemons if name == 'mgr' else []
+
+ sd = svc.get_service_discovery_endpoints()
+ assert sd == [f'[fe80::1]:{port}', f'192.0.2.1:{port}']
+
+ # generic service endpoints also need the same treatment
+ foo_daemons = [DaemonDescription(daemon_type='foo', hostname='f1', ip='fe80::2', ports=[8080])]
+ cephadm_module.cache.get_daemons_by_service = lambda name: foo_daemons if name == 'foo' else []
+ assert svc.get_service_endpoints('foo') == ['[fe80::2]:8080']
+
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_endpoints")
@patch("cephadm.services.mgmt_gateway.MgmtGatewayService.get_service_discovery_endpoints")