]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/cephfs: use depth-first, non-recursive approach for cloning
authorRishabh Dave <ridave@redhat.com>
Thu, 31 Jul 2025 12:59:45 +0000 (18:29 +0530)
committerRishabh Dave <ridave@redhat.com>
Thu, 16 Apr 2026 12:41:08 +0000 (18:11 +0530)
Switch to non-recursive approach in cptree() (located in bulk_copy() in
async_cloner.py) to prevent it from crashing with "RecursionError:
maximum recursion depth exceeded".

Fixes: https://tracker.ceph.com/issues/72357
Signed-off-by: Rishabh Dave <ridave@redhat.com>
src/pybind/cephfs/cephfs.pyx

index a8652072d6bfe68df1a0ddd4b65c0602c8df1600..fef4ac6d760ff7e62181c8be05bcc73e687bd309 100644 (file)
@@ -39,6 +39,7 @@ AT_STATX_SYNC_TYPE  = 0x6000
 AT_STATX_SYNC_AS_STAT = 0x0000
 AT_STATX_FORCE_SYNC = 0x2000
 AT_STATX_DONT_SYNC = 0x4000
+AT_FDCWD = -100
 cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
 CEPH_STATX_BASIC_STATS = 0x7ff
 cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
@@ -3155,6 +3156,56 @@ cdef class LibCephFS(object):
                          f'file at path {trash_path}: {e}')
                 raise
 
