From: Sebastian Wagner Date: Thu, 30 Jan 2020 16:23:24 +0000 (+0100) Subject: mgr/progress: Add test_progress.py to tox X-Git-Tag: v15.1.1~520^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8ee0cece70bba1414542ac46af9420e8d9b77ba6;p=ceph.git mgr/progress: Add test_progress.py to tox Signed-off-by: Sebastian Wagner --- diff --git a/src/pybind/mgr/progress/__init__.py b/src/pybind/mgr/progress/__init__.py index fcbdd8fbdc25..e3379608af76 100644 --- a/src/pybind/mgr/progress/__init__.py +++ b/src/pybind/mgr/progress/__init__.py @@ -2,10 +2,6 @@ import os if 'UNITTEST' not in os.environ: from .module import * else: - import sys - import mock - sys.modules['ceph_module'] = mock.Mock() - sys.modules['rados'] = mock.Mock() - from .module import * - + import tests + diff --git a/src/pybind/mgr/progress/test_progress.py b/src/pybind/mgr/progress/test_progress.py new file mode 100644 index 000000000000..db2013deb3f2 --- /dev/null +++ b/src/pybind/mgr/progress/test_progress.py @@ -0,0 +1,204 @@ +#python unit test +import unittest +import os +import sys +from tests import mock + +import pytest +import json +os.environ['UNITTEST'] = "1" +sys.path.insert(0, "../../pybind/mgr") +from progress import module + +class TestPgRecoveryEvent(object): + # Testing PgRecoveryEvent class + + def setup(self): + # Creating the class and Mocking + # a bunch of attributes for testing + module._module = mock.Mock() # just so Event._refresh() works + self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30) + + def test_pg_update(self): + # Test for a completed event when the pg states show active+clear + pg_dump = { + "pg_stats":[ + { + "state": "active+clean", + "stat_sum": { + "num_bytes": 10, + "num_bytes_recovered": 10 + }, + "up": [ + 3, + 1 + ], + "acting": [ + 3, + 1 + ], + "pgid": "1.0", + "reported_epoch": "30" + }, + { + "state": "active+clean", + "stat_sum": { + "num_bytes": 10, + "num_bytes_recovered": 10 + }, + "up": [ + 3, + 1 + ], + "acting": [ + 3, + 1 + ], + "pgid": "1.1", + "reported_epoch": "30" + }, + { + "state": "active+clean", + "stat_sum": { + "num_bytes": 10, + "num_bytes_recovered": 10 + }, + "up": [ + 3, + 1 + ], + "acting": [ + 3, + 1 + ], + "pgid": "1.2", + "reported_epoch": "30" + } + ] + } + + self.test_event.pg_update(pg_dump, mock.Mock()) + assert self.test_event._progress == 1.0 + +class OSDMap: + + # This is an artificial class to help + # _osd_in_out function have all the + # necessary characteristics, some + # of the funcitons are copied from + # mgr_module + + def __init__(self, dump, pg_stats): + self._dump = dump + self._pg_stats = pg_stats + + def _pg_to_up_acting_osds(self, pool_id, ps): + pg_id = str(pool_id) + "." + str(ps) + for pg in self._pg_stats["pg_stats"]: + if pg["pg_id"] == pg_id: + ret = { + "up_primary": pg["up_primary"], + "acting_primary": pg["acting_primary"], + "up": pg["up"], + "acting": pg["acting"] + } + return ret + + def dump(self): + return self._dump + + def get_pools(self): + d = self._dump() + return dict([(p['pool'], p) for p in d['pools']]) + + def get_pools_by_name(self): + d = self._dump() + return dict([(p['pool_name'], p) for p in d['pools']]) + + def pg_to_up_acting_osds(self, pool_id, ps): + return self._pg_to_up_acting_osds(pool_id, ps) + +class TestModule(object): + # Testing Module Class + + def setup(self): + # Creating the class and Mocking a + # bunch of attributes for testing + + module.PgRecoveryEvent.pg_update = mock.Mock() + self.test_module = module.Module() # so we can see if an event gets created + self.test_module.log = mock.Mock() # we don't need to log anything + self.test_module.get = mock.Mock() # so we can call pg_update + self.test_module._complete = mock.Mock() # we want just to see if this event gets called + self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works + module._module = mock.Mock() # so that Event.refresh() works + + def test_osd_in_out(self): + # test for the correct event being + # triggered and completed. + + old_pg_stats = { + "pg_stats":[ + { + "pg_id": "1.0", + "up_primary": 3, + "acting_primary": 3, + "up": [ + 3, + 0 + ], + "acting": [ + 3, + 0 + ] + + }, + + ] + } + new_pg_stats = { + "pg_stats":[ + { + "pg_id": "1.0", + "up_primary": 0, + "acting_primary": 0, + "up": [ + 0, + 2 + ], + "acting": [ + 0, + 2 + ] + }, + ] + } + + old_dump ={ + "pools": [ + { + "pool": 1, + "pg_num": 1 + } + ] + } + + new_dump = { + "pools": [ + { + "pool": 1, + "pg_num": 1 + } + ] + } + + new_map = OSDMap(new_dump, new_pg_stats) + old_map = OSDMap(old_dump, old_pg_stats) + self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out") + # check if only one event is created + assert len(self.test_module._events) == 1 + self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in") + # check if complete function is called + assert self.test_module._complete.call_count == 1 + # check if a PgRecovery Event was created and pg_update gets triggered + assert module.PgRecoveryEvent.pg_update.call_count == 2 diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 81ae3752e795..e6e3441e270c 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -5,7 +5,7 @@ skipsdist = true [testenv] setenv = UNITTEST = true deps = -r requirements.txt -commands = pytest -v --cov --cov-append --cov-report=term --doctest-modules {posargs:mgr_util.py tests/ cephadm/ ansible/} +commands = pytest -v --cov --cov-append --cov-report=term --doctest-modules {posargs:mgr_util.py tests/ cephadm/ ansible/ progress/} [testenv:mypy] basepython = python3 diff --git a/src/test/mgr/test_progress.py b/src/test/mgr/test_progress.py deleted file mode 100644 index 80f51a1aeb01..000000000000 --- a/src/test/mgr/test_progress.py +++ /dev/null @@ -1,203 +0,0 @@ -#python unit test -import unittest -import os -import sys -from mock import Mock -import pytest -import json -os.environ['UNITTEST'] = "1" -sys.path.insert(0, "../../pybind/mgr") -from progress import module - -class TestPgRecoveryEvent(object): - # Testing PgRecoveryEvent class - - def setup(self): - # Creating the class and Mocking - # a bunch of attributes for testing - module._module = Mock() # just so Event._refresh() works - self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30) - - def test_pg_update(self): - # Test for a completed event when the pg states show active+clear - pg_dump = { - "pg_stats":[ - { - "state": "active+clean", - "stat_sum": { - "num_bytes": 10, - "num_bytes_recovered": 10 - }, - "up": [ - 3, - 1 - ], - "acting": [ - 3, - 1 - ], - "pgid": "1.0", - "reported_epoch": "30" - }, - { - "state": "active+clean", - "stat_sum": { - "num_bytes": 10, - "num_bytes_recovered": 10 - }, - "up": [ - 3, - 1 - ], - "acting": [ - 3, - 1 - ], - "pgid": "1.1", - "reported_epoch": "30" - }, - { - "state": "active+clean", - "stat_sum": { - "num_bytes": 10, - "num_bytes_recovered": 10 - }, - "up": [ - 3, - 1 - ], - "acting": [ - 3, - 1 - ], - "pgid": "1.2", - "reported_epoch": "30" - } - ] - } - - self.test_event.pg_update(pg_dump, Mock()) - assert self.test_event._progress == 1.0 - -class OSDMap: - - # This is an artificial class to help - # _osd_in_out function have all the - # necessary characteristics, some - # of the funcitons are copied from - # mgr_module - - def __init__(self, dump, pg_stats): - self._dump = dump - self._pg_stats = pg_stats - - def _pg_to_up_acting_osds(self, pool_id, ps): - pg_id = str(pool_id) + "." + str(ps) - for pg in self._pg_stats["pg_stats"]: - if pg["pg_id"] == pg_id: - ret = { - "up_primary": pg["up_primary"], - "acting_primary": pg["acting_primary"], - "up": pg["up"], - "acting": pg["acting"] - } - return ret - - def dump(self): - return self._dump - - def get_pools(self): - d = self._dump() - return dict([(p['pool'], p) for p in d['pools']]) - - def get_pools_by_name(self): - d = self._dump() - return dict([(p['pool_name'], p) for p in d['pools']]) - - def pg_to_up_acting_osds(self, pool_id, ps): - return self._pg_to_up_acting_osds(pool_id, ps) - -class TestModule(object): - # Testing Module Class - - def setup(self): - # Creating the class and Mocking a - # bunch of attributes for testing - - module.PgRecoveryEvent.pg_update = Mock() - self.test_module = module.Module() # so we can see if an event gets created - self.test_module.log = Mock() # we don't need to log anything - self.test_module.get = Mock() # so we can call pg_update - self.test_module._complete = Mock() # we want just to see if this event gets called - self.test_module.get_osdmap = Mock() # so that self.get_osdmap().get_epoch() works - module._module = Mock() # so that Event.refresh() works - - def test_osd_in_out(self): - # test for the correct event being - # triggered and completed. - - old_pg_stats = { - "pg_stats":[ - { - "pg_id": "1.0", - "up_primary": 3, - "acting_primary": 3, - "up": [ - 3, - 0 - ], - "acting": [ - 3, - 0 - ] - - }, - - ] - } - new_pg_stats = { - "pg_stats":[ - { - "pg_id": "1.0", - "up_primary": 0, - "acting_primary": 0, - "up": [ - 0, - 2 - ], - "acting": [ - 0, - 2 - ] - }, - ] - } - - old_dump ={ - "pools": [ - { - "pool": 1, - "pg_num": 1 - } - ] - } - - new_dump = { - "pools": [ - { - "pool": 1, - "pg_num": 1 - } - ] - } - - new_map = OSDMap(new_dump, new_pg_stats) - old_map = OSDMap(old_dump, old_pg_stats) - self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out") - # check if only one event is created - assert len(self.test_module._events) == 1 - self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in") - # check if complete function is called - assert self.test_module._complete.call_count == 1 - # check if a PgRecovery Event was created and pg_update gets triggered - assert module.PgRecoveryEvent.pg_update.call_count == 2