raise IndexException(-e.args[0], e.args[1])
return None
+ def find_clone_entry_index(self, sink_path):
+ try:
+ for entry in list_one_entry_at_a_time(self.fs, self.path):
+ dname = entry.d_name
+ dpath = os.path.join(self.path, dname)
+ st = self.fs.lstat(dpath)
+ if stat.S_ISLNK(st.st_mode):
+ target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
+ if sink_path == target_path[:st.st_size]:
+ return dname
+ return None
+ except cephfs.Error as e:
+ raise IndexException(-e.args[0], e.args[1])
+
def create_clone_index(fs, vol_spec):
clone_index = CloneIndex(fs, vol_spec)
try: