ts = int(time_from)
return ((int(ts / 60) * 60) + timo) - ts
-class TestSnapSchedules(CephFSTestCase):
+class TestSnapSchedulesHelper(CephFSTestCase):
CLIENTS_REQUIRED = 1
TEST_VOLUME_NAME = 'snap_vol'
result = json.loads(self._fs_cmd("volume", "ls"))
if len(result) == 0:
self.vol_created = True
- self.volname = TestSnapSchedules.TEST_VOLUME_NAME
+ self.volname = TestSnapSchedulesHelper.TEST_VOLUME_NAME
self._fs_cmd("volume", "create", self.volname)
else:
self.volname = result[0]['name']
self.config_set('mgr', 'mgr/snap_schedule/dump_on_update', True)
def setUp(self):
- super(TestSnapSchedules, self).setUp()
+ super(TestSnapSchedulesHelper, self).setUp()
self.volname = None
self.vol_created = False
self._create_or_reuse_test_volume()
if self.vol_created:
self._delete_test_volume()
self._disable_snap_schedule()
- super(TestSnapSchedules, self).tearDown()
+ super(TestSnapSchedulesHelper, self).tearDown()
def _schedule_to_timeout(self, schedule):
mult = schedule[-1]
def verify(self, dir_path, max_trials):
trials = 0
- snap_path = "{0}/.snap".format(dir_path)
+ snap_path = f'{dir_path}/.snap'
while (len(self.create_cbks) or len(self.remove_cbks)) and trials < max_trials:
snapshots = set(self.mount_a.ls(path=snap_path))
- log.info(f"snapshots: {snapshots}")
+ log.info(f'snapshots: {snapshots}')
added = snapshots - self.snapshots
- log.info(f"added: {added}")
+ log.info(f'added: {added}')
removed = self.snapshots - snapshots
- log.info(f"removed: {removed}")
+ log.info(f'removed: {removed}')
if added:
for cbk in list(self.create_cbks):
res = cbk(list(added))
# expected "scheduled" snapshot name
ts_name = (datetime.utcfromtimestamp(snap_sched_exec_epoch)
- + timedelta(seconds=wait_timo)).strftime(TestSnapSchedules.SNAPSHOT_TS_FORMAT)
+ + timedelta(seconds=wait_timo)).strftime(TestSnapSchedulesHelper.SNAPSHOT_TS_FORMAT)
return (wait_timo, ts_name)
def verify_schedule(self, dir_path, schedules, retentions=[]):
self.assertTrue(schedule in json_res['schedule'])
for retention in retentions:
self.assertTrue(retention in json_res['retention'])
-
+
+class TestSnapSchedules(TestSnapSchedulesHelper):
def remove_snapshots(self, dir_path):
snap_path = f'{dir_path}/.snap'
snap_path = f"{dir_path}/.snap"[1:]
snapshots = self.mount_a.ls(path=snap_path)
fs_count = len(snapshots)
- log.debug(f'snapshots: {snapshots}');
+ log.debug(f'snapshots: {snapshots}')
result = self.fs_snap_schedule_cmd('status', path=dir_path,
snap_schedule='1M', format='json')
# cleanup
self.fs_snap_schedule_cmd('remove', path=testdir, snap_schedule='1M')
self.remove_snapshots(testdir[1:])
- self.mount_a.run_shell(['rmdir', testdir[1:]])
+ self.mount_a.run_shell(['rmdir', testdir[1:]])
+
+class TestSnapSchedulesSnapdir(TestSnapSchedulesHelper):
+ def remove_snapshots(self, dir_path, sdn):
+ snap_path = f'{dir_path}/{sdn}'
+
+ snapshots = self.mount_a.ls(path=snap_path)
+ for snapshot in snapshots:
+ snapshot_path = os.path.join(snap_path, snapshot)
+ log.debug(f'removing snapshot: {snapshot_path}')
+ self.mount_a.run_shell(['rmdir', snapshot_path])
+
+ def get_snap_dir_name(self):
+ from tasks.cephfs.fuse_mount import FuseMount
+ from tasks.cephfs.kernel_mount import KernelMount
+
+ if isinstance(self.mount_a, KernelMount):
+ sdn = self.mount_a.client_config.get('snapdirname', '.snap')
+ elif isinstance(self.mount_a, FuseMount):
+ sdn = self.mount_a.client_config.get('client_snapdir', '.snap')
+ self.fs.set_ceph_conf('client', 'client snapdir', sdn)
+ self.mount_a.remount()
+ return sdn
+
+ def test_snap_dir_name(self):
+ """Test the correctness of snap directory name"""
+ self.mount_a.run_shell(['mkdir', '-p', TestSnapSchedulesSnapdir.TEST_DIRECTORY])
+
+ # set a schedule on the dir
+ self.fs_snap_schedule_cmd('add', path=TestSnapSchedulesSnapdir.TEST_DIRECTORY, snap_schedule='1M')
+ self.fs_snap_schedule_cmd('retention', 'add', path=TestSnapSchedulesSnapdir.TEST_DIRECTORY, retention_spec_or_period='1M')
+ exec_time = time.time()
+
+ timo, snap_sfx = self.calc_wait_time_and_snap_name(exec_time, '1M')
+ sdn = self.get_snap_dir_name()
+ log.info(f'expecting snap {TestSnapSchedulesSnapdir.TEST_DIRECTORY}/{sdn}/scheduled-{snap_sfx} in ~{timo}s...')
+
+ # verify snapshot schedule
+ self.verify_schedule(TestSnapSchedulesSnapdir.TEST_DIRECTORY, ['1M'], retentions=[{'M':1}])
+
+ # remove snapshot schedule
+ self.fs_snap_schedule_cmd('remove', path=TestSnapSchedulesSnapdir.TEST_DIRECTORY)
+
+ # remove all scheduled snapshots
+ self.remove_snapshots(TestSnapSchedulesSnapdir.TEST_DIRECTORY, sdn)
+
+ self.mount_a.run_shell(['rmdir', TestSnapSchedulesSnapdir.TEST_DIRECTORY])