+    def cptree(self, src_path, dst_path, should_sync_attrs=False,
+               cp_src_dir=True, should_cancel=False, suppress_errors=False):
+        '''
+        Copy entire file hierarchy under src using depth-first (to prevent
+        excessive memory consumption) and non-recursive (to prevent hitting
+        Python's max recursion limit error) approach.
+
+        If src is regfile, symlink or something else, copy it to dst and return.
+        '''
+        if isinstance(src_path, str):
+            src_path = src_path.encode('utf-8')
+        else:
+            assert isinstance(src_path, bytes)
+
+        if isinstance(dst_path, str):
+            dst_path = dst_path.encode('utf-8')
+        else:
+            assert isinstance(dst_path, bytes)
+
+        if not should_cancel:
+            should_cancel = lambda: False
+
+        # stx_b = statx buffer
+        stx_b = self.statx(src_path, CEPH_STATX_MODE, AT_SYMLINK_NOFOLLOW)
+        if stat.S_ISDIR(stx_b['mode']):
+            cptree_worker = CptreeWorker(
+                self, src_path, dst_path, should_sync_attrs, cp_src_dir,
+                should_cancel, suppress_errors)
+
+            cptree_worker.start()
+        elif stat.S_ISREG(stx_b['mode']):
+            src_dir = os.path.dirname(src_path)
+            src_file_name = os.path.basename(src_path)
+
+            src_fd = self.open(src_dir, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+            dst_fd = self.open(dst_path, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+            copy_reg_file(self, src_fd, dst_fd, src_file_name, should_sync_attrs)
+        elif stat.S_ISLNK(stx_b['mode']):
+            src_dir = os.path.dirname(src_path)
+            src_link_name = os.path.basename(src_path)
+
+            src_fd = self.open(src_dir, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+            dst_fd = self.open(dst_path, os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+            copy_sym_link(self, src_fd, dst_fd, src_link_name, should_sync_attrs)
+        else:
+            raise RuntimeError('expected a directory, regfile or symlink but '
+                               f'found something else. src = {self.src_path}')
+
 
 class UnlinkTreeWorker:
     '''
@@ -3410,3 +3461,344 @@ class RmtreeDir:
 
             if not suppress_errors:
                 raise
+
+
+# following code includes cptree() and related helper methods.
+
+class CptreeWorker:
+    '''
+    Contains code to non-recursively copy a file hierarchy present under a
+    given path with a depth-first approach. And while doing so, handle
+    case where a dir can't be copied due to permission issue.
+    '''
+
+    def __init__(self, fs, src_path, dst_path, should_sync_attrs=False,
+                 cp_src_dir=False, should_cancel=lambda: False,
+                 suppress_errors=False):
+        self.fs = fs
+
+        # source and destination path passed by the user.
+        self.src_path = src_path
+        self.dst_path = dst_path
+        self._do_sanity_check_for_paths()
+
+        # in case of subvolume snap clone, src's UUID dir is not copied, only
+        # its contents are copied to dst's UUID dir. This param is meant for
+        # indicating this.
+        self.cp_src_dir = cp_src_dir
+
+        # set attrs on dst files to same as src files.
+        self.should_sync_attrs = should_sync_attrs
+
+        self.should_cancel = should_cancel
+        self.suppress_errors = suppress_errors
+
+        # Stack needed for traversing/copying file hierarchy in non-recursive,
+        # depth-first fashion.
+        self.stack = deque([])
+
+        # Current dir that is being copied. It should always be topmost entry of
+        # stack.
+        self.curr_dir = None
+
+    def _do_sanity_check_for_paths(self):
+        if self.src_path == b'/':
+            raise PermissionError(1, 'can\'t copy dir into itself')
+
+        # check if self.src_path is ancestor of self.dst_path.
+        src_path_comp = self.src_path + b'/'
+        if src_path_comp in self.dst_path:
+            raise PermissionError(1, 'can\'t copy dir into itself')
+
+    def notify_parent_dir(self):
+        '''
+        Add current dir's name to parent dir's "de_ignore_list". This is
+        necessary since parent dir can't be copied when current dir can't be
+        copied.
+        '''
+        # ensure we are dealing with the dir at the top of the stack.
+        assert self.curr_dir is self.stack[-1]
+
+        if len(self.stack) < 2:
+            return
+
+        parent_dir = self.stack[-2]
+        parent_dir.add_to_de_ignore_list(self.curr_dir.name)
+
+    def add_dir_to_stack(self, de_name):
+        '''
+        Add new dir to stack and start traversing it. If it fails, add this
+        new dir to current dir's ignorelist since most likely we don't have
+        permissions for it.
+        '''
+        # ensure we are dealing with the dir at the top of the stack.
+        assert self.curr_dir is self.stack[-1]
+
+        try:
+            self.stack.append(CptreeDir(self.fs, de_name, de_name,
+                                        self.curr_dir,
+                                        should_sync_attrs=self.should_sync_attrs))
+            return True
+        except Error as e:
+            if self.suppress_errors:
+                # add to ignore list, traversal should continue for current dir.
+                log.info(f'dir "{de_name}" couldn\'t be opened and therefore '
+                          'it can\'t be removed. perhaps permissions for it '
+                          'are not granted.')
+                self.curr_dir.add_to_de_ignore_list(de_name)
+
+                return False
+            else:
+                raise
+
+    def start(self):
+        # initiate stack with first entry
+        try:
+            if self.cp_src_dir:
+                src_dir_name = os.path.basename(self.src_path)
+                dst_path = os.path.join(self.dst_path, src_dir_name)
+            else:
+                dst_path = self.dst_path
+            self.stack.append(CptreeDir(self.fs, self.src_path, dst_path,
+                                        parent_dir=None,
+                                        should_sync_attrs=self.should_sync_attrs,
+                                        cp_src_dir=self.cp_src_dir))
+        except Exception as e:
+            log.error('opening root dir of the file tree failed with exception '
+                      f'"{e}", exiting.')
+            if self.suppress_errors:
+                return
+            else:
+                raise
+
+        while self.stack:
+            if self.should_cancel():
+                raise OpCanceled('cptree')
+
+            self.curr_dir = self.stack[-1]
+            finished_copying_curr_dir = True
+
+            # de = directory entry
+            de = self.curr_dir.read_src_dir()
+            while de:
+                if self.should_cancel():
+                    raise OpCanceled('cptree')
+
+                if de.is_dir():
+                    if self.add_dir_to_stack(de.d_name):
+                        # since adding new dir to stack was successful, stop
+                        # traversing current dir and start traversing the new
+                        # dir that has been freshly added to the stack.
+                        finished_copying_curr_dir = False
+                        break
+                elif de.is_symbol_file():
+                    self.curr_dir.copy_sym_link(de.d_name)
+                else:
+                    self.curr_dir.copy_reg_file(de.d_name)
+
+                de = self.curr_dir.read_src_dir()
+
+            if finished_copying_curr_dir:
+                # XXX if attrs are sync-ed during creation of dir on destination
+                # side, it's mtime would change as files are copied underneath
+                # it. to avoid this, first copy all files and then sync attrs.
+                self.curr_dir.sync_attrs()
+
+                if self.curr_dir.has_any_fs_op_failed():
+                    self.notify_parent_dir()
+                self.stack.pop()
+
+
+class CptreeDir:
+    '''
+    Contains code for reading and copying dir entries and for handling cases
+    when it can't be done due to lack of permission.
+
+    Although named CptreeDir, this class's objects operates over a pair of dir
+    -- src dir as well as dst dir.
+    '''
+
+    def __init__(self, fs, src_rel_path, dst_rel_path, parent_dir=None,
+                 should_sync_attrs=False, cp_src_dir=True):
+        self.fs = fs
+
+        self.src_rel_path = src_rel_path
+        self.dst_rel_path = dst_rel_path
+
+        self.should_sync_attrs = should_sync_attrs
+        self.cp_src_dir = cp_src_dir
+
+        # needed to open src dir fd and dst dir fd.
+        self.parent_dir_src_fd = parent_dir.src_fd if parent_dir else AT_FDCWD
+        self.parent_dir_dst_fd = parent_dir.dst_fd if parent_dir else AT_FDCWD
+
+        self.copy_dir()
+
+        self.src_fd = self.fs.openat(self.parent_dir_src_fd, self.src_rel_path,
+                                     os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+        self.dst_fd = self.fs.openat(self.parent_dir_dst_fd, self.dst_rel_path,
+                                     os.O_RDONLY | os.O_DIRECTORY, 0o755)
+
+        self.src_handle = self.fs.fdopendir(self.src_fd)
+
+        # List of dir entries to be ignored instead of copying them.
+        self.de_ignore_list = []
+
+        # Indicates whether an error occured during call to readdir().
+        self.has_readdir_failed = False
+
+    def __str__(self):
+        return f'{self.src_rel_path}, {self.dst_rel_path}'
+
+    def add_to_de_ignore_list(self, de_name):
+        self.de_ignore_list.append(de_name)
+
+    def set_readdir_error(self):
+        self.has_readdir_failed = True
+
+    def should_skip_d_name(self, de_name):
+        return de_name in self.de_ignore_list
+
+    def has_any_fs_op_failed(self):
+        return self.has_readdir_failed or len(self.de_ignore_list) > 0
+
+    def read_src_dir(self):
+        '''
+        Get sub-directory of current directory.
+        '''
+        # Assuming True for now, if it's not empty it will be set to
+        # False by the following loop.
+        self.is_empty = True
+
+        try:
+            # de = directory entry
+            de = self.fs.readdir(self.src_handle)
+            while de:
+                if de.d_name in (b'.', b'..'):
+                    pass
+                elif de.d_name in self.de_ignore_list:
+                    self.is_empty = False
+                    log.debug(
+                        'readdir() has previously '
+                        f'failed for dir entry "{de.d_name}", avoiding '
+                        'running readdir() on it again.')
+                else:
+                    self.is_empty = False
+                    return de
+
+                de = self.fs.readdir(self.src_handle)
+        except Error as e:
+            # This is the tricky one: it's an error on this
+            # directory, not on a entry in this directory
+            log.error(f'Exception occured: "{e}"')
+            self.set_readdir_error()
+
+    def copy_dir(self):
+        # XXX: cp_src_dir=False implies don't copy dir, copy only its contents
+        # instead. This is used for subvolume cloning.
+        if not self.cp_src_dir:
+            return
+
+        # create dir on dst side.
+        self.fs.mkdirat(self.parent_dir_dst_fd, self.dst_rel_path, 0o755)
+
+    # XXX: Dir's mtime gets updated when a file is copied underneath it.
+    # Therefore, call this method only after all files have been copied
+    # underneath the dir on dst side so that mtime on it and mtime on src
+    # dir is the same.
+    def sync_attrs(self):
+        '''
+        Copy value of attributes from source side to the directory on
+        destination side.
+        '''
+        if self.should_sync_attrs:
+            sync_attrs(self.fs, self.parent_dir_src_fd, self.parent_dir_dst_fd,
+                       self.dst_rel_path)
+
+    def copy_reg_file(self, de_name):
+        copy_reg_file(self.fs, self.src_fd, self.dst_fd, de_name,
+                      self.should_sync_attrs)
+
+    def copy_sym_link(self, de_name):
+        copy_sym_link(self.fs, self.src_fd, self.dst_fd, de_name,
+                      self.should_sync_attrs)
+
+
+def sync_attrs(fs, src_fd, dst_fd, de_name, src_stx_b=None):
+    if not src_stx_b:
+        flags = (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_MODE |
+                 CEPH_STATX_ATIME | CEPH_STATX_MTIME)
+        # src_stx_b = statx buffer for source path
+        src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW)
+
+    src_uid = src_stx_b["uid"]
+    src_gid = src_stx_b["gid"]
+    src_mode = src_stx_b["mode"]
+    src_timestamps = (time.mktime(src_stx_b["atime"].timetuple()),
+                      time.mktime(src_stx_b["mtime"].timetuple()))
+
+    try:
+        fs.chownat(dst_fd, de_name, src_uid, src_gid, AT_SYMLINK_NOFOLLOW)
+        fs.chmodat(dst_fd, de_name, src_mode, AT_SYMLINK_NOFOLLOW)
+        fs.utimensat(dst_fd, de_name, src_timestamps, AT_SYMLINK_NOFOLLOW)
+    except Exception as e:
+        log.error('Exception occurred while synchronizing attrs for '
+                  f'"{de_name}". Exception: ({e})')
+        raise e
+
+
+def copy_reg_file(fs, src_fd, dst_fd, file_name, should_sync_attrs=False):
+    src_file_fd = dst_file_fd = None
+    try:
+        src_file_fd = fs.openat(src_fd, file_name, os.O_RDONLY, 0o755)
+        dst_file_fd = fs.openat(dst_fd, file_name, os.O_CREAT | os.O_TRUNC |
+                                os.O_WRONLY, 0o755)
+    except Exception:
+        if src_file_fd:
+            fs.close(src_file_fd)
+        if dst_file_fd:
+            fs.close(dst_file_fd)
+        raise
+
+    while True:
+        data = fs.read(src_file_fd, -1, 1 * 1024 * 1024)
+        if not len(data):
+            break
+
+        written = 0
+        while written < len(data):
+            written += fs.write(dst_file_fd, data[written:], -1)
+
+    if should_sync_attrs:
+        sync_attrs(fs, src_fd, dst_fd, file_name)
+
+    fs.fsync(dst_file_fd, 0)
+    fs.close(src_file_fd)
+    fs.close(dst_file_fd)
+
+
+def copy_sym_link(fs, src_fd, dst_fd, de_name, should_sync_attrs=False):
+    flags = (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_MODE |
+             CEPH_STATX_ATIME | CEPH_STATX_MTIME | CEPH_STATX_SIZE)
+    # src_stx_b = statx buffer for source path
+    src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW)
+    size = src_stx_b['size']
+
+    src_stx_b = fs.statxat(src_fd, de_name, flags, AT_SYMLINK_NOFOLLOW)
+    try:
+        target = fs.readlinkat(src_fd, de_name, size)
+    except Exception as e:
+        log.info('Following exception occurred while reading '
+                 f'symlink: {e}')
+        raise
+
+    try:
+        fs.symlinkat(target, dst_fd, de_name)
+    except Exception as e:
+        log.info('Following exception occurred while creating '
+                 f'symlink: {e}')
+        raise
+
+    if should_sync_attrs:
+        sync_attrs(fs, src_fd, dst_fd, de_name, src_stx_b)