osd_map = self.get('osd_map')
r = {}
for o in osd_map['osds']:
- # only include OSDs that have ever started in this map. this way
- # an interrupted osd create can be repeated and succeed the second
- # time around.
+ # when only_up, only include OSDs that are currently up, so we do not
+ # treat a "just created" (still down) osd as "already present" for
+ # deploy_osd_daemons_for_existing_osds().
osd_id = o.get('osd')
if osd_id is None:
raise OrchestratorError("Could not retrieve osd_id from osd_map")
- if not only_up:
+ if only_up:
+ if o.get('up') == 1:
+ r[str(osd_id)] = o.get('uuid', '')
+ else:
r[str(osd_id)] = o.get('uuid', '')
return r
import json
import logging
-from asyncio import gather
+from asyncio import gather, to_thread
from threading import Lock
from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
assert replace_osd_ids is not None
+ # ceph-volume registers new OSDs with the monitor before returning.
+ # the mgr's view of the osd map can briefly lag, so get_osd_uuid_map()
+ # would miss the new id and we would skip deploying the cephadm
+ # daemon (misleading "Created no osd(s)" while the osd exists but is still down).
+ # wait_for_latest_osdmap() is synchronous:
+ # We need to run it in a thread pool so we do not block the cephadm asyncio event loop.
+ ret = await to_thread(self.mgr.rados.wait_for_latest_osdmap)
+ if ret < 0:
+ raise OrchestratorError(
+ 'wait_for_latest_osdmap failed with %d' % ret)
+
# check result: lvm
osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
host, 'osd', 'ceph-volume',
@contextmanager
def with_osd_daemon(cephadm_module: CephadmOrchestrator, _run_cephadm, host: str, osd_id: int, ceph_volume_lvm_list=None):
+ # OSD is in the cluster map but still down (no cephadm daemon yet). If up==1,
+ # get_osd_uuid_map(only_up=True) would list it in before_osd_uuid_map and
+ # deploy_osd_daemons_for_existing_osds would skip deploying the daemon.
cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
'osds': [
{
'osd': 1,
'up_from': 0,
- 'up': True,
+ 'up': 0,
'uuid': 'uuid'
}
]
class TestCephadm(object):
+ def test_get_osd_uuid_map_includes_down_osds_when_only_up_false(self, cephadm_module):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {'osd': 0, 'up': 1, 'uuid': 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'},
+ {'osd': 1, 'up': 0, 'uuid': 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'},
+ ]
+ })
+ m = cephadm_module.get_osd_uuid_map(only_up=False)
+ assert m == {
+ '0': 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ '1': 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
+ }
+
+ def test_get_osd_uuid_map_only_up_includes_only_up_osds(self, cephadm_module):
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {'osd': 0, 'up': 1, 'uuid': 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'},
+ {'osd': 1, 'up': 0, 'uuid': 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'},
+ ]
+ })
+ m = cephadm_module.get_osd_uuid_map(only_up=True)
+ assert m == {'0': 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'}
+
+ @mock.patch('cephadm.services.osd.to_thread', new_callable=mock.AsyncMock)
+ def test_deploy_osd_daemons_wait_for_latest_osdmap_failure(self, mock_to_thread, cephadm_module):
+ mock_to_thread.return_value = -5
+ with pytest.raises(OrchestratorError, match='wait_for_latest_osdmap failed'):
+ cephadm_module.wait_async(
+ cephadm_module.osd_service.deploy_osd_daemons_for_existing_osds(
+ 'test',
+ DriveGroupSpec(service_type='osd', service_id=''),
+ replace_osd_ids=[]))
+
def test_get_unique_name(self, cephadm_module):
# type: (CephadmOrchestrator) -> None
existing = [
dd2.status = DaemonDescriptionStatus.error
cephadm_module.cache.update_host_daemons(dd1.hostname, {dd1.name(): dd1})
cephadm_module.cache.update_host_daemons(dd2.hostname, {dd2.name(): dd2})
+ # _check_for_moved_osds removes the stray duplicate only if
+ # get_osd_by_id reports the osd as up
+ cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+ 'osds': [
+ {'osd': 1, 'up_from': 0, 'up': 1, 'uuid': 'uuid'}
+ ]
+ })
CephadmServe(cephadm_module)._check_for_moved_osds()
assert len(cephadm_module.cache.get_daemons()) == 1