import argparse
import pytest
import ceph_disk
+import StringIO
def fail_to_mount(dev, fstype, options):
raise ceph_disk.MountError(dev + " mount fail")
self.assertRaises(IOError, ceph_disk.main_deactivate, args)
# clear the created file by unit test
- def test_main_deactivate_non_exists_by_dev(self):
+ def test_main_deactivate_non_exists_non_cluster_by_dev(self):
args = ceph_disk.parse_args(['deactivate', \
- '--cluster', 'ceph', \
'/dev/Xda1'])
self.assertRaises(Exception, ceph_disk.main_deactivate, args)
def test_main_deactivate_osd_in_and_down_by_dev(self):
args = ceph_disk.parse_args(['deactivate', \
'--cluster', 'ceph', \
+ '--mark-out', \
'/dev/Xda1'])
patcher = patch('os.path.exists')
patch_path = patcher.start()
is_mounted=lambda dev: mount_path,
get_oneliner=lambda mount_path, filen: 5566,
_check_osd_status=lambda ceph, status: 2,
+ _mark_osd_out=lambda ceph, osd_id: True
):
ceph_disk.main_deactivate(args)
def test_main_deactivate_osd_in_and_up_by_dev(self):
args = ceph_disk.parse_args(['deactivate', \
'--cluster', 'ceph', \
+ '--mark-out', \
'/dev/Xda1'])
patcher = patch('os.path.exists')
patch_path = patcher.start()
is_mounted=lambda dev: mount_path,
get_oneliner=lambda mount_path, filen: 5566,
_check_osd_status=lambda ceph, osd_id: 3,
+ _mark_osd_out=lambda ceph, osd_id: True,
stop_daemon=lambda ceph, osd_id: True,
_remove_osd_directory_files=lambda mount_path, ceph: True,
unmount=lambda path: True,
):
self.assertRaises(Exception, ceph_disk._mark_osd_out, 'ceph', '5566')
- def test_stop_daemon_fail(self):
- dev = {
- 'cluster': 'ceph',
- 'osd_id': '5566',
- }
+ def test_check_osd_status_fail(self):
+ with patch.multiple(
+ ceph_disk,
+ command=raise_command_error,
+ ):
+ self.assertRaises(Exception, ceph_disk._check_osd_status, 'ceph', '5566')
+
+ def test_check_osd_status_osd_not_found(self):
+
+ fake_value = '{"osds":[{"osd":0,"up":1,"in":1},{"osd":1,"up":1,"in":1}]}'
+
+ def return_fake_value(cmd):
+ return fake_value, 0
+
+ with patch.multiple(
+ ceph_disk,
+ command=return_fake_value,
+ ):
+ #ceph_disk._check_osd_status('ceph', '5566')
+ self.assertRaises(Exception, ceph_disk._check_osd_status, 'ceph', '5566')
+
+ def test_check_osd_status_success(self):
- def stop_daemon_fail():
- raise Error('ceph osd stop failed')
+ fake_value = '{"osds":[{"osd":0,"up":1,"in":1},{"osd":5566,"up":1,"in":1}]}'
+
+ def return_fake_value(cmd):
+ return fake_value, 0
with patch.multiple(
ceph_disk,
+ command=return_fake_value,
+ ):
+ ceph_disk._check_osd_status('ceph', '5566')
+
+ @patch('os.path.exists', return_value=False)
+ def test_stop_daemon_fail_all_init_type(self, mock_path_exists):
+ self.assertRaises(Exception, ceph_disk.stop_daemon, 'ceph', '5566')
+
+ @patch('os.path.exists', return_value=Exception)
+ def test_stop_daemon_fail_on_os_path_check(self, mock_path_exists):
+ self.assertRaises(Exception, ceph_disk.stop_daemon, 'ceph', '5566')
+
+ def test_stop_daemon_fail_upstart(self):
+ STATEDIR = '/var/lib/ceph'
+ cluster = 'ceph'
+ osd_id = '5566'
+
+ path = (STATEDIR + '/osd/{cluster}-{osd_id}/upstart').format(
+ cluster=cluster, osd_id=osd_id)
+
+ def path_file_test(check_path):
+ if check_path == path:
+ return True
+ else:
+ False
+
+ def stop_daemon_fail(cmd):
+ raise Exception('ceph osd stop failed')
+
+ patcher = patch('os.path.exists')
+ check_path = patcher.start()
+ check_path.side_effect = path_file_test
+ with patch.multiple(
+ ceph_disk,
+ check_path,
command_check_call=stop_daemon_fail,
):
self.assertRaises(Exception, ceph_disk.stop_daemon, 'ceph', '5566')
+ def test_stop_daemon_fail_sysvinit_usr_sbin_service(self):
+ STATEDIR = '/var/lib/ceph'
+ cluster = 'ceph'
+ osd_id = '5566'
+
+ path = (STATEDIR + '/osd/{cluster}-{osd_id}/sysvinit').format(
+ cluster=cluster, osd_id=osd_id)
+
+ def path_file_test(check_path):
+ if check_path == path:
+ return True
+ elif check_path == '/usr/sbin/service':
+ return True
+ else:
+ False
+
+ def stop_daemon_fail(cmd):
+ raise Exception('ceph osd stop failed')
+
+ patcher = patch('os.path.exists')
+ check_path = patcher.start()
+ check_path.side_effect = path_file_test
+ with patch.multiple(
+ ceph_disk,
+ check_path,
+ #join_path,
+ command_check_call=stop_daemon_fail,
+ ):
+ self.assertRaises(Exception, ceph_disk.stop_daemon, 'ceph', '5566')
+
+ def test_stop_daemon_fail_sysvinit_sbin_service(self):
+ STATEDIR = '/var/lib/ceph'
+ cluster = 'ceph'
+ osd_id = '5566'
+
+ path = (STATEDIR + '/osd/{cluster}-{osd_id}/sysvinit').format(
+ cluster=cluster, osd_id=osd_id)
+
+ def path_file_test(check_path):
+ if check_path == path:
+ return True
+ elif check_path == '/sbin/service':
+ return True
+ else:
+ False
+
+ def stop_daemon_fail(cmd):
+ raise Exception('ceph osd stop failed')
+
+ patcher = patch('os.path.exists')
+ check_path = patcher.start()
+ check_path.side_effect = path_file_test
+ with patch.multiple(
+ ceph_disk,
+ check_path,
+ command_check_call=stop_daemon_fail,
+ ):
+ self.assertRaises(Exception, ceph_disk.stop_daemon, 'ceph', '5566')
+
+ def test_stop_daemon_fail_systemd_disable_stop(self):
+ STATEDIR = '/var/lib/ceph'
+ cluster = 'ceph'
+ osd_id = '5566'
+
+ path = (STATEDIR + '/osd/{cluster}-{osd_id}/systemd').format(
+ cluster=cluster, osd_id=osd_id)
+
+ def path_file_test(check_path):
+ if check_path == path:
+ return True
+ else:
+ False
+
+ def stop_daemon_fail(cmd):
+ if 'stop' in cmd:
+ raise Exception('ceph osd stop failed')
+ else:
+ return True
+
+ patcher = patch('os.path.exists')
+ check_path = patcher.start()
+ check_path.side_effect = path_file_test
+ with patch.multiple(
+ ceph_disk,
+ check_path,
+ command_check_call=stop_daemon_fail,
+ ):
+ self.assertRaises(Exception, ceph_disk.stop_daemon, 'ceph', '5566')
+
+ def test_convert_osd_id(self):
+ file_output = StringIO.StringIO('/dev/sdX1 /var/lib/ceph/osd/ceph-1234 xfs rw,noatime 0 0\n' \
+ '/dev/sdX1 /var/lib/ceph/osd/ceph-5566 xfs rw,noatime 0 0\n')
+ with patch('__builtin__.open', return_value=file_output):
+ ceph_disk.convert_osd_id('ceph', '5566')
+
+ def test_convert_osd_id_not_found(self):
+ file_output = StringIO.StringIO('/dev/sdX1 /var/lib/ceph/osd/ceph-1234 xfs rw,noatime 0 0\n' \
+ '/dev/sdY1 /var/lib/ceph/osd/ceph-5678 xfs rw,noatime 0 0\n')
+ with patch('__builtin__.open', return_value=file_output):
+ self.assertRaises(Exception, ceph_disk.convert_osd_id, 'ceph', '5566')
+
+ def test_convert_osd_id_get_mounts_fail(self):
+ with patch('__builtin__.open', return_value=Exception):
+ self.assertRaises(Exception, ceph_disk.convert_osd_id, 'ceph', '5566')
+
def test_convert_osd_id_fail(self):
dev = {
'cluster': 'ceph',
}
self.assertRaises(Exception, ceph_disk.convert_osd_id, dev)
- def test_remove_osd_directory_files(self):
+ @patch('os.remove', return_value=True)
+ def test_remove_osd_directory_files(self, mock_remove):
cluster = 'ceph'
init_file = 'init'
with patch.multiple(
):
ceph_disk._remove_osd_directory_files('/somewhere', cluster)
+ def test_remove_osd_directory_files_remove_OSError(self):
+ cluster = 'ceph'
+ init_file = 'init'
+ with patch.multiple(
+ ceph_disk,
+ get_conf=lambda cluster, **kwargs: None,
+ init_get=lambda : init_file
+ ):
+ ceph_disk._remove_osd_directory_files('/somewhere', cluster)
+
+ @patch('os.path.exists', return_value=False)
+ def test_remove_osd_directory_files_already_remove(self, mock_exists):
+ cluster = 'ceph'
+ init_file = 'upstart'
+ with patch.multiple(
+ ceph_disk,
+ get_conf=lambda cluster, **kwargs: init_file,
+ ):
+ ceph_disk._remove_osd_directory_files('/tmp', cluster)
+
def test_path_set_context(self):
path = '/somewhere'
with patch.multiple(
):
ceph_disk.main_destroy(args)
- def test_main_destroy_without_zap_by_id(self):
+ def test_main_destroy_with_zap_find_part_fail_by_id(self):
args = ceph_disk.parse_args(['destroy', \
'--cluster', 'ceph', \
+ '--zap', \
'--destroy-by-id', '5566'])
cluster = 'ceph'
osd_id = '5566'
+ fake_part_return = {'Xda': ['Xda1'], 'Xdb': []}
+ disk = 'Xda'
+ partition = 'Xda1'
with patch.multiple(
ceph_disk,
_check_osd_status=lambda cluster, osd_id: 0,
_remove_from_crush_map=lambda cluster, osd_id: True,
_delete_osd_auth_key=lambda cluster, osd_id: True,
_deallocate_osd_id=lambda cluster, osd_id: True,
+ list_all_partitions=lambda names: fake_part_return,
+ split_dev_base_partnum=lambda names: (disk, 1)
):
- ceph_disk.main_destroy(args)
+ self.assertRaises(Exception, ceph_disk.main_destroy, args)
- def test_main_destroy_with_zap_find_part_fail_by_id(self):
+ def test_main_destroy_with_zap_mount_part_fail_by_id(self):
args = ceph_disk.parse_args(['destroy', \
'--cluster', 'ceph', \
'--zap', \
'--destroy-by-id', '5566'])
cluster = 'ceph'
osd_id = '5566'
- disk = "Xda"
- partition = "Xda1"
+ fake_part_return = {'Xda': ['Xda1'], 'Xdb': []}
+ disk = 'Xda'
+ partition = 'Xda1'
+ fstype = 'ext4'
with patch.multiple(
ceph_disk,
_check_osd_status=lambda cluster, osd_id: 0,
_remove_from_crush_map=lambda cluster, osd_id: True,
_delete_osd_auth_key=lambda cluster, osd_id: True,
_deallocate_osd_id=lambda cluster, osd_id: True,
- list_all_partitions=lambda names: { disk: [partition] },
- split_dev_base_partnum=lambda names: (disk, 1)
+ list_all_partitions=lambda names: fake_part_return,
+ split_dev_base_partnum=lambda names: (disk, 1),
+ get_dev_fs=lambda dev:fstype,
+ mount=lambda dev, fstype, options:Exception
):
self.assertRaises(Exception, ceph_disk.main_destroy, args)
):
self.assertRaises(Exception, ceph_disk.main_destroy, args)
- def test_main_destroy_non_exist_by_dev(self):
+ @patch('os.path.exists', return_value=False)
+ def test_main_destroy_non_exist_non_cluster_by_dev(self, mock_exists):
args = ceph_disk.parse_args(['destroy', \
- '--cluster', 'ceph', \
'/dev/Xda1'])
self.assertRaises(Exception, ceph_disk.main_destroy, args)