return ('', '', 0)
def test_init(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
source_db_tags = 'ceph.osd_id=0,journal_uuid=x,ceph.type=db, osd_fsid=1234'
source_wal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=wal'
assert 'wal' == t.old_wal_tags['ceph.type']
def test_update_tags_when_lv_create(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.journal_uuid=x,' \
'ceph.type=data,ceph.osd_fsid=1234'
'/dev/VolGroup/lv2'] == self.mock_process_input[2]
def test_remove_lvs(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.journal_uuid=x,' \
'ceph.type=data,ceph.osd_fsid=1234,ceph.wal_uuid=aaaaa'
'/dev/VolGroup/lv2'] == self.mock_process_input[2]
def test_replace_lvs(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\
'ceph.wal_uuid=wal_uuid,ceph.db_device=/dbdevice'
'/dev/VolGroup/lv_target'].sort()
def test_undo(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234'
source_db_tags = 'ceph.osd_id=0,journal_uuid=x,ceph.type=db, osd_fsid=1234'
source_wal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=wal'
expected = 'Target Logical Volume is already used by ceph: vgname/new_db'
assert expected in stderr
- @patch('os.getuid')
- def test_newdb(self, m_getuid, monkeypatch, capsys):
- m_getuid.return_value = 0
-
+ def test_newdb(self, is_root, monkeypatch, capsys):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\
'ceph.wal_uuid=wal_uuid,ceph.db_device=/dbdevice'
assert not stdout
def test_newdb_no_systemd(self, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\
'ceph.wal_uuid=wal_uuid,ceph.db_device=/dbdevice'
'--dev-target', '/dev/VolGroup/target_volume',
'--command', 'bluefs-bdev-new-db']
- @patch('os.getuid')
- def test_newwal(self, m_getuid, monkeypatch, capsys):
- m_getuid.return_value = 0
-
+ def test_newwal(self, is_root, monkeypatch, capsys):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234'
assert not stdout
def test_newwal_no_systemd(self, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = \
'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234'
@patch('os.getuid')
def test_newwal_encrypted(self, m_getuid, monkeypatch, capsys):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
source_tags = \
@patch.object(Zap, 'main')
def test_migrate_data_db_to_new_db(self, m_zap, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
@patch.object(Zap, 'main')
@patch('os.getuid')
def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
@patch.object(Zap, 'main')
def test_migrate_data_db_to_new_db_no_systemd(self, m_zap, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
@patch.object(Zap, 'main')
def test_migrate_data_db_to_new_db_skip_wal(self, m_zap, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
@patch.object(Zap, 'main')
def test_migrate_data_db_wal_to_new_db(self, m_zap, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
@patch.object(Zap, 'main')
@patch('os.getuid')
def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
m_getuid,
monkeypatch,
capsys):
- m_getuid.return_value = 0
-
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
assert not stdout
def test_migrate_data_db_to_db_no_systemd(self, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
is_root,
monkeypatch,
capsys):
-
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
m_zap,
monkeypatch,
capsys):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
m_zap,
monkeypatch,
capsys):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
@patch.object(Zap, 'main')
def test_migrate_data_wal_to_db_no_systemd(self, m_zap, is_root, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
disk = device.Device("/dev/sda")
assert disk.lvm_size.gb == 4
- def test_is_lv(self, fake_call, device_info):
+ def test_is_lv(self, fake_call, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
lsblk = {"TYPE": "lvm", "NAME": "vg-lv"}
device_info(lv=data,lsblk=lsblk)
disk = device.Device("/dev/cdrom")
assert not disk.available
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ @patch('ceph_volume.util.device.os.path.realpath')
+ @patch('ceph_volume.util.device.os.path.islink')
+ def test_reject_lv_symlink_to_device(self,
+ m_os_path_islink,
+ m_os_path_realpath,
+ device_info,
+ fake_call):
+ m_os_path_islink.return_value = True
+ m_os_path_realpath.return_value = '/dev/mapper/vg-lv'
+ lv = {"lv_path": "/dev/vg/lv", "vg_name": "vg", "name": "lv"}
+ lsblk = {"TYPE": "lvm", "NAME": "vg-lv"}
+ device_info(lv=lv,lsblk=lsblk)
+ disk = device.Device("/dev/vg/lv")
+ assert disk.path == '/dev/vg/lv'
+
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
def test_reject_smaller_than_5gb(self, fake_call, device_info):
data = {"/dev/sda": {"size": 5368709119}}
assert disk.is_encrypted is False
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_lv_is_encrypted_blkid(self, fake_call, device_info):
+ def test_lv_is_encrypted_blkid(self, fake_call, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
assert disk.is_encrypted is True
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info):
+ def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
blkid = {'TYPE': 'xfs'}
device_info(lsblk=lsblk, blkid=blkid)
assert disk.is_encrypted is False
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_lv_is_encrypted_lsblk(self, fake_call, device_info):
+ def test_lv_is_encrypted_lsblk(self, fake_call, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
lsblk = {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
assert disk.is_encrypted is True
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info):
+ def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
assert disk.is_encrypted is False
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info):
+ def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
assert disk.is_encrypted is True
@patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
- def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info):
+ def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info, monkeypatch):
+ monkeypatch.setattr('ceph_volume.util.device.Device.is_lv', lambda: True)
lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)