from __future__ import print_function
import logging
+import argparse
from textwrap import dedent
from ceph_volume import objectstore
from .common import prepare_parser
-
+from typing import List, Optional
logger = logging.getLogger(__name__)
help = 'Format an LVM device and associate it with an OSD'
- def __init__(self, argv, args=None):
- self.objectstore = None
+ def __init__(self, argv: Optional[List[str]] = None, args: Optional[argparse.Namespace] = None) -> None:
+ self.objectstore: Optional[objectstore.baseobjectstore.BaseObjectStore] = None
self.argv = argv
self.args = args
self.osd_id = None
- def main(self):
+ def main(self) -> None:
sub_command_help = dedent("""
Prepare an OSD by assigning an ID and FSID, registering them with the
cluster with an ID and FSID, formatting and mounting the volume, and
prog='ceph-volume lvm prepare',
description=sub_command_help,
)
+ if self.argv is None:
+ self.argv = []
if len(self.argv) == 0 and self.args is None:
print(sub_command_help)
return
if self.args.bluestore:
self.args.objectstore = 'bluestore'
self.objectstore = objectstore.mapping['LVM'][self.args.objectstore](args=self.args)
- self.objectstore.safe_prepare()
+ if self.objectstore is not None:
+ self.objectstore.safe_prepare()
+ else:
+ raise RuntimeError('Unexpected error while setting objectore backend.')