LOG = logging.getLogger('CephDisk')
+
class CephDisk:
def __init__(self):
return output.strip()
def unused_disks(self, pattern='[vs]d.'):
- names = filter(lambda x: re.match(pattern, x), os.listdir("/sys/block"))
+ names = filter(
+ lambda x: re.match(pattern, x), os.listdir("/sys/block"))
if not names:
return []
- disks = json.loads(self.sh("ceph-disk list --format json " + " ".join(names)))
+ disks = json.loads(
+ self.sh("ceph-disk list --format json " + " ".join(names)))
unused = []
for disk in disks:
if 'partitions' not in disk:
for partition in disk['partitions']:
if partition['path'] == journal_dev:
if 'journal_for' in partition:
- assert partition['journal_for'] == data_partition['path']
+ assert partition[
+ 'journal_for'] == data_partition['path']
return partition
- raise Exception("journal for uuid = " + uuid + " not found in " + str(disks))
+ raise Exception(
+ "journal for uuid = " + uuid + " not found in " + str(disks))
def destroy_osd(self, uuid):
id = self.sh("ceph osd create " + uuid)
journal_partition = self.get_journal_partition(uuid)
assert journal_partition
+
class TestCephDisk(object):
def setup_class(self):
def test_deactivate_reactivate_osd(self):
c = CephDisk()
- have_journal=True
+ have_journal = True
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk zap " + disk)
c.check_osd_status(osd_uuid, have_journal)
c.destroy_osd(osd_uuid)
-
def test_activate_dmcrypt_plain(self):
c = CephDisk()
c.conf['global']['osd dmcrypt type'] = 'plain'
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
tempdir = tempfile.mkdtemp()
- symlink = os.path.join(tempdir,'osd')
+ symlink = os.path.join(tempdir, 'osd')
os.symlink(disk, symlink)
c.sh("ceph-disk zap " + symlink)
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
data_disk = disks[0]
journal_disk = disks[1]
osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
- c.helper("pool_read_write 1") # 1 == pool size
+ c.helper("pool_read_write 1") # 1 == pool size
c.destroy_osd(osd_uuid)
c.sh("ceph-disk zap " + data_disk + " " + journal_disk)
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_disk)
c.wait_for_osd_up(osd_uuid)
- device = json.loads(c.sh("ceph-disk list --format json " + data_disk))[0]
+ device = json.loads(
+ c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
c.check_osd_status(osd_uuid, have_journal)
return osd_uuid
other_data_disk = disks[1]
journal_disk = disks[2]
osd_uuid = self.activate_separated_journal(data_disk, journal_disk)
- other_osd_uuid = self.activate_separated_journal(other_data_disk, journal_disk)
+ other_osd_uuid = self.activate_separated_journal(
+ other_data_disk, journal_disk)
#
# read/write can only succeed if the two osds are up because
# the pool needs two OSD
#
- c.helper("pool_read_write 2") # 2 == pool size
+ c.helper("pool_read_write 2") # 2 == pool size
c.destroy_osd(osd_uuid)
c.destroy_osd(other_osd_uuid)
- c.sh("ceph-disk zap " + data_disk + " " + journal_disk + " " + other_data_disk)
+ c.sh("ceph-disk zap " + data_disk + " " +
+ journal_disk + " " + other_data_disk)
#
# Create an OSD and reuse an existing journal partition
#
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_path)
- c.helper("pool_read_write 1") # 1 == pool size
+ c.helper("pool_read_write 1") # 1 == pool size
c.wait_for_osd_up(osd_uuid)
- device = json.loads(c.sh("ceph-disk list --format json " + data_disk))[0]
+ device = json.loads(
+ c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
c.check_osd_status(osd_uuid)
journal_partition = c.get_journal_partition(osd_uuid)
def test_activate_multipath(self):
c = CephDisk()
if c.sh("lsb_release -si") != 'CentOS':
- pytest.skip("see issue https://bugs.launchpad.net/ubuntu/+source/multipath-tools/+bug/1488688")
+ pytest.skip(
+ "see issue https://bugs.launchpad.net/ubuntu/+source/multipath-tools/+bug/1488688")
c.ensure_sd()
#
# Figure out the name of the multipath device
disk = c.unused_disks('sd.')[0]
c.sh("mpathconf --enable || true")
c.sh("multipath " + disk)
- holders = os.listdir("/sys/block/" + os.path.basename(disk) + "/holders")
+ holders = os.listdir(
+ "/sys/block/" + os.path.basename(disk) + "/holders")
assert 1 == len(holders)
name = open("/sys/block/" + holders[0] + "/dm/name").read()
multipath = "/dev/mapper/" + name
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + multipath)
c.wait_for_osd_up(osd_uuid)
- device = json.loads(c.sh("ceph-disk list --format json " + multipath))[0]
+ device = json.loads(
+ c.sh("ceph-disk list --format json " + multipath))[0]
assert len(device['partitions']) == 2
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
c.sh("multipath -F")
c.unload_scsi_debug()
+
class CephDiskTest(CephDisk):
def main(self, argv):