from ceph_volume.util.device import Device
from ceph_volume.util import system
from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.devices.lvm.zap import Zap
+
class TestGetClusterName(object):
assert not stderr
- def test_migrate_data_db_to_new_db(self, is_root, monkeypatch):
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_to_new_db(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
+ m_zap.assert_called_once()
+ @patch.object(Zap, 'main')
@patch('os.getuid')
- def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, monkeypatch):
+ def test_migrate_data_db_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
+ m_zap.assert_called_once()
+
def test_migrate_data_db_to_new_db_active_systemd(self, is_root, monkeypatch, capsys):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
assert '--> OSD is running, stop it with: systemctl stop ceph-osd@2' == stderr.rstrip()
assert not stdout
- def test_migrate_data_db_to_new_db_no_systemd(self, is_root, monkeypatch):
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_to_new_db_no_systemd(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
- def test_migrate_data_db_to_new_db_skip_wal(self, is_root, monkeypatch):
+ m_zap.assert_called_once()
+
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_to_new_db_skip_wal(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev'
source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db']
- def test_migrate_data_db_wal_to_new_db(self, is_root, monkeypatch):
+ m_zap.assert_called_once()
+
+ @patch.object(Zap, 'main')
+ def test_migrate_data_db_wal_to_new_db(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ assert len(m_zap.mock_calls) == 2
+
+ @patch.object(Zap, 'main')
@patch('os.getuid')
- def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, monkeypatch):
+ def test_migrate_data_db_wal_to_new_db_encrypted(self, m_getuid, m_zap, monkeypatch):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.db',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ assert len(m_zap.mock_calls) == 2
+
@patch('os.getuid')
def test_dont_migrate_data_db_wal_to_new_data(self,
m_getuid,
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block']
+ @patch.object(Zap, 'main')
def test_migrate_data_wal_to_db(self,
+ m_zap,
is_root,
monkeypatch,
capsys):
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ m_zap.assert_called_once()
+
+ @patch.object(Zap, 'main')
@patch('os.getuid')
def test_migrate_wal_to_db(self,
- m_getuid,
- monkeypatch,
- capsys):
+ m_getuid,
+ m_zap,
+ monkeypatch,
+ capsys):
m_getuid.return_value = 0
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ m_zap.assert_called_once()
+
+ @patch.object(Zap, 'main')
@patch('os.getuid')
def test_migrate_data_wal_to_db_encrypted(self,
m_getuid,
+ m_zap,
monkeypatch,
capsys):
m_getuid.return_value = 0
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+ m_zap.assert_called_once()
+
def test_migrate_data_wal_to_db_active_systemd(self, is_root, monkeypatch, capsys):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
assert '--> OSD is running, stop it with: systemctl stop ceph-osd@2' == stderr.rstrip()
assert not stdout
- def test_migrate_data_wal_to_db_no_systemd(self, is_root, monkeypatch):
+ @patch.object(Zap, 'main')
+ def test_migrate_data_wal_to_db_no_systemd(self, m_zap, is_root, monkeypatch):
source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \
'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \
'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev'
'--command', 'bluefs-bdev-migrate',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block',
'--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal']
+
+ m_zap.assert_called_once()
\ No newline at end of file