raise Exception("uuid = " + uuid + " not found in " + str(disks))
def get_journal_partition(self, uuid):
+ return self.get_space_partition('journal', uuid)
+
+ def get_space_partition(self, name, uuid):
data_partition = self.get_osd_partition(uuid)
- journal_dev = data_partition['journal_dev']
+ space_dev = data_partition[name + '_dev']
disks = json.loads(self.sh("ceph-disk list --format json"))
for disk in disks:
if 'partitions' in disk:
for partition in disk['partitions']:
- if partition['path'] == journal_dev:
- if 'journal_for' in partition:
+ if partition['path'] == space_dev:
+ if name + '_for' in partition:
assert partition[
- 'journal_for'] == data_partition['path']
+ name + '_for'] == data_partition['path']
return partition
raise Exception(
- "journal for uuid = " + uuid + " not found in " + str(disks))
+ name + " for uuid = " + uuid + " not found in " + str(disks))
def destroy_osd(self, uuid):
id = self.sh("ceph osd create " + uuid).strip()
time.sleep(delay)
raise Exception('timeout waiting for osd ' + uuid + ' to be ' + info)
- def check_osd_status(self, uuid, have_journal=False):
+ def check_osd_status(self, uuid, space_name=None):
data_partition = self.get_osd_partition(uuid)
assert data_partition['type'] == 'data'
assert data_partition['state'] == 'active'
- if have_journal:
- journal_partition = self.get_journal_partition(uuid)
- assert journal_partition
+ if space_name is not None:
+ space_partition = self.get_space_partition(space_name, uuid)
+ assert space_partition
class TestCephDisk(object):
def test_deactivate_reactivate_osd(self):
c = CephDisk()
- have_journal = True
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
data_partition = c.get_osd_partition(osd_uuid)
c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
c.wait_for_osd_down(osd_uuid)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
def activate_reactivate_dmcrypt(self, type):
c = CephDisk()
- have_journal = True
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
journal_uuid = str(uuid.uuid1())
" --dmcrypt " +
" " + disk)
c.wait_for_osd_up(osd_uuid)
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
data_partition = c.get_osd_partition(osd_uuid)
c.sh("ceph-disk --verbose deactivate " + data_partition['path'])
c.wait_for_osd_down(osd_uuid)
c.sh("ceph-disk --verbose activate-journal " + data_partition['journal_dev'] +
" --reactivate" + " --dmcrypt")
c.wait_for_osd_up(osd_uuid)
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
c.destroy_osd(osd_uuid)
def test_activate_dmcrypt_plain(self):
def activate_dmcrypt(self, type):
c = CephDisk()
- have_journal = True
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
journal_uuid = str(uuid.uuid1())
" --dmcrypt " +
" " + disk)
c.wait_for_osd_up(osd_uuid)
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
c.destroy_osd(osd_uuid)
def test_activate_no_journal(self):
c.destroy_osd(osd_uuid)
c.save_conf()
- def test_activate_with_journal(self):
+ def test_activate_with_journal_dev_no_symlink(self):
c = CephDisk()
- have_journal = True
disk = c.unused_disks()[0]
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose zap " + disk)
c.wait_for_osd_up(osd_uuid)
device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
c.helper("pool_read_write")
c.destroy_osd(osd_uuid)
def activate_separated_journal(self, data_disk, journal_disk):
c = CephDisk()
- have_journal = True
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk --verbose prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_disk)
device = json.loads(
c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
- c.check_osd_status(osd_uuid, have_journal)
+ c.check_osd_status(osd_uuid, 'journal')
return osd_uuid
#