from __future__ import absolute_import
-from ceph.deployment.service_spec import ServiceSpec
-from test_orchestrator import TestOrchestrator as _TestOrchestrator
-from tests import mock
+import json
import pytest
+import yaml
+from ceph.deployment.service_spec import ServiceSpec
from ceph.deployment import inventory
+
+from test_orchestrator import TestOrchestrator as _TestOrchestrator
+from tests import mock
+
from orchestrator import raise_if_exception, Completion, ProgressReference
-from orchestrator import InventoryHost, DaemonDescription
+from orchestrator import InventoryHost, DaemonDescription, ServiceDescription
from orchestrator import OrchestratorValidationError
+from orchestrator.module import to_format
def _test_resource(data, resource_class, extra=None):
ServiceSpec(service_type='nfs'),
])
completion.finalize(42)
- assert completion.result == [None, None, None]
\ No newline at end of file
+ assert completion.result == [None, None, None]
+
+
+def test_yaml():
+ y = """daemon_type: crash
+daemon_id: ubuntu
+hostname: ubuntu
+status: 1
+status_desc: starting
+---
+service_type: crash
+service_name: crash
+placement:
+ host_pattern: '*'
+status:
+ container_image_id: 74803e884bea289d2d2d3ebdf6d37cd560499e955595695b1390a89800f4e37a
+ container_image_name: docker.io/ceph/daemon-base:latest-master-devel
+ created: '2020-06-10T10:37:31.051288'
+ last_refresh: '2020-06-10T10:57:40.715637'
+ running: 1
+ size: 1
+"""
+ types = (DaemonDescription, ServiceDescription)
+
+ for y, cls in zip(y.split('---\n'), types):
+ data = yaml.safe_load(y)
+ object = cls.from_json(data)
+
+ assert to_format(object, 'yaml', False, cls) == y
+ assert to_format([object], 'yaml', True, cls) == y
+
+ j = json.loads(to_format(object, 'json', False, cls))
+ assert to_format(cls.from_json(j), 'yaml', False, cls) == y
\ No newline at end of file