hosts:
- host1
- host2
+ networks:
+ - 1.2.3.4/24
+ ip_addrs:
+ host1: 10.0.0.100
+ host2: 10.0.0.101
spec:
port: 12345
monitoring_port: 567
In this example, we run the server on the non-default ``port`` of
12345 (instead of the default 2049) on ``host1`` and ``host2``.
-The default monitoring port can be customized using the ``monitoring_port``
+You can bind the NFS data port to a specific IP address using either the
+``ip_addrs`` or ``networks`` section. If ``ip_addrs`` is provided and
+the specified IP is assigned to the host, that IP will be used. If the
+IP is not present but ``networks`` is specified, an IP matching one of
+the given networks will be selected. If neither condition is met, the
+daemon will not start on that node.
+The default NFS monitoring port can be customized using the ``monitoring_port``
parameter. Additionally, you can specify the ``monitoring_ip_addrs`` or
``monitoring_networks`` parameters to bind the monitoring port to a specific
IP address or network. If ``monitoring_ip_addrs`` is provided and the specified
"placement spec is empty: no hosts, no label, no pattern, no count")
# allocate an IP?
- if self.spec.networks:
+ if self.spec.networks or self.spec.ip_addrs:
orig = ls.copy()
ls = []
for p in orig:
- ip = self.find_ip_on_host(p.hostname, self.spec.networks)
+ ip = None
+ # daemon can have specific ip if 'ip_addrs' is spcified in spec, we can use this
+ # parameter for all services, if they need to bind to specific ip
+ # If ip not present and networks is passed, ip of that network will be used
+ if self.spec.ip_addrs:
+ ip = self.spec.ip_addrs.get(p.hostname)
+ host_ips: List[str] = []
+ for net_details in self.networks.get(p.hostname, {}).values():
+ for ips in net_details.values():
+ host_ips.extend(ips)
+ if ip and ip not in host_ips:
+ logger.debug(f"IP {ip} is not configured on host {p.hostname}.")
+ ip = None
+ if not ip and self.spec.networks:
+ ip = self.find_ip_on_host(p.hostname, self.spec.networks)
if ip:
ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
hostname=p.hostname, network=p.network,
name=p.name, ports=p.ports, ip=ip))
else:
logger.debug(
- f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
+ f"Skipping {p.hostname} with no IP in provided networks or ip_addrs "
+ f"{f'networks: {self.spec.networks}' if self.spec.networks else ''}"
+ f"{f'ip_addrs: {self.spec.ip_addrs}' if self.spec.ip_addrs else ''}"
)
if self.filter_new_host:
from cephadm.services.ingress import IngressSpec
from cephadm.services.cephadmservice import CephExporterService
from cephadm.services.nvmeof import NvmeofService
+from cephadm.services.service_registry import service_registry
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
srv_entries = []
for dd in self.mgr.cache.get_daemons_by_type('nfs'):
assert dd.hostname is not None
- addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
- port = NFSService.DEFAULT_EXPORTER_PORT
+ nfs = cast(NFSService, service_registry.get_service('nfs'))
+ monitoring_ip, monitoring_port = nfs.get_monitoring_details(dd.service_name(), dd.hostname)
+ addr = monitoring_ip or dd.ip or self.mgr.inventory.get_addr(dd.hostname)
+ port = monitoring_port or NFSService.DEFAULT_EXPORTER_PORT
srv_entries.append({
'targets': [build_url(host=addr, port=port).lstrip('/')],
'labels': {'instance': dd.hostname}
self.create_rados_config_obj(spec)
port = daemon_spec.ports[0] if daemon_spec.ports else 2049
- monitoring_port = spec.monitoring_port if spec.monitoring_port else 9587
+ monitoring_ip, monitoring_port = self.get_monitoring_details(daemon_spec.service_name, host)
# create the RGW keyring
rgw_user = f'{rados_user}-rgw'
rgw_keyring = self.create_rgw_keyring(daemon_spec)
+ bind_addr = ''
if spec.virtual_ip and not spec.enable_haproxy_protocol:
bind_addr = spec.virtual_ip
daemon_spec.port_ips = {str(port): spec.virtual_ip}
- else:
- bind_addr = daemon_spec.ip if daemon_spec.ip else ''
+ # update daemon spec ip for prometheus, as monitoring will happen on this
+ # ip, if no monitor ip specified
+ daemon_spec.ip = bind_addr
+ elif daemon_spec.ip:
+ bind_addr = daemon_spec.ip
+ daemon_spec.port_ips = {str(port): daemon_spec.ip}
if not bind_addr:
logger.warning(f'Bind address in {daemon_type}.{daemon_id}\'s ganesha conf is defaulting to empty')
else:
logger.debug("using haproxy bind address: %r", bind_addr)
- # check if monitor needs to be bind on specific ip
- monitoring_addr = spec.monitoring_ip_addrs.get(host) if spec.monitoring_ip_addrs else None
- if monitoring_addr and monitoring_addr not in self.mgr.cache.get_host_network_ips(host):
- logger.debug(f"Monitoring IP {monitoring_addr} is not configured on host {daemon_spec.host}.")
- monitoring_addr = None
- if not monitoring_addr and spec.monitoring_networks:
- monitoring_addr = self.mgr.get_first_matching_network_ip(daemon_spec.host, spec, spec.monitoring_networks)
- if not monitoring_addr:
- logger.debug(f"No IP address found in the network {spec.monitoring_networks} on host {daemon_spec.host}.")
- if monitoring_addr:
- daemon_spec.port_ips.update({str(monitoring_port): monitoring_addr})
+ if monitoring_ip:
+ daemon_spec.port_ips.update({str(monitoring_port): monitoring_ip})
# generate the ganesha config
def get_ganesha_conf() -> str:
"url": f'rados://{POOL_NAME}/{spec.service_id}/{spec.rados_config_name()}',
# fall back to default NFS port if not present in daemon_spec
"port": port,
- "monitoring_addr": monitoring_addr,
+ "monitoring_addr": monitoring_ip,
"monitoring_port": monitoring_port,
"bind_addr": bind_addr,
"haproxy_hosts": [],
# one address per interface/subnet is enough
cluster_ips.append(addrs[0])
return cluster_ips
+
+ def get_monitoring_details(self, service_name: str, host: str) -> Tuple[Optional[str], Optional[int]]:
+ spec = cast(NFSServiceSpec, self.mgr.spec_store[service_name].spec)
+ monitoring_port = spec.monitoring_port if spec.monitoring_port else 9587
+
+ # check if monitor needs to be bind on specific ip
+ monitoring_addr = spec.monitoring_ip_addrs.get(host) if spec.monitoring_ip_addrs else None
+ if monitoring_addr and monitoring_addr not in self.mgr.cache.get_host_network_ips(host):
+ logger.debug(f"Monitoring IP {monitoring_addr} is not configured on host {host}.")
+ monitoring_addr = None
+ if not monitoring_addr and spec.monitoring_networks:
+ monitoring_addr = self.mgr.get_first_matching_network_ip(host, spec, spec.monitoring_networks)
+ if not monitoring_addr:
+ logger.debug(f"No IP address found in the network {spec.monitoring_networks} on host {host}.")
+ return monitoring_addr, monitoring_port
from unittest.mock import MagicMock
from cephadm.service_discovery import Root
+from cephadm.services.service_registry import service_registry
class FakeDaemonDescription:
FakeDaemonDescription('1.2.3.5', [9200], 'node1')]
def get_daemons_by_type(self, daemon_type):
- return [FakeDaemonDescription('1.2.3.4', [9100], 'node0', 'ingress', 'haproxy'),
- FakeDaemonDescription('1.2.3.5', [9200], 'node1', 'ingress', 'haproxy')]
+ if daemon_type == 'ingress':
+ return [FakeDaemonDescription('1.2.3.4', [9100], 'node0', 'ingress', 'haproxy'),
+ FakeDaemonDescription('1.2.3.5', [9200], 'node1', 'ingress', 'haproxy')]
+ else:
+ return [FakeDaemonDescription('1.2.3.4', [1234], 'node0', daemon_type, daemon_type),
+ FakeDaemonDescription('1.2.3.5', [1234], 'node1', daemon_type, daemon_type)]
class FakeInventory:
return '1.2.3.4'
+class FakeNFSServiceSpec:
+ def __init__(self, port):
+ self.monitoring_port = None
+ self.monitoring_ip_addrs = None
+ self.monitoring_networks = None
+
+
+class FakeIngressServiceSpec:
+ def __init__(self, port):
+ self.monitor_port = port
+
+
class FakeServiceSpec:
def __init__(self, port):
self.monitor_port = port
class FakeSpecDescription:
- def __init__(self, port):
- self.spec = FakeServiceSpec(port)
+ def __init__(self, service, port):
+ if service == 'ingress':
+ self.spec = FakeIngressServiceSpec(port)
+ elif service == 'nfs':
+ self.spec = FakeNFSServiceSpec(port)
+ else:
+ self.spec = FakeServiceSpec(port)
class FakeSpecStore():
def __init__(self, mgr):
self.mgr = mgr
- self._specs = {'ingress': FakeSpecDescription(9049)}
+ self._specs = {'ingress': FakeSpecDescription('ingress', 9049), 'nfs': FakeSpecDescription('nfs', 9587)}
def __contains__(self, name):
return name in self._specs
def __getitem__(self, name):
- return self._specs['ingress']
+ return self._specs[name]
class FakeMgr:
self.inventory = FakeInventory()
self.cache = FakeCache()
self.spec_store = FakeSpecStore(self)
+ service_registry.init_services(self)
def get_mgr_id(self):
return 'mgr-1'
MgmtGatewaySpec,
OAuth2ProxySpec
)
-from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect
+from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect, wait
from ceph.utils import datetime_now
# check keepalived config
assert keepalived_generated_conf[0] == keepalived_expected_conf
- @patch("cephadm.serve.CephadmServe._run_cephadm")
- @patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
- @patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())
- @patch("cephadm.services.nfs.NFSService.purge", MagicMock())
- @patch("cephadm.services.nfs.NFSService.create_rados_config_obj", MagicMock())
- def test_nfs_config_monitoring_ip(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
-
- with with_host(cephadm_module, 'test', addr='1.2.3.7'):
- cephadm_module.cache.update_host_networks('test', {
- '1.2.3.0/24': {
- 'if0': ['1.2.3.1']
- }
- })
-
- nfs_spec = NFSServiceSpec(service_id="foo", placement=PlacementSpec(hosts=['test']),
- monitoring_ip_addrs={'test': '1.2.3.1'})
- with with_service(cephadm_module, nfs_spec) as _:
- nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(
- CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=nfs_spec.service_name()))
- ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
- assert "Monitoring_Addr = 1.2.3.1" in ganesha_conf
-
- nfs_spec = NFSServiceSpec(service_id="foo", placement=PlacementSpec(hosts=['test']),
- monitoring_networks=['1.2.3.0/24'])
- with with_service(cephadm_module, nfs_spec) as _:
- nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(
- CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=nfs_spec.service_name()))
- ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
- assert "Monitoring_Addr = 1.2.3.1" in ganesha_conf
-
@patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
@patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())
@patch("cephadm.services.nfs.NFSService.purge", MagicMock())
)
assert nfs_generated_conf == nfs_expected_conf
+ @patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.purge", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.create_rados_config_obj", MagicMock())
+ @patch("cephadm.inventory.Inventory.keys")
+ @patch("cephadm.inventory.Inventory.get_addr")
+ @patch("cephadm.utils.resolve_ip")
+ @patch("cephadm.inventory.HostCache.get_daemons_by_service")
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_haproxy_protocol_nfs_config_with_ip_addrs(
+ self,
+ _run_cephadm,
+ _get_daemons_by_service,
+ _resolve_ip,
+ _get_addr,
+ _inventory_keys,
+ cephadm_module: CephadmOrchestrator,
+ ):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ nfs_service = NFSServiceSpec(
+ service_id="foo",
+ placement=PlacementSpec(
+ count=1,
+ hosts=['host1', 'host2']),
+ port=12049,
+ ip_addrs={
+ 'host1': '10.10.2.20',
+ 'host2': '10.10.2.21'
+ },
+ enable_haproxy_protocol=True,
+ )
+
+ ispec = IngressSpec(
+ service_type='ingress',
+ service_id='nfs.foo',
+ backend_service='nfs.foo',
+ frontend_port=2049,
+ monitor_port=9049,
+ virtual_ip='192.168.122.100/24',
+ monitor_user='admin',
+ monitor_password='12345',
+ keepalived_password='12345',
+ enable_haproxy_protocol=True,
+ )
+ cephadm_module.spec_store._specs = {
+ 'nfs.foo': nfs_service,
+ 'ingress.nfs.foo': ispec
+ }
+ cephadm_module.spec_store.spec_created = {
+ 'nfs.foo': datetime_now(),
+ 'ingress.nfs.foo': datetime_now()
+ }
+ nfs_daemons = [
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id='foo.0.1.host1.qwerty',
+ hostname='host1',
+ ip='10.10.2.20',
+ rank=0,
+ rank_generation=1,
+ ports=[12049],
+ ),
+ DaemonDescription(
+ daemon_type='nfs',
+ daemon_id='foo.0.0.host2.abcdef',
+ hostname='host2',
+ ip='10.10.2.21',
+ rank=0,
+ rank_generation=0,
+ ports=[12049],
+ ),
+ ]
+ _get_daemons_by_service.return_value = nfs_daemons
+
+ ingress_svc = service_registry.get_service('ingress')
+ nfs_svc = service_registry.get_service('nfs')
+
+ cephadm_module.cache.update_host_networks('host1', {
+ # this one is additional
+ '10.10.2.0/24': {
+ 'eth1': ['10.10.2.20']
+ },
+ # this is redundant and will be skipped
+ '192.168.122.0/24': {
+ 'eth0': ['192.168.122.111']
+ },
+ })
+ cephadm_module.cache.update_host_networks('host2', {
+ # this one is additional
+ '10.10.2.0/24': {
+ 'eth1': ['10.10.2.22']
+ },
+ # this is redundant and will be skipped
+ '192.168.122.0/24': {
+ 'eth0': ['192.168.122.112']
+ },
+ })
+
+ haproxy_generated_conf, _ = ingress_svc.haproxy_generate_config(
+ CephadmDaemonDeploySpec(
+ host='host1',
+ daemon_id='ingress',
+ service_name=ispec.service_name(),
+ ),
+ )
+ gen_config_lines = haproxy_generated_conf['files']['haproxy.cfg']
+ assert 'server nfs.foo.0 10.10.2.20:12049 check' in gen_config_lines
+
+ nfs_generated_conf, _ = nfs_svc.generate_config(
+ CephadmDaemonDeploySpec(
+ host='test',
+ daemon_id='foo.test.0.0',
+ service_name=nfs_service.service_name(),
+ rank=0,
+ ip='10.10.2.20'
+ ),
+ )
+ ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
+ assert "Bind_addr = 10.10.2.20" in ganesha_conf
+
+
+class TestNFS:
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.purge", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.create_rados_config_obj", MagicMock())
+ def test_nfs_config_monitoring_ip(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+ with with_host(cephadm_module, 'test', addr='1.2.3.7'):
+ cephadm_module.cache.update_host_networks('test', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.1']
+ }
+ })
+
+ nfs_spec = NFSServiceSpec(service_id="foo", placement=PlacementSpec(hosts=['test']),
+ monitoring_ip_addrs={'test': '1.2.3.1'})
+ with with_service(cephadm_module, nfs_spec) as _:
+ nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(
+ CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=nfs_spec.service_name()))
+ ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
+ assert "Monitoring_Addr = 1.2.3.1" in ganesha_conf
+
+ nfs_spec = NFSServiceSpec(service_id="foo", placement=PlacementSpec(hosts=['test']),
+ monitoring_networks=['1.2.3.0/24'])
+ with with_service(cephadm_module, nfs_spec) as _:
+ nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(
+ CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=nfs_spec.service_name()))
+ ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
+ assert "Monitoring_Addr = 1.2.3.1" in ganesha_conf
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ @patch("cephadm.services.nfs.NFSService.fence_old_ranks", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.run_grace_tool", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.purge", MagicMock())
+ @patch("cephadm.services.nfs.NFSService.create_rados_config_obj", MagicMock())
+ def test_nfs_config_bind_addr(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+
+ with with_host(cephadm_module, 'host1', addr='1.2.3.7'):
+ cephadm_module.cache.update_host_networks('host1', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.7']
+ }
+ })
+
+ nfs_spec = NFSServiceSpec(service_id="foo", placement=PlacementSpec(hosts=['host1']),
+ ip_addrs={'host1': '1.2.3.7'})
+ with with_service(cephadm_module, nfs_spec, status_running=True) as _:
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dds[0])
+ nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(daemon_spec)
+ ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
+ assert "Bind_addr = 1.2.3.7" in ganesha_conf
+
+ with with_host(cephadm_module, 'host1', addr='1.2.3.7'):
+ cephadm_module.cache.update_host_networks('host1', {
+ '1.2.3.0/24': {
+ 'if0': ['1.2.3.7']
+ }
+ })
+ nfs_spec = NFSServiceSpec(service_id="foo", placement=PlacementSpec(hosts=['host1']),
+ networks=['1.2.3.0/24'])
+ with with_service(cephadm_module, nfs_spec, status_running=True) as _:
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dds[0])
+ nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(daemon_spec)
+ ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
+ assert "Bind_addr = 1.2.3.7" in ganesha_conf
+
class TestCephFsMirror:
@patch("cephadm.serve.CephadmServe._run_cephadm")
extra_container_args: Optional[GeneralArgList] = None,
extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
+ ip_addrs: Optional[Dict[str, str]] = None,
):
#: See :ref:`orchestrator-cli-placement-spec`.
extra_entrypoint_args)
self.custom_configs: Optional[List[CustomConfig]] = custom_configs
+ # ip_addrs is a dict where each key is a hostname and the corresponding value
+ # is the IP address {hostname: ip} that the NFS service should bind to on that host.
+ self.ip_addrs = ip_addrs
+
def __setattr__(self, name: str, value: Any) -> None:
if value is not None and name in ('extra_container_args', 'extra_entrypoint_args'):
for v in value:
preview_only: bool = False,
config: Optional[Dict[str, str]] = None,
networks: Optional[List[str]] = None,
+ ip_addrs: Optional[Dict[str, str]] = None,
port: Optional[int] = None,
monitoring_networks: Optional[List[str]] = None,
monitoring_ip_addrs: Optional[Dict[str, str]] = None,
'nfs', service_id=service_id,
placement=placement, unmanaged=unmanaged, preview_only=preview_only,
config=config, networks=networks, extra_container_args=extra_container_args,
- extra_entrypoint_args=extra_entrypoint_args, custom_configs=custom_configs)
+ extra_entrypoint_args=extra_entrypoint_args, custom_configs=custom_configs,
+ ip_addrs=ip_addrs)
self.port = port
# type: () -> str
return 'conf-' + self.service_name()
+ def validate(self) -> None:
+ super(NFSServiceSpec, self).validate()
+
+ if self.virtual_ip and (self.ip_addrs or self.networks):
+ raise SpecValidationError("Invalid NFS spec: Cannot set virtual_ip and "
+ f"{'ip_addrs' if self.ip_addrs else 'networks'} fields")
+
yaml.add_representer(NFSServiceSpec, ServiceSpec.yaml_representer)