assert fake_enable.calls == []
assert fake_start_osd.calls == []
+ def test_filestore_no_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
+ fake_enable = Capture()
+ fake_start_osd = Capture()
+ monkeypatch.setattr('ceph_volume.util.system.device_is_mounted', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
+ monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
+ monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
+ JournalVolume = api.Volume(
+ lv_name='journal',
+ lv_path='/dev/vg/journal',
+ lv_uuid='000',
+ lv_tags=','.join([
+ "ceph.cluster_name=ceph", "ceph.journal_device=/dev/vg/journal",
+ "ceph.journal_uuid=000", "ceph.type=journal",
+ "ceph.osd_id=0", "ceph.osd_fsid=1234"])
+ )
+ DataVolume = api.Volume(
+ lv_name='data',
+ lv_path='/dev/vg/data',
+ lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/journal,ceph.journal_uuid=000,ceph.type=data,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes.append(DataVolume)
+ volumes.append(JournalVolume)
+ monkeypatch.setattr(api, 'Volumes', lambda: volumes)
+ args = Args(osd_id=None, osd_fsid='1234', no_systemd=True, filestore=True, auto_detect_objectstore=True)
+ activate.Activate([]).activate(args)
+ assert fake_enable.calls == []
+ assert fake_start_osd.calls == []
+
+ def test_filestore_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ fake_enable = Capture()
+ fake_start_osd = Capture()
+ monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
+ monkeypatch.setattr('ceph_volume.util.system.device_is_mounted', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
+ monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
+ monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
+ JournalVolume = api.Volume(
+ lv_name='journal',
+ lv_path='/dev/vg/journal',
+ lv_uuid='000',
+ lv_tags=','.join([
+ "ceph.cluster_name=ceph", "ceph.journal_device=/dev/vg/journal",
+ "ceph.journal_uuid=000", "ceph.type=journal",
+ "ceph.osd_id=0","ceph.osd_fsid=1234"])
+ )
+ DataVolume = api.Volume(
+ lv_name='data',
+ lv_path='/dev/vg/data',
+ lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/journal,ceph.journal_uuid=000,ceph.type=data,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes.append(DataVolume)
+ volumes.append(JournalVolume)
+ monkeypatch.setattr(api, 'Volumes', lambda: volumes)
+ args = Args(osd_id=None, osd_fsid='1234', no_systemd=False, filestore=True, auto_detect_objectstore=False)
+ activate.Activate([]).activate(args)
+ assert fake_enable.calls != []
+ assert fake_start_osd.calls != []
+
def test_filestore_systemd(self, is_root, volumes, monkeypatch, capture):
fake_enable = Capture()
fake_start_osd = Capture()
assert fake_enable.calls != []
assert fake_start_osd.calls != []
+ def test_bluestore_no_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ fake_enable = Capture()
+ fake_start_osd = Capture()
+ monkeypatch.setattr('ceph_volume.util.system.path_is_mounted', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
+ monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
+ monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
+ DataVolume = api.Volume(
+ lv_name='data',
+ lv_path='/dev/vg/data',
+ lv_tags="ceph.cluster_name=ceph,,ceph.block_uuid=000,ceph.type=block,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes.append(DataVolume)
+ monkeypatch.setattr(api, 'Volumes', lambda: volumes)
+ args = Args(osd_id=None, osd_fsid='1234', no_systemd=True, bluestore=True, auto_detect_objectstore=True)
+ activate.Activate([]).activate(args)
+ assert fake_enable.calls == []
+ assert fake_start_osd.calls == []
+
+ def test_bluestore_systemd_autodetect(self, is_root, volumes, monkeypatch, capture):
+ fake_enable = Capture()
+ fake_start_osd = Capture()
+ monkeypatch.setattr('ceph_volume.util.system.path_is_mounted', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
+ monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
+ monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
+ monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
+ DataVolume = api.Volume(
+ lv_name='data',
+ lv_path='/dev/vg/data',
+ lv_tags="ceph.cluster_name=ceph,,ceph.journal_uuid=000,ceph.type=block,ceph.osd_id=0,ceph.osd_fsid=1234")
+ volumes.append(DataVolume)
+ monkeypatch.setattr(api, 'Volumes', lambda: volumes)
+ args = Args(osd_id=None, osd_fsid='1234', no_systemd=False, bluestore=True, auto_detect_objectstore=False)
+ activate.Activate([]).activate(args)
+ assert fake_enable.calls != []
+ assert fake_start_osd.calls != []
class TestActivateFlags(object):