def node(host, request):
"""
This fixture represents a single node in the ceph cluster. Using the
- host.ansible fixture provided by testinfra it can access all the ansible variables
- provided to it by the specific test scenario being ran.
+ host.ansible fixture provided by testinfra it can access all the ansible
+ variables provided to it by the specific test scenario being ran.
- You must include this fixture on any tests that operate on specific type of node
- because it contains the logic to manage which tests a node should run.
+ You must include this fixture on any tests that operate on specific type
+ of node because it contains the logic to manage which tests a node
+ should run.
"""
ansible_vars = host.ansible.get_variables()
# tox will pass in this environment variable. we need to do it this way
osd_auto_discovery = ansible_vars.get("osd_auto_discovery")
lvm_scenario = ansible_vars.get("osd_scenario") == 'lvm'
ceph_release_num = {
- 'jewel': 10,
- 'kraken': 11,
- 'luminous': 12,
- 'mimic': 13,
- 'dev': 99
+ 'jewel': 10,
+ 'kraken': 11,
+ 'luminous': 12,
+ 'mimic': 13,
+ 'dev': 99
}
- if not request.node.get_marker(node_type) and not request.node.get_marker('all'):
+ if not request.node.get_marker(node_type) and not request.node.get_marker('all'): # noqa E501
pytest.skip("Not a valid test for node type: %s" % node_type)
if request.node.get_marker("no_lvm_scenario") and lvm_scenario:
pytest.skip("Not a valid test for non-lvm scenarios")
if request.node.get_marker("no_docker") and docker:
- pytest.skip("Not a valid test for containerized deployments or atomic hosts")
+ pytest.skip(
+ "Not a valid test for containerized deployments or atomic hosts")
if request.node.get_marker("docker") and not docker:
- pytest.skip("Not a valid test for non-containerized deployments or atomic hosts")
+ pytest.skip(
+ "Not a valid test for non-containerized deployments or atomic hosts") # noqa E501
if node_type == "mgrs" and ceph_stable_release == "jewel":
pytest.skip("mgr nodes can not be tested with ceph release jewel")
if node_type == "nfss" and ceph_stable_release == "jewel":
pytest.skip("nfs nodes can not be tested with ceph release jewel")
- if request.node.get_marker("from_luminous") and ceph_release_num[ceph_stable_release] < ceph_release_num['luminous']:
- pytest.skip("This test is only valid for releases starting from Luminous and above")
+ if node_type == "iscsi_gws" and ceph_stable_release == "jewel":
+ pytest.skip("iscsi_gws nodes can not be tested with ceph release jewel") # noqa E501
- if request.node.get_marker("before_luminous") and ceph_release_num[ceph_stable_release] >= ceph_release_num['luminous']:
+ if request.node.get_marker("from_luminous") and ceph_release_num[ceph_stable_release] < ceph_release_num['luminous']: # noqa E501
+ pytest.skip(
+ "This test is only valid for releases starting from Luminous and above") # noqa E501
+
+ if request.node.get_marker("before_luminous") and ceph_release_num[ceph_stable_release] >= ceph_release_num['luminous']: # noqa E501
pytest.skip("This test is only valid for release before Luminous")
journal_collocation_test = ansible_vars.get("osd_scenario") == "collocated"
- if request.node.get_marker("journal_collocation") and not journal_collocation_test:
+ if request.node.get_marker("journal_collocation") and not journal_collocation_test: # noqa E501
pytest.skip("Scenario is not using journal collocation")
osd_ids = []
item.add_marker(pytest.mark.rgws)
elif "nfs" in test_path:
item.add_marker(pytest.mark.nfss)
+ elif "iscsi" in test_path:
+ item.add_marker(pytest.mark.iscsi_gws)
else:
item.add_marker(pytest.mark.all)
--- /dev/null
+import pytest
+import json
+
+
+class TestiSCSIs(object):
+
+ @pytest.mark.no_docker
+ def test_tcmu_runner_is_installed(self, node, host):
+ assert host.package("tcmu-runner").is_installed
+
+ @pytest.mark.no_docker
+ def test_rbd_target_api_is_installed(self, node, host):
+ assert host.package("rbd-target-api").is_installed
+
+ @pytest.mark.no_docker
+ def test_rbd_target_gw_is_installed(self, node, host):
+ assert host.package("rbd-target-gw").is_installed
+
+ def test_tcmu_runner_service_is_running(self, node, host):
+ service_name = "tcmu-runner"
+ assert host.service(service_name).is_running
+
+ def test_rbd_target_api_service_is_running(self, node, host):
+ service_name = "rbd-target-api"
+ assert host.service(service_name).is_running
+
+ def test_rbd_target_gw_service_is_running(self, node, host):
+ service_name = "rbd-target-gw"
+ assert host.service(service_name).is_running
+
+ def test_tcmu_runner_service_is_enabled(self, node, host):
+ service_name = "tcmu-runner"
+ assert host.service(service_name).is_enabled
+
+ def test_rbd_target_api_service_is_enabled(self, node, host):
+ service_name = "rbd-target-api"
+ assert host.service(service_name).is_enabled
+
+ def test_rbd_target_gw_service_is_enabled(self, node, host):
+ service_name = "rbd-target-gw"
+ assert host.service(service_name).is_enabled