From: Rishabh Dave Date: Thu, 31 Jul 2025 12:59:45 +0000 (+0530) Subject: pybind/cephfs: use depth-first, non-recursive approach for cloning X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c12e7a56ac35df36f645b97bc680984b6af016bf;p=ceph.git pybind/cephfs: use depth-first, non-recursive approach for cloning Switch to non-recursive approach in cptree() (located in bulk_copy() in async_cloner.py) to prevent it from crashing with "RecursionError: maximum recursion depth exceeded". Fixes: https://tracker.ceph.com/issues/72357 Signed-off-by: Rishabh Dave --- diff --git a/src/pybind/cephfs/cephfs.pyx b/src/pybind/cephfs/cephfs.pyx index a8652072d6bf..fef4ac6d760f 100644 --- a/src/pybind/cephfs/cephfs.pyx +++ b/src/pybind/cephfs/cephfs.pyx @@ -39,6 +39,7 @@ AT_STATX_SYNC_TYPE = 0x6000 AT_STATX_SYNC_AS_STAT = 0x0000 AT_STATX_FORCE_SYNC = 0x2000 AT_STATX_DONT_SYNC = 0x4000 +AT_FDCWD = -100 cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW CEPH_STATX_BASIC_STATS = 0x7ff cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS @@ -3155,6 +3156,56 @@ cdef class LibCephFS(object): f'file at path {trash_path}: {e}') raise + def cptree(self, src_path, dst_path, should_sync_attrs=False, + cp_src_dir=True, should_cancel=False, suppress_errors=False): + ''' + Copy entire file hierarchy under src using depth-first (to prevent + excessive memory consumption) and non-recursive (to prevent hitting + Python's max recursion limit error) approach. + + If src is regfile, symlink or something else, copy it to dst and return. + ''' + if isinstance(src_path, str): + src_path = src_path.encode('utf-8') + else: + assert isinstance(src_path, bytes) + + if isinstance(dst_path, str): + dst_path = dst_path.encode('utf-8') + else: + assert isinstance(dst_path, bytes) + + if not should_cancel: + should_cancel = lambda: False + + # stx_b = statx buffer + stx_b = self.statx(src_path, CEPH_STATX_MODE, AT_SYMLINK_NOFOLLOW) + if stat.S_ISDIR(stx_b['mode']): + cptree_worker = CptreeWorker( + self, src_path, dst_path, should_sync_attrs, cp_src_dir, + should_cancel, suppress_errors) + + cptree_worker.start() + elif stat.S_ISREG(stx_b['mode']): + src_dir = os.path.dirname(src_path) + src_file_name = os.path.basename(src_path) + + src_fd = self.open(src_dir, os.O_RDONLY | os.O_DIRECTORY, 0o755) + dst_fd = self.open(dst_path, os.O_RDONLY | os.O_DIRECTORY, 0o755) + + copy_reg_file(self, src_fd, dst_fd, src_file_name, should_sync_attrs) + elif stat.S_ISLNK(stx_b['mode']): + src_dir = os.path.dirname(src_path) + src_link_name = os.path.basename(src_path) + + src_fd = self.open(src_dir, os.O_RDONLY | os.O_DIRECTORY, 0o755) + dst_fd = self.open(dst_path, os.O_RDONLY | os.O_DIRECTORY, 0o755) + + copy_sym_link(self, src_fd, dst_fd, src_link_name, should_sync_attrs) + else: + raise RuntimeError('expected a directory, regfile or symlink but ' + f'found something else. src = {self.src_path}') + class UnlinkTreeWorker: ''' @@ -3410,3 +3461,344 @@ class RmtreeDir: if not suppress_errors: raise + + +# following code includes cptree() and related helper methods. + +class CptreeWorker: + ''' + Contains code to non-recursively copy a file hierarchy present under a + given path with a depth-first approach. And while doing so, handle + case where a dir can't be copied due to permission issue. + ''' + + def __init__(self, fs, src_path, dst_path, should_sync_attrs=False, + cp_src_dir=False, should_cancel=lambda: False, + suppress_errors=False): + self.fs = fs + + # source and destination path passed by the user. + self.src_path = src_path + self.dst_path = dst_path + self._do_sanity_check_for_paths() + + # in case of subvolume snap clone, src's UUID dir is not copied, only + # its contents are copied to dst's UUID dir. This param is meant for + # indicating this. + self.cp_src_dir = cp_src_dir + + # set attrs on dst files to same as src files. + self.should_sync_attrs = should_sync_attrs + + self.should_cancel = should_cancel + self.suppress_errors = suppress_errors + + # Stack needed for traversing/copying file hierarchy in non-recursive, + # depth-first fashion. + self.stack = deque([]) + + # Current dir that is being copied. It should always be topmost entry of + # stack. + self.curr_dir = None + + def _do_sanity_check_for_paths(self): + if self.src_path == b'/': + raise PermissionError(1, 'can\'t copy dir into itself') + + # check if self.src_path is ancestor of self.dst_path. + src_path_comp = self.src_path + b'/' + if src_path_comp in self.dst_path: + raise PermissionError(1, 'can\'t copy dir into itself') + + def notify_parent_dir(self): + ''' + Add current dir's name to parent dir's "de_ignore_list". This is + necessary since parent dir can't be copied when current dir can't be + copied. + ''' + # ensure we are dealing with the dir at the top of the stack. + assert self.curr_dir is self.stack[-1] + + if len(self.stack) < 2: + return + + parent_dir = self.stack[-2] + parent_dir.add_to_de_ignore_list(self.curr_dir.name) + + def add_dir_to_stack(self, de_name): + ''' + Add new dir to stack and start traversing it. If it fails, add this + new dir to current dir's ignorelist since most likely we don't have + permissions for it. + ''' + # ensure we are dealing with the dir at the top of the stack. + assert self.curr_dir is self.stack[-1] + + try: + self.stack.append(CptreeDir(self.fs, de_name, de_name, + self.curr_dir, + should_sync_attrs=self.should_sync_attrs)) + return True + except Error as e: + if self.suppress_errors: + # add to ignore list, traversal should continue for current dir. + log.info(f'dir "{de_name}" couldn\'t be opened and therefore ' + 'it can\'t be removed. perhaps permissions for it ' + 'are not granted.') + self.curr_dir.add_to_de_ignore_list(de_name) + + return False + else: + raise + + def start(self): + # initiate stack with first entry + try: + if self.cp_src_dir: + src_dir_name = os.path.basename(self.src_path) + dst_path = os.path.join(self.dst_path, src_dir_name) + else: + dst_path = self.dst_path + self.stack.append(CptreeDir(self.fs, self.src_path, dst_path, + parent_dir=None, + should_sync_attrs=self.should_sync_attrs, + cp_src_dir=self.cp_src_dir)) + except Exception as e: + log.error('opening root dir of the file tree failed with exception ' + f'"{e}", exiting.') + if self.suppress_errors: + return + else: + raise + + while self.stack: + if self.should_cancel(): + raise OpCanceled('cptree') + + self.curr_dir = self.stack[-1] + finished_copying_curr_dir = True + + # de = directory entry + de = self.curr_dir.read_src_dir() + while de: + if self.should_cancel(): + raise OpCanceled('cptree') + + if de.is_dir(): + if self.add_dir_to_stack(de.d_name): + # since adding new dir to stack was successful, stop + # traversing current dir and start traversing the new + # dir that has been freshly added to the stack. + finished_copying_curr_dir = False + break + elif de.is_symbol_file(): + self.curr_dir.copy_sym_link(de.d_name) + else: + self.curr_dir.copy_reg_file(de.d_name) + + de = self.curr_dir.read_src_dir() + + if finished_copying_curr_dir: + # XXX if attrs are sync-ed during creation of dir on destination + # side, it's mtime would change as files are copied underneath + # it. to avoid this, first copy all files and then sync attrs. + self.curr_dir.sync_attrs() + + if self.curr_dir.has_any_fs_op_failed(): + self.notify_parent_dir() + self.stack.pop() + + +class CptreeDir: + ''' + Contains code for reading and copying dir entries and for handling cases + when it can't be done due to lack of permission. + + Although named CptreeDir, this class's objects operates over a pair of dir + -- src dir as well as dst dir. + ''' + + def __init__(self, fs, src_rel_path, dst_rel_path, parent_dir=None, + should_sync_attrs=False, cp_src_dir=True): + self.fs = fs + + self.src_rel_path = src_rel_path + self.dst_rel_path = dst_rel_path + + self.should_sync_attrs = should_sync_attrs + self.cp_src_dir = cp_src_dir + + # needed to open src dir fd and dst dir fd. + self.parent_dir_src_fd = parent_dir.src_fd if parent_dir else AT_FDCWD + self.parent_dir_dst_fd = parent_dir.dst_fd if parent_dir else AT_FDCWD + + self.copy_dir() + + self.src_fd = self.fs.openat(self.parent_dir_src_fd, self.src_rel_path, + os.O_RDONLY | os.O_DIRECTORY, 0o755) + + self.dst_fd = self.fs.openat(self.parent_dir_dst_fd, self.dst_rel_path, + os.O_RDONLY | os.O_DIRECTORY, 0o755) + + self.src_handle = self.fs.fdopendir(self.src_fd) + + # List of dir entries to be ignored instead of copying them. + self.de_ignore_list = [] + + # Indicates whether an error occured during call to readdir(). + self.has_readdir_failed = False + + def __str__(self): + return f'{self.src_rel_path}, {self.dst_rel_path}' + + def add_to_de_ignore_list(self, de_name): + self.de_ignore_list.append(de_name) + + def set_readdir_error(self): + self.has_readdir_failed = True + + def should_skip_d_name(self, de_name): + return de_name in self.de_ignore_list + + def has_any_fs_op_failed(self): + return self.has_readdir_failed or len(self.de_ignore_list) > 0 + + def read_src_dir(self): + ''' + Get sub-directory of current directory. + ''' + # Assuming True for now, if it's not empty it will be set to + # False by the following loop. + self.is_empty = True + + try: + # de = directory entry + de = self.fs.readdir(self.src_handle) + while de: + if de.d_name in (b'.', b'..'): + pass + elif de.d_name in self.de_ignore_list: + self.is_empty = False + log.debug( + 'readdir() has previously ' + f'failed for dir entry "{de.d_name}", avoiding ' + 'running readdir() on it again.') + else: + self.is_empty = False + return de + + de = self.fs.readdir(self.src_handle) + except Error as e: + # This is the tricky one: it's an error on this + # directory, not on a entry in this directory + log.error(f'Exception occured: "{e}"') + self.set_readdir_error() + + def copy_dir(self): + # XXX: cp_src_dir=False implies don't copy dir, copy only its contents + # instead. This is used for subvolume cloning. + if not self.cp_src_dir: + return + + # create dir on dst side. + self.fs.mkdirat(self.parent_dir_dst_fd, self.dst_rel_path, 0o755) + + # XXX: Dir's mtime gets updated when a file is copied underneath it. + # Therefore, call this method only after all files have been copied + # underneath the dir on dst side so that mtime on it and mtime on src + # dir is the same. + def sync_attrs(self): + ''' + Copy value of attributes from source side to the directory on + destination side. + ''' + if self.should_sync_attrs: + sync_attrs(self.fs, self.parent_dir_src_fd, self.parent_dir_dst_fd, + self.dst_rel_path) + + def copy_reg_file(self, de_name): + copy_reg_file(self.fs, self.src_fd, self.dst_fd, de_name, + self.should_sync_attrs) + + def copy_sym_link(self, de_name): + copy_sym_link(self.fs, self.src_fd, self.dst_fd, de_name, + self.should_sync_attrs) + + +def sync_attrs(fs, src_fd, dst_fd, de_name, src_stx_b=None): + if not src_stx_b: + flags = (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_MODE | + CEPH_STATX_ATIME | CEPH_STATX_MTIME) + # src_stx_b = statx buffer for source path + src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW) + + src_uid = src_stx_b["uid"] + src_gid = src_stx_b["gid"] + src_mode = src_stx_b["mode"] + src_timestamps = (time.mktime(src_stx_b["atime"].timetuple()), + time.mktime(src_stx_b["mtime"].timetuple())) + + try: + fs.chownat(dst_fd, de_name, src_uid, src_gid, AT_SYMLINK_NOFOLLOW) + fs.chmodat(dst_fd, de_name, src_mode, AT_SYMLINK_NOFOLLOW) + fs.utimensat(dst_fd, de_name, src_timestamps, AT_SYMLINK_NOFOLLOW) + except Exception as e: + log.error('Exception occurred while synchronizing attrs for ' + f'"{de_name}". Exception: ({e})') + raise e + + +def copy_reg_file(fs, src_fd, dst_fd, file_name, should_sync_attrs=False): + src_file_fd = dst_file_fd = None + try: + src_file_fd = fs.openat(src_fd, file_name, os.O_RDONLY, 0o755) + dst_file_fd = fs.openat(dst_fd, file_name, os.O_CREAT | os.O_TRUNC | + os.O_WRONLY, 0o755) + except Exception: + if src_file_fd: + fs.close(src_file_fd) + if dst_file_fd: + fs.close(dst_file_fd) + raise + + while True: + data = fs.read(src_file_fd, -1, 1 * 1024 * 1024) + if not len(data): + break + + written = 0 + while written < len(data): + written += fs.write(dst_file_fd, data[written:], -1) + + if should_sync_attrs: + sync_attrs(fs, src_fd, dst_fd, file_name) + + fs.fsync(dst_file_fd, 0) + fs.close(src_file_fd) + fs.close(dst_file_fd) + + +def copy_sym_link(fs, src_fd, dst_fd, de_name, should_sync_attrs=False): + flags = (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_MODE | + CEPH_STATX_ATIME | CEPH_STATX_MTIME | CEPH_STATX_SIZE) + # src_stx_b = statx buffer for source path + src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW) + size = src_stx_b['size'] + + src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW) + try: + target = fs.readlinkat(src_fd, de_name, size) + except Exception as e: + log.info('Following exception occurred while reading ' + f'symlink: {e}') + raise + + try: + fs.symlinkat(target, dst_fd, de_name) + except Exception as e: + log.info('Following exception occurred while creating ' + f'symlink: {e}') + raise + + if should_sync_attrs: + sync_attrs(fs, src_fd, dst_fd, de_name, src_stx_b)