return 0
+ def complete_ln(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ ln_parser = argparse.ArgumentParser(
+ description='Add a hard link to an existing file or create a symbolic '
+ 'link to an existing file or directory.')
+ ln_parser.add_argument('target', type=str, action=path_to_bytes,
+ help='File/Directory of which link is '
+ 'to be created')
+ ln_parser.add_argument('link_name', type=str, action=path_to_bytes,
+ help='Link to target with the name link_name',
+ nargs='?')
+ ln_parser.add_argument('-s', '--symbolic', action='store_true',
+ help='Create symbolic link')
+ ln_parser.add_argument('-v', '--verbose', action='store_true',
+ help='Print name of each linked file')
+ ln_parser.add_argument('-f', '--force', action='store_true',
+ help='Force create link/symbolic link')
+
+ @with_argparser(ln_parser)
+ def do_ln(self, args):
+ if not is_file_exists(args.target) \
+ and not is_dir_exists(args.target):
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"ln: failed to access "
+ f"'{args.target.decode('utf-8')}"
+ f"': No such file or directory")
+ return
+
+ is_a_dir = False
+ if is_dir_exists(args.target):
+ is_a_dir = True
+
+ target_last_char_slash = False
+ if args.target.decode('utf-8')[len(args.target) - 1] == '/':
+ target_last_char_slash = True
+
+ link_name = ''
+
+ if args.link_name is None:
+ if target_last_char_slash is True:
+ if is_dir_exists(args.target):
+ pass
+ else:
+ set_exit_code_msg(errno.ENOTDIR,
+ f"ln: failed to access "
+ f"'{args.target.decode('utf-8')}': "
+ f"Not a directory")
+ return
+ link_name = os.path.join(cephfs.getcwd(),
+ os.path.basename(
+ os.path.normpath(args.target)))
+ if (is_file_exists(link_name) or is_dir_exists(
+ link_name)) and not args.force:
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"ln: failed to create link "
+ f"{link_name.decode('utf-8')}: "
+ f"exists")
+ return
+ else:
+ if is_dir_exists(args.link_name):
+ dest = args.link_name.decode('utf-8').rstrip('/')
+ dest_first_half = dest.encode('utf-8') + b'/'
+ if is_file_exists(args.target):
+ if target_last_char_slash is True:
+ set_exit_code_msg(errno.ENOTDIR,
+ "ln: failed to access "
+ f"'{args.target.decode('utf-8')}': "
+ "Not a directory")
+ return
+ dest_file = os.path.basename(os.path.normpath(args.target))
+ link_name = dest_first_half + dest_file
+
+ elif is_dir_exists(args.target):
+ dest_dir = os.path.basename(os.path.normpath(args.target))
+ link_name = dest_first_half + dest_dir
+
+ else:
+ # if the destination is not a file or a dir then:
+ # accept it as file so the end part of path cannot have
+ # a `/` succeeding it.
+ test_path = args.link_name.decode('utf-8')
+ if test_path[len(test_path) - 1] == '/':
+ set_exit_code_msg(errno.ENOENT, f"'{test_path}': "
+ f"No such file or "
+ f"directory")
+ return
+ else:
+ link_name = test_path.encode('utf-8')
+
+ if args.force:
+ try:
+ cephfs.lstat(os.path.join(b'', link_name))
+ if not is_a_dir or (is_a_dir and args.symbolic):
+ cephfs.unlink(link_name)
+ except libcephfs.ObjectNotFound:
+ pass
+
+ try:
+ if args.symbolic:
+ cephfs.symlink(args.target, link_name)
+ else:
+ if is_a_dir:
+ set_exit_code_msg(errno.EPERM,
+ f"ln: {args.target.decode('utf-8')}: "
+ "hard link not allowed for directory")
+ return
+ cephfs.link(args.target, link_name)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=str(e))
+ return
+
+ if args.verbose:
+ poutput(f"{link_name.decode('utf-8')} ->"
+ f" {args.target.decode('utf-8')}")
+
def complete_ls(self, text, line, begidx, endidx):
"""
auto complete of file name.