import subprocess
import sys
import tempfile
+import time
import uuid
LOG = logging.getLogger('CephDisk')
def destroy_osd(self, uuid):
id = self.sh("ceph osd create " + uuid)
self.helper("control_osd stop " + id + " || true")
+ self.wait_for_osd_down(uuid)
try:
partition = self.get_journal_partition(uuid)
if partition:
ceph osd crush rm osd.{id}
""".format(id=id))
+ @staticmethod
+ def osd_up_predicate(osds, uuid):
+ for osd in osds:
+ if osd['uuid'] == uuid and 'up' in osd['state']:
+ return True
+ return False
+
+ @staticmethod
+ def wait_for_osd_up(uuid):
+ CephDisk.wait_for_osd(uuid, CephDisk.osd_up_predicate, 'up')
+
+ @staticmethod
+ def osd_down_predicate(osds, uuid):
+ found = False
+ for osd in osds:
+ if osd['uuid'] == uuid:
+ found = True
+ if 'down' in osd['state'] or ['exists'] == osd['state']:
+ return True
+ return not found
+
+ @staticmethod
+ def wait_for_osd_down(uuid):
+ CephDisk.wait_for_osd(uuid, CephDisk.osd_down_predicate, 'down')
+
+ @staticmethod
+ def wait_for_osd(uuid, predicate, info):
+ LOG.info("wait_for_osd " + info + " " + uuid)
+ for delay in (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024):
+ dump = json.loads(CephDisk.sh("ceph osd dump -f json"))
+ if predicate(dump['osds'], uuid):
+ return True
+ time.sleep(delay)
+ raise Exception('timeout waiting for osd ' + uuid + ' to be ' + info)
+
def run_osd(self, uuid, data, journal=None):
prepare = ("ceph-disk prepare --osd-uuid " + uuid +
" " + data)
" --journal-uuid " + journal_uuid +
" --dmcrypt " +
" " + disk)
- data_partition = c.get_osd_partition(osd_uuid)
- c.sh("ceph-disk activate --dmcrypt " + data_partition['path'])
+ c.wait_for_osd_up(osd_uuid)
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
assert data_partition['state'] == 'active'
c.augtool("set /files/etc/ceph/ceph.conf/global/osd_objectstore memstore")
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + disk)
- device = json.loads(c.helper("ceph-disk list --format json " + disk))[0]
+ c.wait_for_osd_up(osd_uuid)
+ device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 1
partition = device['partitions'][0]
assert partition['type'] == 'data'
c.sh("ceph-disk zap " + disk)
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + disk)
- device = json.loads(c.helper("ceph-disk list --format json " + disk))[0]
+ c.wait_for_osd_up(osd_uuid)
+ device = json.loads(c.sh("ceph-disk list --format json " + disk))[0]
assert len(device['partitions']) == 2
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
osd_uuid = str(uuid.uuid1())
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_disk)
- device = json.loads(c.helper("ceph-disk list --format json " + data_disk))[0]
+ c.wait_for_osd_up(osd_uuid)
+ device = json.loads(c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + data_disk + " " + journal_path)
c.helper("pool_read_write 1") # 1 == pool size
- device = json.loads(c.helper("ceph-disk list --format json " + data_disk))[0]
+ c.wait_for_osd_up(osd_uuid)
+ device = json.loads(c.sh("ceph-disk list --format json " + data_disk))[0]
assert len(device['partitions']) == 1
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'
c.sh("ceph-disk zap " + multipath)
c.sh("ceph-disk prepare --osd-uuid " + osd_uuid +
" " + multipath)
- device = json.loads(c.helper("ceph-disk list --format json " + multipath))[0]
+ c.wait_for_osd_up(osd_uuid)
+ device = json.loads(c.sh("ceph-disk list --format json " + multipath))[0]
assert len(device['partitions']) == 2
data_partition = c.get_osd_partition(osd_uuid)
assert data_partition['type'] == 'data'