def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+ daemon_spec.ports = [self.mgr.agent_starting_port]
if not self.mgr.http_server.agent:
raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
)
+class TestAgent:
+ @patch('cephadm.cert_mgr.CertMgr.get_root_ca', lambda instance: cephadm_root_ca)
+ @patch('cephadm.cert_mgr.CertMgr.generate_cert',
+ lambda instance, test, ip: (ceph_generated_cert, ceph_generated_key))
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_deploy_cephadm_agent(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ agent_spec = ServiceSpec(service_type="agent", placement=PlacementSpec(count=1))
+ agent_config = {"agent.json": "{\"target_ip\": \"::1\", \"target_port\": 7150, \"refresh_period\": 20, \"listener_port\": 4721, \"host\": \"test\", \"device_enhanced_scan\": \"False\"}", "keyring": "[client.agent.test]\nkey = None\n", "root_cert.pem": f"{cephadm_root_ca}", "listener.crt": f"{ceph_generated_cert}", "listener.key": f"{ceph_generated_key}"}
+
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, agent_spec):
+ _run_cephadm.assert_called_with(
+ "test",
+ "agent.test",
+ ['_orch', 'deploy'],
+ [],
+ stdin=json.dumps({
+ "fsid": "fsid",
+ "name": 'agent.test',
+ "image": '',
+ "deploy_arguments": [],
+ "params": {
+ 'tcp_ports': [4721],
+ },
+ "meta": {
+ 'service_name': 'agent',
+ 'ports': [4721],
+ 'ip': None,
+ 'deployed_by': [],
+ 'rank': None,
+ 'rank_generation': None,
+ 'extra_container_args': None,
+ 'extra_entrypoint_args': None,
+ },
+ "config_blobs": agent_config,
+ }),
+ error_ok=True,
+ use_current_daemon_image=False,
+ )
+
+
class TestCustomContainer:
@patch("cephadm.serve.CephadmServe._run_cephadm")
def test_deploy_custom_container(