def test_osds_listen_on_public_network(self, node, host):
# TODO: figure out way to paramaterize this test
- nb_port = (node["num_devices"] * 2)
+ nb_port = (node["num_osds"] * 2)
assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["address"])) == str(nb_port)
def test_osds_listen_on_cluster_network(self, node, host):
# TODO: figure out way to paramaterize this test
- nb_port = (node["num_devices"] * 2)
+ nb_port = (node["num_osds"] * 2)
assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["cluster_address"])) == str(nb_port)
def test_osd_services_are_running(self, node, host):
def test_all_osds_are_up_and_in(self, node, host):
cmd = "sudo ceph --cluster={cluster} --connect-timeout 5 --keyring /var/lib/ceph/bootstrap-osd/{cluster}.keyring -n client.bootstrap-osd osd tree -f json".format(cluster=node["cluster_name"])
output = json.loads(host.check_output(cmd))
- assert node["num_devices"] == self._get_nb_up_osds_from_ids(node, output)
+ assert node["num_osds"] == self._get_nb_up_osds_from_ids(node, output)
@pytest.mark.docker
def test_all_docker_osds_are_up_and_in(self, node, host):
cluster=node["cluster_name"]
)
output = json.loads(host.check_output(cmd))
- assert node["num_devices"] == self._get_nb_up_osds_from_ids(node, output)
\ No newline at end of file
+ assert node["num_osds"] == self._get_nb_up_osds_from_ids(node, output)