]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph-volume: introduce baseobjectstore class
authorGuillaume Abrioux <gabrioux@ibm.com>
Mon, 22 May 2023 14:32:23 +0000 (16:32 +0200)
committerGuillaume Abrioux <gabrioux@redhat.com>
Wed, 15 May 2024 09:12:56 +0000 (09:12 +0000)
This introduces a refactor in order to ease the addition or removal
of an OSD objectstore backend in ceph-volume.

Fixes: https://tracker.ceph.com/issues/61827
Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
Co-authored-by: Rongqi Sun <sunrongqi@huawei.com>
(cherry picked from commit 04c93a1ed42397d814ae4001c4ee3bddeb6809de)

17 files changed:
src/ceph-volume/ceph_volume/activate/main.py
src/ceph-volume/ceph_volume/devices/lvm/activate.py
src/ceph-volume/ceph_volume/devices/lvm/batch.py
src/ceph-volume/ceph_volume/devices/lvm/common.py
src/ceph-volume/ceph_volume/devices/lvm/create.py
src/ceph-volume/ceph_volume/devices/lvm/prepare.py
src/ceph-volume/ceph_volume/devices/raw/activate.py
src/ceph-volume/ceph_volume/devices/raw/common.py
src/ceph-volume/ceph_volume/devices/raw/prepare.py
src/ceph-volume/ceph_volume/objectstore/__init__.py [new file with mode: 0644]
src/ceph-volume/ceph_volume/objectstore/baseobjectstore.py [new file with mode: 0644]
src/ceph-volume/ceph_volume/objectstore/bluestore.py [new file with mode: 0644]
src/ceph-volume/ceph_volume/objectstore/lvmbluestore.py [new file with mode: 0644]
src/ceph-volume/ceph_volume/objectstore/rawbluestore.py [new file with mode: 0644]
src/ceph-volume/ceph_volume/tests/devices/lvm/test_batch.py
src/ceph-volume/ceph_volume/util/arg_validators.py
src/ceph-volume/ceph_volume/util/prepare.py

index 1cef038b62fe734e86e62d3f0964867e439b83f9..161cd1cf4368ce6f0b4586738f640e617d87f88c 100644 (file)
@@ -3,8 +3,8 @@
 import argparse
 
 from ceph_volume import terminal
-from ceph_volume.devices.lvm.activate import Activate as LVMActivate
-from ceph_volume.devices.raw.activate import Activate as RAWActivate
+from ceph_volume.objectstore.lvmbluestore import LvmBlueStore as LVMActivate
+from ceph_volume.objectstore.rawbluestore import RawBlueStore as RAWActivate
 from ceph_volume.devices.simple.activate import Activate as SimpleActivate
 
 
@@ -44,27 +44,24 @@ class Activate(object):
 
         # first try raw
         try:
-            RAWActivate([]).activate(
-                devs=None,
-                start_osd_id=self.args.osd_id,
-                start_osd_uuid=self.args.osd_uuid,
-                tmpfs=not self.args.no_tmpfs,
-                systemd=not self.args.no_systemd,
-            )
+            raw_activate = RAWActivate([])
+            raw_activate.activate(None,
+                                  self.args.osd_id,
+                                  self.args.osd_uuid,
+                                  not self.args.no_tmpfs)
             return
         except Exception as e:
             terminal.info(f'Failed to activate via raw: {e}')
 
         # then try lvm
         try:
-            LVMActivate([]).activate(
-                argparse.Namespace(
-                    osd_id=self.args.osd_id,
-                    osd_fsid=self.args.osd_uuid,
-                    no_tmpfs=self.args.no_tmpfs,
-                    no_systemd=self.args.no_systemd,
-                )
-            )
+            lvm_activate = LVMActivate(argparse.Namespace(
+                no_tmpfs=self.args.no_tmpfs,
+                no_systemd=self.args.no_systemd,
+                osd_fsid=self.args.osd_uuid))
+            lvm_activate.activate(None,
+                                  self.args.osd_id,
+                                  self.args.osd_uuid)
             return
         except Exception as e:
             terminal.info(f'Failed to activate via LVM: {e}')
index 17c66194c67e36f9562aa9145f1922810e03f7a0..7b4d57c9509144da5aa03fa8bb6ab8963c117cf5 100644 (file)
 from __future__ import print_function
 import argparse
 import logging
-import os
 from textwrap import dedent
-from ceph_volume import process, conf, decorators, terminal, configuration
-from ceph_volume.util import system, disk
-from ceph_volume.util import prepare as prepare_utils
-from ceph_volume.util import encryption as encryption_utils
-from ceph_volume.systemd import systemctl
-from ceph_volume.api import lvm as api
-from .listing import direct_report
+from ceph_volume import objectstore
 
 
 logger = logging.getLogger(__name__)
 
 
-
-def get_osd_device_path(osd_lvs, device_type, dmcrypt_secret=None):
-    """
-    ``device_type`` can be one of ``db``, ``wal`` or ``block`` so that we can
-     query LVs on system and fallback to querying the uuid if that is not
-     present.
-
-    Return a path if possible, failing to do that a ``None``, since some of
-    these devices are optional.
-    """
-    osd_block_lv = None
-    for lv in osd_lvs:
-        if lv.tags.get('ceph.type') == 'block':
-            osd_block_lv = lv
-            break
-    if osd_block_lv:
-        is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
-        logger.debug('Found block device (%s) with encryption: %s', osd_block_lv.name, is_encrypted)
-        uuid_tag = 'ceph.%s_uuid' % device_type
-        device_uuid = osd_block_lv.tags.get(uuid_tag)
-        if not device_uuid:
-            return None
-
-    device_lv = None
-    for lv in osd_lvs:
-        if lv.tags.get('ceph.type') == device_type:
-            device_lv = lv
-            break
-    if device_lv:
-        if is_encrypted:
-            encryption_utils.luks_open(dmcrypt_secret, device_lv.lv_path, device_uuid)
-            return '/dev/mapper/%s' % device_uuid
-        return device_lv.lv_path
-
-    # this could be a regular device, so query it with blkid
-    physical_device = disk.get_device_from_partuuid(device_uuid)
-    if physical_device:
-        if is_encrypted:
-            encryption_utils.luks_open(dmcrypt_secret, physical_device, device_uuid)
-            return '/dev/mapper/%s' % device_uuid
-        return physical_device
-
-    raise RuntimeError('could not find %s with uuid %s' % (device_type, device_uuid))
-
-
-def activate_bluestore(osd_lvs, no_systemd=False, no_tmpfs=False):
-    for lv in osd_lvs:
-        if lv.tags.get('ceph.type') == 'block':
-            osd_block_lv = lv
-            break
-    else:
-        raise RuntimeError('could not find a bluestore OSD to activate')
-
-    is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
-    if is_encrypted and conf.dmcrypt_no_workqueue is None:
-        encryption_utils.set_dmcrypt_no_workqueue()
-    dmcrypt_secret = None
-    osd_id = osd_block_lv.tags['ceph.osd_id']
-    conf.cluster = osd_block_lv.tags['ceph.cluster_name']
-    osd_fsid = osd_block_lv.tags['ceph.osd_fsid']
-    configuration.load_ceph_conf_path(osd_block_lv.tags['ceph.cluster_name'])
-    configuration.load()
-
-    # mount on tmpfs the osd directory
-    osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
-    if not system.path_is_mounted(osd_path):
-        # mkdir -p and mount as tmpfs
-        prepare_utils.create_osd_path(osd_id, tmpfs=not no_tmpfs)
-    # XXX This needs to be removed once ceph-bluestore-tool can deal with
-    # symlinks that exist in the osd dir
-    for link_name in ['block', 'block.db', 'block.wal']:
-        link_path = os.path.join(osd_path, link_name)
-        if os.path.exists(link_path):
-            os.unlink(os.path.join(osd_path, link_name))
-    # encryption is handled here, before priming the OSD dir
-    if is_encrypted:
-        osd_lv_path = '/dev/mapper/%s' % osd_block_lv.lv_uuid
-        lockbox_secret = osd_block_lv.tags['ceph.cephx_lockbox_secret']
-        encryption_utils.write_lockbox_keyring(osd_id, osd_fsid, lockbox_secret)
-        dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
-        encryption_utils.luks_open(dmcrypt_secret, osd_block_lv.lv_path, osd_block_lv.lv_uuid)
-    else:
-        osd_lv_path = osd_block_lv.lv_path
-
-    db_device_path = get_osd_device_path(osd_lvs, 'db', dmcrypt_secret=dmcrypt_secret)
-    wal_device_path = get_osd_device_path(osd_lvs, 'wal', dmcrypt_secret=dmcrypt_secret)
-
-    # Once symlinks are removed, the osd dir can be 'primed again. chown first,
-    # regardless of what currently exists so that ``prime-osd-dir`` can succeed
-    # even if permissions are somehow messed up
-    system.chown(osd_path)
-    prime_command = [
-        'ceph-bluestore-tool', '--cluster=%s' % conf.cluster,
-        'prime-osd-dir', '--dev', osd_lv_path,
-        '--path', osd_path, '--no-mon-config']
-
-    process.run(prime_command)
-    # always re-do the symlink regardless if it exists, so that the block,
-    # block.wal, and block.db devices that may have changed can be mapped
-    # correctly every time
-    process.run(['ln', '-snf', osd_lv_path, os.path.join(osd_path, 'block')])
-    system.chown(os.path.join(osd_path, 'block'))
-    system.chown(osd_path)
-    if db_device_path:
-        destination = os.path.join(osd_path, 'block.db')
-        process.run(['ln', '-snf', db_device_path, destination])
-        system.chown(db_device_path)
-        system.chown(destination)
-    if wal_device_path:
-        destination = os.path.join(osd_path, 'block.wal')
-        process.run(['ln', '-snf', wal_device_path, destination])
-        system.chown(wal_device_path)
-        system.chown(destination)
-
-    if no_systemd is False:
-        # enable the ceph-volume unit for this OSD
-        systemctl.enable_volume(osd_id, osd_fsid, 'lvm')
-
-        # enable the OSD
-        systemctl.enable_osd(osd_id)
-
-        # start the OSD
-        systemctl.start_osd(osd_id)
-    terminal.success("ceph-volume lvm activate successful for osd ID: %s" % osd_id)
-
-
 class Activate(object):
-
     help = 'Discover and mount the LVM device associated with an OSD ID and start the Ceph OSD'
 
-    def __init__(self, argv):
+    def __init__(self, argv, args=None):
+        self.objectstore = None
         self.argv = argv
