import json
-import time
from contextlib import contextmanager
+import fnmatch
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
return 0, '', ''
+def match_glob(val, pat):
+ ok = fnmatch.fnmatchcase(val, pat)
+ if not ok:
+ assert pat in val
+
class TestCephadm(object):
@contextmanager
ServiceDescription(service_instance='mon.a')
]
new_mon = cephadm_module.get_unique_name('myhost', existing, 'mon')
- assert new_mon.startswith('mon.')
- assert new_mon != 'mon.a'
- assert '.myhost.' in new_mon
+ match_glob(new_mon, 'mon.myhost.*')
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.update_mgrs(ServiceSpec(placement=ps))
[out] = wait(cephadm_module, c)
- assert "Deployed mgr." in out
- assert " on host 'test'" in out
+ match_glob(out, "Deployed mgr.* on host 'test'")
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_mds(ServiceSpec('name', placement=ps))
[out] = wait(cephadm_module, c)
- assert "Deployed mds.name." in out
- assert " on host 'test'" in out
+ match_glob(out, "Deployed mds.name.* on host 'test'")
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps))
[out] = wait(cephadm_module, c)
- assert "Deployed rbd-mirror." in out
- assert " on host 'test'" in out
+ match_glob(out, "Deployed rbd-mirror.* on host 'test'")
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")