from unittest import mock
-import copy, json, os, socket, threading
+import copy, datetime, json, os, socket, threading
import pytest
agent.mgr_listener.run()
FakeConn.send.assert_not_called()
FSS_exc_testing.accept.call_count == 3
+
+
+@mock.patch("cephadm.CephadmAgent._get_ls")
+def test_gatherer_update_func(_get_ls, cephadm_fs):
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ cephadm_fs.create_dir(AGENT_DIR)
+
+ def _sample_func():
+ return 7
+
+ agent.ls_gatherer.func()
+ _get_ls.assert_called()
+
+ _get_ls = mock.MagicMock()
+ agent.ls_gatherer.update_func(_sample_func)
+ out = agent.ls_gatherer.func()
+ assert out == 7
+ _get_ls.assert_not_called()
+
+
+@mock.patch("cephadm.CephadmAgent.wakeup")
+@mock.patch("time.monotonic")
+@mock.patch("threading.Event.wait")
+def test_gatherer_run(_wait, _time, _agent_wakeup, cephadm_fs):
+ with with_cephadm_ctx([]) as ctx:
+ ctx.fsid = FSID
+ agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
+ cephadm_fs.create_dir(AGENT_DIR)
+ agent.loop_interval = 30
+ agent.ack = 23
+
+ _sample_func = lambda *args, **kwargs: ('sample out', True)
+ agent.ls_gatherer.update_func(_sample_func)
+ agent.ls_gatherer.ack = 20
+ agent.ls_gatherer.stop = False
+
+ def _fake_clear(*args, **kwargs):
+ agent.ls_gatherer.stop = True
+
+ _time.side_effect = [0, 20, 0, 20, 0, 20] # start at time 0, complete at time 20
+ _wait.return_value = None
+
+ with mock.patch("threading.Event.clear") as _clear:
+ _clear.side_effect = _fake_clear
+ agent.ls_gatherer.run()
+
+ _wait.assert_called_with(10) # agent loop_interval - run time
+ assert agent.ls_gatherer.data == 'sample out'
+ assert agent.ls_gatherer.ack == 23
+ _agent_wakeup.assert_called_once()
+ _clear.assert_called_once()
+
+ _exc_func = lambda *args, **kwargs: Exception()
+ agent.ls_gatherer.update_func(_exc_func)
+ agent.ls_gatherer.ack = 20
+ agent.ls_gatherer.stop = False
+
+ with mock.patch("threading.Event.clear") as _clear:
+ _clear.side_effect = _fake_clear
+ agent.ls_gatherer.run()
+ assert agent.ls_gatherer.data is None
+ assert agent.ls_gatherer.ack == agent.ack
+ # should have run full loop despite exception
+ _clear.assert_called_once()
+
+ # test general exception for full coverage
+ _agent_wakeup.side_effect = [Exception()]
+ agent.ls_gatherer.update_func(_sample_func)
+ agent.ls_gatherer.stop = False
+ # just to force only one iteration
+ _time.side_effect = _fake_clear
+ with mock.patch("threading.Event.clear") as _clear:
+ _clear.side_effect = Exception()
+ agent.ls_gatherer.run()
+ assert agent.ls_gatherer.data == 'sample out'
+ assert agent.ls_gatherer.ack == agent.ack
+ # should not have gotten to end of loop
+ _clear.assert_not_called()