-
-    @decorators.needs_root
-    def activate_all(self, args):
-        listed_osds = direct_report()
-        osds = {}
-        for osd_id, devices in listed_osds.items():
-            # the metadata for all devices in each OSD will contain
-            # the FSID which is required for activation
-            for device in devices:
-                fsid = device.get('tags', {}).get('ceph.osd_fsid')
-                if fsid:
-                    osds[fsid] = osd_id
-                    break
-        if not osds:
-            terminal.warning('Was unable to find any OSDs to activate')
-            terminal.warning('Verify OSDs are present with "ceph-volume lvm list"')
-            return
-        for osd_fsid, osd_id in osds.items():
-            if not args.no_systemd and systemctl.osd_is_active(osd_id):
-                terminal.warning(
-                    'OSD ID %s FSID %s process is active. Skipping activation' % (osd_id, osd_fsid)
-                )
-            else:
-                terminal.info('Activating OSD ID %s FSID %s' % (osd_id, osd_fsid))
-                self.activate(args, osd_id=osd_id, osd_fsid=osd_fsid)
-
-    @decorators.needs_root
-    def activate(self, args, osd_id=None, osd_fsid=None):
-        """
-        :param args: The parsed arguments coming from the CLI
-        :param osd_id: When activating all, this gets populated with an
-                       existing OSD ID
-        :param osd_fsid: When activating all, this gets populated with an
-                         existing OSD FSID
-        """
-        osd_id = osd_id if osd_id else args.osd_id
-        osd_fsid = osd_fsid if osd_fsid else args.osd_fsid
-
-        if osd_id and osd_fsid:
-            tags = {'ceph.osd_id': osd_id, 'ceph.osd_fsid': osd_fsid}
-        elif not osd_id and osd_fsid:
-            tags = {'ceph.osd_fsid': osd_fsid}
-        elif osd_id and not osd_fsid:
-            raise RuntimeError('could not activate osd.{}, please provide the '
-                               'osd_fsid too'.format(osd_id))
-        else:
-            raise RuntimeError('Please provide both osd_id and osd_fsid')
-        lvs = api.get_lvs(tags=tags)
-        if not lvs:
-            raise RuntimeError('could not find osd.%s with osd_fsid %s' %
-                               (osd_id, osd_fsid))
-
-        # This argument is only available when passed in directly or via
-        # systemd, not when ``create`` is being used
-        # placeholder when a new objectstore support will be added
-        if getattr(args, 'auto_detect_objectstore', False):
-            logger.info('auto detecting objectstore')
-            return activate_bluestore(lvs, args.no_systemd)
-
-        # explicit 'objectstore' flags take precedence
-        if getattr(args, 'bluestore', False):
-            activate_bluestore(lvs, args.no_systemd, getattr(args, 'no_tmpfs', False))
-        elif any('ceph.block_device' in lv.tags for lv in lvs):
-            activate_bluestore(lvs, args.no_systemd, getattr(args, 'no_tmpfs', False))
+        self.args = args
 
     def main(self):
         sub_command_help = dedent("""
@@ -256,6 +60,14 @@ class Activate(object):
             action='store_true',
             help='force bluestore objectstore activation',
         )
+        parser.add_argument(
+            '--objectstore',
+            dest='objectstore',
+            help='The OSD objectstore.',
+            default='bluestore',
+            choices=['bluestore', 'seastore'],
+            type=str,
+        )
         parser.add_argument(
             '--all',
             dest='activate_all',
@@ -273,11 +85,15 @@ class Activate(object):
             action='store_true',
             help='Do not use a tmpfs mount for OSD data dir'
         )
-        if len(self.argv) == 0:
+        if len(self.argv) == 0 and self.args is None:
             print(sub_command_help)
             return
-        args = parser.parse_args(self.argv)
-        if args.activate_all:
-            self.activate_all(args)
+        if self.args is None:
+            self.args = parser.parse_args(self.argv)
+        if self.args.bluestore:
+            self.args.objectstore = 'bluestore'
+        self.objectstore = objectstore.mapping['LVM'][self.args.objectstore](args=self.args)
+        if self.args.activate_all:
+            self.objectstore.activate_all()
         else:
-            self.activate(args)
+            self.objectstore.activate()
index 2118ce47aeeab5d271a12b7a2d107b2deb5db4ba..6cd3bc39817772c69b8a672f3a599c23cda05c95 100644 (file)
@@ -233,10 +233,18 @@ class Batch(object):
             help=('deploy standalone OSDs if rotational and non-rotational drives '
                   'are passed in DEVICES'),
         )
+        parser.add_argument(
+            '--objectstore',
+            dest='objectstore',
+            help='The OSD objectstore.',
+            default='bluestore',
+            choices=['bluestore', 'seastore'],
+            type=str,
+        )
         parser.add_argument(
             '--bluestore',
             action='store_true',
-            help='bluestore objectstore (default)',
+            help='bluestore objectstore (default). (DEPRECATED: use --objectstore instead)',
         )
         parser.add_argument(
             '--report',
@@ -323,6 +331,8 @@ class Batch(object):
             type=arg_validators.valid_osd_id
         )
         self.args = parser.parse_args(argv)
+        if self.args.bluestore:
+            self.args.objectstore = 'bluestore'
         self.parser = parser
         for dev_list in ['', 'db_', 'wal_']:
             setattr(self, '{}usable'.format(dev_list), [])
@@ -383,11 +393,6 @@ class Batch(object):
         if not self.args.devices:
             return self.parser.print_help()
 
-        # Default to bluestore here since defaulting it in add_argument may
-        # cause both to be True
-        if not self.args.bluestore:
-            self.args.bluestore = True
-
         if (self.args.auto and not self.args.db_devices and not
             self.args.wal_devices):
             self._sort_rotational_disks()
@@ -398,7 +403,7 @@ class Batch(object):
                                      self.args.db_devices,
                                      self.args.wal_devices)
 
-        plan = self.get_plan(self.args)
+        plan = self.get_deployment_layout()
 
         if self.args.report:
             self.report(plan)
@@ -425,43 +430,38 @@ class Batch(object):
         for osd in plan:
             args = osd.get_args(defaults)
             if self.args.prepare:
-                p = Prepare([])
-                p.safe_prepare(argparse.Namespace(**args))
+                p = Prepare([], args=argparse.Namespace(**args))
+                p.main()
             else:
-                c = Create([])
-                c.create(argparse.Namespace(**args))
-
-
-    def get_plan(self, args):
-        if args.bluestore:
-            plan = self.get_deployment_layout(args, args.devices, args.db_devices,
-                                              args.wal_devices)
-        return plan
+                c = Create([], args=argparse.Namespace(**args))
+                c.create()
 
-    def get_deployment_layout(self, args, devices, fast_devices=[],
-                              very_fast_devices=[]):
+    def get_deployment_layout(self):
         '''
         The methods here are mostly just organization, error reporting and
         setting up of (default) args. The heavy lifting code for the deployment
         layout can be found in the static get_*_osds and get_*_fast_allocs
         functions.
         '''
+        devices = self.args.devices
+        fast_devices = self.args.db_devices
+        very_fast_devices = self.args.wal_devices
         plan = []
         phys_devs, lvm_devs = separate_devices_from_lvs(devices)
         mlogger.debug(('passed data devices: {} physical,'
                        ' {} LVM').format(len(phys_devs), len(lvm_devs)))
 
-        plan.extend(get_physical_osds(phys_devs, args))
+        plan.extend(get_physical_osds(phys_devs, self.args))
 
-        plan.extend(get_lvm_osds(lvm_devs, args))
+        plan.extend(get_lvm_osds(lvm_devs, self.args))
 
         num_osds = len(plan)
         if num_osds == 0:
             mlogger.info('All data devices are unavailable')
             return plan
-        requested_osds = args.osds_per_device * len(phys_devs) + len(lvm_devs)
+        requested_osds = self.args.osds_per_device * len(phys_devs) + len(lvm_devs)
 
-        if args.bluestore:
+        if self.args.objectstore == 'bluestore':
             fast_type = 'block_db'
         fast_allocations = self.fast_allocations(fast_devices,
                                                  requested_osds,
@@ -491,7 +491,7 @@ class Batch(object):
             if fast_devices:
                 osd.add_fast_device(*fast_allocations.pop(),
                                     type_=fast_type)
-            if very_fast_devices and args.bluestore:
+            if very_fast_devices and self.args.objectstore == 'bluestore':
                 osd.add_very_fast_device(*very_fast_allocations.pop())
         return plan
 
index 198ba9417a1b5eef0dd1cc0c822a008d88511468..90bed61a3bd948f4b0b40e9c49034370af6bd787 100644 (file)
@@ -36,6 +36,13 @@ def rollback_osd(args, osd_id=None):
 
 
 common_args = {
+    '--objectstore': {
+        'dest': 'objectstore',
+        'help': 'The OSD objectstore.',
+        'default': 'bluestore',
+        'choices': ['bluestore', 'seastore'],
+        'type': str,
+    },
     '--data': {
         'help': 'OSD data path. A physical device or logical volume',
         'required': True,
@@ -86,7 +93,7 @@ common_args = {
 bluestore_args = {
     '--bluestore': {
         'action': 'store_true',
-        'help': 'Use the bluestore objectstore',
+        'help': 'Use the bluestore objectstore. (DEPRECATED: use --objectstore instead)',
     },
     '--block.db': {
         'dest': 'block_db',
index 631a21b239d2e5c3056973918441b062e7307ece..6a4d11b99bf5f5d2951e8ad78d89c35055cd4f98 100644 (file)
@@ -3,10 +3,8 @@ from textwrap import dedent
 import logging
 from ceph_volume.util import system
 from ceph_volume.util.arg_validators import exclude_group_options
-from ceph_volume import decorators, terminal
+from ceph_volume import decorators, terminal, objectstore
 from .common import create_parser, rollback_osd
-from .prepare import Prepare
-from .activate import Activate
 
 logger = logging.getLogger(__name__)
 
@@ -15,27 +13,29 @@ class Create(object):
 
     help = 'Create a new OSD from an LVM device'
 
-    def __init__(self, argv):
+    def __init__(self, argv, args=None):
+        self.objectstore = None
         self.argv = argv
+        self.args = args
 
     @decorators.needs_root
-    def create(self, args):
-        if not args.osd_fsid:
-            args.osd_fsid = system.generate_uuid()
-        prepare_step = Prepare([])
-        prepare_step.safe_prepare(args)
-        osd_id = prepare_step.osd_id
+    def create(self):
+        if not self.args.osd_fsid:
+            self.args.osd_fsid = system.generate_uuid()
+        self.objectstore = objectstore.mapping['LVM'][self.args.objectstore](args=self.args)
+        self.objectstore.safe_prepare()
+        osd_id = self.objectstore.osd_id
         try:
             # we try this for activate only when 'creating' an OSD, because a rollback should not
             # happen when doing normal activation. For example when starting an OSD, systemd will call
             # activate, which would never need to be rolled back.
-            Activate([]).activate(args)
+            self.objectstore.activate()
         except Exception:
             logger.exception('lvm activate was unable to complete, while creating the OSD')
             logger.info('will rollback OSD ID creation')
-            rollback_osd(args, osd_id)
+            rollback_osd(self.args, osd_id)
             raise
-        terminal.success("ceph-volume lvm create successful for: %s" % args.data)
+        terminal.success("ceph-volume lvm create successful for: %s" % self.args.data)
 
     def main(self):
         sub_command_help = dedent("""
@@ -69,9 +69,9 @@ class Create(object):
             print(sub_command_help)
             return
         exclude_group_options(parser, groups=['bluestore'], argv=self.argv)
-        args = parser.parse_args(self.argv)
-        # Default to bluestore here since defaulting it in add_argument may
-        # cause both to be True
-        if not args.bluestore:
-            args.bluestore = True
-        self.create(args)
+        if self.args is None:
+            self.args = parser.parse_args(self.argv)
+        if self.args.bluestore:
+            self.args.objectstore = 'bluestore'
+        self.objectstore = objectstore.mapping['LVM'][self.args.objectstore]
+        self.create()
index 85c8a14677121e2580713254e339f0d5bed990ea..18fc1df03d8d675da16c6fa41ce9614a50166520 100644 (file)
 from __future__ import print_function
-import json
 import logging
 from textwrap import dedent
-from ceph_volume.util import prepare as prepare_utils
-from ceph_volume.util import encryption as encryption_utils
-from ceph_volume.util import system, disk
-from ceph_volume.util.arg_validators import exclude_group_options
-from ceph_volume import conf, decorators, terminal
-from ceph_volume.api import lvm as api
-from .common import prepare_parser, rollback_osd
+from ceph_volume import objectstore
+from .common import prepare_parser
 
 
 logger = logging.getLogger(__name__)
 
 
-def prepare_dmcrypt(key, device, device_type, tags):
-    """
-    Helper for devices that are encrypted. The operations needed for
-    block, db, wal devices are all the same
-    """
-    if not device:
-        return ''
-    tag_name = 'ceph.%s_uuid' % device_type
-    uuid = tags[tag_name]
-    return encryption_utils.prepare_dmcrypt(key, device, uuid)
-
-def prepare_bluestore(block, wal, db, secrets, tags, osd_id, fsid):
-    """
-    :param block: The name of the logical volume for the bluestore data
-    :param wal: a regular/plain disk or logical volume, to be used for block.wal
-    :param db: a regular/plain disk or logical volume, to be used for block.db
-    :param secrets: A dict with the secrets needed to create the osd (e.g. cephx)
-    :param id_: The OSD id
-    :param fsid: The OSD fsid, also known as the OSD UUID
-    """
-    cephx_secret = secrets.get('cephx_secret', prepare_utils.create_key())
-    # encryption-only operations
-    if secrets.get('dmcrypt_key'):
-        # If encrypted, there is no need to create the lockbox keyring file because
-        # bluestore re-creates the files and does not have support for other files
-        # like the custom lockbox one. This will need to be done on activation.
-        # format and open ('decrypt' devices) and re-assign the device and journal
-        # variables so that the rest of the process can use the mapper paths
-        key = secrets['dmcrypt_key']
-        block = prepare_dmcrypt(key, block, 'block', tags)
-        wal = prepare_dmcrypt(key, wal, 'wal', tags)
-        db = prepare_dmcrypt(key, db, 'db', tags)
-
-    # create the directory
-    prepare_utils.create_osd_path(osd_id, tmpfs=True)
-    # symlink the block
-    prepare_utils.link_block(block, osd_id)
-    # get the latest monmap
-    prepare_utils.get_monmap(osd_id)
-    # write the OSD keyring if it doesn't exist already
-    prepare_utils.write_keyring(osd_id, cephx_secret)
-    # prepare the osd filesystem
-    prepare_utils.osd_mkfs_bluestore(
-        osd_id, fsid,
-        keyring=cephx_secret,
-        wal=wal,
-        db=db
-    )
-
-
 class Prepare(object):
 
     help = 'Format an LVM device and associate it with an OSD'
 
-    def __init__(self, argv):
+    def __init__(self, argv, args=None):
+        self.objectstore = None
         self.argv = argv
+        self.args = args
         self.osd_id = None
 
-    def get_ptuuid(self, argument):
-        uuid = disk.get_partuuid(argument)
-        if not uuid:
-            terminal.error('blkid could not detect a PARTUUID for device: %s' % argument)
-            raise RuntimeError('unable to use device')
-        return uuid
-
-    def setup_device(self, device_type, device_name, tags, size, slots):
-        """
-        Check if ``device`` is an lv, if so, set the tags, making sure to
-        update the tags with the lv_uuid and lv_path which the incoming tags
-        will not have.
-
-        If the device is not a logical volume, then retrieve the partition UUID
-        by querying ``blkid``
-        """
-        if device_name is None:
-            return '', '', tags
-        tags['ceph.type'] = device_type
-        tags['ceph.vdo'] = api.is_vdo(device_name)
-
-        try:
-            vg_name, lv_name = device_name.split('/')
-            lv = api.get_single_lv(filters={'lv_name': lv_name,
-                                            'vg_name': vg_name})
-        except ValueError:
-            lv = None
-
-        if lv:
-            lv_uuid = lv.lv_uuid
-            path = lv.lv_path
-            tags['ceph.%s_uuid' % device_type] = lv_uuid
-            tags['ceph.%s_device' % device_type] = path
-            lv.set_tags(tags)
-        elif disk.is_device(device_name):
-            # We got a disk, create an lv
-            lv_type = "osd-{}".format(device_type)
-            name_uuid = system.generate_uuid()
-            kwargs = {
-                'device': device_name,
-                'tags': tags,
-                'slots': slots
-            }
-            #TODO use get_block_db_size and co here to get configured size in
-            #conf file
-            if size != 0:
-                kwargs['size'] = size
-            lv = api.create_lv(
-                lv_type,
-                name_uuid,
-                **kwargs)
-            path = lv.lv_path
-            tags['ceph.{}_device'.format(device_type)] = path
-            tags['ceph.{}_uuid'.format(device_type)] = lv.lv_uuid
-            lv_uuid = lv.lv_uuid
-            lv.set_tags(tags)
-        else:
-            # otherwise assume this is a regular disk partition
-            name_uuid = self.get_ptuuid(device_name)
-            path = device_name
-            tags['ceph.%s_uuid' % device_type] = name_uuid
-            tags['ceph.%s_device' % device_type] = path
-            lv_uuid = name_uuid
-        return path, lv_uuid, tags
-
-    def prepare_data_device(self, device_type, osd_uuid):
-        """
-        Check if ``arg`` is a device or partition to create an LV out of it
-        with a distinct volume group name, assigning LV tags on it and
-        ultimately, returning the logical volume object.  Failing to detect
-        a device or partition will result in error.
-
-        :param arg: The value of ``--data`` when parsing args
-        :param device_type: Usually ``block``
-        :param osd_uuid: The OSD uuid
-        """
-        device = self.args.data
-        if disk.is_partition(device) or disk.is_device(device):
-            # we must create a vg, and then a single lv
-            lv_name_prefix = "osd-{}".format(device_type)
-            kwargs = {'device': device,
-                      'tags': {'ceph.type': device_type},
-                      'slots': self.args.data_slots,
-                     }
-            logger.debug('data device size: {}'.format(self.args.data_size))
-            if self.args.data_size != 0:
-                kwargs['size'] = self.args.data_size
-            return api.create_lv(
-                lv_name_prefix,
-                osd_uuid,
-                **kwargs)
-        else:
-            error = [
-                'Cannot use device ({}).'.format(device),
-                'A vg/lv path or an existing device is needed']
-            raise RuntimeError(' '.join(error))
-
-        raise RuntimeError('no data logical volume found with: {}'.format(device))
-
-    def safe_prepare(self, args=None):
-        """
-        An intermediate step between `main()` and `prepare()` so that we can
-        capture the `self.osd_id` in case we need to rollback
-
-        :param args: Injected args, usually from `lvm create` which compounds
-                     both `prepare` and `create`
-        """
-        if args is not None:
-            self.args = args
-
-        try:
-            vgname, lvname = self.args.data.split('/')
-            lv = api.get_single_lv(filters={'lv_name': lvname,
-                                            'vg_name': vgname})
-        except ValueError:
-            lv = None
-
-        if api.is_ceph_device(lv):
-            logger.info("device {} is already used".format(self.args.data))
-            raise RuntimeError("skipping {}, it is already prepared".format(self.args.data))
-        try:
-            self.prepare()
-        except Exception:
-            logger.exception('lvm prepare was unable to complete')
-            logger.info('will rollback OSD ID creation')
-            rollback_osd(self.args, self.osd_id)
-            raise
-        terminal.success("ceph-volume lvm prepare successful for: %s" % self.args.data)
-
-    def get_cluster_fsid(self):
-        """
-        Allows using --cluster-fsid as an argument, but can fallback to reading
-        from ceph.conf if that is unset (the default behavior).
-        """
-        if self.args.cluster_fsid:
-            return self.args.cluster_fsid
-        else:
-            return conf.ceph.get('global', 'fsid')
-
-    @decorators.needs_root
-    def prepare(self):
-        # FIXME we don't allow re-using a keyring, we always generate one for the
-        # OSD, this needs to be fixed. This could either be a file (!) or a string
-        # (!!) or some flags that we would need to compound into a dict so that we
-        # can convert to JSON (!!!)
-        secrets = {'cephx_secret': prepare_utils.create_key()}
-        cephx_lockbox_secret = ''
-        encrypted = 1 if self.args.dmcrypt else 0
-        cephx_lockbox_secret = '' if not encrypted else prepare_utils.create_key()
-
-        if encrypted:
-            secrets['dmcrypt_key'] = encryption_utils.create_dmcrypt_key()
-            secrets['cephx_lockbox_secret'] = cephx_lockbox_secret
-
-        cluster_fsid = self.get_cluster_fsid()
-
-        osd_fsid = self.args.osd_fsid or system.generate_uuid()
-        crush_device_class = self.args.crush_device_class
-        if crush_device_class:
-            secrets['crush_device_class'] = crush_device_class
-        # reuse a given ID if it exists, otherwise create a new ID
-        self.osd_id = prepare_utils.create_id(osd_fsid, json.dumps(secrets), osd_id=self.args.osd_id)
-        tags = {
-            'ceph.osd_fsid': osd_fsid,
-            'ceph.osd_id': self.osd_id,
-            'ceph.cluster_fsid': cluster_fsid,
-            'ceph.cluster_name': conf.cluster,
-            'ceph.crush_device_class': crush_device_class,
-            'ceph.osdspec_affinity': prepare_utils.get_osdspec_affinity()
-        }
-        if self.args.bluestore:
-            try:
-                vg_name, lv_name = self.args.data.split('/')
-                block_lv = api.get_single_lv(filters={'lv_name': lv_name,
-                                                      'vg_name': vg_name})
-            except ValueError:
-                block_lv = None
-
-            if not block_lv:
-                block_lv = self.prepare_data_device('block', osd_fsid)
-
-            tags['ceph.block_device'] = block_lv.lv_path
-            tags['ceph.block_uuid'] = block_lv.lv_uuid
-            tags['ceph.cephx_lockbox_secret'] = cephx_lockbox_secret
-            tags['ceph.encrypted'] = encrypted
-            tags['ceph.vdo'] = api.is_vdo(block_lv.lv_path)
-
-            wal_device, wal_uuid, tags = self.setup_device(
-                'wal',
-                self.args.block_wal,
-                tags,
-                self.args.block_wal_size,
-                self.args.block_wal_slots)
-            db_device, db_uuid, tags = self.setup_device(
-                'db',
-                self.args.block_db,
-                tags,
-                self.args.block_db_size,
-                self.args.block_db_slots)
-
-            tags['ceph.type'] = 'block'
-            block_lv.set_tags(tags)
-
-            prepare_bluestore(
-                block_lv.lv_path,
-                wal_device,
-                db_device,
-                secrets,
-                tags,
-                self.osd_id,
-                osd_fsid,
-            )
-
     def main(self):
         sub_command_help = dedent("""
         Prepare an OSD by assigning an ID and FSID, registering them with the
@@ -315,13 +48,12 @@ class Prepare(object):
             prog='ceph-volume lvm prepare',
             description=sub_command_help,
         )
-        if len(self.argv) == 0:
+        if len(self.argv) == 0 and self.args is None:
             print(sub_command_help)
             return
-        exclude_group_options(parser, argv=self.argv, groups=['bluestore'])
-        self.args = parser.parse_args(self.argv)
-        # Default to bluestore here since defaulting it in add_argument may
-        # cause both to be True
-        if not self.args.bluestore:
-            self.args.bluestore = True
-        self.safe_prepare()
+        if self.args is None:
+            self.args = parser.parse_args(self.argv)
+        if self.args.bluestore:
+            self.args.objectstore = 'bluestore'
+        self.objectstore = objectstore.mapping['LVM'][self.args.objectstore](args=self.args)
+        self.objectstore.safe_prepare()
index 17be57dfeaa8ee75f7cef51bd2c6907536ad35a3..9323b18f98034944caf3b14c99e907117e8de626 100644 (file)
@@ -1,95 +1,20 @@
 from __future__ import print_function
 import argparse
 import logging
-import os
 from textwrap import dedent
-from ceph_volume import process, conf, decorators, terminal
-from ceph_volume.util import system
-from ceph_volume.util import prepare as prepare_utils
-from .list import direct_report
+from ceph_volume import objectstore
 
 
 logger = logging.getLogger(__name__)
 
-def activate_bluestore(meta, tmpfs, systemd):
-    # find the osd
-    osd_id = meta['osd_id']
-    osd_uuid = meta['osd_uuid']
-
-    # mount on tmpfs the osd directory
-    osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
-    if not system.path_is_mounted(osd_path):
-        # mkdir -p and mount as tmpfs
-        prepare_utils.create_osd_path(osd_id, tmpfs=tmpfs)
-
-    # XXX This needs to be removed once ceph-bluestore-tool can deal with
-    # symlinks that exist in the osd dir
-    for link_name in ['block', 'block.db', 'block.wal']:
-        link_path = os.path.join(osd_path, link_name)
-        if os.path.exists(link_path):
-            os.unlink(os.path.join(osd_path, link_name))
-
-    # Once symlinks are removed, the osd dir can be 'primed again. chown first,
-    # regardless of what currently exists so that ``prime-osd-dir`` can succeed
-    # even if permissions are somehow messed up
-    system.chown(osd_path)
-    prime_command = [
-        'ceph-bluestore-tool',
-        'prime-osd-dir',
-        '--path', osd_path,
-        '--no-mon-config',
-        '--dev', meta['device'],
-    ]
-    process.run(prime_command)
-
-    # always re-do the symlink regardless if it exists, so that the block,
-    # block.wal, and block.db devices that may have changed can be mapped
-    # correctly every time
-    prepare_utils.link_block(meta['device'], osd_id)
-
-    if 'device_db' in meta:
-        prepare_utils.link_db(meta['device_db'], osd_id, osd_uuid)
-
-    if 'device_wal' in meta:
-        prepare_utils.link_wal(meta['device_wal'], osd_id, osd_uuid)
-
-    system.chown(osd_path)
-    terminal.success("ceph-volume raw activate successful for osd ID: %s" % osd_id)
-
-
 class Activate(object):
 
     help = 'Discover and prepare a data directory for a (BlueStore) OSD on a raw device'
 
-    def __init__(self, argv):
+    def __init__(self, argv, args=None):
+        self.objectstore = None
         self.argv = argv
-        self.args = None
-
-    @decorators.needs_root
-    def activate(self, devs, start_osd_id, start_osd_uuid,
-                 tmpfs, systemd):
-        """
-        :param args: The parsed arguments coming from the CLI
-        """
-        assert devs or start_osd_id or start_osd_uuid
-        found = direct_report(devs)
-
-        activated_any = False
-        for osd_uuid, meta in found.items():
-            osd_id = meta['osd_id']
-            if start_osd_id is not None and str(osd_id) != str(start_osd_id):
-                continue
-            if start_osd_uuid is not None and osd_uuid != start_osd_uuid:
-                continue
-            logger.info('Activating osd.%s uuid %s cluster %s' % (
-                        osd_id, osd_uuid, meta['ceph_fsid']))
-            activate_bluestore(meta,
-                               tmpfs=tmpfs,
-                               systemd=systemd)
-            activated_any = True
-
-        if not activated_any:
-            raise RuntimeError('did not find any matching OSD to activate')
+        self.args = args
 
     def main(self):
         sub_command_help = dedent("""
@@ -126,7 +51,15 @@ class Activate(object):
             '--no-systemd',
             dest='no_systemd',
             action='store_true',
-            help='Skip creating and enabling systemd units and starting OSD services'
+            help='This argument has no effect, this is here for backward compatibility.'
+        )
+        parser.add_argument(
+            '--objectstore',
+            dest='objectstore',
+            help='The OSD objectstore.',
+            default='bluestore',
+            choices=['bluestore', 'seastore'],
+            type=str,
         )
         parser.add_argument(
             '--block.db',
@@ -147,20 +80,15 @@ class Activate(object):
         if not self.argv:
             print(sub_command_help)
             return
-        args = parser.parse_args(self.argv)
-        self.args = args
-        if not args.no_systemd:
-            terminal.error('systemd support not yet implemented')
-            raise SystemExit(1)
-
-        devs = [args.device]
-        if args.block_wal:
-            devs.append(args.block_wal)
-        if args.block_db:
-            devs.append(args.block_db)
-
-        self.activate(devs=devs,
-                      start_osd_id=args.osd_id,
-                      start_osd_uuid=args.osd_uuid,
-                      tmpfs=not args.no_tmpfs,
-                      systemd=not self.args.no_systemd)
+        self.args = parser.parse_args(self.argv)
+
+        devs = [self.args.device]
+        if self.args.block_wal:
+            devs.append(self.args.block_wal)
+        if self.args.block_db:
+            devs.append(self.args.block_db)
+        self.objectstore = objectstore.mapping['RAW'][self.args.objectstore](args=self.args)
+        self.objectstore.activate(devs=devs,
+                                  start_osd_id=self.args.osd_id,
+                                  start_osd_uuid=self.args.osd_uuid,
+                                  tmpfs=not self.args.no_tmpfs)
index 4863b9e18e05ae763e6041009c77449215948ffd..e3aea2c7250dc5689c50c70a936937c7a80849a3 100644 (file)
@@ -11,6 +11,14 @@ def create_parser(prog, description):
         formatter_class=argparse.RawDescriptionHelpFormatter,
         description=description,
     )
+    parser.add_argument(
+        '--objectstore',
+        dest='objectstore',
+        help='The OSD objectstore.',
+        default='bluestore',
+        choices=['bluestore', 'seastore'],
+        type=str,
+    ),
     parser.add_argument(
         '--data',
         required=True,
@@ -20,7 +28,8 @@ def create_parser(prog, description):
     parser.add_argument(
         '--bluestore',
         action='store_true',
-        help='Use BlueStore backend')
+        help='Use BlueStore backend. (DEPRECATED: use --objectstore instead)'
+    )
     parser.add_argument(
         '--crush-device-class',
         dest='crush_device_class',
index b3201a89dafce061f3915157274cf54f7e0d4b9c..e4308e55036755cc35c4e770d989ed3c3f0315c5 100644 (file)
@@ -1,62 +1,12 @@
 from __future__ import print_function
-import json
 import logging
 import os
 from textwrap import dedent
-from ceph_volume.util import prepare as prepare_utils
-from ceph_volume.util import encryption as encryption_utils
-from ceph_volume.util import disk
-from ceph_volume.util import system
-from ceph_volume import decorators, terminal
-from ceph_volume.devices.lvm.common import rollback_osd
+from ceph_volume import terminal, objectstore
 from .common import create_parser
 
 logger = logging.getLogger(__name__)
 
-def prepare_dmcrypt(key, device, device_type, fsid):
-    """
-    Helper for devices that are encrypted. The operations needed for
-    block, db, wal, devices are all the same
-    """
-    if not device:
-        return ''
-    kname = disk.lsblk(device)['KNAME']
-    mapping = 'ceph-{}-{}-{}-dmcrypt'.format(fsid, kname, device_type)
-    return encryption_utils.prepare_dmcrypt(key, device, mapping)
-
-def prepare_bluestore(block, wal, db, secrets, osd_id, fsid, tmpfs):
-    """
-    :param block: The name of the logical volume for the bluestore data
-    :param wal: a regular/plain disk or logical volume, to be used for block.wal
-    :param db: a regular/plain disk or logical volume, to be used for block.db
-    :param secrets: A dict with the secrets needed to create the osd (e.g. cephx)
-    :param id_: The OSD id
-    :param fsid: The OSD fsid, also known as the OSD UUID
-    """
-    cephx_secret = secrets.get('cephx_secret', prepare_utils.create_key())
-
-    if secrets.get('dmcrypt_key'):
-        key = secrets['dmcrypt_key']
-        block = prepare_dmcrypt(key, block, 'block', fsid)
-        wal = prepare_dmcrypt(key, wal, 'wal', fsid)
-        db = prepare_dmcrypt(key, db, 'db', fsid)
-
-    # create the directory
-    prepare_utils.create_osd_path(osd_id, tmpfs=tmpfs)
-    # symlink the block
-    prepare_utils.link_block(block, osd_id)
-    # get the latest monmap
-    prepare_utils.get_monmap(osd_id)
-    # write the OSD keyring if it doesn't exist already
-    prepare_utils.write_keyring(osd_id, cephx_secret)
-    # prepare the osd filesystem
-    prepare_utils.osd_mkfs_bluestore(
-        osd_id, fsid,
-        keyring=cephx_secret,
-        wal=wal,
-        db=db
-    )
-
 
 class Prepare(object):
 
@@ -65,65 +15,7 @@ class Prepare(object):
     def __init__(self, argv):
         self.argv = argv
         self.osd_id = None
-
-    def safe_prepare(self, args=None):
-        """
-        An intermediate step between `main()` and `prepare()` so that we can
-        capture the `self.osd_id` in case we need to rollback
-
-        :param args: Injected args, usually from `raw create` which compounds
-                     both `prepare` and `create`
-        """
-        if args is not None:
-            self.args = args
-        try:
-            self.prepare()
-        except Exception:
-            logger.exception('raw prepare was unable to complete')
-            logger.info('will rollback OSD ID creation')
-            rollback_osd(self.args, self.osd_id)
-            raise
-        dmcrypt_log = 'dmcrypt' if args.dmcrypt else 'clear'
-        terminal.success("ceph-volume raw {} prepare successful for: {}".format(dmcrypt_log, self.args.data))
-
-
-    @decorators.needs_root
-    def prepare(self):
-        secrets = {'cephx_secret': prepare_utils.create_key()}
-        encrypted = 1 if self.args.dmcrypt else 0
-        cephx_lockbox_secret = '' if not encrypted else prepare_utils.create_key()
-
-        if encrypted:
-            secrets['dmcrypt_key'] = os.getenv('CEPH_VOLUME_DMCRYPT_SECRET')
-            secrets['cephx_lockbox_secret'] = cephx_lockbox_secret # dummy value to make `ceph osd new` not complaining
-
-        osd_fsid = system.generate_uuid()
-        crush_device_class = self.args.crush_device_class
-        if crush_device_class:
-            secrets['crush_device_class'] = crush_device_class
-        tmpfs = not self.args.no_tmpfs
-        wal = ""
-        db = ""
-        if self.args.block_wal:
-            wal = self.args.block_wal
-        if self.args.block_db:
-            db = self.args.block_db
-
-        # reuse a given ID if it exists, otherwise create a new ID
-        self.osd_id = prepare_utils.create_id(
-            osd_fsid,
-            json.dumps(secrets),
-            osd_id=self.args.osd_id)
-
-        prepare_bluestore(
-            self.args.data,
-            wal,
-            db,
-            secrets,
-            self.osd_id,
-            osd_fsid,
-            tmpfs,
-        )
+        self.objectstore = None
 
     def main(self):
         sub_command_help = dedent("""
@@ -148,13 +40,13 @@ class Prepare(object):
             print(sub_command_help)
             return
         self.args = parser.parse_args(self.argv)
-        if not self.args.bluestore:
-            terminal.error('must specify --bluestore (currently the only supported backend)')
-            raise SystemExit(1)
+        if self.args.bluestore:
+            self.args.objectstore = 'bluestore'
         if self.args.dmcrypt and not os.getenv('CEPH_VOLUME_DMCRYPT_SECRET'):
             terminal.error('encryption was requested (--dmcrypt) but environment variable ' \
                            'CEPH_VOLUME_DMCRYPT_SECRET is not set, you must set ' \
                            'this variable to provide a dmcrypt secret.')
             raise SystemExit(1)
 
-        self.safe_prepare(self.args)
+        self.objectstore = objectstore.mapping['RAW'][self.args.objectstore](args=self.args)
+        self.objectstore.safe_prepare(self.args)
diff --git a/src/ceph-volume/ceph_volume/objectstore/__init__.py b/src/ceph-volume/ceph_volume/objectstore/__init__.py
new file mode 100644 (file)
index 0000000..f8bc2c5
--- /dev/null
@@ -0,0 +1,11 @@
+from . import lvmbluestore
+from . import rawbluestore
+
+mapping = {
+    'LVM': {
+        'bluestore': lvmbluestore.LvmBlueStore
+    },
+    'RAW': {
+        'bluestore': rawbluestore.RawBlueStore
+    }
+}
diff --git a/src/ceph-volume/ceph_volume/objectstore/baseobjectstore.py b/src/ceph-volume/ceph_volume/objectstore/baseobjectstore.py
new file mode 100644 (file)
index 0000000..822f293
--- /dev/null
@@ -0,0 +1,154 @@
+import logging
+import os
+import errno
+import time
+from ceph_volume import conf, terminal, process
+from ceph_volume.util import prepare as prepare_utils
+from ceph_volume.util import system, disk
+from typing import Dict, Any, List, Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    import argparse
+    from ceph_volume.api.lvm import Volume
+
+
+logger = logging.getLogger(__name__)
+
+
+class BaseObjectStore:
+    def __init__(self, args: "argparse.Namespace") -> None:
+        self.args: "argparse.Namespace" = args
+        # FIXME we don't allow re-using a keyring, we always generate one
+        # for the OSD, this needs to be fixed. This could either be a file (!)
+        # or a string (!!) or some flags that we would need to compound
+        # into a dict so that we can convert to JSON (!!!)
+        self.secrets = {'cephx_secret': prepare_utils.create_key()}
+        self.cephx_secret = self.secrets.get('cephx_secret',
+                                             prepare_utils.create_key())
+        self.encrypted = 0
+        self.tags: Dict[str, Any] = {}
+        self.osd_id: str = ''
+        self.osd_fsid = ''
+        self.block_lv: Optional["Volume"] = None
+        self.cephx_lockbox_secret = ''
+        self.objectstore: str = ''
+        self.osd_mkfs_cmd: List[str] = []
+        self.block_device_path = ''
+        if hasattr(self.args, 'dmcrypt'):
+            if self.args.dmcrypt:
+                self.encrypted = 1
+                self.cephx_lockbox_secret = prepare_utils.create_key()
+                self.secrets['cephx_lockbox_secret'] = \
+                    self.cephx_lockbox_secret
+
+    def get_ptuuid(self, argument: str) -> str:
+        uuid = disk.get_partuuid(argument)
+        if not uuid:
+            terminal.error('blkid could not detect a PARTUUID for device: %s' %
+                           argument)
+            raise RuntimeError('unable to use device')
+        return uuid
+
+    def get_osdspec_affinity(self) -> str:
+        return os.environ.get('CEPH_VOLUME_OSDSPEC_AFFINITY', '')
+
+    def pre_prepare(self) -> None:
+        raise NotImplementedError()
+
+    def prepare_data_device(self,
+                            device_type: str,
+                            osd_uuid: str) -> Optional["Volume"]:
+        raise NotImplementedError()
+
+    def safe_prepare(self, args: "argparse.Namespace") -> None:
+        raise NotImplementedError()
+
+    def add_objectstore_opts(self) -> None:
+        raise NotImplementedError()
+
+    def prepare_osd_req(self, tmpfs: bool = True) -> None:
+        # create the directory
+        prepare_utils.create_osd_path(self.osd_id, tmpfs=tmpfs)
+        # symlink the block
+        prepare_utils.link_block(self.block_device_path, self.osd_id)
+        # get the latest monmap
+        prepare_utils.get_monmap(self.osd_id)
+        # write the OSD keyring if it doesn't exist already
+        prepare_utils.write_keyring(self.osd_id, self.cephx_secret)
+
+    def prepare(self) -> None:
+        raise NotImplementedError()
+
+    def prepare_dmcrypt(self) -> None:
+        raise NotImplementedError()
+
+    def get_cluster_fsid(self) -> str:
+        """
+        Allows using --cluster-fsid as an argument, but can fallback to reading
+        from ceph.conf if that is unset (the default behavior).
+        """
+        if self.args.cluster_fsid:
+            return self.args.cluster_fsid
+        else:
+            return conf.ceph.get('global', 'fsid')
+
+    def get_osd_path(self) -> str:
+        return '/var/lib/ceph/osd/%s-%s/' % (conf.cluster, self.osd_id)
+
+    def build_osd_mkfs_cmd(self) -> List[str]:
+        self.supplementary_command = [
+            '--osd-data', self.osd_path,
+            '--osd-uuid', self.osd_fsid,
+            '--setuser', 'ceph',
+            '--setgroup', 'ceph'
+        ]
+        self.osd_mkfs_cmd = [
+            'ceph-osd',
+            '--cluster', conf.cluster,
+            '--osd-objectstore', self.objectstore,
+            '--mkfs',
+            '-i', self.osd_id,
+            '--monmap', self.monmap,
+        ]
+        if self.cephx_secret is not None:
+            self.osd_mkfs_cmd.extend(['--keyfile', '-'])
+        try:
+            self.add_objectstore_opts()
+        except NotImplementedError:
+            logger.info("No specific objectstore options to add.")
+
+        self.osd_mkfs_cmd.extend(self.supplementary_command)
+        return self.osd_mkfs_cmd
+
+    def osd_mkfs(self) -> None:
+        self.osd_path = self.get_osd_path()
+        self.monmap = os.path.join(self.osd_path, 'activate.monmap')
+        cmd = self.build_osd_mkfs_cmd()
+
+        system.chown(self.osd_path)
+        """
+        When running in containers the --mkfs on raw device sometimes fails
+        to acquire a lock through flock() on the device because systemd-udevd holds one temporarily.
+        See KernelDevice.cc and _lock() to understand how ceph-osd acquires the lock.
+        Because this is really transient, we retry up to 5 times and wait for 1 sec in-between
+        """
+        for retry in range(5):
+            _, _, returncode = process.call(cmd,
+                                            stdin=self.cephx_secret,
+                                            terminal_verbose=True,
+                                            show_command=True)
+            if returncode == 0:
+                break
+            else:
+                if returncode == errno.EWOULDBLOCK:
+                    time.sleep(1)
+                    logger.info('disk is held by another process, '
+                                'trying to mkfs again... (%s/5 attempt)' %
+                                retry)
+                    continue
+                else:
+                    raise RuntimeError('Command failed with exit code %s: %s' %
+                                       (returncode, ' '.join(cmd)))
+
+    def activate(self) -> None:
+        raise NotImplementedError()
diff --git a/src/ceph-volume/ceph_volume/objectstore/bluestore.py b/src/ceph-volume/ceph_volume/objectstore/bluestore.py
new file mode 100644 (file)
index 0000000..e9b0e95
--- /dev/null
@@ -0,0 +1,61 @@
+import logging
+import os
+from .baseobjectstore import BaseObjectStore
+from ceph_volume.util import system
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    import argparse
+
+logger = logging.getLogger(__name__)
+
+
+class BlueStore(BaseObjectStore):
+    def __init__(self, args: "argparse.Namespace") -> None:
+        super().__init__(args)
+        self.args: "argparse.Namespace" = args
+        self.objectstore = 'bluestore'
+        self.osd_id: str = ''
+        self.osd_fsid: str = ''
+        self.osd_path: str = ''
+        self.key: Optional[str] = None
+        self.block_device_path: str = ''
+        self.wal_device_path: str = ''
+        self.db_device_path: str = ''
+
+    def add_objectstore_opts(self) -> None:
+        """
+        Create the files for the OSD to function. A normal call will look like:
+
+            ceph-osd --cluster ceph --mkfs --mkkey -i 0 \
+                    --monmap /var/lib/ceph/osd/ceph-0/activate.monmap \
+                    --osd-data /var/lib/ceph/osd/ceph-0 \
+                    --osd-uuid 8d208665-89ae-4733-8888-5d3bfbeeec6c \
+                    --keyring /var/lib/ceph/osd/ceph-0/keyring \
+                    --setuser ceph --setgroup ceph
+
+        In some cases it is required to use the keyring, when it is passed
+        in as a keyword argument it is used as part of the ceph-osd command
+        """
+
+        if self.wal_device_path:
+            self.osd_mkfs_cmd.extend(
+                ['--bluestore-block-wal-path', self.wal_device_path]
+            )
+            system.chown(self.wal_device_path)
+
+        if self.db_device_path:
+            self.osd_mkfs_cmd.extend(
+                ['--bluestore-block-db-path', self.db_device_path]
+            )
+            system.chown(self.db_device_path)
+
+        if self.get_osdspec_affinity():
+            self.osd_mkfs_cmd.extend(['--osdspec-affinity',
+                                      self.get_osdspec_affinity()])
+
+    def unlink_bs_symlinks(self) -> None:
+        for link_name in ['block', 'block.db', 'block.wal']:
+            link_path = os.path.join(self.osd_path, link_name)
+            if os.path.exists(link_path):
+                os.unlink(os.path.join(self.osd_path, link_name))
diff --git a/src/ceph-volume/ceph_volume/objectstore/lvmbluestore.py b/src/ceph-volume/ceph_volume/objectstore/lvmbluestore.py
new file mode 100644 (file)
index 0000000..a76502e
--- /dev/null
@@ -0,0 +1,490 @@
+import json
+import logging
+import os
+from ceph_volume import conf, terminal, decorators, configuration, process
+from ceph_volume.api import lvm as api
+from ceph_volume.util import prepare as prepare_utils
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.util import system, disk
+from ceph_volume.systemd import systemctl
+from ceph_volume.devices.lvm.common import rollback_osd
+from ceph_volume.devices.lvm.listing import direct_report
+from .bluestore import BlueStore
+from typing import Dict, Any, Optional, List, Tuple, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    import argparse
+    from ceph_volume.api.lvm import Volume
+
+logger = logging.getLogger(__name__)
+
+
+class LvmBlueStore(BlueStore):
+    def __init__(self, args: "argparse.Namespace") -> None:
+        super().__init__(args)
+        self.tags: Dict[str, Any] = {}
+        self.block_lv: Optional["Volume"] = None
+
+    def pre_prepare(self) -> None:
+        if self.encrypted:
+            self.secrets['dmcrypt_key'] = encryption_utils.create_dmcrypt_key()
+
+        cluster_fsid = self.get_cluster_fsid()
+
+        self.osd_fsid = self.args.osd_fsid or system.generate_uuid()
+        crush_device_class = self.args.crush_device_class
+        if crush_device_class:
+            self.secrets['crush_device_class'] = crush_device_class
+        # reuse a given ID if it exists, otherwise create a new ID
+        self.osd_id = prepare_utils.create_id(self.osd_fsid,
+                                              json.dumps(self.secrets),
+                                              osd_id=self.args.osd_id)
+        self.tags = {
+            'ceph.osd_fsid': self.osd_fsid,
+            'ceph.osd_id': self.osd_id,
+            'ceph.cluster_fsid': cluster_fsid,
+            'ceph.cluster_name': conf.cluster,
+            'ceph.crush_device_class': crush_device_class,
+            'ceph.osdspec_affinity': self.get_osdspec_affinity()
+        }
+
+        try:
+            vg_name, lv_name = self.args.data.split('/')
+            self.block_lv = api.get_single_lv(filters={'lv_name': lv_name,
+                                                       'vg_name': vg_name})
+        except ValueError:
+            self.block_lv = None
+
+        if not self.block_lv:
+            self.block_lv = self.prepare_data_device('block', self.osd_fsid)
+        self.block_device_path = self.block_lv.__dict__['lv_path']
+
+        self.tags['ceph.block_device'] = self.block_lv.__dict__['lv_path']
+        self.tags['ceph.block_uuid'] = self.block_lv.__dict__['lv_uuid']
+        self.tags['ceph.cephx_lockbox_secret'] = self.cephx_lockbox_secret
+        self.tags['ceph.encrypted'] = self.encrypted
+        self.tags['ceph.vdo'] = api.is_vdo(self.block_lv.__dict__['lv_path'])
+
+    def prepare_data_device(self,
+                            device_type: str,
+                            osd_uuid: str) -> Optional["Volume"]:
+        """
+        Check if ``arg`` is a device or partition to create an LV out of it
+        with a distinct volume group name, assigning LV tags on it and
+        ultimately, returning the logical volume object.  Failing to detect
+        a device or partition will result in error.
+
+        :param arg: The value of ``--data`` when parsing args
+        :param device_type: Usually ``block``
+        :param osd_uuid: The OSD uuid
+        """
+
+        device = self.args.data
+        if disk.is_partition(device) or disk.is_device(device):
+            # we must create a vg, and then a single lv
+            lv_name_prefix = "osd-{}".format(device_type)
+            kwargs = {
+                'device': device,
+                'tags': {'ceph.type': device_type},
+                'slots': self.args.data_slots,
+                }
+            logger.debug('data device size: {}'.format(self.args.data_size))
+            if self.args.data_size != 0:
+                kwargs['size'] = self.args.data_size
+            return api.create_lv(
+                lv_name_prefix,
+                osd_uuid,
+                **kwargs)
+        else:
+            error = [
+                'Cannot use device ({}).'.format(device),
+                'A vg/lv path or an existing device is needed']
+            raise RuntimeError(' '.join(error))
+
+    def safe_prepare(self,
+                     args: Optional["argparse.Namespace"] = None) -> None:
+        """
+        An intermediate step between `main()` and `prepare()` so that we can
+        capture the `self.osd_id` in case we need to rollback
+
+        :param args: Injected args, usually from `lvm create` which compounds
+                     both `prepare` and `create`
+        """
+        if args is not None:
+            self.args = args
+
+        try:
+            vgname, lvname = self.args.data.split('/')
+            lv = api.get_single_lv(filters={'lv_name': lvname,
+                                            'vg_name': vgname})
+        except ValueError:
+            lv = None
+
+        if api.is_ceph_device(lv):
+            logger.info("device {} is already used".format(self.args.data))
+            raise RuntimeError("skipping {}, it is already prepared".format(
+                self.args.data))
+        try:
+            self.prepare()
+        except Exception:
+            logger.exception('lvm prepare was unable to complete')
+            logger.info('will rollback OSD ID creation')
+            rollback_osd(self.args, self.osd_id)
+            raise
+        terminal.success("ceph-volume lvm prepare successful for: %s" %
+                         self.args.data)
+
+    @decorators.needs_root
+    def prepare(self) -> None:
+        # 1/
+        # Need to be reworked (move it to the parent class + call super()? )
+        self.pre_prepare()
+
+        # 2/
+        self.wal_device_path, wal_uuid, tags = self.setup_device(
+            'wal',
+            self.args.block_wal,
+            self.tags,
+            self.args.block_wal_size,
+            self.args.block_wal_slots)
+        self.db_device_path, db_uuid, tags = self.setup_device(
+            'db',
+            self.args.block_db,
+            self.tags,
+            self.args.block_db_size,
+            self.args.block_db_slots)
+
+        self.tags['ceph.type'] = 'block'
+        self.block_lv.set_tags(self.tags)  # type: ignore
+
+        # 3/ encryption-only operations
+        if self.secrets.get('dmcrypt_key'):
+            self.prepare_dmcrypt()
+
+        # 4/ osd_prepare req
+        self.prepare_osd_req()
+
+        # 5/ bluestore mkfs
+        # prepare the osd filesystem
+        self.osd_mkfs()
+
+    def prepare_dmcrypt(self) -> None:
+        # If encrypted, there is no need to create the lockbox keyring file
+        # because bluestore re-creates the files and does not have support
+        # for other files like the custom lockbox one. This will need to be
+        # done on activation. Format and open ('decrypt' devices) and
+        # re-assign the device and journal variables so that the rest of the
+        # process can use the mapper paths
+        key = self.secrets['dmcrypt_key']
+
+        self.block_device_path = \
+            self.luks_format_and_open(key,
+                                      self.block_device_path,
+                                      'block',
+                                      self.tags)
+        self.wal_device_path = self.luks_format_and_open(key,
+                                                         self.wal_device_path,
+                                                         'wal',
+                                                         self.tags)
+        self.db_device_path = self.luks_format_and_open(key,
+                                                        self.db_device_path,
+                                                        'db',
+                                                        self.tags)
+
+    def luks_format_and_open(self,
+                             key: Optional[str],
+                             device: str,
+                             device_type: str,
+                             tags: Dict[str, Any]) -> str:
+        """
+        Helper for devices that are encrypted. The operations needed for
+        block, db, wal devices are all the same
+        """
+        if not device:
+            return ''
+        tag_name = 'ceph.%s_uuid' % device_type
+        uuid = tags[tag_name]
+        # format data device
+        encryption_utils.luks_format(
+            key,
+            device
+        )
+        encryption_utils.luks_open(
+            key,
+            device,
+            uuid
+        )
+
+        return '/dev/mapper/%s' % uuid
+
+    def setup_device(self,
+                     device_type: str,
+                     device_name: str,
+                     tags: Dict[str, Any],
+                     size: int,
+                     slots: int) -> Tuple[str, str, Dict[str, Any]]:
+        """
+        Check if ``device`` is an lv, if so, set the tags, making sure to
+        update the tags with the lv_uuid and lv_path which the incoming tags
+        will not have.
+
+        If the device is not a logical volume, then retrieve the partition UUID
+        by querying ``blkid``
+        """
+        if device_name is None:
+            return '', '', tags
+        tags['ceph.type'] = device_type
+        tags['ceph.vdo'] = api.is_vdo(device_name)
+
+        try:
+            vg_name, lv_name = device_name.split('/')
+            lv = api.get_single_lv(filters={'lv_name': lv_name,
+                                            'vg_name': vg_name})
+        except ValueError:
+            lv = None
+
+        if lv:
+            lv_uuid = lv.lv_uuid
+            path = lv.lv_path
+            tags['ceph.%s_uuid' % device_type] = lv_uuid
+            tags['ceph.%s_device' % device_type] = path
+            lv.set_tags(tags)
+        elif disk.is_device(device_name):
+            # We got a disk, create an lv
+            lv_type = "osd-{}".format(device_type)
+            name_uuid = system.generate_uuid()
+            kwargs = {
+                'device': device_name,
+                'tags': tags,
+                'slots': slots
+            }
+            # TODO use get_block_db_size and co here to get configured size in
+            # conf file
+            if size != 0:
+                kwargs['size'] = size
+            lv = api.create_lv(
+                lv_type,
+                name_uuid,
+                **kwargs)
+            path = lv.lv_path
+            tags['ceph.{}_device'.format(device_type)] = path
+            tags['ceph.{}_uuid'.format(device_type)] = lv.lv_uuid
+            lv_uuid = lv.lv_uuid
+            lv.set_tags(tags)
+        else:
+            # otherwise assume this is a regular disk partition
+            name_uuid = self.get_ptuuid(device_name)
+            path = device_name
+            tags['ceph.%s_uuid' % device_type] = name_uuid
+            tags['ceph.%s_device' % device_type] = path
+            lv_uuid = name_uuid
+        return path, lv_uuid, tags
+
+    def get_osd_device_path(self,
+                            osd_lvs: List["Volume"],
+                            device_type: str,
+                            dmcrypt_secret: Optional[str] =
+                            None) -> Optional[str]:
+        """
+        ``device_type`` can be one of ``db``, ``wal`` or ``block`` so that we
+        can query LVs on system and fallback to querying the uuid if that is
+        not present.
+
+        Return a path if possible, failing to do that a ``None``, since some of
+        these devices are optional.
+        """
+        osd_block_lv = None
+        for lv in osd_lvs:
+            if lv.tags.get('ceph.type') == 'block':
+                osd_block_lv = lv
+                break
+        if osd_block_lv:
+            is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
+            logger.debug('Found block device (%s) with encryption: %s',
+                         osd_block_lv.name, is_encrypted)
+            uuid_tag = 'ceph.%s_uuid' % device_type
+            device_uuid = osd_block_lv.tags.get(uuid_tag)
+            if not device_uuid:
+                return None
+
+        device_lv: Optional["Volume"] = None
+        for lv in osd_lvs:
+            if lv.tags.get('ceph.type') == device_type:
+                device_lv = lv
+                break
+        if device_lv:
+            if is_encrypted:
+                encryption_utils.luks_open(dmcrypt_secret,
+                                           device_lv.__dict__['lv_path'],
+                                           device_uuid)
+                return '/dev/mapper/%s' % device_uuid
+            return device_lv.__dict__['lv_path']
+
+        # this could be a regular device, so query it with blkid
+        physical_device = disk.get_device_from_partuuid(device_uuid)
+        if physical_device:
+            if is_encrypted:
+                encryption_utils.luks_open(dmcrypt_secret,
+                                           physical_device,
+                                           device_uuid)
+                return '/dev/mapper/%s' % device_uuid
+            return physical_device
+
+        raise RuntimeError('could not find %s with uuid %s' % (device_type,
+                                                               device_uuid))
+
+    def _activate(self,
+                  osd_lvs: List["Volume"],
+                  no_systemd: bool = False,
+                  no_tmpfs: bool = False) -> None:
+        for lv in osd_lvs:
+            if lv.tags.get('ceph.type') == 'block':
+                osd_block_lv = lv
+                break
+        else:
+            raise RuntimeError('could not find a bluestore OSD to activate')
+
+        is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
+        dmcrypt_secret = None
+        osd_id = osd_block_lv.tags['ceph.osd_id']
+        conf.cluster = osd_block_lv.tags['ceph.cluster_name']
+        osd_fsid = osd_block_lv.tags['ceph.osd_fsid']
+        configuration.load_ceph_conf_path(
+            osd_block_lv.tags['ceph.cluster_name'])
+        configuration.load()
+
+        # mount on tmpfs the osd directory
+        self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
+        if not system.path_is_mounted(self.osd_path):
+            # mkdir -p and mount as tmpfs
+            prepare_utils.create_osd_path(osd_id, tmpfs=not no_tmpfs)
+
+        # XXX This needs to be removed once ceph-bluestore-tool can deal with
+        # symlinks that exist in the osd dir
+        self.unlink_bs_symlinks()
+
+        # encryption is handled here, before priming the OSD dir
+        if is_encrypted:
+            osd_lv_path = '/dev/mapper/%s' % osd_block_lv.__dict__['lv_uuid']
+            lockbox_secret = osd_block_lv.tags['ceph.cephx_lockbox_secret']
+            encryption_utils.write_lockbox_keyring(osd_id,
+                                                   osd_fsid,
+                                                   lockbox_secret)
+            dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
+            encryption_utils.luks_open(dmcrypt_secret,
+                                       osd_block_lv.__dict__['lv_path'],
+                                       osd_block_lv.__dict__['lv_uuid'])
+        else:
+            osd_lv_path = osd_block_lv.__dict__['lv_path']
+
+        db_device_path = \
+            self.get_osd_device_path(osd_lvs, 'db',
+                                     dmcrypt_secret=dmcrypt_secret)
+        wal_device_path = \
+            self.get_osd_device_path(osd_lvs,
+                                     'wal',
+                                     dmcrypt_secret=dmcrypt_secret)
+
+        # Once symlinks are removed, the osd dir can be 'primed again.
+        # chown first, regardless of what currently exists so that
+        # ``prime-osd-dir`` can succeed even if permissions are
+        # somehow messed up.
+        system.chown(self.osd_path)
+        prime_command = [
+            'ceph-bluestore-tool', '--cluster=%s' % conf.cluster,
+            'prime-osd-dir', '--dev', osd_lv_path,
+            '--path', self.osd_path, '--no-mon-config']
+
+        process.run(prime_command)
+        # always re-do the symlink regardless if it exists, so that the block,
+        # block.wal, and block.db devices that may have changed can be mapped
+        # correctly every time
+        process.run(['ln',
+                     '-snf',
+                     osd_lv_path,
+                     os.path.join(self.osd_path, 'block')])
+        system.chown(os.path.join(self.osd_path, 'block'))
+        system.chown(self.osd_path)
+        if db_device_path:
+            destination = os.path.join(self.osd_path, 'block.db')
+            process.run(['ln', '-snf', db_device_path, destination])
+            system.chown(db_device_path)
+            system.chown(destination)
+        if wal_device_path:
+            destination = os.path.join(self.osd_path, 'block.wal')
+            process.run(['ln', '-snf', wal_device_path, destination])
+            system.chown(wal_device_path)
+            system.chown(destination)
+
+        if no_systemd is False:
+            # enable the ceph-volume unit for this OSD
+            systemctl.enable_volume(osd_id, osd_fsid, 'lvm')
+
+            # enable the OSD
+            systemctl.enable_osd(osd_id)
+
+            # start the OSD
+            systemctl.start_osd(osd_id)
+        terminal.success("ceph-volume lvm activate successful for osd ID: %s" %
+                         osd_id)
+
+    @decorators.needs_root
+    def activate_all(self) -> None:
+        listed_osds = direct_report()
+        osds = {}
+        for osd_id, devices in listed_osds.items():
+            # the metadata for all devices in each OSD will contain
+            # the FSID which is required for activation
+            for device in devices:
+                fsid = device.get('tags', {}).get('ceph.osd_fsid')
+                if fsid:
+                    osds[fsid] = osd_id
+                    break
+        if not osds:
+            terminal.warning('Was unable to find any OSDs to activate')
+            terminal.warning('Verify OSDs are present with '
+                             '"ceph-volume lvm list"')
+            return
+        for osd_fsid, osd_id in osds.items():
+            if not self.args.no_systemd and systemctl.osd_is_active(osd_id):
+                terminal.warning(
+                    'OSD ID %s FSID %s process is active. '
+                    'Skipping activation' % (osd_id, osd_fsid)
+                )
+            else:
+                terminal.info('Activating OSD ID %s FSID %s' % (osd_id,
+                                                                osd_fsid))
+                self.activate(self.args, osd_id=osd_id, osd_fsid=osd_fsid)
+
+    @decorators.needs_root
+    def activate(self,
+                 args: Optional["argparse.Namespace"] = None,
+                 osd_id: Optional[str] = None,
+                 osd_fsid: Optional[str] = None) -> None:
+        """
+        :param args: The parsed arguments coming from the CLI
+        :param osd_id: When activating all, this gets populated with an
+                       existing OSD ID
+        :param osd_fsid: When activating all, this gets populated with an
+                         existing OSD FSID
+        """
+        osd_id = osd_id if osd_id else self.args.osd_id
+        osd_fsid = osd_fsid if osd_fsid else self.args.osd_fsid
+
+        if osd_id and osd_fsid:
+            tags = {'ceph.osd_id': osd_id, 'ceph.osd_fsid': osd_fsid}
+        elif not osd_id and osd_fsid:
+            tags = {'ceph.osd_fsid': osd_fsid}
+        elif osd_id and not osd_fsid:
+            raise RuntimeError('could not activate osd.{}, please provide the '
+                               'osd_fsid too'.format(osd_id))
+        else:
+            raise RuntimeError('Please provide both osd_id and osd_fsid')
+        lvs = api.get_lvs(tags=tags)
+        if not lvs:
+            raise RuntimeError('could not find osd.%s with osd_fsid %s' %
+                               (osd_id, osd_fsid))
+
+        self._activate(lvs, self.args.no_systemd, getattr(self.args,
+                                                          'no_tmpfs',
+                                                          False))
diff --git a/src/ceph-volume/ceph_volume/objectstore/rawbluestore.py b/src/ceph-volume/ceph_volume/objectstore/rawbluestore.py
new file mode 100644 (file)
index 0000000..5ac1661
--- /dev/null
@@ -0,0 +1,181 @@
+import logging
+import json
+import os
+from .bluestore import BlueStore
+from ceph_volume import terminal, decorators, conf, process
+from ceph_volume.util import system, disk
+from ceph_volume.util import prepare as prepare_utils
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.devices.lvm.common import rollback_osd
+from ceph_volume.devices.raw.list import direct_report
+from typing import Any, Dict, List, Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    import argparse
+
+logger = logging.getLogger(__name__)
+
+
+class RawBlueStore(BlueStore):
+    def __init__(self, args: "argparse.Namespace") -> None:
+        super().__init__(args)
+        if hasattr(self.args, 'data'):
+            self.block_device_path = self.args.data
+        if hasattr(self.args, 'block_db'):
+            self.db_device_path = self.args.block_db
+        if hasattr(self.args, 'block_wal'):
+            self.wal_device_path = self.args.block_wal
+
+    def prepare_dmcrypt(self) -> None:
+        """
+        Helper for devices that are encrypted. The operations needed for
+        block, db, wal, devices are all the same
+        """
+        key = self.secrets['dmcrypt_key']
+
+        for device, device_type in [(self.block_device_path, 'block'),
+                                    (self.db_device_path, 'db'),
+                                    (self.wal_device_path, 'wal')]:
+
+            if device:
+                kname = disk.lsblk(device)['KNAME']
+                mapping = 'ceph-{}-{}-{}-dmcrypt'.format(self.osd_fsid,
+                                                         kname,
+                                                         device_type)
+                # format data device
+                encryption_utils.luks_format(
+                    key,
+                    device
+                )
+                encryption_utils.luks_open(
+                    key,
+                    device,
+                    mapping
+                )
+                self.__dict__[f'{device_type}_device_path'] = \
+                    '/dev/mapper/{}'.format(mapping)
+
+    def safe_prepare(self,
+                     args: Optional["argparse.Namespace"] = None) -> None:
+        """
+        An intermediate step between `main()` and `prepare()` so that we can
+        capture the `self.osd_id` in case we need to rollback
+
+        :param args: Injected args, usually from `raw create` which compounds
+                     both `prepare` and `create`
+        """
+        if args is not None:
+            self.args = args  # This should be moved (to __init__ ?)
+        try:
+            self.prepare()
+        except Exception:
+            logger.exception('raw prepare was unable to complete')
+            logger.info('will rollback OSD ID creation')
+            rollback_osd(self.args, self.osd_id)
+            raise
+        dmcrypt_log = 'dmcrypt' if hasattr(args, 'dmcrypt') else 'clear'
+        terminal.success("ceph-volume raw {} prepare "
+                         "successful for: {}".format(dmcrypt_log,
+                                                     self.args.data))
+
+    @decorators.needs_root
+    def prepare(self) -> None:
+        if self.encrypted:
+            self.secrets['dmcrypt_key'] = \
+                os.getenv('CEPH_VOLUME_DMCRYPT_SECRET')
+        self.osd_fsid = system.generate_uuid()
+        crush_device_class = self.args.crush_device_class
+        if crush_device_class:
+            self.secrets['crush_device_class'] = crush_device_class
+
+        tmpfs = not self.args.no_tmpfs
+        if self.args.block_wal:
+            self.wal = self.args.block_wal
+        if self.args.block_db:
+            self.db = self.args.block_db
+
+        # reuse a given ID if it exists, otherwise create a new ID
+        self.osd_id = prepare_utils.create_id(
+            self.osd_fsid, json.dumps(self.secrets))
+
+        if self.secrets.get('dmcrypt_key'):
+            self.prepare_dmcrypt()
+
+        self.prepare_osd_req(tmpfs=tmpfs)
+
+        # prepare the osd filesystem
+        self.osd_mkfs()
+
+    def _activate(self,
+                  meta: Dict[str, Any],
+                  tmpfs: bool) -> None:
+        # find the osd
+        osd_id = meta['osd_id']
+        osd_uuid = meta['osd_uuid']
+
+        # mount on tmpfs the osd directory
+        self.osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
+        if not system.path_is_mounted(self.osd_path):
+            # mkdir -p and mount as tmpfs
+            prepare_utils.create_osd_path(osd_id, tmpfs=tmpfs)
+
+        # XXX This needs to be removed once ceph-bluestore-tool can deal with
+        # symlinks that exist in the osd dir
+
+        self.unlink_bs_symlinks()
+
+        # Once symlinks are removed, the osd dir can be 'primed again. chown
+        # first, regardless of what currently exists so that ``prime-osd-dir``
+        # can succeed even if permissions are somehow messed up
+        system.chown(self.osd_path)
+        prime_command = [
+            'ceph-bluestore-tool',
+            'prime-osd-dir',
+            '--path', self.osd_path,
+            '--no-mon-config',
+            '--dev', meta['device'],
+        ]
+        process.run(prime_command)
+
+        # always re-do the symlink regardless if it exists, so that the block,
+        # block.wal, and block.db devices that may have changed can be mapped
+        # correctly every time
+        prepare_utils.link_block(meta['device'], osd_id)
+
+        if 'device_db' in meta:
+            prepare_utils.link_db(meta['device_db'], osd_id, osd_uuid)
+
+        if 'device_wal' in meta:
+            prepare_utils.link_wal(meta['device_wal'], osd_id, osd_uuid)
+
+        system.chown(self.osd_path)
+        terminal.success("ceph-volume raw activate "
+                         "successful for osd ID: %s" % osd_id)
+
+    @decorators.needs_root
+    def activate(self,
+                 devs: List[str],
+                 start_osd_id: str,
+                 start_osd_uuid: str,
+                 tmpfs: bool) -> None:
+        """
+        :param args: The parsed arguments coming from the CLI
+        """
+        assert devs or start_osd_id or start_osd_uuid
+        found = direct_report(devs)
+
+        activated_any = False
+        for osd_uuid, meta in found.items():
+            osd_id = meta['osd_id']
+            if start_osd_id is not None and str(osd_id) != str(start_osd_id):
+                continue
+            if start_osd_uuid is not None and osd_uuid != start_osd_uuid:
+                continue
+            logger.info('Activating osd.%s uuid %s cluster %s' % (
+                osd_id, osd_uuid, meta['ceph_fsid']))
+            self._activate(meta,
+                           tmpfs=tmpfs)
+            activated_any = True
+
+        if not activated_any:
+            raise RuntimeError('did not find any matching OSD to activate')
index 75073c51aca258b73dd403220eca78ffdaa55fc2..51072da120a911ccb02c01895a7ab32140b048df 100644 (file)
@@ -54,14 +54,14 @@ class TestBatch(object):
                        devices=devs,
                        db_devices=[],
                        wal_devices=[],
-                       bluestore=True,
+                       objectstore='bluestore',
                        block_db_size="1G",
                        dmcrypt=True,
                        data_allocate_fraction=1.0,
                       )
         b = batch.Batch([])
-        plan = b.get_plan(args)
         b.args = args
+        plan = b.get_deployment_layout()
         b.report(plan)
 
     @pytest.mark.parametrize('format_', ['json', 'json-pretty'])
index e936cab895e389dae4975f95adbd25c0c6ca5934..8f49dac721bccf0816a0ddcaeeffaba49dec6c44 100644 (file)
@@ -5,7 +5,7 @@ from ceph_volume import terminal, decorators, process
 from ceph_volume.util.device import Device
 from ceph_volume.util import disk
 from ceph_volume.util.encryption import set_dmcrypt_no_workqueue
-from ceph_volume import process, conf
+
 
 def valid_osd_id(val):
     return str(int(val))
index 576c086170847d70cca03eb5946aaf9bb9023071..9c863b83d938f52ba044b8693569b64f3c8dc1b0 100644 (file)
@@ -4,11 +4,9 @@ but also a compounded ("single call") helper to do them in order. Some plugins
 may want to change some part of the process, while others might want to consume
 the single-call helper
 """
-import errno
 import os
 import logging
 import json
-import time
 from ceph_volume import process, conf, terminal
 from ceph_volume.util import system, constants, str_to_int, disk
 
@@ -379,82 +377,3 @@ def get_monmap(osd_id):
         '--keyring', bootstrap_keyring,
         'mon', 'getmap', '-o', monmap_destination
     ])
-
-
-def get_osdspec_affinity():
-    return os.environ.get('CEPH_VOLUME_OSDSPEC_AFFINITY', '')
-
-
-def osd_mkfs_bluestore(osd_id, fsid, keyring=None, wal=False, db=False):
-    """
-    Create the files for the OSD to function. A normal call will look like:
-
-          ceph-osd --cluster ceph --mkfs --mkkey -i 0 \
-                   --monmap /var/lib/ceph/osd/ceph-0/activate.monmap \
-                   --osd-data /var/lib/ceph/osd/ceph-0 \
-                   --osd-uuid 8d208665-89ae-4733-8888-5d3bfbeeec6c \
-                   --keyring /var/lib/ceph/osd/ceph-0/keyring \
-                   --setuser ceph --setgroup ceph
-
-    In some cases it is required to use the keyring, when it is passed in as
-    a keyword argument it is used as part of the ceph-osd command
-    """
-    path = '/var/lib/ceph/osd/%s-%s/' % (conf.cluster, osd_id)
-    monmap = os.path.join(path, 'activate.monmap')
-
-    system.chown(path)
-
-    base_command = [
-        'ceph-osd',
-        '--cluster', conf.cluster,
-        '--osd-objectstore', 'bluestore',
-        '--mkfs',
-        '-i', osd_id,
-        '--monmap', monmap,
-    ]
-
-    supplementary_command = [
-        '--osd-data', path,
-        '--osd-uuid', fsid,
-        '--setuser', 'ceph',
-        '--setgroup', 'ceph'
-    ]
-
-    if keyring is not None:
-        base_command.extend(['--keyfile', '-'])
-
-    if wal:
-        base_command.extend(
-            ['--bluestore-block-wal-path', wal]
-        )
-        system.chown(wal)
-
-    if db:
-        base_command.extend(
-            ['--bluestore-block-db-path', db]
-        )
-        system.chown(db)
-
-    if get_osdspec_affinity():
-        base_command.extend(['--osdspec-affinity', get_osdspec_affinity()])
-
-    command = base_command + supplementary_command
-
-    """
-    When running in containers the --mkfs on raw device sometimes fails
-    to acquire a lock through flock() on the device because systemd-udevd holds one temporarily.
-    See KernelDevice.cc and _lock() to understand how ceph-osd acquires the lock.
-    Because this is really transient, we retry up to 5 times and wait for 1 sec in-between
-    """
-    for retry in range(5):
-        _, _, returncode = process.call(command, stdin=keyring, terminal_verbose=True, show_command=True)
-        if returncode == 0:
-            break
-        else:
-            if returncode == errno.EWOULDBLOCK:
-                    time.sleep(1)
-                    logger.info('disk is held by another process, trying to mkfs again... (%s/5 attempt)' % retry)
-                    continue
-            else:
-                raise RuntimeError('Command failed with exit code %s: %s' % (returncode, ' '.join(command)))
-