if spec.get('error'):
return spec.get('message')
dg_name = spec.get('osdspec')
- for osd in spec.get('data', {}).get('osds', []):
- db_path = '-'
- wal_path = '-'
- block_db = osd.get('block.db', {}).get('path')
- block_wal = osd.get('block.wal', {}).get('path')
- block_data = osd.get('data', {}).get('path', '')
+ for osd in spec.get('data', []):
+ db_path = osd.get('block_db', '-')
+ wal_path = osd.get('block_wal', '-')
+ block_data = osd.get('data', '')
if not block_data:
continue
- if block_db:
- db_path = spec.get('data', {}).get('vg', {}).get('devices', [])
- if block_wal:
- wal_path = spec.get('data', {}).get('wal_vg', {}).get('devices', [])
table.add_row(('osd', dg_name, host, block_data, db_path, wal_path))
return table.get_string()
from orchestrator import raise_if_exception, Completion, ProgressReference
from orchestrator import InventoryHost, DaemonDescription, ServiceDescription
from orchestrator import OrchestratorValidationError
-from orchestrator.module import to_format, OrchestratorCli
+from orchestrator.module import to_format, OrchestratorCli, preview_table_osd
def _test_resource(data, resource_class, extra=None):
r = m._handle_command(None, cmd)
assert r == HandleCommandResult(
retval=-2, stdout='', stderr='No orchestrator configured (try `ceph orch set backend`)')
+
+
+def test_preview_table_osd_smoke():
+ data = [
+ {
+ 'service_type': 'osd',
+ 'data':
+ {
+ 'foo host':
+ [
+ {
+ 'osdspec': 'foo',
+ 'error': '',
+ 'data':
+ [
+ {
+ "block_db": "/dev/nvme0n1",
+ "block_db_size": "66.67 GB",
+ "data": "/dev/sdb",
+ "data_size": "300.00 GB",
+ "encryption": "None"
+ },
+ {
+ "block_db": "/dev/nvme0n1",
+ "block_db_size": "66.67 GB",
+ "data": "/dev/sdc",
+ "data_size": "300.00 GB",
+ "encryption": "None"
+ },
+ {
+ "block_db": "/dev/nvme0n1",
+ "block_db_size": "66.67 GB",
+ "data": "/dev/sdd",
+ "data_size": "300.00 GB",
+ "encryption": "None"
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ preview_table_osd(data)