snap_stats['fs_count'] = fs_count
snap_stats['db_count'] = db_count
+ log.debug(f'fs_count: {fs_count}')
+ log.debug(f'db_count: {db_count}')
+
return snap_stats
def verify_snap_stats(self, dir_path):
# remove snapshot schedule
self.fs_snap_schedule_cmd('remove', path="/bad-path")
+ def test_snap_schedule_for_number_of_snaps_retention(self):
+ """
+ Test that number of snaps retained are as per user spec.
+ """
+ total_snaps = 55
+ test_dir = '/' + TestSnapSchedules.TEST_DIRECTORY
+
+ self.mount_a.run_shell(['mkdir', '-p', test_dir[1:]])
+
+ # set a schedule on the dir
+ self.fs_snap_schedule_cmd('add', path=test_dir, snap_schedule='1M')
+ self.fs_snap_schedule_cmd('retention', 'add', path=test_dir,
+ retention_spec_or_period=f'{total_snaps}n')
+ exec_time = time.time()
+
+ timo_1, snap_sfx = self.calc_wait_time_and_snap_name(exec_time, '1M')
+
+ # verify snapshot schedule
+ self.verify_schedule(test_dir, ['1M'])
+
+ # we wait for total_snaps snaps to be taken
+ wait_time = timo_1 + total_snaps * 60 + 15
+ time.sleep(wait_time)
+
+ snap_stats = self.get_snap_stats(test_dir)
+ self.assertTrue(snap_stats['fs_count'] == total_snaps)
+ self.assertTrue(snap_stats['db_count'] >= total_snaps)
+
+ # remove snapshot schedule
+ self.fs_snap_schedule_cmd('remove', path=test_dir)
+
+ # remove all scheduled snapshots
+ self.remove_snapshots(test_dir[1:])
+
+ self.mount_a.run_shell(['rmdir', test_dir[1:]])
+
class TestSnapSchedulesSnapdir(TestSnapSchedulesHelper):
def remove_snapshots(self, dir_path, sdn):