assert not results.success
assert params['error_msg'] in rs['results'][0]['msg']
assert rs['results'][0]['conflicting_share_id'] in params['conflicts']
+
+
+def test_apply_share_with_qos(thandler):
+ cluster = _cluster(
+ cluster_id='qoscluster',
+ auth_mode=smb.enums.AuthMode.USER,
+ user_group_settings=[
+ smb.resources.UserGroupSource(
+ source_type=smb.resources.UserGroupSourceType.EMPTY,
+ ),
+ ],
+ )
+ share = smb.resources.Share(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ name='QoS Test Share',
+ cephfs=_cephfs(
+ volume='cephfs',
+ path='/',
+ qos=smb.resources.QoSConfig(
+ read_iops_limit=100,
+ write_iops_limit=200,
+ read_bw_limit=1048576,
+ write_bw_limit=2097152,
+ read_delay_max=20,
+ write_delay_max=30,
+ ),
+ ),
+ )
+ rg = thandler.apply([cluster, share])
+ assert rg.success, rg.to_simplified()
+
+ # Verify QoS settings were stored
+ share_dict = thandler.internal_store.data[
+ ('shares', 'qoscluster.qostest')
+ ]
+ assert share_dict['cephfs']['qos']['read_iops_limit'] == 100
+ assert share_dict['cephfs']['qos']['write_iops_limit'] == 200
+ assert share_dict['cephfs']['qos']['read_bw_limit'] == 1048576
+ assert share_dict['cephfs']['qos']['write_bw_limit'] == 2097152
+ assert share_dict['cephfs']['qos']['read_delay_max'] == 20
+ assert share_dict['cephfs']['qos']['write_delay_max'] == 30
data = yaml.safe_load_all(yaml_str)
with pytest.raises(ValueError, match="Comment cannot contain newlines"):
smb.resources.load(data)
+
+
+def test_share_with_qos():
+ import yaml
+
+ yaml_str = """
+resource_type: ceph.smb.share
+cluster_id: qoscluster
+share_id: qostest
+name: QoS Test Share
+cephfs:
+ volume: myvol
+ path: /qos
+ qos:
+ read_iops_limit: 100
+ write_iops_limit: 200
+ read_bw_limit: 1048576
+ write_bw_limit: 2097152
+ read_delay_max: 20
+ write_delay_max: 30
+"""
+ data = yaml.safe_load_all(yaml_str)
+ loaded = smb.resources.load(data)
+ assert loaded
+
+ share = loaded[0]
+ assert share.cephfs.qos is not None
+ assert share.cephfs.qos.read_iops_limit == 100
+ assert share.cephfs.qos.write_iops_limit == 200
+ assert share.cephfs.qos.read_bw_limit == 1048576
+ assert share.cephfs.qos.write_bw_limit == 2097152
+ assert share.cephfs.qos.read_delay_max == 20
+ assert share.cephfs.qos.write_delay_max == 30
+
+
+def test_share_update_qos():
+ share = smb.resources.Share(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ name='QoS Test Share',
+ cephfs=smb.resources.CephFSStorage(
+ volume='myvol',
+ path='/qos',
+ qos=smb.resources.QoSConfig(
+ read_iops_limit=100,
+ write_iops_limit=200,
+ read_delay_max=5,
+ write_delay_max=30,
+ ),
+ ),
+ )
+
+ # Update with new QoS values
+ updated_cephfs = share.cephfs.update_qos(
+ read_bw_limit=1048576,
+ write_bw_limit=2097152,
+ read_iops_limit=300,
+ read_delay_max=15,
+ )
+
+ assert updated_cephfs.qos is not None
+ assert updated_cephfs.qos.read_iops_limit == 300 # new value
+ assert updated_cephfs.qos.write_iops_limit == 200 # preserved original
+ assert updated_cephfs.qos.read_bw_limit == 1048576 # new value
+ assert updated_cephfs.qos.write_bw_limit == 2097152 # new value
+ assert updated_cephfs.qos.read_delay_max == 15 # new value
+ assert updated_cephfs.qos.write_delay_max == 30 # preserved original
+
+ # Verify share with updated QoS works
+ data = share.to_simplified()
+ data.pop("resource_type", None)
+ updated_share = smb.resources.Share(**{**data, 'cephfs': updated_cephfs})
+ assert updated_share.cephfs.qos.read_bw_limit == 1048576
+ assert updated_share.cephfs.qos.read_delay_max == 15
+
+
+def test_share_qos_remove():
+ share = smb.resources.Share(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ name='QoS Test Share',
+ cephfs=smb.resources.CephFSStorage(
+ volume='myvol',
+ path='/qos',
+ qos=smb.resources.QoSConfig(
+ read_iops_limit=100,
+ write_iops_limit=200,
+ read_delay_max=5,
+ write_delay_max=30,
+ ),
+ ),
+ )
+
+ # Disable QoS by setting all limits to 0
+ updated_cephfs = share.cephfs.update_qos(
+ read_iops_limit=0,
+ write_iops_limit=0,
+ read_bw_limit=0,
+ write_bw_limit=0,
+ read_delay_max=0,
+ write_delay_max=0,
+ )
+
+ # Verify QoS is completely removed
+ assert updated_cephfs.qos is None
+
+
+def test_share_qos_default_delay():
+ """Test that delay_max defaults to 30 when not specified"""
+ share = smb.resources.Share(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ name='QoS Test Share',
+ cephfs=smb.resources.CephFSStorage(
+ volume='myvol',
+ path='/qos',
+ qos=smb.resources.QoSConfig(
+ read_iops_limit=100,
+ write_iops_limit=200
+ # delay_max not specified - should use defaults
+ ),
+ ),
+ )
+
+ assert share.cephfs.qos is not None
+ assert share.cephfs.qos.read_delay_max == 30 # Default value
+ assert share.cephfs.qos.write_delay_max == 30 # Default value
+
+
+def test_share_qos_max_allowed_delay():
+ """Test that delay_max values exceeding 300 will be capped to 300"""
+ share = smb.resources.Share(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ name='QoS Test Share',
+ cephfs=smb.resources.CephFSStorage(
+ volume='myvol',
+ path='/qos',
+ qos=smb.resources.QoSConfig(
+ read_iops_limit=100,
+ write_iops_limit=200,
+ read_delay_max=30,
+ write_delay_max=30,
+ ),
+ ),
+ )
+
+ updated_cephfs = share.cephfs.update_qos(read_delay_max=350)
+
+ assert updated_cephfs.qos is not None
+ assert updated_cephfs.qos.read_delay_max == 300 # Capped value
+
+
+def test_share_qos_max_allowed_iops_and_bandwidth():
+ """Test that iops and bandwidth values exceeding limits will be capped to IOPS_LIMIT_MAX and BYTES_LIMIT_MAX"""
+ IOPS_LIMIT_MAX = 1_000_000
+ BYTES_LIMIT_MAX = 1 << 40 # 1 TB
+
+ share = smb.resources.Share(
+ cluster_id="qoscluster",
+ share_id="qostest",
+ name="QoS Test Share",
+ cephfs=smb.resources.CephFSStorage(
+ volume="myvol",
+ path="/qos",
+ qos=smb.resources.QoSConfig(
+ read_iops_limit=100,
+ write_iops_limit=200,
+ read_delay_max=30,
+ write_delay_max=30,
+ ),
+ ),
+ )
+
+ updated_cephfs = share.cephfs.update_qos(
+ read_iops_limit=1_500_000_000, # way above limit
+ write_bw_limit=2_000_000_000_000, # ~2 TB, way above limit
+ )
+
+ assert updated_cephfs.qos is not None
+ assert updated_cephfs.qos.read_iops_limit == IOPS_LIMIT_MAX # capped
+ assert updated_cephfs.qos.write_bw_limit == BYTES_LIMIT_MAX # capped
assert res == 0
body = body.strip()
assert 'value: |' in body
+
+
+def test_cmd_share_update_qos(tmodule):
+ cluster = _cluster(
+ cluster_id='qoscluster',
+ auth_mode=smb.enums.AuthMode.USER,
+ user_group_settings=[
+ smb.resources.UserGroupSource(
+ source_type=smb.resources.UserGroupSourceType.EMPTY,
+ ),
+ ],
+ )
+ share = smb.resources.Share(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ name='QoS Test Share',
+ cephfs=smb.resources.CephFSStorage(
+ volume='cephfs',
+ path='/',
+ ),
+ )
+ rg = tmodule._handler.apply([cluster, share])
+ assert rg.success, rg.to_simplified()
+
+ # Test updating with positive values
+ res, body, status = tmodule.share_update_qos.command(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ read_iops_limit=100,
+ write_iops_limit=200,
+ read_bw_limit=1048576,
+ write_bw_limit=2097152,
+ read_delay_max=20,
+ write_delay_max=30,
+ )
+ assert res == 0
+ bdata = json.loads(body)
+ assert bdata['success']
+ assert bdata['state'] == 'updated'
+
+ # Verify the QoS settings were updated
+ updated_shares = tmodule._handler.matching_resources(
+ ['ceph.smb.share.qoscluster.qostest']
+ )
+ assert len(updated_shares) == 1
+ updated_share = updated_shares[0]
+ assert updated_share.cephfs.qos is not None
+ assert updated_share.cephfs.qos.read_iops_limit == 100
+ assert updated_share.cephfs.qos.write_iops_limit == 200
+ assert updated_share.cephfs.qos.read_bw_limit == 1048576
+ assert updated_share.cephfs.qos.write_bw_limit == 2097152
+ assert updated_share.cephfs.qos.read_delay_max == 20
+ assert updated_share.cephfs.qos.write_delay_max == 30
+
+ # Test updating with None values (should remove QoS)
+ res, body, status = tmodule.share_update_qos.command(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ read_iops_limit=0,
+ write_iops_limit=0,
+ read_bw_limit=0,
+ write_bw_limit=0,
+ read_delay_max=0,
+ write_delay_max=0,
+ )
+ assert res == 0
+ bdata = json.loads(body)
+ assert bdata['success']
+ assert bdata['state'] == 'updated'
+
+ # Verify QoS was removed
+ updated_shares = tmodule._handler.matching_resources(
+ ['ceph.smb.share.qoscluster.qostest']
+ )
+ updated_share = updated_shares[0]
+ assert updated_share.cephfs.qos is None
+
+ # Test updating with some values and keeping others
+ res, body, status = tmodule.share_update_qos.command(
+ cluster_id='qoscluster',
+ share_id='qostest',
+ read_iops_limit=500,
+ write_bw_limit=524288,
+ )
+ assert res == 0
+ bdata = json.loads(body)
+ assert bdata['success']
+ assert bdata['state'] == 'updated'
+
+ # Verify partial update
+ updated_shares = tmodule._handler.matching_resources(
+ ['ceph.smb.share.qoscluster.qostest']
+ )
+ updated_share = updated_shares[0]
+ assert updated_share.cephfs.qos is not None
+ assert updated_share.cephfs.qos.read_iops_limit == 500
+ assert updated_share.cephfs.qos.write_iops_limit is None
+ assert updated_share.cephfs.qos.read_bw_limit is None
+ assert updated_share.cephfs.qos.write_bw_limit == 524288
+ assert updated_share.cephfs.qos.read_delay_max == 30
+ assert updated_share.cephfs.qos.write_delay_max == 30