from ceph_volume import decorators, terminal, process
from ceph_volume.api import lvm as api
-from ceph_volume.util import system, encryption, disk, arg_validators, str_to_int
+from ceph_volume.util import system, encryption, disk, arg_validators, str_to_int, merge_dict
from ceph_volume.util.device import Device
from ceph_volume.systemd import systemctl
raise RuntimeError('Unable to find any LV for zapping OSD: '
'%s' % osd_id or osd_fsid)
- devices_to_zap = ensure_associated_lvs(lvs)
+ devices_to_zap = ensure_associated_lvs(lvs, lv_tags)
return [Device(path) for path in set(devices_to_zap) if path]
-def ensure_associated_lvs(lvs):
+def ensure_associated_lvs(lvs, lv_tags={}):
"""
Go through each LV and ensure if backing devices (journal, wal, block)
are LVs or partitions, so that they can be accurately reported.
# receive a filtering for osd.1, and have multiple failed deployments
# leaving many journals with osd.1 - usually, only a single LV will be
# returned
- journal_lvs = api.get_lvs(tags={'ceph.type': 'journal'})
- db_lvs = api.get_lvs(tags={'ceph.type': 'db'})
- wal_lvs = api.get_lvs(tags={'ceph.type': 'wal'})
+
+ journal_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'journal'}))
+ db_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'db'}))
+ wal_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'wal'}))
backing_devices = [(journal_lvs, 'journal'), (db_lvs, 'db'),
(wal_lvs, 'wal')]
import os
import pytest
from copy import deepcopy
+from mock.mock import patch, call
from ceph_volume.api import lvm as api
from ceph_volume.devices.lvm import zap
assert '/dev/VolGroup/lvwal' in result
assert '/dev/VolGroup/lvdb' in result
+ @patch('ceph_volume.devices.lvm.zap.api.get_lvs')
+ def test_ensure_associated_lvs(self, m_get_lvs):
+ zap.ensure_associated_lvs([], lv_tags={'ceph.osd_id': '1'})
+ calls = [
+ call(tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}),
+ call(tags={'ceph.type': 'db', 'ceph.osd_id': '1'}),
+ call(tags={'ceph.type': 'wal', 'ceph.osd_id': '1'})
+ ]
+ m_get_lvs.assert_has_calls(calls, any_order=True)
+
class TestWipeFs(object):