From: Adam King Date: Fri, 28 Oct 2022 20:10:28 +0000 (-0400) Subject: cephadm/tests: unit tests for CephadmAgent class X-Git-Tag: v18.1.0~552^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7e3bc078294793348310a4ca67c4c42f5d6aafe2;p=ceph.git cephadm/tests: unit tests for CephadmAgent class Part of: https://tracker.ceph.com/issues/57621 Signed-off-by: Adam King --- diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py new file mode 100644 index 000000000000..ee7a45ef2abe --- /dev/null +++ b/src/cephadm/tests/test_agent.py @@ -0,0 +1,541 @@ +from unittest import mock +import copy, json, os, threading + +import pytest + +from tests.fixtures import with_cephadm_ctx, cephadm_fs, import_cephadm + +_cephadm = import_cephadm() + + +FSID = "beefbeef-beef-beef-1234-beefbeefbeef" +AGENT_ID = 'host1' +AGENT_DIR = f'/var/lib/ceph/{FSID}/agent.{AGENT_ID}' + + +def test_agent_validate(): + required_files = _cephadm.CephadmAgent.required_files + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + for i in range(len(required_files)): + incomplete_files = {s: 'text' for s in [f for j, f in enumerate(required_files) if j != i]} + with pytest.raises(_cephadm.Error, match=f'required file missing from config: {required_files[i]}'): + agent.validate(incomplete_files) + all_files = {s: 'text' for s in required_files} + agent.validate(all_files) + + +def _check_file(path, content): + assert os.path.exists(path) + with open(path) as f: + fcontent = f.read() + assert fcontent == content + + +@mock.patch('cephadm.call_throws') +def test_agent_deploy_daemon_unit(_call_throws, cephadm_fs): + _call_throws.return_value = ('', '', 0) + agent_id = AGENT_ID + + with with_cephadm_ctx([]) as ctx: + ctx.meta_json = json.dumps({'meta': 'data'}) + agent = _cephadm.CephadmAgent(ctx, FSID, agent_id) + cephadm_fs.create_dir(AGENT_DIR) + + with pytest.raises(_cephadm.Error, match='Agent needs a config'): + agent.deploy_daemon_unit() + + config = {s: f'text for {s}' for s in _cephadm.CephadmAgent.required_files} + config['not-required-file.txt'] = 'don\'t write me' + + agent.deploy_daemon_unit(config) + + # check required config file were all created + for fname in _cephadm.CephadmAgent.required_files: + _check_file(f'{AGENT_DIR}/{fname}', f'text for {fname}') + + # assert non-required file was not written + assert not os.path.exists(f'{AGENT_DIR}/not-required-file.txt') + + # check unit.run file was created correctly + _check_file(f'{AGENT_DIR}/unit.run', agent.unit_run()) + + # check unit.meta file created correctly + _check_file(f'{AGENT_DIR}/unit.meta', json.dumps({'meta': 'data'}, indent=4) + '\n') + + # check unit file was created correctly + _check_file(f'{ctx.unit_dir}/{agent.unit_name()}', agent.unit_file()) + + expected_call_throws_calls = [ + mock.call(ctx, ['systemctl', 'daemon-reload']), + mock.call(ctx, ['systemctl', 'enable', '--now', agent.unit_name()]), + ] + _call_throws.assert_has_calls(expected_call_throws_calls) + + expected_call_calls = [ + mock.call(ctx, ['systemctl', 'stop', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG), + mock.call(ctx, ['systemctl', 'reset-failed', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG), + ] + _cephadm.call.assert_has_calls(expected_call_calls) + + +@mock.patch('threading.Thread.is_alive') +def test_agent_shutdown(_is_alive): + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + _is_alive.return_value = True + assert agent.stop == False + assert agent.mgr_listener.stop == False + agent.shutdown() + assert agent.stop == True + assert agent.mgr_listener.stop == True + + +def test_agent_wakeup(): + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + assert agent.event.is_set() == False + agent.wakeup() + assert agent.event.is_set() == True + + +@mock.patch("cephadm.CephadmAgent.shutdown") +@mock.patch("cephadm.AgentGatherer.update_func") +def test_pull_conf_settings(_update_func, _shutdown, cephadm_fs): + target_ip = '192.168.0.0' + target_port = 9876 + refresh_period = 20 + listener_port = 5678 + host = AGENT_ID + device_enhanced_scan = 'True' + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + full_config = { + 'target_ip': target_ip, + 'target_port': target_port, + 'refresh_period': refresh_period, + 'listener_port': listener_port, + 'host': host, + 'device_enhanced_scan': device_enhanced_scan + } + cephadm_fs.create_dir(AGENT_DIR) + with open(agent.config_path, 'w') as f: + f.write(json.dumps(full_config)) + + with pytest.raises(_cephadm.Error, match="Failed to get agent keyring:"): + agent.pull_conf_settings() + _shutdown.assert_called() + with open(agent.keyring_path, 'w') as f: + f.write('keyring') + + assert agent.device_enhanced_scan == False + agent.pull_conf_settings() + assert agent.host == host + assert agent.target_ip == target_ip + assert agent.target_port == target_port + assert agent.loop_interval == refresh_period + assert agent.starting_port == listener_port + assert agent.device_enhanced_scan == True + assert agent.keyring == 'keyring' + _update_func.assert_called() + + full_config.pop('target_ip') + with open(agent.config_path, 'w') as f: + f.write(json.dumps(full_config)) + with pytest.raises(_cephadm.Error, match="Failed to get agent target ip and port from config:"): + agent.pull_conf_settings() + + +@mock.patch("cephadm.command_ceph_volume") +def test_agent_ceph_volume(_ceph_volume): + + def _ceph_volume_outputter(_): + print("ceph-volume output") + + def _ceph_volume_empty(_): + pass + + with with_cephadm_ctx([]) as ctx: + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + + _ceph_volume.side_effect = _ceph_volume_outputter + out, _ = agent._ceph_volume(False) + assert ctx.command == ['inventory', '--format=json'] + assert out == "ceph-volume output\n" + + out, _ = agent._ceph_volume(True) + assert ctx.command == ['inventory', '--format=json', '--with-lsm'] + assert out == "ceph-volume output\n" + + _ceph_volume.side_effect = _ceph_volume_empty + with pytest.raises(Exception, match='ceph-volume returned empty value'): + out, _ = agent._ceph_volume(False) + + +def test_agent_daemon_ls_subset(cephadm_fs): + # Basing part of this test on some actual sample output + + # Some sample "podman stats --format '{{.ID}},{{.MemUsage}}' --no-stream" output + # 3f2b31d19ecd,456.4MB / 41.96GB + # 5aca2499e0f8,7.082MB / 41.96GB + # fe0cef07d5f7,35.91MB / 41.96GB + + # Sample "podman ps --format '{{.ID}},{{.Names}}' --no-trunc" output with the same containers + # fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mon-host1 + # 3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mgr-host1-pntmho + # 5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-crash-host1 + + # Some of the components from that output + mgr_cid = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f' + mon_cid = 'fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30' + crash_cid = '5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a' + mgr_short_cid = mgr_cid[0:12] + mon_short_cid = mon_cid[0:12] + crash_short_cid = crash_cid[0:12] + + #Rebuilding the output but with our testing FSID and components (to allow alteration later for whatever reason) + mem_out = f"""{mgr_short_cid},456.4MB / 41.96GB +{crash_short_cid},7.082MB / 41.96GB +{mon_short_cid},35.91MB / 41.96GB""" + + ps_out = f"""{mon_cid},ceph-{FSID}-mon-host1 +{mgr_cid},ceph-{FSID}-mgr-host1-pntmho +{crash_cid},ceph-{FSID}-crash-host1""" + + def _fake_call(ctx, cmd, desc=None, verbosity=_cephadm.CallVerbosity.VERBOSE_ON_FAILURE, timeout=_cephadm.DEFAULT_TIMEOUT, **kwargs): + if 'stats' in cmd: + return (mem_out, '', 0) + elif 'ps' in cmd: + return (ps_out, '', 0) + return ('out', 'err', 0) + + cephadm_fs.create_dir(AGENT_DIR) + cephadm_fs.create_dir(f'/var/lib/ceph/mon/ceph-host1') # legacy daemon + cephadm_fs.create_dir(f'/var/lib/ceph/osd/nothing') # improper directory, should be skipped + cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/mgr.host1.pntmho') # cephadm daemon + cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/crash.host1') # cephadm daemon + + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + _cephadm.call.side_effect = _fake_call + daemons = agent._daemon_ls_subset() + + assert 'agent.host1' in daemons + assert 'mgr.host1.pntmho' in daemons + assert 'crash.host1' in daemons + assert 'mon.host1' in daemons + + assert daemons['mon.host1']['style'] == 'legacy' + assert daemons['mgr.host1.pntmho']['style'] == 'cephadm:v1' + assert daemons['crash.host1']['style'] == 'cephadm:v1' + assert daemons['agent.host1']['style'] == 'cephadm:v1' + + assert daemons['mgr.host1.pntmho']['systemd_unit'] == f'ceph-{FSID}@mgr.host1.pntmho' + assert daemons['agent.host1']['systemd_unit'] == f'ceph-{FSID}@agent.host1' + assert daemons['crash.host1']['systemd_unit'] == f'ceph-{FSID}@crash.host1' + + assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid + assert daemons['crash.host1']['container_id'] == crash_cid + + assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB + assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB + + +@mock.patch("cephadm.list_daemons") +@mock.patch("cephadm.CephadmAgent._daemon_ls_subset") +def test_agent_get_ls(_ls_subset, _ls, cephadm_fs): + ls_out = [{ + "style": "cephadm:v1", + "name": "mgr.host1.pntmho", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho", + "enabled": True, + "state": "running", + "service_name": "mgr", + "memory_request": None, + "memory_limit": None, + "ports": [ + 9283, + 8765 + ], + "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f", + "container_image_name": "quay.io/ceph/ceph:testing", + "container_image_id": "3300e39269f0c13ae45026cf233d8b3fff1303d52f2598a69c7fba0bb8405164", + "container_image_digests": [ + "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30" + ], + "memory_usage": 507510784, + "cpu_percentage": "5.95%", + "version": "18.0.0-556-gb4d1a199", + "started": "2022-10-27T14:19:36.086664Z", + "created": "2022-10-27T14:19:36.282281Z", + "deployed": "2022-10-27T14:19:35.377275Z", + "configured": "2022-10-27T14:22:40.316912Z" + },{ + "style": "cephadm:v1", + "name": "agent.host1", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@agent.host1", + "enabled": True, + "state": "running", + "service_name": "agent", + "ports": [], + "ip": None, + "deployed_by": [ + "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30" + ], + "rank": None, + "rank_generation": None, + "extra_container_args": None, + "container_id": None, + "container_image_name": None, + "container_image_id": None, + "container_image_digests": None, + "version": None, + "started": None, + "created": "2022-10-27T19:46:49.751594Z", + "deployed": None, + "configured": "2022-10-27T19:46:49.751594Z" + }, { + "style": "legacy", + "name": "mon.host1", + "fsid": FSID, + "systemd_unit": "ceph-mon@host1", + "enabled": False, + "state": "stopped", + "host_version": None + }] + + ls_subset_out = { + 'mgr.host1.pntmho': { + "style": "cephadm:v1", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho", + "enabled": True, + "state": "running", + "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f", + "memory_usage": 507510784, + }, + 'agent.host1': { + "style": "cephadm:v1", + "fsid": FSID, + "systemd_unit": f"ceph-{FSID}@agent.host1", + "enabled": True, + "state": "running", + "container_id": None + }, 'mon.host1': { + "style": "legacy", + "name": "mon.host1", + "fsid": FSID, + "systemd_unit": "ceph-mon@host1", + "enabled": False, + "state": "stopped", + "host_version": None + }} + + _ls.return_value = ls_out + _ls_subset.return_value = ls_subset_out + + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + + # first pass, no cached daemon metadata + daemons, changed = agent._get_ls() + assert daemons == ls_out + assert changed + + # second pass, should recognize that daemons have not changed and just keep cached values + daemons, changed = agent._get_ls() + assert daemons == daemons + assert not changed + + # change a container id so it needs to get more info + ls_subset_out2 = copy.deepcopy(ls_subset_out) + ls_out2 = copy.deepcopy(ls_out) + ls_subset_out2['mgr.host1.pntmho']['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa' + ls_out2[0]['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa' + _ls.return_value = ls_out2 + _ls_subset.return_value = ls_subset_out2 + assert agent.cached_ls_values['mgr.host1.pntmho']['container_id'] == "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f" + daemons, changed = agent._get_ls() + assert daemons == ls_out2 + assert changed + + # run again with the same data so it should use cached values + daemons, changed = agent._get_ls() + assert daemons == ls_out2 + assert not changed + + # change the state of a container so new daemon metadata is needed + ls_subset_out3 = copy.deepcopy(ls_subset_out2) + ls_out3 = copy.deepcopy(ls_out2) + ls_subset_out3['mgr.host1.pntmho']['enabled'] = False + ls_out3[0]['enabled'] = False + _ls.return_value = ls_out3 + _ls_subset.return_value = ls_subset_out3 + assert agent.cached_ls_values['mgr.host1.pntmho']['enabled'] == True + daemons, changed = agent._get_ls() + assert daemons == ls_out3 + assert changed + + # run again with the same data so it should use cached values + daemons, changed = agent._get_ls() + assert daemons == ls_out3 + assert not changed + + # remove a daemon so new metadats is needed + ls_subset_out4 = copy.deepcopy(ls_subset_out3) + ls_out4 = copy.deepcopy(ls_out3) + ls_subset_out4.pop('mon.host1') + ls_out4.pop() + _ls.return_value = ls_out4 + _ls_subset.return_value = ls_subset_out4 + assert 'mon.host1' in agent.cached_ls_values + daemons, changed = agent._get_ls() + assert daemons == ls_out4 + assert changed + + # run again with the same data so it should use cached values + daemons, changed = agent._get_ls() + assert daemons == ls_out4 + assert not changed + + +@mock.patch("threading.Event.clear") +@mock.patch("threading.Event.wait") +@mock.patch("urllib.request.Request.__init__") +@mock.patch("cephadm.urlopen") +@mock.patch("cephadm.list_networks") +@mock.patch("cephadm.HostFacts.dump") +@mock.patch("cephadm.HostFacts.__init__", lambda _, __: None) +@mock.patch("ssl.SSLContext.load_verify_locations") +@mock.patch("threading.Thread.is_alive") +@mock.patch("cephadm.MgrListener.start") +@mock.patch("cephadm.AgentGatherer.start") +@mock.patch("cephadm.port_in_use") +@mock.patch("cephadm.CephadmAgent.pull_conf_settings") +def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, + _listener_start, _is_alive, _load_verify_locations, + _HF_dump, _list_networks, _urlopen, _RQ_init, _wait, _clear): + target_ip = '192.168.0.0' + target_port = '9999' + refresh_period = 20 + listener_port = 7770 + open_listener_port = 7777 + host = AGENT_ID + device_enhanced_scan = False + + def _fake_port_in_use(ctx, port): + if port == open_listener_port: + return False + return True + + network_data: Dict[str, Dict[str, Set[str]]] = { + "10.2.1.0/24": { + "eth1": set(["10.2.1.122"]) + }, + "192.168.122.0/24": { + "eth0": set(["192.168.122.221"]) + }, + "fe80::/64": { + "eth0": set(["fe80::5054:ff:fe3f:d94e"]), + "eth1": set(["fe80::5054:ff:fe3f:aa4a"]), + } + } + + # the json serializable version of the networks data + # we expect the agent to actually send + network_data_no_sets: Dict[str, Dict[str, List[str]]] = { + "10.2.1.0/24": { + "eth1": ["10.2.1.122"] + }, + "192.168.122.0/24": { + "eth0": ["192.168.122.221"] + }, + "fe80::/64": { + "eth0": ["fe80::5054:ff:fe3f:d94e"], + "eth1": ["fe80::5054:ff:fe3f:aa4a"], + } + } + + class FakeHTTPResponse(): + def __init__(self): + pass + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + pass + + def read(self): + return json.dumps({'valid': 'output', 'result': '400'}) + + _port_in_use.side_effect = _fake_port_in_use + _is_alive.return_value = False + _HF_dump.return_value = 'Host Facts' + _list_networks.return_value = network_data + _urlopen.side_effect = lambda *args, **kwargs: FakeHTTPResponse() + _RQ_init.side_effect = lambda *args, **kwargs: None + with with_cephadm_ctx([]) as ctx: + ctx.fsid = FSID + agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) + agent.keyring = 'agent keyring' + agent.ack = 7 + agent.volume_gatherer.ack = 7 + agent.volume_gatherer.data = 'ceph-volume inventory data' + agent.ls_gatherer.ack = 7 + agent.ls_gatherer.data = [{'valid_daemon': 'valid_metadata'}] + + def _set_conf(): + agent.target_ip = target_ip + agent.target_port = target_port + agent.loop_interval = refresh_period + agent.starting_port = listener_port + agent.host = host + agent.device_enhanced_scan = device_enhanced_scan + _pull_conf_settings.side_effect = _set_conf + + # technically the run function loops forever unless the agent + # is told to stop. To get around that we're going to have the + # event.wait() (which happens at the end of the loop) to throw + # a special exception type. If we catch this exception we can + # consider it as being a "success" run + class EventCleared(Exception): + pass + + _clear.side_effect = EventCleared('SUCCESS') + with pytest.raises(EventCleared, match='SUCCESS'): + agent.run() + + expected_data = { + 'host': host, + 'ls': [{'valid_daemon': 'valid_metadata'}], + 'networks': network_data_no_sets, + 'facts': 'Host Facts', + 'volume': 'ceph-volume inventory data', + 'ack': str(7), + 'keyring': 'agent keyring', + 'port': str(open_listener_port) + } + _RQ_init.assert_called_with( + f'https://{target_ip}:{target_port}/data/', + json.dumps(expected_data).encode('ascii'), + {'Content-Type': 'application/json'} + ) + _listener_start.assert_called() + _gatherer_start.assert_called() + _urlopen.assert_called() + + # agent should not go down if connections fail + _urlopen.side_effect = Exception() + with pytest.raises(EventCleared, match='SUCCESS'): + agent.run() + + # should fail if no ports are open for listener + _port_in_use.side_effect = lambda _, __: True + agent.listener_port = None + with pytest.raises(Exception, match='Failed to pick port for agent to listen on: All 1000 ports starting at 7770 taken.'): + agent.run()