--- /dev/null
+# Copyright 2020, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from ansible.module_utils.basic import AnsibleModule
+try:
+ from ansible.module_utils.ca_common import exit_module, generate_ceph_cmd, is_containerized, exec_command
+except ImportError:
+ from module_utils.ca_common import exit_module, generate_ceph_cmd, is_containerized, exec_command
+import datetime
+import json
+
+
+ANSIBLE_METADATA = {
+ 'metadata_version': '1.1',
+ 'status': ['preview'],
+ 'supported_by': 'community'
+}
+
+DOCUMENTATION = '''
+---
+module: ceph_crush_rule
+short_description: Manage Ceph Crush Replicated/Erasure Rule
+version_added: "2.8"
+description:
+ - Manage Ceph Crush rule(s) creation, deletion and updates.
+options:
+ name:
+ description:
+ - name of the Ceph Crush rule.
+ required: true
+ cluster:
+ description:
+ - The ceph cluster name.
+ required: false
+ default: ceph
+ state:
+ description:
+ If 'present' is used, the module creates a rule if it doesn't
+ exist or update it if it already exists.
+ If 'absent' is used, the module will simply delete the rule.
+ If 'info' is used, the module will return all details about the
+ existing rule (json formatted).
+ required: false
+ choices: ['present', 'absent', 'info']
+ default: present
+ rule_type:
+ description:
+ - The ceph CRUSH rule type.
+ required: false
+ choices: ['replicated', 'erasure']
+ required: false
+ bucket_root:
+ description:
+ - The ceph bucket root for replicated rule.
+ required: false
+ bucket_type:
+ description:
+ - The ceph bucket type for replicated rule.
+ required: false
+ choices: ['osd', 'host', 'chassis', 'rack', 'row', 'pdu', 'pod', 'room', 'datacenter', 'zone', 'region', 'root']
+ device_class:
+ description:
+ - The ceph device class for replicated rule.
+ required: false
+ profile:
+ description:
+ - The ceph erasure profile for erasure rule.
+ required: false
+author:
+ - Dimitri Savineau <dsavinea@redhat.com>
+'''
+
+EXAMPLES = '''
+- name: create a Ceph Crush replicated rule
+ ceph_crush_rule:
+ name: foo
+ bucket_root: default
+ bucket_type: host
+ device_class: ssd
+ rule_type: replicated
+
+- name: create a Ceph Crush erasure rule
+ ceph_crush_rule:
+ name: foo
+ profile: bar
+ rule_type: erasure
+
+- name: get a Ceph Crush rule information
+ ceph_crush_rule:
+ name: foo
+ state: info
+
+- name: delete a Ceph Crush rule
+ ceph_crush_rule:
+ name: foo
+ state: absent
+'''
+
+RETURN = '''# '''
+
+
+def create_rule(module, container_image=None):
+ '''
+ Create a new crush replicated/erasure rule
+ '''
+
+ cluster = module.params.get('cluster')
+ name = module.params.get('name')
+ rule_type = module.params.get('rule_type')
+ bucket_root = module.params.get('bucket_root')
+ bucket_type = module.params.get('bucket_type')
+ device_class = module.params.get('device_class')
+ profile = module.params.get('profile')
+
+ if rule_type == 'replicated':
+ args = ['create-replicated', name, bucket_root, bucket_type]
+ if device_class:
+ args.append(device_class)
+ else:
+ args = ['create-erasure', name]
+ if profile:
+ args.append(profile)
+
+ cmd = generate_ceph_cmd(['osd', 'crush', 'rule'], args, cluster=cluster, container_image=container_image)
+
+ return cmd
+
+
+def get_rule(module, container_image=None):
+ '''
+ Get existing crush rule
+ '''
+
+ cluster = module.params.get('cluster')
+ name = module.params.get('name')
+
+ args = ['dump', name, '--format=json']
+
+ cmd = generate_ceph_cmd(['osd', 'crush', 'rule'], args, cluster=cluster, container_image=container_image)
+
+ return cmd
+
+
+def remove_rule(module, container_image=None):
+ '''
+ Remove a crush rule
+ '''
+
+ cluster = module.params.get('cluster')
+ name = module.params.get('name')
+
+ args = ['rm', name]
+
+ cmd = generate_ceph_cmd(['osd', 'crush', 'rule'], args, cluster=cluster, container_image=container_image)
+
+ return cmd
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ name=dict(type='str', required=True),
+ cluster=dict(type='str', required=False, default='ceph'),
+ state=dict(type='str', required=False, choices=['present', 'absent', 'info'], default='present'),
+ rule_type=dict(type='str', required=False, choices=['replicated', 'erasure']),
+ bucket_root=dict(type='str', required=False),
+ bucket_type=dict(type='str', required=False, choices=['osd', 'host', 'chassis', 'rack', 'row', 'pdu', 'pod',
+ 'room', 'datacenter', 'zone', 'region', 'root']),
+ device_class=dict(type='str', required=False),
+ profile=dict(type='str', required=False)
+ ),
+ supports_check_mode=True,
+ required_if=[
+ ('state', 'present', ['rule_type']),
+ ('rule_type', 'replicated', ['bucket_root', 'bucket_type']),
+ ('rule_type', 'erasure', ['profile'])
+ ]
+ )
+
+ # Gather module parameters in variables
+ name = module.params.get('name')
+ state = module.params.get('state')
+ rule_type = module.params.get('rule_type')
+
+ if module.check_mode:
+ module.exit_json(
+ changed=False,
+ stdout='',
+ stderr='',
+ rc=0,
+ start='',
+ end='',
+ delta='',
+ )
+
+ startd = datetime.datetime.now()
+ changed = False
+
+ # will return either the image name or None
+ container_image = is_containerized()
+
+ if state == "present":
+ rc, cmd, out, err = exec_command(module, get_rule(module, container_image=container_image))
+ if rc != 0:
+ rc, cmd, out, err = exec_command(module, create_rule(module, container_image=container_image))
+ changed = True
+ else:
+ rule = json.loads(out)
+ if (rule['type'] == 1 and rule_type == 'erasure') or (rule['type'] == 3 and rule_type == 'replicated'):
+ module.fail_json(msg="Can not convert crush rule {} to {}".format(name, rule_type), changed=False, rc=1)
+
+ elif state == "absent":
+ rc, cmd, out, err = exec_command(module, get_rule(module, container_image=container_image))
+ if rc == 0:
+ rc, cmd, out, err = exec_command(module, remove_rule(module, container_image=container_image))
+ changed = True
+ else:
+ rc = 0
+ out = "Crush Rule {} doesn't exist".format(name)
+
+ elif state == "info":
+ rc, cmd, out, err = exec_command(module, get_rule(module, container_image=container_image))
+
+ exit_module(module=module, out=out, rc=rc, cmd=cmd, err=err, startd=startd, changed=changed)
+
+
+if __name__ == '__main__':
+ main()
--- /dev/null
+from mock.mock import patch
+import os
+import pytest
+import ca_test_common
+import ceph_crush_rule
+
+fake_cluster = 'ceph'
+fake_container_binary = 'podman'
+fake_container_image = 'quay.ceph.io/ceph/daemon:latest'
+fake_name = 'foo'
+fake_bucket_root = 'default'
+fake_bucket_type = 'host'
+fake_device_class = 'ssd'
+fake_profile = 'default'
+fake_user = 'client.admin'
+fake_keyring = '/etc/ceph/{}.{}.keyring'.format(fake_cluster, fake_user)
+
+
+class TestCephCrushRuleModule(object):
+
+ @patch('ansible.module_utils.basic.AnsibleModule.fail_json')
+ def test_without_parameters(self, m_fail_json):
+ ca_test_common.set_module_args({})
+ m_fail_json.side_effect = ca_test_common.fail_json
+
+ with pytest.raises(ca_test_common.AnsibleFailJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert result['msg'] == 'missing required arguments: name'
+
+ @patch('ansible.module_utils.basic.AnsibleModule.fail_json')
+ def test_with_name_only(self, m_fail_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name
+ })
+ m_fail_json.side_effect = ca_test_common.fail_json
+
+ with pytest.raises(ca_test_common.AnsibleFailJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert result['msg'] == 'state is present but all of the following are missing: rule_type'
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ def test_with_check_mode(self, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'replicated',
+ 'bucket_root': fake_bucket_root,
+ 'bucket_type': fake_bucket_type,
+ '_ansible_check_mode': True
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['rc'] == 0
+ assert not result['stdout']
+ assert not result['stderr']
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_create_non_existing_replicated_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'replicated',
+ 'bucket_root': fake_bucket_root,
+ 'bucket_type': fake_bucket_type
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ get_rc = 2
+ get_stderr = 'Error ENOENT: unknown crush rule \'{}\''.format(fake_name)
+ get_stdout = ''
+ create_rc = 0
+ create_stderr = ''
+ create_stdout = ''
+ m_run_command.side_effect = [
+ (get_rc, get_stdout, get_stderr),
+ (create_rc, create_stdout, create_stderr)
+ ]
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'create-replicated', fake_name, fake_bucket_root, fake_bucket_type]
+ assert result['rc'] == create_rc
+ assert result['stderr'] == create_stderr
+ assert result['stdout'] == create_stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_create_existing_replicated_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'replicated',
+ 'bucket_root': fake_bucket_root,
+ 'bucket_type': fake_bucket_type
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"rule_name":"{}","type":1,"steps":[{{"item_name":"{}"}},{{"type":"{}"}}]}}'.format(fake_name, fake_bucket_root, fake_bucket_type)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'dump', fake_name, '--format=json']
+ assert result['rc'] == 0
+ assert result['stderr'] == stderr
+ assert result['stdout'] == stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_create_non_existing_replicated_rule_device_class(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'replicated',
+ 'bucket_root': fake_bucket_root,
+ 'bucket_type': fake_bucket_type,
+ 'device_class': fake_device_class
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ get_rc = 2
+ get_stderr = 'Error ENOENT: unknown crush rule \'{}\''.format(fake_name)
+ get_stdout = ''
+ create_rc = 0
+ create_stderr = ''
+ create_stdout = ''
+ m_run_command.side_effect = [
+ (get_rc, get_stdout, get_stderr),
+ (create_rc, create_stdout, create_stderr)
+ ]
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'create-replicated', fake_name, fake_bucket_root, fake_bucket_type, fake_device_class]
+ assert result['rc'] == create_rc
+ assert result['stderr'] == create_stderr
+ assert result['stdout'] == create_stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_create_existing_replicated_rule_device_class(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'replicated',
+ 'bucket_root': fake_bucket_root,
+ 'bucket_type': fake_bucket_type,
+ 'device_class': fake_device_class
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"rule_name":"{}","type":1,"steps":[{{"item_name":"{}"}},{{"type":"{}"}}]}}'.format(fake_name, fake_bucket_root, fake_bucket_type)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'dump', fake_name, '--format=json']
+ assert result['rc'] == 0
+ assert result['stderr'] == stderr
+ assert result['stdout'] == stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_create_non_existing_erasure_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'erasure',
+ 'profile': fake_profile
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ get_rc = 2
+ get_stderr = 'Error ENOENT: unknown crush rule \'{}\''.format(fake_name)
+ get_stdout = ''
+ create_rc = 0
+ create_stderr = ''
+ create_stdout = 'created rule {} at 1'.format(fake_name)
+ m_run_command.side_effect = [
+ (get_rc, get_stdout, get_stderr),
+ (create_rc, create_stdout, create_stderr)
+ ]
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'create-erasure', fake_name, fake_profile]
+ assert result['rc'] == create_rc
+ assert result['stderr'] == create_stderr
+ assert result['stdout'] == create_stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_create_existing_erasure_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'erasure',
+ 'profile': fake_profile
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"type":3,"rule_name":"{}","steps":[{{"item_name":"default"}},{{"type":"host"}}]}}'.format(fake_name)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'dump', fake_name, '--format=json']
+ assert result['rc'] == 0
+ assert result['stderr'] == stderr
+ assert result['stdout'] == stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.fail_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_update_existing_replicated_rule(self, m_run_command, m_fail_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'replicated',
+ 'bucket_root': fake_bucket_root,
+ 'bucket_type': fake_bucket_type,
+ 'device_class': fake_device_class
+ })
+ m_fail_json.side_effect = ca_test_common.fail_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"type":3,"rule_name":"{}","steps":[{{"item_name":"default"}},{{"type":"host"}}]}}'.format(fake_name)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleFailJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ print(result)
+ assert not result['changed']
+ assert result['msg'] == 'Can not convert crush rule {} to replicated'.format(fake_name)
+ assert result['rc'] == 1
+
+ @patch('ansible.module_utils.basic.AnsibleModule.fail_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_update_existing_erasure_rule(self, m_run_command, m_fail_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'rule_type': 'erasure',
+ 'profile': fake_profile
+ })
+ m_fail_json.side_effect = ca_test_common.fail_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"type":1,"rule_name":"{}","steps":[{{"item_name":"default"}},{{"type":"host"}}]}}'.format(fake_name)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleFailJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ print(result)
+ assert not result['changed']
+ assert result['msg'] == 'Can not convert crush rule {} to erasure'.format(fake_name)
+ assert result['rc'] == 1
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_remove_non_existing_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'state': 'absent'
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 2
+ stderr = 'Error ENOENT: unknown crush rule \'{}\''.format(fake_name)
+ stdout = ''
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'dump', fake_name, '--format=json']
+ assert result['rc'] == 0
+ assert result['stderr'] == stderr
+ assert result['stdout'] == "Crush Rule {} doesn't exist".format(fake_name)
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_remove_existing_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'state': 'absent'
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ get_rc = 0
+ get_stderr = ''
+ get_stdout = '{{"rule_name":"{}","steps":[{{"item_name":"{}"}},{{"type":"{}"}}]}}'.format(fake_name, fake_bucket_root, fake_bucket_type)
+ remove_rc = 0
+ remove_stderr = ''
+ remove_stdout = ''
+ m_run_command.side_effect = [
+ (get_rc, get_stdout, get_stderr),
+ (remove_rc, remove_stdout, remove_stderr)
+ ]
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'rm', fake_name]
+ assert result['rc'] == remove_rc
+ assert result['stderr'] == remove_stderr
+ assert result['stdout'] == remove_stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_get_non_existing_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'state': 'info'
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 2
+ stderr = 'Error ENOENT: unknown crush rule \'{}\''.format(fake_name)
+ stdout = ''
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'dump', fake_name, '--format=json']
+ assert result['rc'] == rc
+ assert result['stderr'] == stderr
+ assert result['stdout'] == stdout
+
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_get_existing_rule(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'state': 'info'
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"rule_name":"{}","steps":[{{"item_name":"{}"}},{{"type":"{}"}}]}}'.format(fake_name, fake_bucket_root, fake_bucket_type)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == ['ceph', '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush', 'rule',
+ 'dump', fake_name, '--format=json']
+ assert result['rc'] == rc
+ assert result['stderr'] == stderr
+ assert result['stdout'] == stdout
+
+ @patch.dict(os.environ, {'CEPH_CONTAINER_BINARY': fake_container_binary})
+ @patch.dict(os.environ, {'CEPH_CONTAINER_IMAGE': fake_container_image})
+ @patch('ansible.module_utils.basic.AnsibleModule.exit_json')
+ @patch('ansible.module_utils.basic.AnsibleModule.run_command')
+ def test_with_container(self, m_run_command, m_exit_json):
+ ca_test_common.set_module_args({
+ 'name': fake_name,
+ 'state': 'info'
+ })
+ m_exit_json.side_effect = ca_test_common.exit_json
+ rc = 0
+ stderr = ''
+ stdout = '{{"rule_name":"{}","steps":[{{"item_name":"{}"}},{{"type":"{}"}}]}}'.format(fake_name, fake_bucket_root, fake_bucket_type)
+ m_run_command.return_value = rc, stdout, stderr
+
+ with pytest.raises(ca_test_common.AnsibleExitJson) as result:
+ ceph_crush_rule.main()
+
+ result = result.value.args[0]
+ assert not result['changed']
+ assert result['cmd'] == [fake_container_binary, 'run', '--rm', '--net=host',
+ '-v', '/etc/ceph:/etc/ceph:z',
+ '-v', '/var/lib/ceph/:/var/lib/ceph/:z',
+ '-v', '/var/log/ceph/:/var/log/ceph/:z',
+ '--entrypoint=ceph', fake_container_image,
+ '-n', fake_user, '-k', fake_keyring,
+ '--cluster', fake_cluster, 'osd', 'crush',
+ 'rule', 'dump', fake_name, '--format=json']
+ assert result['rc'] == rc
+ assert result['stderr'] == stderr
+ assert result['stdout'] == stdout