from tasks.cephfs.cephfs_test_case import CephFSTestCase, needs_trimming
from tasks.cephfs.fuse_mount import FuseMount
import os
+from io import StringIO
log = logging.getLogger(__name__)
a fraction of second (0.5) by default when throttling condition is met.
"""
- max_caps_per_client = 500
- cap_acquisition_throttle = 250
+ subdir_count = 4
+ files_per_dir = 25
- self.config_set('mds', 'mds_max_caps_per_client', max_caps_per_client)
- self.config_set('mds', 'mds_session_cap_acquisition_throttle', cap_acquisition_throttle)
+ # throttle in a way so that two dir reads are already hitting it.
+ throttle_value = (files_per_dir * 3) // 2
- # Create 1500 files split across 6 directories, 250 each.
- for i in range(1, 7):
- self.mount_a.create_n_files("dir{0}/file".format(i), cap_acquisition_throttle, sync=True)
+ # activate throttling logic by setting max per client to a low value
+ self.config_set('mds', 'mds_max_caps_per_client', 1)
+ self.config_set('mds', 'mds_session_cap_acquisition_throttle', throttle_value)
- mount_a_client_id = self.mount_a.get_global_id()
+ # Create files split across {subdir_count} directories, {per_dir_count} in each dir
+ for i in range(1, subdir_count+1):
+ self.mount_a.create_n_files("dir{0}/file".format(i), files_per_dir, sync=True)
- # recursive readdir
- self.mount_a.run_shell_payload("find | wc")
+ mount_a_client_id = self.mount_a.get_global_id()
- # validate cap_acquisition decay counter after readdir to exceed throttle count i.e 250
- cap_acquisition_value = self.get_session(mount_a_client_id)['cap_acquisition']['value']
- self.assertGreaterEqual(cap_acquisition_value, cap_acquisition_throttle)
+ # recursive readdir. macOs wants an explicit directory for `find`.
+ proc = self.mount_a.run_shell_payload("find . | wc", stderr=StringIO())
+ # return code may be None if the command got interrupted
+ self.assertTrue(proc.returncode is None or proc.returncode == 0, proc.stderr.getvalue())
# validate the throttle condition to be hit atleast once
cap_acquisition_throttle_hit_count = self.perf_dump()['mds_server']['cap_acquisition_throttle']
self.assertGreaterEqual(cap_acquisition_throttle_hit_count, 1)
+ # validate cap_acquisition decay counter after readdir to NOT exceed the throttle value
+ # plus one batch that could have been taken immediately before querying
+ # assuming the batch is equal to the per dir file count.
+ cap_acquisition_value = self.get_session(mount_a_client_id)['cap_acquisition']['value']
+ self.assertLessEqual(cap_acquisition_value, files_per_dir + throttle_value)
+
+ # make sure that the throttle was reported in the events
+ def historic_ops_have_event(expected_event):
+ ops_dump = self.fs.rank_tell(['dump_historic_ops'])
+ # reverse the events and the ops assuming that later ops would be throttled
+ for op in reversed(ops_dump['ops']):
+ for ev in reversed(op.get('type_data', {}).get('events', [])):
+ if ev['event'] == expected_event:
+ return True
+ return False
+
+ self.assertTrue(historic_ops_have_event('cap_acquisition_throttle'))
+
def test_client_release_bug(self):
"""
When a client has a bug (which we will simulate) preventing it from releasing caps,