Make flake8 happy on the testinfra files.
Signed-off-by: Sébastien Han <seb@redhat.com>
if marker.name in group_names or marker.name == 'all':
test_is_applicable = True
break
- # Check if any markers on the test method exist in the nodes group_names. If they do not, this test is not valid for the node being tested.
+ # Check if any markers on the test method exist in the nodes group_names.
+ # If they do not, this test is not valid for the node being tested.
if not test_is_applicable:
- reason = "%s: Not a valid test for node type: %s" % (request.function, group_names)
+ reason = "%s: Not a valid test for node type: %s" % (
+ request.function, group_names)
pytest.skip(reason)
if request.node.get_closest_marker("no_lvm_scenario") and lvm_scenario:
import pytest
-import json
class TestiSCSIs(object):
import pytest
import json
+
class TestMDSs(object):
@pytest.mark.no_docker
container_binary = 'docker'
if host.exists('podman') and host.ansible("setup")["ansible_facts"]["ansible_distribution"] == 'Fedora': # noqa E501
container_binary = 'podman'
- docker_exec_cmd = '{container_binary} exec ceph-mds-{hostname}'.format(
+ docker_exec_cmd = '{container_binary} exec ceph-mds-{hostname}'.format( # noqa E501
hostname=hostname, container_binary=container_binary)
else:
docker_exec_cmd = ''
- cmd = "sudo {docker_exec_cmd} ceph --name client.bootstrap-mds --keyring /var/lib/ceph/bootstrap-mds/{cluster}.keyring --cluster={cluster} --connect-timeout 5 -f json -s".format(
+ cmd = "sudo {docker_exec_cmd} ceph --name client.bootstrap-mds --keyring /var/lib/ceph/bootstrap-mds/{cluster}.keyring --cluster={cluster} --connect-timeout 5 -f json -s".format( # noqa E501
docker_exec_cmd=docker_exec_cmd,
cluster=node['cluster_name']
)
cluster_status = json.loads(host.check_output(cmd))
- assert (cluster_status['fsmap'].get('up', 0) + cluster_status['fsmap'].get('up:standby', 0)) == len(node["vars"]["groups"]["mdss"])
+ assert (cluster_status['fsmap'].get('up', 0) + cluster_status['fsmap'].get( # noqa E501
+ 'up:standby', 0)) == len(node["vars"]["groups"]["mdss"])
import pytest
import json
+
class TestMGRs(object):
@pytest.mark.no_docker
container_binary = 'docker'
if host.exists('podman') and host.ansible("setup")["ansible_facts"]["ansible_distribution"] == 'Fedora': # noqa E501
container_binary = 'podman'
- docker_exec_cmd = '{container_binary} exec ceph-mgr-{hostname}'.format(
+ docker_exec_cmd = '{container_binary} exec ceph-mgr-{hostname}'.format( # noqa E501
hostname=hostname, container_binary=container_binary)
else:
docker_exec_cmd = ''
- cmd = "sudo {docker_exec_cmd} ceph --name mgr.{hostname} --keyring /var/lib/ceph/mgr/{cluster}-{hostname}/keyring --cluster={cluster} --connect-timeout 5 -f json -s".format(
+ cmd = "sudo {docker_exec_cmd} ceph --name mgr.{hostname} --keyring /var/lib/ceph/mgr/{cluster}-{hostname}/keyring --cluster={cluster} --connect-timeout 5 -f json -s".format( # noqa E501
docker_exec_cmd=docker_exec_cmd,
hostname=node["vars"]["inventory_hostname"],
cluster=cluster
import pytest
import re
+
class TestMons(object):
@pytest.mark.no_docker
@pytest.mark.no_docker
def test_can_get_cluster_health(self, node, host):
- cmd = "sudo ceph --cluster={} --connect-timeout 5 -s".format(node["cluster_name"])
+ cmd = "sudo ceph --cluster={} --connect-timeout 5 -s".format(node["cluster_name"]) # noqa E501
output = host.check_output(cmd)
assert output.strip().startswith("cluster")
assert File(node["conf_path"]).contains("^mon initial members = .*$")
def test_initial_members_line_has_correct_value(self, node, host, File):
- mon_initial_members_line = host.check_output("grep 'mon initial members = ' /etc/ceph/{cluster}.conf".format(cluster=node['cluster_name']))
+ mon_initial_members_line = host.check_output("grep 'mon initial members = ' /etc/ceph/{cluster}.conf".format(cluster=node['cluster_name'])) # noqa E501
result = True
for host in node["vars"]["groups"]["mons"]:
pattern = re.compile(host)
- if pattern.search(mon_initial_members_line) == None:
+ if pattern.search(mon_initial_members_line) == None: # noqa E501
result = False
assert result
-
import json
import pytest
+
class TestNFSs(object):
@pytest.mark.no_docker
@pytest.mark.no_docker
def test_nfs_config_override(self, node, host):
- assert host.file("/etc/ganesha/ganesha.conf").contains("Entries_HWMark")
+ assert host.file(
+ "/etc/ganesha/ganesha.conf").contains("Entries_HWMark")
def test_nfs_is_up(self, node, host):
hostname = node["vars"]["inventory_hostname"]
container_binary = 'docker'
if host.exists('podman') and host.ansible("setup")["ansible_facts"]["ansible_distribution"] == 'Fedora': # noqa E501
container_binary = 'podman'
- docker_exec_cmd = '{container_binary} exec ceph-nfs-{hostname}'.format(
+ docker_exec_cmd = '{container_binary} exec ceph-nfs-{hostname}'.format( # noqa E501
hostname=hostname, container_binary=container_binary)
else:
docker_exec_cmd = ''
- cmd = "sudo {docker_exec_cmd} ceph --name client.rgw.{hostname} --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring --cluster={cluster} --connect-timeout 5 -f json -s".format(
+ cmd = "sudo {docker_exec_cmd} ceph --name client.rgw.{hostname} --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring --cluster={cluster} --connect-timeout 5 -f json -s".format( # noqa E501
docker_exec_cmd=docker_exec_cmd,
hostname=hostname,
cluster=cluster
)
output = host.check_output(cmd)
- daemons = [i for i in json.loads(output)["servicemap"]["services"]["rgw-nfs"]["daemons"]]
+ daemons = [i for i in json.loads(
+ output)["servicemap"]["services"]["rgw-nfs"]["daemons"]]
assert hostname in daemons
-#NOTE (guits): This check must be fixed. (Permission denied error)
+# NOTE (guits): This check must be fixed. (Permission denied error)
# @pytest.mark.no_docker
# def test_nfs_rgw_fsal_export(self, node, host):
# if(host.mount_point("/mnt").exists):
class TestOSD(object):
def test_osds_are_all_collocated(self, node, host):
- # TODO: figure out way to paramaterize node['vars']['devices'] for this test
+ # TODO: figure out way to paramaterize node['vars']['devices'] for this test # noqa E501
osd_auto_discovery = node["vars"].get('osd_auto_discovery', False)
if osd_auto_discovery:
- node["vars"]["devices"] = ["/dev/sda", "/dev/sdb", "/dev/sdc"] # Hardcoded since we can't retrieve the devices list generated during playbook run
+ node["vars"]["devices"] = ["/dev/sda", "/dev/sdb", "/dev/sdc"] # Hardcoded since we can't retrieve the devices list generated during playbook run # noqa E501
for device in node["vars"]["devices"]:
- assert host.check_output("sudo blkid -s PARTLABEL -o value $(readlink -f %s)2" % device) in ["ceph journal", "ceph block"]
+ assert host.check_output("sudo blkid -s PARTLABEL -o value $(readlink -f %s)2" % device) in ["ceph journal", "ceph block"] # noqa E501
def test_osds_listen_on_public_network(self, node, host):
# TODO: figure out way to paramaterize this test
nb_port = (node["num_osds"] * 2)
- assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["address"])) == str(nb_port)
+ assert host.check_output(
+ "netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["address"])) == str(nb_port) # noqa E501
def test_osds_listen_on_cluster_network(self, node, host):
# TODO: figure out way to paramaterize this test
nb_port = (node["num_osds"] * 2)
- assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["cluster_address"])) == str(nb_port)
+ assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % # noqa E501
+ (node["cluster_address"])) == str(nb_port)
def test_osd_services_are_running(self, node, host):
# TODO: figure out way to paramaterize node['osds'] for this test
def _get_osd_id_from_host(self, node, osd_tree):
children = []
for n in osd_tree['nodes']:
- if n['name'] == node['vars']['inventory_hostname'] and n['type'] == 'host':
+ if n['name'] == node['vars']['inventory_hostname'] and n['type'] == 'host': # noqa E501
children = n['children']
return children
@pytest.mark.no_docker
def test_all_osds_are_up_and_in(self, node, host):
- cmd = "sudo ceph --cluster={cluster} --connect-timeout 5 --keyring /var/lib/ceph/bootstrap-osd/{cluster}.keyring -n client.bootstrap-osd osd tree -f json".format(cluster=node["cluster_name"])
+ cmd = "sudo ceph --cluster={cluster} --connect-timeout 5 --keyring /var/lib/ceph/bootstrap-osd/{cluster}.keyring -n client.bootstrap-osd osd tree -f json".format( # noqa E501
+ cluster=node["cluster_name"])
output = json.loads(host.check_output(cmd))
assert node["num_osds"] == self._get_nb_up_osds_from_ids(node, output)
container_binary = 'podman'
osd_id = host.check_output(os.path.join(
container_binary + " ps -q --filter='name=ceph-osd' | head -1"))
- cmd = "sudo {container_binary} exec {osd_id} ceph --cluster={cluster} --connect-timeout 5 --keyring /var/lib/ceph/bootstrap-osd/{cluster}.keyring -n client.bootstrap-osd osd tree -f json".format(
+ cmd = "sudo {container_binary} exec {osd_id} ceph --cluster={cluster} --connect-timeout 5 --keyring /var/lib/ceph/bootstrap-osd/{cluster}.keyring -n client.bootstrap-osd osd tree -f json".format( # noqa E501
osd_id=osd_id,
cluster=node["cluster_name"],
container_binary=container_binary
import pytest
import json
-import os
+
class TestRbdMirrors(object):
container_binary = 'docker'
if host.exists('podman') and host.ansible("setup")["ansible_facts"]["ansible_distribution"] == 'Fedora': # noqa E501
container_binary = 'podman'
- docker_exec_cmd = '{container_binary} exec ceph-rbd-mirror-{hostname}'.format(
+ docker_exec_cmd = '{container_binary} exec ceph-rbd-mirror-{hostname}'.format( # noqa E501
hostname=hostname, container_binary=container_binary)
else:
docker_exec_cmd = ''
hostname = node["vars"]["inventory_hostname"]
cluster = node['cluster_name']
- cmd = "sudo {docker_exec_cmd} ceph --name client.bootstrap-rbd-mirror --keyring /var/lib/ceph/bootstrap-rbd-mirror/{cluster}.keyring --cluster={cluster} --connect-timeout 5 -f json -s".format(
+ cmd = "sudo {docker_exec_cmd} ceph --name client.bootstrap-rbd-mirror --keyring /var/lib/ceph/bootstrap-rbd-mirror/{cluster}.keyring --cluster={cluster} --connect-timeout 5 -f json -s".format( # noqa E501
docker_exec_cmd=docker_exec_cmd,
hostname=hostname,
cluster=cluster
)
output = host.check_output(cmd)
status = json.loads(output)
- daemon_ids = [i for i in status["servicemap"]["services"]["rbd-mirror"]["daemons"].keys() if i != "summary"]
+ daemon_ids = [i for i in status["servicemap"]["services"]
+ ["rbd-mirror"]["daemons"].keys() if i != "summary"]
for daemon_id in daemon_ids:
- daemons.append(status["servicemap"]["services"]["rbd-mirror"]["daemons"][daemon_id]["metadata"]["hostname"])
- assert hostname in daemons
\ No newline at end of file
+ daemons.append(status["servicemap"]["services"]["rbd-mirror"]
+ ["daemons"][daemon_id]["metadata"]["hostname"])
+ assert hostname in daemons
import pytest
import json
+
class TestRGWs(object):
@pytest.mark.no_docker
container_binary = 'docker'
if host.exists('podman') and host.ansible("setup")["ansible_facts"]["ansible_distribution"] == 'Fedora': # noqa E501
container_binary = 'podman'
- docker_exec_cmd = '{container_binary} exec ceph-rgw-{hostname}'.format(
+ docker_exec_cmd = '{container_binary} exec ceph-rgw-{hostname}'.format( # noqa E501
hostname=hostname, container_binary=container_binary)
else:
docker_exec_cmd = ''
- cmd = "sudo {docker_exec_cmd} ceph --name client.bootstrap-rgw --keyring /var/lib/ceph/bootstrap-rgw/{cluster}.keyring --cluster={cluster} --connect-timeout 5 -f json -s".format(
+ cmd = "sudo {docker_exec_cmd} ceph --name client.bootstrap-rgw --keyring /var/lib/ceph/bootstrap-rgw/{cluster}.keyring --cluster={cluster} --connect-timeout 5 -f json -s".format( # noqa E501
docker_exec_cmd=docker_exec_cmd,
hostname=hostname,
cluster=cluster
)
output = host.check_output(cmd)
- daemons = [i for i in json.loads(output)["servicemap"]["services"]["rgw"]["daemons"]]
+ daemons = [i for i in json.loads(
+ output)["servicemap"]["services"]["rgw"]["daemons"]]
assert hostname in daemons
@pytest.mark.no_docker
def test_rgw_http_endpoint(self, node, host):
# rgw frontends ip_addr is configured on eth1
ip_addr = host.interface("eth1").addresses[0]
- assert host.socket("tcp://{ip_addr}:{port}".format(ip_addr=ip_addr, port=8080)).is_listening
+ assert host.socket(
+ "tcp://{ip_addr}:{port}".format(ip_addr=ip_addr, port=8080)).is_listening # noqa E501
@pytest.mark.no_docker
def test_rgw_bucket_default_quota_is_set(self, node, host):
- assert host.file(node["conf_path"]).contains("rgw override bucket index max shards")
- assert host.file(node["conf_path"]).contains("rgw bucket default quota max objects")
+ assert host.file(node["conf_path"]).contains(
+ "rgw override bucket index max shards")
+ assert host.file(node["conf_path"]).contains(
+ "rgw bucket default quota max objects")
@pytest.mark.no_docker
def test_rgw_bucket_default_quota_is_applied(self, node, host):
- radosgw_admin_cmd = "sudo radosgw-admin --cluster={cluster} -n client.rgw.{hostname} --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring user create --uid=test --display-name Test".format(
+ radosgw_admin_cmd = "sudo radosgw-admin --cluster={cluster} -n client.rgw.{hostname} --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring user create --uid=test --display-name Test".format( # noqa E501
hostname=node["vars"]["inventory_hostname"],
cluster=node['cluster_name']
)
radosgw_admin_output = host.check_output(radosgw_admin_cmd)
radosgw_admin_output_json = json.loads(radosgw_admin_output)
- assert radosgw_admin_output_json["bucket_quota"]["enabled"] == True
- assert radosgw_admin_output_json["bucket_quota"]["max_objects"] == 1638400
+ assert radosgw_admin_output_json["bucket_quota"]["enabled"] == True # noqa E501
+ assert radosgw_admin_output_json["bucket_quota"]["max_objects"] == 1638400 # noqa E501
@pytest.mark.no_docker
def test_rgw_tuning_pools_are_set(self, node, host):
- cmd = "sudo ceph --cluster={cluster} --connect-timeout 5 -n client.rgw.{hostname} --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring osd dump".format(
+ cmd = "sudo ceph --cluster={cluster} --connect-timeout 5 -n client.rgw.{hostname} --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring osd dump".format( # noqa E501
hostname=node["vars"]["inventory_hostname"],
cluster=node['cluster_name']
)
container_binary = 'docker'
if host.exists('podman') and host.ansible("setup")["ansible_facts"]["ansible_distribution"] == 'Fedora': # noqa E501
container_binary = 'podman'
- cmd = "sudo {container_binary} exec ceph-rgw-{hostname} ceph --cluster={cluster} -n client.rgw.{hostname} --connect-timeout 5 --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring osd dump".format(
+ cmd = "sudo {container_binary} exec ceph-rgw-{hostname} ceph --cluster={cluster} -n client.rgw.{hostname} --connect-timeout 5 --keyring /var/lib/ceph/radosgw/{cluster}-rgw.{hostname}/keyring osd dump".format( # noqa E501
hostname=hostname,
cluster=cluster,
container_binary=container_binary
)
output = host.check_output(cmd)
pools = node["vars"].get("rgw_create_pools")
- if pools == None:
+ if pools is None:
pytest.skip('rgw_create_pools not defined, nothing to test')
for pool_name, pg_num in pools.items():
assert pool_name in output
import pytest
import re
+
class TestInstall(object):
def test_ceph_dir_exists(self, host, node):
assert File(node["conf_path"]).contains("^mon host = .*$")
def test_mon_host_line_has_correct_value(self, node, host):
- mon_host_line = host.check_output("grep 'mon host = ' /etc/ceph/{cluster}.conf".format(cluster=node['cluster_name']))
- result=True
+ mon_host_line = host.check_output("grep 'mon host = ' /etc/ceph/{cluster}.conf".format(cluster=node['cluster_name'])) # noqa E501
+ result = True
for x in range(0, node["num_mons"]):
- pattern=re.compile(("{}.1{}".format(node["subnet"], x)))
- if pattern.search(mon_host_line) == None:
- result=False
+ pattern = re.compile(("{}.1{}".format(node["subnet"], x)))
+ if pattern.search(mon_host_line) is None:
+ result = False
assert result