option(WITH_CEPHFS_SHELL "install cephfs-shell" OFF)
if(WITH_CEPHFS_SHELL)
- include(Distutils)
- distutils_install_module(cephfs-shell)
- if(WITH_TESTS)
- include(AddCephTest)
- add_tox_test(cephfs-shell)
- endif()
+ add_subdirectory(shell)
endif()
option(WITH_CEPHFS_TOP "install cephfs-top utility" ON)
+++ /dev/null
-#!/usr/bin/python3
-# coding = utf-8
-
-import argparse
-import os
-import os.path
-import sys
-import cephfs as libcephfs
-import shutil
-import traceback
-import colorama
-import fnmatch
-import math
-import re
-import shlex
-import stat
-import errno
-
-from cmd2 import Cmd
-from cmd2 import __version__ as cmd2_version
-from distutils.version import LooseVersion
-
-if sys.version_info.major < 3:
- raise RuntimeError("cephfs-shell is only compatible with python3")
-
-try:
- from cmd2 import with_argparser
-except ImportError:
- def with_argparser(argparser):
- import functools
-
- def argparser_decorator(func):
- @functools.wraps(func)
- def wrapper(thiz, cmdline):
- if isinstance(cmdline, list):
- arglist = cmdline
- else:
- # do not split if it's already a list
- arglist = shlex.split(cmdline, posix=False)
- # in case user quotes the command args
- arglist = [arg.strip('\'""') for arg in arglist]
- try:
- args = argparser.parse_args(arglist)
- except SystemExit:
- shell.exit_code = 1
- # argparse exits at seeing bad arguments
- return
- else:
- return func(thiz, args)
- argparser.prog = func.__name__[3:]
- if argparser.description is None and func.__doc__:
- argparser.description = func.__doc__
-
- return wrapper
-
- return argparser_decorator
-
-
-cephfs = None # holds CephFS Python bindings
-shell = None # holds instance of class CephFSShell
-exit_codes = {'Misc': 1,
- 'KeyboardInterrupt': 2,
- errno.EPERM: 3,
- errno.EACCES: 4,
- errno.ENOENT: 5,
- errno.EIO: 6,
- errno.ENOSPC: 7,
- errno.EEXIST: 8,
- errno.ENODATA: 9,
- errno.EINVAL: 10,
- errno.EOPNOTSUPP: 11,
- errno.ERANGE: 12,
- errno.EWOULDBLOCK: 13,
- errno.ENOTEMPTY: 14,
- errno.ENOTDIR: 15,
- errno.EDQUOT: 16,
- errno.EPIPE: 17,
- errno.ESHUTDOWN: 18,
- errno.ECONNABORTED: 19,
- errno.ECONNREFUSED: 20,
- errno.ECONNRESET: 21,
- errno.EINTR: 22}
-
-
-#########################################################################
-#
-# Following are methods are generically useful through class CephFSShell
-#
-#######################################################################
-
-
-def poutput(s, end='\n'):
- shell.poutput(s, end=end)
-
-
-def perror(msg, **kwargs):
- shell.perror(msg, **kwargs)
-
-
-def set_exit_code_msg(errcode='Misc', msg=''):
- """
- Set exit code and print error message
- """
- if isinstance(msg, libcephfs.Error):
- shell.exit_code = exit_codes[msg.get_error_code()]
- else:
- shell.exit_code = exit_codes[errcode]
- if msg:
- perror(msg)
-
-
-def mode_notation(mode):
- """
- """
- permission_bits = {'0': '---',
- '1': '--x',
- '2': '-w-',
- '3': '-wx',
- '4': 'r--',
- '5': 'r-x',
- '6': 'rw-',
- '7': 'rwx'}
- mode = str(oct(mode))
- notation = '-'
- if mode[2] == '4':
- notation = 'd'
- elif mode[2:4] == '12':
- notation = 'l'
- for i in mode[-3:]:
- notation += permission_bits[i]
- return notation
-
-
-def get_chunks(file_size):
- chunk_start = 0
- chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size
- while chunk_start + chunk_size < file_size:
- yield chunk_start, chunk_size
- chunk_start += chunk_size
- final_chunk_size = file_size - chunk_start
- yield chunk_start, final_chunk_size
-
-
-def to_bytes(param):
- # don't convert as follows as it can lead unusable results like coverting
- # [1, 2, 3, 4] to '[1, 2, 3, 4]' -
- # str(param).encode('utf-8')
- if isinstance(param, bytes):
- return param
- elif isinstance(param, str):
- return bytes(param, encoding='utf-8')
- elif isinstance(param, list):
- return [i.encode('utf-8') if isinstance(i, str) else to_bytes(i) for
- i in param]
- elif isinstance(param, int) or isinstance(param, float):
- return str(param).encode('utf-8')
- elif param is None:
- return None
-
-
-def ls(path, opts=''):
- # opts tries to be like /bin/ls opts
- almost_all = 'A' in opts
- try:
- with cephfs.opendir(path) as d:
- while True:
- dent = cephfs.readdir(d)
- if dent is None:
- return
- elif almost_all and dent.d_name in (b'.', b'..'):
- continue
- yield dent
- except libcephfs.ObjectNotFound as e:
- set_exit_code_msg(msg=e)
-
-
-def glob(path, pattern):
- paths = []
- parent_dir = os.path.dirname(path)
- if parent_dir == b'':
- parent_dir = b'/'
- if path == b'/' or is_dir_exists(os.path.basename(path), parent_dir):
- for i in ls(path, opts='A'):
- if fnmatch.fnmatch(i.d_name, pattern):
- paths.append(os.path.join(path, i.d_name))
- return paths
-
-
-def locate_file(name, case_sensitive=True):
- dir_list = sorted(set(dirwalk(cephfs.getcwd())))
- if not case_sensitive:
- return [dname for dname in dir_list if name.lower() in dname.lower()]
- else:
- return [dname for dname in dir_list if name in dname]
-
-
-def get_all_possible_paths(pattern):
- complete_pattern = pattern[:]
- paths = []
- is_rel_path = not os.path.isabs(pattern)
- if is_rel_path:
- dir_ = cephfs.getcwd()
- else:
- dir_ = b'/'
- pattern = pattern[1:]
- patterns = pattern.split(b'/')
- paths.extend(glob(dir_, patterns[0]))
- patterns.pop(0)
- for pattern in patterns:
- for path in paths:
- paths.extend(glob(path, pattern))
- if is_rel_path:
- complete_pattern = os.path.join(cephfs.getcwd(), complete_pattern)
- return [path for path in paths if fnmatch.fnmatch(path, complete_pattern)]
-
-
-suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
-
-
-def humansize(nbytes):
- i = 0
- while nbytes >= 1024 and i < len(suffixes) - 1:
- nbytes /= 1024.
- i += 1
- nbytes = math.ceil(nbytes)
- f = ('%d' % nbytes).rstrip('.')
- return '%s%s' % (f, suffixes[i])
-
-
-def style_listing(path, is_dir, is_symlink, ls_long=False):
- if not (is_dir or is_symlink):
- return path
- pretty = colorama.Style.BRIGHT
- if is_symlink:
- pretty += colorama.Fore.CYAN + path
- if ls_long:
- # Add target path
- pretty += ' -> ' + cephfs.readlink(path, size=255).decode('utf-8')
- elif is_dir:
- pretty += colorama.Fore.BLUE + path + '/'
- pretty += colorama.Style.RESET_ALL
- return pretty
-
-
-def print_long(path, is_dir, is_symlink, human_readable):
- info = cephfs.stat(path, follow_symlink=(not is_symlink))
- pretty = style_listing(os.path.basename(path.decode('utf-8')), is_dir, is_symlink, True)
- if human_readable:
- sizefmt = '\t {:10s}'.format(humansize(info.st_size))
- else:
- sizefmt = '{:12d}'.format(info.st_size)
- poutput(f'{mode_notation(info.st_mode)} {sizefmt} {info.st_uid} {info.st_gid} {info.st_mtime}'
- f' {pretty}')
-
-
-def word_len(word):
- """
- Returns the word length, minus any color codes.
- """
- if word[0] == '\x1b':
- return len(word) - 9
- return len(word)
-
-
-def is_dir_exists(path, dir_=b''):
- path_to_stat = os.path.join(dir_, path)
- try:
- return ((cephfs.stat(path_to_stat).st_mode & 0o0040000) != 0)
- except libcephfs.Error:
- return False
-
-
-def is_file_exists(path, dir_=b''):
- try:
- # if its not a directory, then its a file
- return ((cephfs.stat(os.path.join(dir_, path)).st_mode & 0o0040000) == 0)
- except libcephfs.Error:
- return False
-
-
-def print_list(words, termwidth=79):
- if not words:
- return
- words = [word.decode('utf-8') if isinstance(word, bytes) else word for word in words]
- width = max([word_len(word) for word in words]) + 2
- nwords = len(words)
- ncols = max(1, (termwidth + 1) // (width + 1))
- nrows = (nwords + ncols - 1) // ncols
- for row in range(nrows):
- for i in range(row, nwords, nrows):
- word = words[i]
- print_width = width
- if word[0] == '\x1b':
- print_width = print_width + 10
-
- poutput('%-*s' % (print_width, words[i]),
- end='\n' if i + nrows >= nwords else '')
-
-
-def copy_from_local(local_path, remote_path):
- stdin = -1
- file_ = None
- fd = None
- convert_to_bytes = False
- if local_path == b'-':
- file_ = sys.stdin
- convert_to_bytes = True
- else:
- try:
- file_ = open(local_path, 'rb')
- except PermissionError as e:
- set_exit_code_msg(e.errno, 'error: no permission to read local file {}'.format(
- local_path.decode('utf-8')))
- return
- stdin = 1
- try:
- fd = cephfs.open(remote_path, 'w', 0o666)
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
- return
- progress = 0
- while True:
- data = file_.read(65536)
- if not data or len(data) == 0:
- break
- if convert_to_bytes:
- data = to_bytes(data)
- wrote = cephfs.write(fd, data, progress)
- if wrote < 0:
- break
- progress += wrote
- cephfs.close(fd)
- if stdin > 0:
- file_.close()
- poutput('')
-
-
-def copy_to_local(remote_path, local_path):
- fd = None
- if local_path != b'-':
- local_dir = os.path.dirname(local_path)
- dir_list = remote_path.rsplit(b'/', 1)
- if not os.path.exists(local_dir):
- os.makedirs(local_dir)
- if len(dir_list) > 2 and dir_list[1] == b'':
- return
- fd = open(local_path, 'wb+')
- file_ = cephfs.open(remote_path, 'r')
- file_size = cephfs.stat(remote_path).st_size
- if file_size <= 0:
- return
- progress = 0
- for chunk_start, chunk_size in get_chunks(file_size):
- file_chunk = cephfs.read(file_, chunk_start, chunk_size)
- progress += len(file_chunk)
- if fd:
- fd.write(file_chunk)
- else:
- poutput(file_chunk.decode('utf-8'))
- cephfs.close(file_)
- if fd:
- fd.close()
-
-
-def dirwalk(path):
- """
- walk a directory tree, using a generator
- """
- path = os.path.normpath(path)
- for item in ls(path, opts='A'):
- fullpath = os.path.join(path, item.d_name)
- src_path = fullpath.rsplit(b'/', 1)[0]
-
- yield os.path.normpath(fullpath)
- if is_dir_exists(item.d_name, src_path):
- for x in dirwalk(fullpath):
- yield x
-
-
-##################################################################
-#
-# Following methods are implementation for CephFS Shell commands
-#
-#################################################################
-
-class CephFSShell(Cmd):
-
- def __init__(self):
- super().__init__(use_ipython=False)
- self.working_dir = cephfs.getcwd().decode('utf-8')
- self.set_prompt()
- self.interactive = False
- self.umask = '2'
-
- def default(self, line):
- perror('Unrecognized command')
-
- def set_prompt(self):
- self.prompt = ('\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX
- + self.working_dir + colorama.Style.RESET_ALL
- + '\033[01;33m>>>\033[00m ')
-
- def create_argparser(self, command):
- try:
- argparse_args = getattr(self, 'argparse_' + command)
- except AttributeError:
- set_exit_code_msg()
- return None
- doc_lines = getattr(
- self, 'do_' + command).__doc__.expandtabs().splitlines()
- if '' in doc_lines:
- blank_idx = doc_lines.index('')
- usage = doc_lines[:blank_idx]
- description = doc_lines[blank_idx + 1:]
- else:
- usage = doc_lines
- description = []
- parser = argparse.ArgumentParser(
- prog=command,
- usage='\n'.join(usage),
- description='\n'.join(description),
- formatter_class=argparse.ArgumentDefaultsHelpFormatter
- )
- for args, kwargs in argparse_args:
- parser.add_argument(*args, **kwargs)
- return parser
-
- def complete_filenames(self, text, line, begidx, endidx):
- if not text:
- completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir())
- for x in ls(b".", opts='A')]
- else:
- if text.count('/') > 0:
- completions = [text.rsplit('/', 1)[0] + '/'
- + x.d_name.decode('utf-8') + '/'
- * int(x.is_dir()) for x in ls('/'
- + text.rsplit('/', 1)[0], opts='A')
- if x.d_name.decode('utf-8').startswith(
- text.rsplit('/', 1)[1])]
- else:
- completions = [x.d_name.decode('utf-8') + '/'
- * int(x.is_dir()) for x in ls(b".", opts='A')
- if x.d_name.decode('utf-8').startswith(text)]
- if len(completions) == 1 and completions[0][-1] == '/':
- dir_, file_ = completions[0].rsplit('/', 1)
- completions.extend([dir_ + '/' + x.d_name.decode('utf-8')
- + '/' * int(x.is_dir()) for x in
- ls('/' + dir_, opts='A')
- if x.d_name.decode('utf-8').startswith(file_)])
- return self.delimiter_complete(text, line, begidx, endidx, completions, '/')
- return completions
-
- def onecmd(self, line, **kwargs):
- """
- Global error catcher
- """
- try:
- res = Cmd.onecmd(self, line, **kwargs)
- if self.interactive:
- self.set_prompt()
- return res
- except ConnectionError as e:
- set_exit_code_msg(e.errno, f'***\n{e}')
- except KeyboardInterrupt:
- set_exit_code_msg('KeyboardInterrupt', 'Command aborted')
- except (libcephfs.Error, Exception) as e:
- if shell.debug:
- traceback.print_exc(file=sys.stdout)
- set_exit_code_msg(msg=e)
-
- class path_to_bytes(argparse.Action):
- def __call__(self, parser, namespace, values, option_string=None):
- values = to_bytes(values)
- setattr(namespace, self.dest, values)
-
- # TODO: move the necessary contents from here to `class path_to_bytes`.
- class get_list_of_bytes_path(argparse.Action):
- def __call__(self, parser, namespace, values, option_string=None):
- values = to_bytes(values)
-
- if values == b'.':
- values = cephfs.getcwd()
- else:
- for i in values:
- if i == b'.':
- values[values.index(i)] = cephfs.getcwd()
-
- setattr(namespace, self.dest, values)
-
- def complete_mkdir(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- class ModeAction(argparse.Action):
- def __init__(self, option_strings, dest, nargs=None, **kwargs):
- if nargs is not None and nargs != '?':
- raise ValueError("more than one modes not allowed")
- super().__init__(option_strings, dest, **kwargs)
-
- def __call__(self, parser, namespace, values, option_string=None):
- o_mode = 0
- res = None
- try:
- o_mode = int(values, base=8)
- except ValueError:
- res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', values)
- if res is None:
- parser.error("invalid mode: %s\n"
- "mode must be a numeric octal literal\n"
- "or ((u?g?o?)|(a?))(=)(r?w?x?)" %
- values)
- else:
- # we are supporting only assignment of mode and not + or -
- # as is generally available with the chmod command
- # eg.
- # >>> res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', 'go=')
- # >>> res.groups()
- # ('go', 'go', None, '=', '')
- val = res.groups()
-
- if val[3] != '=':
- parser.error("need assignment operator between user "
- "and mode specifiers")
- if val[4] == '':
- parser.error("invalid mode: %s\n"
- "mode must be combination of: r | w | x" %
- values)
- users = ''
- if val[2] is None:
- users = val[1]
- else:
- users = val[2]
-
- t_mode = 0
- if users == 'a':
- users = 'ugo'
-
- if 'r' in val[4]:
- t_mode |= 4
- if 'w' in val[4]:
- t_mode |= 2
- if 'x' in val[4]:
- t_mode |= 1
-
- if 'u' in users:
- o_mode |= (t_mode << 6)
- if 'g' in users:
- o_mode |= (t_mode << 3)
- if 'o' in users:
- o_mode |= t_mode
-
- if o_mode < 0:
- parser.error("invalid mode: %s\n"
- "mode cannot be negative" % values)
- if o_mode > 0o777:
- parser.error("invalid mode: %s\n"
- "mode cannot be greater than octal 0777" % values)
-
- setattr(namespace, self.dest, str(oct(o_mode)))
-
- mkdir_parser = argparse.ArgumentParser(
- description='Create the directory(ies), if they do not already exist.')
- mkdir_parser.add_argument('dirs', type=str,
- action=path_to_bytes,
- metavar='DIR_NAME',
- help='Name of new_directory.',
- nargs='+')
- mkdir_parser.add_argument('-m', '--mode', type=str,
- action=ModeAction,
- help='Sets the access mode for the new directory.')
- mkdir_parser.add_argument('-p', '--parent', action='store_true',
- help='Create parent directories as necessary. '
- 'When this option is specified, no error is'
- 'reported if a directory already exists.')
-
- @with_argparser(mkdir_parser)
- def do_mkdir(self, args):
- """
- Create directory.
- """
- for path in args.dirs:
- if args.mode:
- permission = int(args.mode, 8)
- else:
- permission = 0o777
- if args.parent:
- cephfs.mkdirs(path, permission)
- else:
- try:
- cephfs.mkdir(path, permission)
- except libcephfs.Error as e:
- set_exit_code_msg(e)
-
- def complete_put(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
- put_parser = argparse.ArgumentParser(
- description='Copy a file/directory to Ceph File System from Local File System.')
- put_parser.add_argument('local_path', type=str, action=path_to_bytes,
- help='Path of the file in the local system')
- put_parser.add_argument('remote_path', type=str, action=path_to_bytes,
- help='Path of the file in the remote system')
- put_parser.add_argument('-f', '--force', action='store_true',
- help='Overwrites the destination if it already exists.')
-
- @with_argparser(put_parser)
- def do_put(self, args):
- """
- Copy a local file/directory to CephFS.
- """
- if args.local_path != b'-' and not os.path.isfile(args.local_path) \
- and not os.path.isdir(args.local_path):
- set_exit_code_msg(errno.ENOENT,
- msg=f"error: "
- f"{args.local_path.decode('utf-8')}: "
- f"No such file or directory")
- return
-
- if (is_file_exists(args.remote_path) or is_dir_exists(
- args.remote_path)) and not args.force:
- set_exit_code_msg(msg=f"error: file/directory "
- f"{args.remote_path.decode('utf-8')} "
- f"exists, use --force to overwrite")
- return
-
- root_src_dir = args.local_path
- root_dst_dir = args.remote_path
- if args.local_path == b'.' or args.local_path == b'./':
- root_src_dir = os.getcwdb()
- elif len(args.local_path.rsplit(b'/', 1)) < 2:
- root_src_dir = os.path.join(os.getcwdb(), args.local_path)
- else:
- p = args.local_path.split(b'/')
- if p[0] == b'.':
- root_src_dir = os.getcwdb()
- p.pop(0)
- while len(p) > 0:
- root_src_dir += b'/' + p.pop(0)
-
- if root_dst_dir == b'.':
- if args.local_path != b'-':
- root_dst_dir = root_src_dir.rsplit(b'/', 1)[1]
- if root_dst_dir == b'':
- root_dst_dir = root_src_dir.rsplit(b'/', 1)[0]
- a = root_dst_dir.rsplit(b'/', 1)
- if len(a) > 1:
- root_dst_dir = a[1]
- else:
- root_dst_dir = a[0]
- else:
- set_exit_code_msg(errno.EINVAL, 'error: no filename specified '
- 'for destination')
- return
-
- if root_dst_dir[-1] != b'/':
- root_dst_dir += b'/'
-
- if args.local_path == b'-' or os.path.isfile(root_src_dir):
- if args.local_path == b'-':
- root_src_dir = b'-'
- copy_from_local(root_src_dir, root_dst_dir)
- else:
- for src_dir, dirs, files in os.walk(root_src_dir):
- if isinstance(src_dir, str):
- src_dir = to_bytes(src_dir)
- dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1)
- dst_dir = re.sub(rb'\/+', b'/', cephfs.getcwd()
- + dst_dir)
- if args.force and dst_dir != b'/' and not is_dir_exists(
- dst_dir[:-1]) and not locate_file(dst_dir):
- try:
- cephfs.mkdirs(dst_dir, 0o777)
- except libcephfs.Error:
- pass
- if (not args.force) and dst_dir != b'/' and not is_dir_exists(
- dst_dir) and not os.path.isfile(root_src_dir):
- try:
- cephfs.mkdirs(dst_dir, 0o777)
- except libcephfs.Error:
- # TODO: perhaps, set retval to 1?
- pass
-
- for dir_ in dirs:
- dir_name = os.path.join(dst_dir, dir_)
- if not is_dir_exists(dir_name):
- try:
- cephfs.mkdirs(dir_name, 0o777)
- except libcephfs.Error:
- # TODO: perhaps, set retval to 1?
- pass
-
- for file_ in files:
- src_file = os.path.join(src_dir, file_)
- dst_file = re.sub(rb'\/+', b'/', b'/' + dst_dir + b'/' + file_)
- if (not args.force) and is_file_exists(dst_file):
- return
- copy_from_local(src_file, os.path.join(cephfs.getcwd(),
- dst_file))
-
- def complete_get(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- get_parser = argparse.ArgumentParser(
- description='Copy a file from Ceph File System to Local Directory.')
- get_parser.add_argument('remote_path', type=str, action=path_to_bytes,
- help='Path of the file in the remote system')
- get_parser.add_argument('local_path', type=str, action=path_to_bytes,
- help='Path of the file in the local system')
- get_parser.add_argument('-f', '--force', action='store_true',
- help='Overwrites the destination if it already exists.')
-
- @with_argparser(get_parser)
- def do_get(self, args):
- """
- Copy a file/directory from CephFS to given path.
- """
- if not is_file_exists(args.remote_path) and not \
- is_dir_exists(args.remote_path):
- set_exit_code_msg(errno.ENOENT, "error: no file/directory"
- " found at specified remote "
- "path")
- return
- if (os.path.isfile(args.local_path) or os.path.isdir(
- args.local_path)) and not args.force:
- set_exit_code_msg(msg=f"error: file/directory "
- f"{args.local_path.decode('utf-8')}"
- f" already exists, use --force to "
- f"overwrite")
- return
- root_src_dir = args.remote_path
- root_dst_dir = args.local_path
- fname = root_src_dir.rsplit(b'/', 1)
- if args.local_path == b'.':
- root_dst_dir = os.getcwdb()
- if args.remote_path == b'.':
- root_src_dir = cephfs.getcwd()
- if args.local_path == b'-':
- if args.remote_path == b'.' or args.remote_path == b'./':
- set_exit_code_msg(errno.EINVAL, 'error: no remote file name specified')
- return
- copy_to_local(root_src_dir, b'-')
- elif is_file_exists(args.remote_path):
- copy_to_local(root_src_dir, root_dst_dir)
- elif b'/' in root_src_dir and is_file_exists(fname[1], fname[0]):
- copy_to_local(root_src_dir, root_dst_dir)
- else:
- files = list(reversed(sorted(dirwalk(root_src_dir))))
- for file_ in files:
- dst_dirpath, dst_file = file_.rsplit(b'/', 1)
- if dst_dirpath in files:
- files.remove(dst_dirpath)
- dst_path = os.path.join(root_dst_dir, dst_dirpath, dst_file)
- dst_path = os.path.normpath(dst_path)
- if is_dir_exists(file_):
- try:
- os.makedirs(dst_path)
- except OSError:
- pass
- else:
- copy_to_local(file_, dst_path)
-
- return 0
-
- def complete_ls(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- ls_parser = argparse.ArgumentParser(
- description='Copy a file from Ceph File System from Local Directory.')
- ls_parser.add_argument('-l', '--long', action='store_true',
- help='Detailed list of items in the directory.')
- ls_parser.add_argument('-r', '--reverse', action='store_true',
- help='Reverse order of listing items in the directory.')
- ls_parser.add_argument('-H', action='store_true', help='Human Readable')
- ls_parser.add_argument('-a', '--all', action='store_true',
- help='Do not Ignore entries starting with .')
- ls_parser.add_argument('-S', action='store_true', help='Sort by file_size')
- ls_parser.add_argument('paths', help='Name of Directories',
- action=path_to_bytes, nargs='*', default=['.'])
-
- @with_argparser(ls_parser)
- def do_ls(self, args):
- """
- List all the files and directories in the current working directory
- """
- paths = args.paths
- for path in paths:
- values = []
- items = []
- try:
- if path.count(b'*') > 0:
- all_items = get_all_possible_paths(path)
- if len(all_items) == 0:
- continue
- path = all_items[0].rsplit(b'/', 1)[0]
- if path == b'':
- path = b'/'
- dirs = []
- for i in all_items:
- for item in ls(path):
- d_name = item.d_name
- if os.path.basename(i) == d_name:
- if item.is_dir():
- dirs.append(os.path.join(path, d_name))
- else:
- items.append(item)
- if dirs:
- paths.extend(dirs)
- else:
- poutput(path.decode('utf-8'), end=':\n')
- items = sorted(items, key=lambda item: item.d_name)
- else:
- if path != b'' and path != cephfs.getcwd() and len(paths) > 1:
- poutput(path.decode('utf-8'), end=':\n')
- items = sorted(ls(path), key=lambda item: item.d_name)
- if not args.all:
- items = [i for i in items if not i.d_name.startswith(b'.')]
- if args.S:
- items = sorted(items, key=lambda item: cephfs.stat(
- path + b'/' + item.d_name, follow_symlink=(
- not item.is_symbol_file())).st_size)
- if args.reverse:
- items = reversed(items)
- for item in items:
- filepath = item.d_name
- is_dir = item.is_dir()
- is_sym_lnk = item.is_symbol_file()
- try:
- if args.long and args.H:
- print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir,
- is_sym_lnk, True)
- elif args.long:
- print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir,
- is_sym_lnk, False)
- elif is_sym_lnk or is_dir:
- values.append(style_listing(filepath.decode('utf-8'), is_dir,
- is_sym_lnk))
- else:
- values.append(filepath)
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
- if not args.long:
- print_list(values, shutil.get_terminal_size().columns)
- if path != paths[-1]:
- poutput('')
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
- def complete_rmdir(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- rmdir_parser = argparse.ArgumentParser(description='Remove Directory.')
- rmdir_parser.add_argument('paths', help='Directory Path.', nargs='+',
- action=path_to_bytes)
- rmdir_parser.add_argument('-p', '--parent', action='store_true',
- help='Remove parent directories as necessary. '
- 'When this option is specified, no error '
- 'is reported if a directory has any '
- 'sub-directories, files')
-
- @with_argparser(rmdir_parser)
- def do_rmdir(self, args):
- self.do_rmdir_helper(args)
-
- def do_rmdir_helper(self, args):
- """
- Remove a specific Directory
- """
- is_pattern = False
- paths = args.paths
- for path in paths:
- if path.count(b'*') > 0:
- is_pattern = True
- all_items = get_all_possible_paths(path)
- if len(all_items) > 0:
- path = all_items[0].rsplit(b'/', 1)[0]
- if path == b'':
- path = b'/'
- dirs = []
- for i in all_items:
- for item in ls(path):
- d_name = item.d_name
- if os.path.basename(i) == d_name:
- if item.is_dir():
- dirs.append(os.path.join(path, d_name))
- paths.extend(dirs)
- continue
- else:
- is_pattern = False
-
- if args.parent:
- path = os.path.join(cephfs.getcwd(), path.rsplit(b'/')[0])
- files = list(sorted(set(dirwalk(path)), reverse=True))
- if not files:
- path = b'.'
- for filepath in files:
- try:
- cephfs.rmdir(os.path.normpath(filepath))
- except libcephfs.Error as e:
- perror(e)
- path = b'.'
- break
- else:
- path = os.path.normpath(os.path.join(cephfs.getcwd(), path))
- if not is_pattern and path != os.path.normpath(b''):
- try:
- cephfs.rmdir(path)
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
- def complete_rm(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- rm_parser = argparse.ArgumentParser(description='Remove File.')
- rm_parser.add_argument('paths', help='File Path.', nargs='+',
- action=path_to_bytes)
-
- @with_argparser(rm_parser)
- def do_rm(self, args):
- """
- Remove a specific file
- """
- file_paths = args.paths
- for path in file_paths:
- if path.count(b'*') > 0:
- file_paths.extend([i for i in get_all_possible_paths(
- path) if is_file_exists(i)])
- else:
- try:
- cephfs.unlink(path)
- except libcephfs.Error as e:
- # NOTE: perhaps we need a better msg here
- set_exit_code_msg(msg=e)
-
- def complete_mv(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- mv_parser = argparse.ArgumentParser(description='Move File.')
- mv_parser.add_argument('src_path', type=str, action=path_to_bytes,
- help='Source File Path.')
- mv_parser.add_argument('dest_path', type=str, action=path_to_bytes,
- help='Destination File Path.')
-
- @with_argparser(mv_parser)
- def do_mv(self, args):
- """
- Rename a file or Move a file from source path to the destination
- """
- cephfs.rename(args.src_path, args.dest_path)
-
- def complete_cd(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- cd_parser = argparse.ArgumentParser(description='Change working directory')
- cd_parser.add_argument('path', type=str, help='Name of the directory.',
- action=path_to_bytes, nargs='?', default='/')
-
- @with_argparser(cd_parser)
- def do_cd(self, args):
- """
- Change working directory
- """
- cephfs.chdir(args.path)
- self.working_dir = cephfs.getcwd().decode('utf-8')
- self.set_prompt()
-
- def do_cwd(self, arglist):
- """
- Get current working directory.
- """
- poutput(cephfs.getcwd().decode('utf-8'))
-
- def complete_chmod(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- chmod_parser = argparse.ArgumentParser(description='Create Directory.')
- chmod_parser.add_argument('mode', type=str, action=ModeAction, help='Mode')
- chmod_parser.add_argument('paths', type=str, action=path_to_bytes,
- help='Name of the file', nargs='+')
-
- @with_argparser(chmod_parser)
- def do_chmod(self, args):
- """
- Change permission of a file
- """
- for path in args.paths:
- mode = int(args.mode, base=8)
- try:
- cephfs.chmod(path, mode)
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
- def complete_cat(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- cat_parser = argparse.ArgumentParser(description='')
- cat_parser.add_argument('paths', help='Name of Files', action=path_to_bytes,
- nargs='+')
-
- @with_argparser(cat_parser)
- def do_cat(self, args):
- """
- Print contents of a file
- """
- for path in args.paths:
- if is_file_exists(path):
- copy_to_local(path, b'-')
- else:
- set_exit_code_msg(errno.ENOENT, '{}: no such file'.format(
- path.decode('utf-8')))
-
- umask_parser = argparse.ArgumentParser(description='Set umask value.')
- umask_parser.add_argument('mode', help='Mode', type=str, action=ModeAction,
- nargs='?', default='')
-
- @with_argparser(umask_parser)
- def do_umask(self, args):
- """
- Set Umask value.
- """
- if args.mode == '':
- poutput(self.umask.zfill(4))
- else:
- mode = int(args.mode, 8)
- self.umask = str(oct(cephfs.umask(mode))[2:])
-
- def complete_write(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- write_parser = argparse.ArgumentParser(description='Writes data into a file')
- write_parser.add_argument('path', type=str, action=path_to_bytes,
- help='Name of File')
-
- @with_argparser(write_parser)
- def do_write(self, args):
- """
- Write data into a file.
- """
-
- copy_from_local(b'-', args.path)
-
- def complete_lcd(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
- lcd_parser = argparse.ArgumentParser(description='')
- lcd_parser.add_argument('path', type=str, action=path_to_bytes, help='Path')
-
- @with_argparser(lcd_parser)
- def do_lcd(self, args):
- """
- Moves into the given local directory
- """
- try:
- os.chdir(os.path.expanduser(args.path))
- except OSError as e:
- set_exit_code_msg(e.errno, "Cannot change to "
- f"{e.filename.decode('utf-8')}: {e.strerror}")
-
- def complete_lls(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
- lls_parser = argparse.ArgumentParser(
- description='List files in local system.')
- lls_parser.add_argument('paths', help='Paths', action=path_to_bytes,
- nargs='*')
-
- @with_argparser(lls_parser)
- def do_lls(self, args):
- """
- Lists all files and folders in the current local directory
- """
- if not args.paths:
- print_list(os.listdir(os.getcwdb()))
- else:
- for path in args.paths:
- try:
- items = os.listdir(path)
- poutput("{}:".format(path.decode('utf-8')))
- print_list(items)
- except OSError as e:
- set_exit_code_msg(e.errno, f"{e.filename.decode('utf-8')}: "
- f"{e.strerror}")
- # Arguments to the with_argpaser decorator function are sticky.
- # The items in args.path do not get overwritten in subsequent calls.
- # The arguments remain in args.paths after the function exits and we
- # neeed to clean it up to ensure the next call works as expected.
- args.paths.clear()
-
- def do_lpwd(self, arglist):
- """
- Prints the absolute path of the current local directory
- """
- poutput(os.getcwd())
-
- def complete_df(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- df_parser = argparse.ArgumentParser(description='Show information about\
- the amount of available disk space')
- df_parser.add_argument('file', help='Name of the file', nargs='*',
- default=['.'], action=path_to_bytes)
-
- @with_argparser(df_parser)
- def do_df(self, arglist):
- """
- Display the amount of available disk space for file systems
- """
- header = True # Set to true for printing header only once
- if b'.' == arglist.file[0]:
- arglist.file = ls(b'.')
-
- for file in arglist.file:
- if isinstance(file, libcephfs.DirEntry):
- file = file.d_name
- if file == b'.' or file == b'..':
- continue
- try:
- statfs = cephfs.statfs(file)
- stat = cephfs.stat(file)
- block_size = (statfs['f_blocks'] * statfs['f_bsize']) // 1024
- available = block_size - stat.st_size
- use = 0
-
- if block_size > 0:
- use = (stat.st_size * 100) // block_size
-
- if header:
- header = False
- poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format(
- "1K-blocks", "Used", "Available", "Use%",
- "Stored on"))
-
- poutput('{:d}\t{:18d}\t{:8d}\t{:10s} {}'.format(block_size,
- stat.st_size, available, str(int(use)) + '%',
- file.decode('utf-8')))
- except libcephfs.OSError as e:
- set_exit_code_msg(e.get_error_code(), "could not statfs {}: {}".format(
- file.decode('utf-8'), e.strerror))
-
- locate_parser = argparse.ArgumentParser(
- description='Find file within file system')
- locate_parser.add_argument('name', help='name', type=str,
- action=path_to_bytes)
- locate_parser.add_argument('-c', '--count', action='store_true',
- help='Count list of items located.')
- locate_parser.add_argument(
- '-i', '--ignorecase', action='store_true', help='Ignore case')
-
- @with_argparser(locate_parser)
- def do_locate(self, args):
- """
- Find a file within the File System
- """
- if args.name.count(b'*') == 1:
- if args.name[0] == b'*':
- args.name += b'/'
- elif args.name[-1] == '*':
- args.name = b'/' + args.name
- args.name = args.name.replace(b'*', b'')
- if args.ignorecase:
- locations = locate_file(args.name, False)
- else:
- locations = locate_file(args.name)
- if args.count:
- poutput(len(locations))
- else:
- poutput((b'\n'.join(locations)).decode('utf-8'))
-
- def complete_du(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- du_parser = argparse.ArgumentParser(
- description='Disk Usage of a Directory')
- du_parser.add_argument('paths', type=str, action=get_list_of_bytes_path,
- help='Name of the directory.', nargs='*',
- default=[b'.'])
- du_parser.add_argument('-r', action='store_true',
- help='Recursive Disk usage of all directories.')
-
- @with_argparser(du_parser)
- def do_du(self, args):
- """
- Print disk usage of a given path(s).
- """
- def print_disk_usage(files):
- if isinstance(files, bytes):
- files = (files, )
-
- for f in files:
- try:
- st = cephfs.lstat(f)
-
- if stat.S_ISDIR(st.st_mode):
- dusage = int(cephfs.getxattr(f,
- 'ceph.dir.rbytes').decode('utf-8'))
- else:
- dusage = st.st_size
-
- # print path in local context
- f = os.path.normpath(f)
- if f[0] is ord('/'):
- f = b'.' + f
- poutput('{:10s} {}'.format(humansize(dusage),
- f.decode('utf-8')))
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
- continue
-
- for path in args.paths:
- if args.r:
- print_disk_usage(sorted(set(dirwalk(path)).union({path})))
- else:
- print_disk_usage(path)
-
- quota_parser = argparse.ArgumentParser(
- description='Quota management for a Directory')
- quota_parser.add_argument('op', choices=['get', 'set'],
- help='Quota operation type.')
- quota_parser.add_argument('path', type=str, action=path_to_bytes,
- help='Name of the directory.')
- quota_parser.add_argument('--max_bytes', type=int, default=-1, nargs='?',
- help='Max cumulative size of the data under '
- 'this directory.')
- quota_parser.add_argument('--max_files', type=int, default=-1, nargs='?',
- help='Total number of files under this '
- 'directory tree.')
-
- @with_argparser(quota_parser)
- def do_quota(self, args):
- """
- Quota management.
- """
- if not is_dir_exists(args.path):
- set_exit_code_msg(errno.ENOENT, 'error: no such directory {}'.format(
- args.path.decode('utf-8')))
- return
-
- if args.op == 'set':
- if (args.max_bytes == -1) and (args.max_files == -1):
- set_exit_code_msg(errno.EINVAL, 'please specify either '
- '--max_bytes or --max_files or both')
- return
-
- if args.max_bytes >= 0:
- max_bytes = to_bytes(str(args.max_bytes))
- try:
- cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
- max_bytes, os.XATTR_CREATE)
- poutput('max_bytes set to %d' % args.max_bytes)
- except libcephfs.Error as e:
- cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
- max_bytes, os.XATTR_REPLACE)
- set_exit_code_msg(e.get_error_code(), 'max_bytes reset to '
- f'{args.max_bytes}')
-
- if args.max_files >= 0:
- max_files = to_bytes(str(args.max_files))
- try:
- cephfs.setxattr(args.path, 'ceph.quota.max_files',
- max_files, os.XATTR_CREATE)
- poutput('max_files set to %d' % args.max_files)
- except libcephfs.Error as e:
- cephfs.setxattr(args.path, 'ceph.quota.max_files',
- max_files, os.XATTR_REPLACE)
- set_exit_code_msg(e.get_error_code(), 'max_files reset to '
- f'{args.max_files}')
- elif args.op == 'get':
- max_bytes = '0'
- max_files = '0'
- try:
- max_bytes = cephfs.getxattr(args.path, 'ceph.quota.max_bytes')
- poutput('max_bytes: {}'.format(max_bytes.decode('utf-8')))
- except libcephfs.Error as e:
- set_exit_code_msg(e.get_error_code(), 'max_bytes is not set')
-
- try:
- max_files = cephfs.getxattr(args.path, 'ceph.quota.max_files')
- poutput('max_files: {}'.format(max_files.decode('utf-8')))
- except libcephfs.Error as e:
- set_exit_code_msg(e.get_error_code(), 'max_files is not set')
-
- snap_parser = argparse.ArgumentParser(description='Snapshot Management')
- snap_parser.add_argument('op', type=str,
- help='Snapshot operation: create or delete')
- snap_parser.add_argument('name', type=str, action=path_to_bytes,
- help='Name of snapshot')
- snap_parser.add_argument('dir', type=str, action=path_to_bytes,
- help='Directory for which snapshot '
- 'needs to be created or deleted')
-
- @with_argparser(snap_parser)
- def do_snap(self, args):
- """
- Snapshot management for the volume
- """
- # setting self.colors to None turns off colorizing and
- # perror emits plain text
- self.colors = None
-
- snapdir = '.snap'
- conf_snapdir = cephfs.conf_get('client_snapdir')
- if conf_snapdir is not None:
- snapdir = conf_snapdir
- snapdir = to_bytes(snapdir)
- if args.op == 'create':
- try:
- if is_dir_exists(args.dir):
- cephfs.mkdir(os.path.join(args.dir, snapdir, args.name), 0o755)
- else:
- set_exit_code_msg(errno.ENOENT, "'{}': no such directory".format(
- args.dir.decode('utf-8')))
- except libcephfs.Error as e:
- set_exit_code_msg(e.get_error_code(),
- "snapshot '{}' already exists".format(
- args.name.decode('utf-8')))
- elif args.op == 'delete':
- snap_dir = os.path.join(args.dir, snapdir, args.name)
- try:
- if is_dir_exists(snap_dir):
- newargs = argparse.Namespace(paths=[snap_dir], parent=False)
- self.do_rmdir_helper(newargs)
- else:
- set_exit_code_msg(errno.ENOENT, "'{}': no such snapshot".format(
- args.name.decode('utf-8')))
- except libcephfs.Error as e:
- set_exit_code_msg(e.get_error_code(), "error while deleting "
- "'{}'".format(snap_dir.decode('utf-8')))
- else:
- set_exit_code_msg(errno.EINVAL, "snapshot can only be created or "
- "deleted; check - help snap")
-
- def do_help(self, line):
- """
- Get details about a command.
- Usage: help <cmd> - for a specific command
- help all - for all the commands
- """
- if line == 'all':
- for k in dir(self):
- if k.startswith('do_'):
- poutput('-' * 80)
- super().do_help(k[3:])
- return
- parser = self.create_argparser(line)
- if parser:
- parser.print_help()
- else:
- super().do_help(line)
-
- def complete_stat(self, text, line, begidx, endidx):
- """
- auto complete of file name.
- """
- return self.complete_filenames(text, line, begidx, endidx)
-
- stat_parser = argparse.ArgumentParser(
- description='Display file or file system status')
- stat_parser.add_argument('paths', type=str, help='file paths',
- action=path_to_bytes, nargs='+')
-
- @with_argparser(stat_parser)
- def do_stat(self, args):
- """
- Display file or file system status
- """
- for path in args.paths:
- try:
- stat = cephfs.stat(path)
- atime = stat.st_atime.isoformat(' ')
- mtime = stat.st_mtime.isoformat(' ')
- ctime = stat.st_mtime.isoformat(' ')
-
- poutput("File: {}\nSize: {:d}\nBlocks: {:d}\nIO Block: {:d}\n"
- "Device: {:d}\tInode: {:d}\tLinks: {:d}\nPermission: "
- "{:o}/{}\tUid: {:d}\tGid: {:d}\nAccess: {}\nModify: "
- "{}\nChange: {}".format(path.decode('utf-8'),
- stat.st_size, stat.st_blocks,
- stat.st_blksize, stat.st_dev,
- stat.st_ino, stat.st_nlink,
- stat.st_mode,
- mode_notation(stat.st_mode),
- stat.st_uid, stat.st_gid, atime,
- mtime, ctime))
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
- setxattr_parser = argparse.ArgumentParser(
- description='Set extended attribute for a file')
- setxattr_parser.add_argument('path', type=str, action=path_to_bytes, help='Name of the file')
- setxattr_parser.add_argument('name', type=str, help='Extended attribute name')
- setxattr_parser.add_argument('value', type=str, help='Extended attribute value')
-
- @with_argparser(setxattr_parser)
- def do_setxattr(self, args):
- """
- Set extended attribute for a file
- """
- val_bytes = to_bytes(args.value)
- name_bytes = to_bytes(args.name)
- try:
- cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_CREATE)
- poutput('{} is successfully set to {}'.format(args.name, args.value))
- except libcephfs.ObjectExists:
- cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_REPLACE)
- poutput('{} is successfully reset to {}'.format(args.name, args.value))
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
- getxattr_parser = argparse.ArgumentParser(
- description='Get extended attribute set for a file')
- getxattr_parser.add_argument('path', type=str, action=path_to_bytes,
- help='Name of the file')
- getxattr_parser.add_argument('name', type=str, help='Extended attribute name')
-
- @with_argparser(getxattr_parser)
- def do_getxattr(self, args):
- """
- Get extended attribute for a file
- """
- try:
- poutput('{}'.format(cephfs.getxattr(args.path,
- to_bytes(args.name)).decode('utf-8')))
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
- listxattr_parser = argparse.ArgumentParser(
- description='List extended attributes set for a file')
- listxattr_parser.add_argument('path', type=str, action=path_to_bytes,
- help='Name of the file')
-
- @with_argparser(listxattr_parser)
- def do_listxattr(self, args):
- """
- List extended attributes for a file
- """
- try:
- size, xattr_list = cephfs.listxattr(args.path)
- if size > 0:
- poutput('{}'.format(xattr_list.replace(b'\x00', b' ').decode('utf-8')))
- else:
- poutput('No extended attribute is set')
- except libcephfs.Error as e:
- set_exit_code_msg(msg=e)
-
-
-#######################################################
-#
-# Following are methods that get cephfs-shell started.
-#
-#####################################################
-
-def setup_cephfs():
- """
- Mounting a cephfs
- """
- global cephfs
- try:
- cephfs = libcephfs.LibCephFS(conffile='')
- cephfs.mount()
- except libcephfs.ObjectNotFound as e:
- print('couldn\'t find ceph configuration not found')
- sys.exit(e.get_error_code())
- except libcephfs.Error as e:
- print(e)
- sys.exit(e.get_error_code())
-
-
-def str_to_bool(val):
- """
- Return corresponding bool values for strings like 'true' or 'false'.
- """
- if not isinstance(val, str):
- return val
-
- val = val.replace('\n', '')
- if val.lower() in ['true', 'yes']:
- return True
- elif val.lower() in ['false', 'no']:
- return False
- else:
- return val
-
-
-def read_shell_conf(shell, shell_conf_file):
- import configparser
-
- sec = 'cephfs-shell'
- opts = []
- if LooseVersion(cmd2_version) >= LooseVersion("0.10.0"):
- for attr in shell.settables.keys():
- opts.append(attr)
- else:
- if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"):
- # hardcoding options for 0.7.9 because -
- # 1. we use cmd2 v0.7.9 with teuthology and
- # 2. there's no way distinguish between a shell setting and shell
- # object attribute until v0.10.0
- opts = ['abbrev', 'autorun_on_edit', 'colors',
- 'continuation_prompt', 'debug', 'echo', 'editor',
- 'feedback_to_output', 'locals_in_py', 'prompt', 'quiet',
- 'timing']
- elif LooseVersion(cmd2_version) >= LooseVersion("0.9.23"):
- opts.append('allow_style')
- # no equivalent option was defined by cmd2.
- else:
- pass
-
- # default and only section in our conf file.
- cp = configparser.ConfigParser(default_section=sec, strict=False)
- cp.read(shell_conf_file)
- for opt in opts:
- if cp.has_option(sec, opt):
- setattr(shell, opt, str_to_bool(cp.get(sec, opt)))
-
-
-def get_shell_conffile_path(arg_conf=''):
- conf_filename = 'cephfs-shell.conf'
- env_var = 'CEPHFS_SHELL_CONF'
-
- arg_conf = '' if not arg_conf else arg_conf
- home_dir_conf = os.path.expanduser('~/.' + conf_filename)
- env_conf = os.environ[env_var] if env_var in os.environ else ''
-
- # here's the priority by which conf gets read.
- for path in (arg_conf, env_conf, home_dir_conf):
- if os.path.isfile(path):
- return path
- else:
- return ''
-
-
-def manage_args():
- main_parser = argparse.ArgumentParser(description='')
- main_parser.add_argument('-c', '--config', action='store',
- help='Path to Ceph configuration file.',
- type=str)
- main_parser.add_argument('-b', '--batch', action='store',
- help='Path to CephFS shell script/batch file'
- 'containing CephFS shell commands',
- type=str)
- main_parser.add_argument('-t', '--test', action='store',
- help='Test against transcript(s) in FILE',
- nargs='+')
- main_parser.add_argument('commands', nargs='*', help='Comma delimited '
- 'commands. The shell executes the given command '
- 'and quits immediately with the return value of '
- 'command. In case no commands are provided, the '
- 'shell is launched.', default=[])
-
- args = main_parser.parse_args()
- args.exe_and_quit = False # Execute and quit, don't launch the shell.
-
- if args.batch:
- if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"):
- args.commands = ['load ' + args.batch, ',quit']
- else:
- args.commands = ['run_script ' + args.batch, ',quit']
- if args.test:
- args.commands.extend(['-t,'] + [arg + ',' for arg in args.test])
- if not args.batch and len(args.commands) > 0:
- args.exe_and_quit = True
-
- manage_sys_argv(args)
-
- return args
-
-
-def manage_sys_argv(args):
- exe = sys.argv[0]
- sys.argv.clear()
- sys.argv.append(exe)
- sys.argv.extend([i.strip() for i in ' '.join(args.commands).split(',')])
-
- setup_cephfs()
-
-
-def execute_cmd_args(args):
- """
- Launch a shell session if no arguments were passed, else just execute
- the given argument as a shell command and exit the shell session
- immediately at (last) command's termination with the (last) command's
- return value.
- """
- if not args.exe_and_quit:
- return shell.cmdloop()
- return execute_cmds_and_quit(args)
-
-
-def execute_cmds_and_quit(args):
- """
- Multiple commands might be passed separated by commas, feed onecmd()
- one command at a time.
- """
- # do_* methods triggered by cephfs-shell commands return None when they
- # complete running successfully. Until 0.9.6, shell.onecmd() returned this
- # value to indicate whether the execution of the commands should stop, but
- # since 0.9.7 it returns the return value of do_* methods only if it's
- # not None. When it is None it returns False instead of None.
- if LooseVersion(cmd2_version) <= LooseVersion("0.9.6"):
- stop_exec_val = None
- else:
- stop_exec_val = False
-
- args_to_onecmd = ''
- if len(args.commands) <= 1:
- args.commands = args.commands[0].split(' ')
- for cmdarg in args.commands:
- if ',' in cmdarg:
- args_to_onecmd += ' ' + cmdarg[0:-1]
- onecmd_retval = shell.onecmd(args_to_onecmd)
- # if the curent command failed, let's abort the execution of
- # series of commands passed.
- if onecmd_retval is not stop_exec_val:
- return onecmd_retval
- if shell.exit_code != 0:
- return shell.exit_code
-
- args_to_onecmd = ''
- continue
-
- args_to_onecmd += ' ' + cmdarg
- return shell.onecmd(args_to_onecmd)
-
-
-if __name__ == '__main__':
- args = manage_args()
-
- shell = CephFSShell()
- # TODO: perhaps, we should add an option to pass ceph.conf?
- read_shell_conf(shell, get_shell_conffile_path(args.config))
- # XXX: setting shell.exit_code to zero so that in case there are no errors
- # and exceptions, it is not set by any method or function of cephfs-shell
- # and return values from shell.cmdloop() or shell.onecmd() is not an
- # integer, we can treat it as the return value of cephfs-shell.
- shell.exit_code = 0
-
- retval = execute_cmd_args(args)
- sys.exit(retval if retval else shell.exit_code)
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from setuptools import setup
-
-__version__ = '0.0.1'
-
-setup(
- name='cephfs-shell',
- version=__version__,
- description='Interactive shell for Ceph file system',
- keywords='cephfs, shell',
- scripts=['cephfs-shell'],
- install_requires=[
- 'cephfs',
- 'cmd2',
- 'colorama',
- ],
- classifiers=[
- 'Development Status :: 3 - Alpha',
- 'Environment :: Console',
- 'Intended Audience :: System Administrators',
- 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
- 'Operating System :: POSIX :: Linux',
- 'Programming Language :: Python :: 3'
- ],
- license='LGPLv2+',
-)
--- /dev/null
+include(Distutils)
+distutils_install_module(cephfs-shell)
+
+if(WITH_TESTS)
+ include(AddCephTest)
+ add_tox_test(cephfs-shell)
+endif()
--- /dev/null
+#!/usr/bin/python3
+# coding = utf-8
+
+import argparse
+import os
+import os.path
+import sys
+import cephfs as libcephfs
+import shutil
+import traceback
+import colorama
+import fnmatch
+import math
+import re
+import shlex
+import stat
+import errno
+
+from cmd2 import Cmd
+from cmd2 import __version__ as cmd2_version
+from distutils.version import LooseVersion
+
+if sys.version_info.major < 3:
+ raise RuntimeError("cephfs-shell is only compatible with python3")
+
+try:
+ from cmd2 import with_argparser
+except ImportError:
+ def with_argparser(argparser):
+ import functools
+
+ def argparser_decorator(func):
+ @functools.wraps(func)
+ def wrapper(thiz, cmdline):
+ if isinstance(cmdline, list):
+ arglist = cmdline
+ else:
+ # do not split if it's already a list
+ arglist = shlex.split(cmdline, posix=False)
+ # in case user quotes the command args
+ arglist = [arg.strip('\'""') for arg in arglist]
+ try:
+ args = argparser.parse_args(arglist)
+ except SystemExit:
+ shell.exit_code = 1
+ # argparse exits at seeing bad arguments
+ return
+ else:
+ return func(thiz, args)
+ argparser.prog = func.__name__[3:]
+ if argparser.description is None and func.__doc__:
+ argparser.description = func.__doc__
+
+ return wrapper
+
+ return argparser_decorator
+
+
+cephfs = None # holds CephFS Python bindings
+shell = None # holds instance of class CephFSShell
+exit_codes = {'Misc': 1,
+ 'KeyboardInterrupt': 2,
+ errno.EPERM: 3,
+ errno.EACCES: 4,
+ errno.ENOENT: 5,
+ errno.EIO: 6,
+ errno.ENOSPC: 7,
+ errno.EEXIST: 8,
+ errno.ENODATA: 9,
+ errno.EINVAL: 10,
+ errno.EOPNOTSUPP: 11,
+ errno.ERANGE: 12,
+ errno.EWOULDBLOCK: 13,
+ errno.ENOTEMPTY: 14,
+ errno.ENOTDIR: 15,
+ errno.EDQUOT: 16,
+ errno.EPIPE: 17,
+ errno.ESHUTDOWN: 18,
+ errno.ECONNABORTED: 19,
+ errno.ECONNREFUSED: 20,
+ errno.ECONNRESET: 21,
+ errno.EINTR: 22}
+
+
+#########################################################################
+#
+# Following are methods are generically useful through class CephFSShell
+#
+#######################################################################
+
+
+def poutput(s, end='\n'):
+ shell.poutput(s, end=end)
+
+
+def perror(msg, **kwargs):
+ shell.perror(msg, **kwargs)
+
+
+def set_exit_code_msg(errcode='Misc', msg=''):
+ """
+ Set exit code and print error message
+ """
+ if isinstance(msg, libcephfs.Error):
+ shell.exit_code = exit_codes[msg.get_error_code()]
+ else:
+ shell.exit_code = exit_codes[errcode]
+ if msg:
+ perror(msg)
+
+
+def mode_notation(mode):
+ """
+ """
+ permission_bits = {'0': '---',
+ '1': '--x',
+ '2': '-w-',
+ '3': '-wx',
+ '4': 'r--',
+ '5': 'r-x',
+ '6': 'rw-',
+ '7': 'rwx'}
+ mode = str(oct(mode))
+ notation = '-'
+ if mode[2] == '4':
+ notation = 'd'
+ elif mode[2:4] == '12':
+ notation = 'l'
+ for i in mode[-3:]:
+ notation += permission_bits[i]
+ return notation
+
+
+def get_chunks(file_size):
+ chunk_start = 0
+ chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size
+ while chunk_start + chunk_size < file_size:
+ yield chunk_start, chunk_size
+ chunk_start += chunk_size
+ final_chunk_size = file_size - chunk_start
+ yield chunk_start, final_chunk_size
+
+
+def to_bytes(param):
+ # don't convert as follows as it can lead unusable results like coverting
+ # [1, 2, 3, 4] to '[1, 2, 3, 4]' -
+ # str(param).encode('utf-8')
+ if isinstance(param, bytes):
+ return param
+ elif isinstance(param, str):
+ return bytes(param, encoding='utf-8')
+ elif isinstance(param, list):
+ return [i.encode('utf-8') if isinstance(i, str) else to_bytes(i) for
+ i in param]
+ elif isinstance(param, int) or isinstance(param, float):
+ return str(param).encode('utf-8')
+ elif param is None:
+ return None
+
+
+def ls(path, opts=''):
+ # opts tries to be like /bin/ls opts
+ almost_all = 'A' in opts
+ try:
+ with cephfs.opendir(path) as d:
+ while True:
+ dent = cephfs.readdir(d)
+ if dent is None:
+ return
+ elif almost_all and dent.d_name in (b'.', b'..'):
+ continue
+ yield dent
+ except libcephfs.ObjectNotFound as e:
+ set_exit_code_msg(msg=e)
+
+
+def glob(path, pattern):
+ paths = []
+ parent_dir = os.path.dirname(path)
+ if parent_dir == b'':
+ parent_dir = b'/'
+ if path == b'/' or is_dir_exists(os.path.basename(path), parent_dir):
+ for i in ls(path, opts='A'):
+ if fnmatch.fnmatch(i.d_name, pattern):
+ paths.append(os.path.join(path, i.d_name))
+ return paths
+
+
+def locate_file(name, case_sensitive=True):
+ dir_list = sorted(set(dirwalk(cephfs.getcwd())))
+ if not case_sensitive:
+ return [dname for dname in dir_list if name.lower() in dname.lower()]
+ else:
+ return [dname for dname in dir_list if name in dname]
+
+
+def get_all_possible_paths(pattern):
+ complete_pattern = pattern[:]
+ paths = []
+ is_rel_path = not os.path.isabs(pattern)
+ if is_rel_path:
+ dir_ = cephfs.getcwd()
+ else:
+ dir_ = b'/'
+ pattern = pattern[1:]
+ patterns = pattern.split(b'/')
+ paths.extend(glob(dir_, patterns[0]))
+ patterns.pop(0)
+ for pattern in patterns:
+ for path in paths:
+ paths.extend(glob(path, pattern))
+ if is_rel_path:
+ complete_pattern = os.path.join(cephfs.getcwd(), complete_pattern)
+ return [path for path in paths if fnmatch.fnmatch(path, complete_pattern)]
+
+
+suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
+
+
+def humansize(nbytes):
+ i = 0
+ while nbytes >= 1024 and i < len(suffixes) - 1:
+ nbytes /= 1024.
+ i += 1
+ nbytes = math.ceil(nbytes)
+ f = ('%d' % nbytes).rstrip('.')
+ return '%s%s' % (f, suffixes[i])
+
+
+def style_listing(path, is_dir, is_symlink, ls_long=False):
+ if not (is_dir or is_symlink):
+ return path
+ pretty = colorama.Style.BRIGHT
+ if is_symlink:
+ pretty += colorama.Fore.CYAN + path
+ if ls_long:
+ # Add target path
+ pretty += ' -> ' + cephfs.readlink(path, size=255).decode('utf-8')
+ elif is_dir:
+ pretty += colorama.Fore.BLUE + path + '/'
+ pretty += colorama.Style.RESET_ALL
+ return pretty
+
+
+def print_long(path, is_dir, is_symlink, human_readable):
+ info = cephfs.stat(path, follow_symlink=(not is_symlink))
+ pretty = style_listing(os.path.basename(path.decode('utf-8')), is_dir, is_symlink, True)
+ if human_readable:
+ sizefmt = '\t {:10s}'.format(humansize(info.st_size))
+ else:
+ sizefmt = '{:12d}'.format(info.st_size)
+ poutput(f'{mode_notation(info.st_mode)} {sizefmt} {info.st_uid} {info.st_gid} {info.st_mtime}'
+ f' {pretty}')
+
+
+def word_len(word):
+ """
+ Returns the word length, minus any color codes.
+ """
+ if word[0] == '\x1b':
+ return len(word) - 9
+ return len(word)
+
+
+def is_dir_exists(path, dir_=b''):
+ path_to_stat = os.path.join(dir_, path)
+ try:
+ return ((cephfs.stat(path_to_stat).st_mode & 0o0040000) != 0)
+ except libcephfs.Error:
+ return False
+
+
+def is_file_exists(path, dir_=b''):
+ try:
+ # if its not a directory, then its a file
+ return ((cephfs.stat(os.path.join(dir_, path)).st_mode & 0o0040000) == 0)
+ except libcephfs.Error:
+ return False
+
+
+def print_list(words, termwidth=79):
+ if not words:
+ return
+ words = [word.decode('utf-8') if isinstance(word, bytes) else word for word in words]
+ width = max([word_len(word) for word in words]) + 2
+ nwords = len(words)
+ ncols = max(1, (termwidth + 1) // (width + 1))
+ nrows = (nwords + ncols - 1) // ncols
+ for row in range(nrows):
+ for i in range(row, nwords, nrows):
+ word = words[i]
+ print_width = width
+ if word[0] == '\x1b':
+ print_width = print_width + 10
+
+ poutput('%-*s' % (print_width, words[i]),
+ end='\n' if i + nrows >= nwords else '')
+
+
+def copy_from_local(local_path, remote_path):
+ stdin = -1
+ file_ = None
+ fd = None
+ convert_to_bytes = False
+ if local_path == b'-':
+ file_ = sys.stdin
+ convert_to_bytes = True
+ else:
+ try:
+ file_ = open(local_path, 'rb')
+ except PermissionError as e:
+ set_exit_code_msg(e.errno, 'error: no permission to read local file {}'.format(
+ local_path.decode('utf-8')))
+ return
+ stdin = 1
+ try:
+ fd = cephfs.open(remote_path, 'w', 0o666)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+ return
+ progress = 0
+ while True:
+ data = file_.read(65536)
+ if not data or len(data) == 0:
+ break
+ if convert_to_bytes:
+ data = to_bytes(data)
+ wrote = cephfs.write(fd, data, progress)
+ if wrote < 0:
+ break
+ progress += wrote
+ cephfs.close(fd)
+ if stdin > 0:
+ file_.close()
+ poutput('')
+
+
+def copy_to_local(remote_path, local_path):
+ fd = None
+ if local_path != b'-':
+ local_dir = os.path.dirname(local_path)
+ dir_list = remote_path.rsplit(b'/', 1)
+ if not os.path.exists(local_dir):
+ os.makedirs(local_dir)
+ if len(dir_list) > 2 and dir_list[1] == b'':
+ return
+ fd = open(local_path, 'wb+')
+ file_ = cephfs.open(remote_path, 'r')
+ file_size = cephfs.stat(remote_path).st_size
+ if file_size <= 0:
+ return
+ progress = 0
+ for chunk_start, chunk_size in get_chunks(file_size):
+ file_chunk = cephfs.read(file_, chunk_start, chunk_size)
+ progress += len(file_chunk)
+ if fd:
+ fd.write(file_chunk)
+ else:
+ poutput(file_chunk.decode('utf-8'))
+ cephfs.close(file_)
+ if fd:
+ fd.close()
+
+
+def dirwalk(path):
+ """
+ walk a directory tree, using a generator
+ """
+ path = os.path.normpath(path)
+ for item in ls(path, opts='A'):
+ fullpath = os.path.join(path, item.d_name)
+ src_path = fullpath.rsplit(b'/', 1)[0]
+
+ yield os.path.normpath(fullpath)
+ if is_dir_exists(item.d_name, src_path):
+ for x in dirwalk(fullpath):
+ yield x
+
+
+##################################################################
+#
+# Following methods are implementation for CephFS Shell commands
+#
+#################################################################
+
+class CephFSShell(Cmd):
+
+ def __init__(self):
+ super().__init__(use_ipython=False)
+ self.working_dir = cephfs.getcwd().decode('utf-8')
+ self.set_prompt()
+ self.interactive = False
+ self.umask = '2'
+
+ def default(self, line):
+ perror('Unrecognized command')
+
+ def set_prompt(self):
+ self.prompt = ('\033[01;33mCephFS:~' + colorama.Fore.LIGHTCYAN_EX
+ + self.working_dir + colorama.Style.RESET_ALL
+ + '\033[01;33m>>>\033[00m ')
+
+ def create_argparser(self, command):
+ try:
+ argparse_args = getattr(self, 'argparse_' + command)
+ except AttributeError:
+ set_exit_code_msg()
+ return None
+ doc_lines = getattr(
+ self, 'do_' + command).__doc__.expandtabs().splitlines()
+ if '' in doc_lines:
+ blank_idx = doc_lines.index('')
+ usage = doc_lines[:blank_idx]
+ description = doc_lines[blank_idx + 1:]
+ else:
+ usage = doc_lines
+ description = []
+ parser = argparse.ArgumentParser(
+ prog=command,
+ usage='\n'.join(usage),
+ description='\n'.join(description),
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ )
+ for args, kwargs in argparse_args:
+ parser.add_argument(*args, **kwargs)
+ return parser
+
+ def complete_filenames(self, text, line, begidx, endidx):
+ if not text:
+ completions = [x.d_name.decode('utf-8') + '/' * int(x.is_dir())
+ for x in ls(b".", opts='A')]
+ else:
+ if text.count('/') > 0:
+ completions = [text.rsplit('/', 1)[0] + '/'
+ + x.d_name.decode('utf-8') + '/'
+ * int(x.is_dir()) for x in ls('/'
+ + text.rsplit('/', 1)[0], opts='A')
+ if x.d_name.decode('utf-8').startswith(
+ text.rsplit('/', 1)[1])]
+ else:
+ completions = [x.d_name.decode('utf-8') + '/'
+ * int(x.is_dir()) for x in ls(b".", opts='A')
+ if x.d_name.decode('utf-8').startswith(text)]
+ if len(completions) == 1 and completions[0][-1] == '/':
+ dir_, file_ = completions[0].rsplit('/', 1)
+ completions.extend([dir_ + '/' + x.d_name.decode('utf-8')
+ + '/' * int(x.is_dir()) for x in
+ ls('/' + dir_, opts='A')
+ if x.d_name.decode('utf-8').startswith(file_)])
+ return self.delimiter_complete(text, line, begidx, endidx, completions, '/')
+ return completions
+
+ def onecmd(self, line, **kwargs):
+ """
+ Global error catcher
+ """
+ try:
+ res = Cmd.onecmd(self, line, **kwargs)
+ if self.interactive:
+ self.set_prompt()
+ return res
+ except ConnectionError as e:
+ set_exit_code_msg(e.errno, f'***\n{e}')
+ except KeyboardInterrupt:
+ set_exit_code_msg('KeyboardInterrupt', 'Command aborted')
+ except (libcephfs.Error, Exception) as e:
+ if shell.debug:
+ traceback.print_exc(file=sys.stdout)
+ set_exit_code_msg(msg=e)
+
+ class path_to_bytes(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ values = to_bytes(values)
+ setattr(namespace, self.dest, values)
+
+ # TODO: move the necessary contents from here to `class path_to_bytes`.
+ class get_list_of_bytes_path(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ values = to_bytes(values)
+
+ if values == b'.':
+ values = cephfs.getcwd()
+ else:
+ for i in values:
+ if i == b'.':
+ values[values.index(i)] = cephfs.getcwd()
+
+ setattr(namespace, self.dest, values)
+
+ def complete_mkdir(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ class ModeAction(argparse.Action):
+ def __init__(self, option_strings, dest, nargs=None, **kwargs):
+ if nargs is not None and nargs != '?':
+ raise ValueError("more than one modes not allowed")
+ super().__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ o_mode = 0
+ res = None
+ try:
+ o_mode = int(values, base=8)
+ except ValueError:
+ res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', values)
+ if res is None:
+ parser.error("invalid mode: %s\n"
+ "mode must be a numeric octal literal\n"
+ "or ((u?g?o?)|(a?))(=)(r?w?x?)" %
+ values)
+ else:
+ # we are supporting only assignment of mode and not + or -
+ # as is generally available with the chmod command
+ # eg.
+ # >>> res = re.match('((u?g?o?)|(a?))(=)(r?w?x?)', 'go=')
+ # >>> res.groups()
+ # ('go', 'go', None, '=', '')
+ val = res.groups()
+
+ if val[3] != '=':
+ parser.error("need assignment operator between user "
+ "and mode specifiers")
+ if val[4] == '':
+ parser.error("invalid mode: %s\n"
+ "mode must be combination of: r | w | x" %
+ values)
+ users = ''
+ if val[2] is None:
+ users = val[1]
+ else:
+ users = val[2]
+
+ t_mode = 0
+ if users == 'a':
+ users = 'ugo'
+
+ if 'r' in val[4]:
+ t_mode |= 4
+ if 'w' in val[4]:
+ t_mode |= 2
+ if 'x' in val[4]:
+ t_mode |= 1
+
+ if 'u' in users:
+ o_mode |= (t_mode << 6)
+ if 'g' in users:
+ o_mode |= (t_mode << 3)
+ if 'o' in users:
+ o_mode |= t_mode
+
+ if o_mode < 0:
+ parser.error("invalid mode: %s\n"
+ "mode cannot be negative" % values)
+ if o_mode > 0o777:
+ parser.error("invalid mode: %s\n"
+ "mode cannot be greater than octal 0777" % values)
+
+ setattr(namespace, self.dest, str(oct(o_mode)))
+
+ mkdir_parser = argparse.ArgumentParser(
+ description='Create the directory(ies), if they do not already exist.')
+ mkdir_parser.add_argument('dirs', type=str,
+ action=path_to_bytes,
+ metavar='DIR_NAME',
+ help='Name of new_directory.',
+ nargs='+')
+ mkdir_parser.add_argument('-m', '--mode', type=str,
+ action=ModeAction,
+ help='Sets the access mode for the new directory.')
+ mkdir_parser.add_argument('-p', '--parent', action='store_true',
+ help='Create parent directories as necessary. '
+ 'When this option is specified, no error is'
+ 'reported if a directory already exists.')
+
+ @with_argparser(mkdir_parser)
+ def do_mkdir(self, args):
+ """
+ Create directory.
+ """
+ for path in args.dirs:
+ if args.mode:
+ permission = int(args.mode, 8)
+ else:
+ permission = 0o777
+ if args.parent:
+ cephfs.mkdirs(path, permission)
+ else:
+ try:
+ cephfs.mkdir(path, permission)
+ except libcephfs.Error as e:
+ set_exit_code_msg(e)
+
+ def complete_put(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ put_parser = argparse.ArgumentParser(
+ description='Copy a file/directory to Ceph File System from Local File System.')
+ put_parser.add_argument('local_path', type=str, action=path_to_bytes,
+ help='Path of the file in the local system')
+ put_parser.add_argument('remote_path', type=str, action=path_to_bytes,
+ help='Path of the file in the remote system')
+ put_parser.add_argument('-f', '--force', action='store_true',
+ help='Overwrites the destination if it already exists.')
+
+ @with_argparser(put_parser)
+ def do_put(self, args):
+ """
+ Copy a local file/directory to CephFS.
+ """
+ if args.local_path != b'-' and not os.path.isfile(args.local_path) \
+ and not os.path.isdir(args.local_path):
+ set_exit_code_msg(errno.ENOENT,
+ msg=f"error: "
+ f"{args.local_path.decode('utf-8')}: "
+ f"No such file or directory")
+ return
+
+ if (is_file_exists(args.remote_path) or is_dir_exists(
+ args.remote_path)) and not args.force:
+ set_exit_code_msg(msg=f"error: file/directory "
+ f"{args.remote_path.decode('utf-8')} "
+ f"exists, use --force to overwrite")
+ return
+
+ root_src_dir = args.local_path
+ root_dst_dir = args.remote_path
+ if args.local_path == b'.' or args.local_path == b'./':
+ root_src_dir = os.getcwdb()
+ elif len(args.local_path.rsplit(b'/', 1)) < 2:
+ root_src_dir = os.path.join(os.getcwdb(), args.local_path)
+ else:
+ p = args.local_path.split(b'/')
+ if p[0] == b'.':
+ root_src_dir = os.getcwdb()
+ p.pop(0)
+ while len(p) > 0:
+ root_src_dir += b'/' + p.pop(0)
+
+ if root_dst_dir == b'.':
+ if args.local_path != b'-':
+ root_dst_dir = root_src_dir.rsplit(b'/', 1)[1]
+ if root_dst_dir == b'':
+ root_dst_dir = root_src_dir.rsplit(b'/', 1)[0]
+ a = root_dst_dir.rsplit(b'/', 1)
+ if len(a) > 1:
+ root_dst_dir = a[1]
+ else:
+ root_dst_dir = a[0]
+ else:
+ set_exit_code_msg(errno.EINVAL, 'error: no filename specified '
+ 'for destination')
+ return
+
+ if root_dst_dir[-1] != b'/':
+ root_dst_dir += b'/'
+
+ if args.local_path == b'-' or os.path.isfile(root_src_dir):
+ if args.local_path == b'-':
+ root_src_dir = b'-'
+ copy_from_local(root_src_dir, root_dst_dir)
+ else:
+ for src_dir, dirs, files in os.walk(root_src_dir):
+ if isinstance(src_dir, str):
+ src_dir = to_bytes(src_dir)
+ dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1)
+ dst_dir = re.sub(rb'\/+', b'/', cephfs.getcwd()
+ + dst_dir)
+ if args.force and dst_dir != b'/' and not is_dir_exists(
+ dst_dir[:-1]) and not locate_file(dst_dir):
+ try:
+ cephfs.mkdirs(dst_dir, 0o777)
+ except libcephfs.Error:
+ pass
+ if (not args.force) and dst_dir != b'/' and not is_dir_exists(
+ dst_dir) and not os.path.isfile(root_src_dir):
+ try:
+ cephfs.mkdirs(dst_dir, 0o777)
+ except libcephfs.Error:
+ # TODO: perhaps, set retval to 1?
+ pass
+
+ for dir_ in dirs:
+ dir_name = os.path.join(dst_dir, dir_)
+ if not is_dir_exists(dir_name):
+ try:
+ cephfs.mkdirs(dir_name, 0o777)
+ except libcephfs.Error:
+ # TODO: perhaps, set retval to 1?
+ pass
+
+ for file_ in files:
+ src_file = os.path.join(src_dir, file_)
+ dst_file = re.sub(rb'\/+', b'/', b'/' + dst_dir + b'/' + file_)
+ if (not args.force) and is_file_exists(dst_file):
+ return
+ copy_from_local(src_file, os.path.join(cephfs.getcwd(),
+ dst_file))
+
+ def complete_get(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ get_parser = argparse.ArgumentParser(
+ description='Copy a file from Ceph File System to Local Directory.')
+ get_parser.add_argument('remote_path', type=str, action=path_to_bytes,
+ help='Path of the file in the remote system')
+ get_parser.add_argument('local_path', type=str, action=path_to_bytes,
+ help='Path of the file in the local system')
+ get_parser.add_argument('-f', '--force', action='store_true',
+ help='Overwrites the destination if it already exists.')
+
+ @with_argparser(get_parser)
+ def do_get(self, args):
+ """
+ Copy a file/directory from CephFS to given path.
+ """
+ if not is_file_exists(args.remote_path) and not \
+ is_dir_exists(args.remote_path):
+ set_exit_code_msg(errno.ENOENT, "error: no file/directory"
+ " found at specified remote "
+ "path")
+ return
+ if (os.path.isfile(args.local_path) or os.path.isdir(
+ args.local_path)) and not args.force:
+ set_exit_code_msg(msg=f"error: file/directory "
+ f"{args.local_path.decode('utf-8')}"
+ f" already exists, use --force to "
+ f"overwrite")
+ return
+ root_src_dir = args.remote_path
+ root_dst_dir = args.local_path
+ fname = root_src_dir.rsplit(b'/', 1)
+ if args.local_path == b'.':
+ root_dst_dir = os.getcwdb()
+ if args.remote_path == b'.':
+ root_src_dir = cephfs.getcwd()
+ if args.local_path == b'-':
+ if args.remote_path == b'.' or args.remote_path == b'./':
+ set_exit_code_msg(errno.EINVAL, 'error: no remote file name specified')
+ return
+ copy_to_local(root_src_dir, b'-')
+ elif is_file_exists(args.remote_path):
+ copy_to_local(root_src_dir, root_dst_dir)
+ elif b'/' in root_src_dir and is_file_exists(fname[1], fname[0]):
+ copy_to_local(root_src_dir, root_dst_dir)
+ else:
+ files = list(reversed(sorted(dirwalk(root_src_dir))))
+ for file_ in files:
+ dst_dirpath, dst_file = file_.rsplit(b'/', 1)
+ if dst_dirpath in files:
+ files.remove(dst_dirpath)
+ dst_path = os.path.join(root_dst_dir, dst_dirpath, dst_file)
+ dst_path = os.path.normpath(dst_path)
+ if is_dir_exists(file_):
+ try:
+ os.makedirs(dst_path)
+ except OSError:
+ pass
+ else:
+ copy_to_local(file_, dst_path)
+
+ return 0
+
+ def complete_ls(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ ls_parser = argparse.ArgumentParser(
+ description='Copy a file from Ceph File System from Local Directory.')
+ ls_parser.add_argument('-l', '--long', action='store_true',
+ help='Detailed list of items in the directory.')
+ ls_parser.add_argument('-r', '--reverse', action='store_true',
+ help='Reverse order of listing items in the directory.')
+ ls_parser.add_argument('-H', action='store_true', help='Human Readable')
+ ls_parser.add_argument('-a', '--all', action='store_true',
+ help='Do not Ignore entries starting with .')
+ ls_parser.add_argument('-S', action='store_true', help='Sort by file_size')
+ ls_parser.add_argument('paths', help='Name of Directories',
+ action=path_to_bytes, nargs='*', default=['.'])
+
+ @with_argparser(ls_parser)
+ def do_ls(self, args):
+ """
+ List all the files and directories in the current working directory
+ """
+ paths = args.paths
+ for path in paths:
+ values = []
+ items = []
+ try:
+ if path.count(b'*') > 0:
+ all_items = get_all_possible_paths(path)
+ if len(all_items) == 0:
+ continue
+ path = all_items[0].rsplit(b'/', 1)[0]
+ if path == b'':
+ path = b'/'
+ dirs = []
+ for i in all_items:
+ for item in ls(path):
+ d_name = item.d_name
+ if os.path.basename(i) == d_name:
+ if item.is_dir():
+ dirs.append(os.path.join(path, d_name))
+ else:
+ items.append(item)
+ if dirs:
+ paths.extend(dirs)
+ else:
+ poutput(path.decode('utf-8'), end=':\n')
+ items = sorted(items, key=lambda item: item.d_name)
+ else:
+ if path != b'' and path != cephfs.getcwd() and len(paths) > 1:
+ poutput(path.decode('utf-8'), end=':\n')
+ items = sorted(ls(path), key=lambda item: item.d_name)
+ if not args.all:
+ items = [i for i in items if not i.d_name.startswith(b'.')]
+ if args.S:
+ items = sorted(items, key=lambda item: cephfs.stat(
+ path + b'/' + item.d_name, follow_symlink=(
+ not item.is_symbol_file())).st_size)
+ if args.reverse:
+ items = reversed(items)
+ for item in items:
+ filepath = item.d_name
+ is_dir = item.is_dir()
+ is_sym_lnk = item.is_symbol_file()
+ try:
+ if args.long and args.H:
+ print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir,
+ is_sym_lnk, True)
+ elif args.long:
+ print_long(os.path.join(cephfs.getcwd(), path, filepath), is_dir,
+ is_sym_lnk, False)
+ elif is_sym_lnk or is_dir:
+ values.append(style_listing(filepath.decode('utf-8'), is_dir,
+ is_sym_lnk))
+ else:
+ values.append(filepath)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+ if not args.long:
+ print_list(values, shutil.get_terminal_size().columns)
+ if path != paths[-1]:
+ poutput('')
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ def complete_rmdir(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ rmdir_parser = argparse.ArgumentParser(description='Remove Directory.')
+ rmdir_parser.add_argument('paths', help='Directory Path.', nargs='+',
+ action=path_to_bytes)
+ rmdir_parser.add_argument('-p', '--parent', action='store_true',
+ help='Remove parent directories as necessary. '
+ 'When this option is specified, no error '
+ 'is reported if a directory has any '
+ 'sub-directories, files')
+
+ @with_argparser(rmdir_parser)
+ def do_rmdir(self, args):
+ self.do_rmdir_helper(args)
+
+ def do_rmdir_helper(self, args):
+ """
+ Remove a specific Directory
+ """
+ is_pattern = False
+ paths = args.paths
+ for path in paths:
+ if path.count(b'*') > 0:
+ is_pattern = True
+ all_items = get_all_possible_paths(path)
+ if len(all_items) > 0:
+ path = all_items[0].rsplit(b'/', 1)[0]
+ if path == b'':
+ path = b'/'
+ dirs = []
+ for i in all_items:
+ for item in ls(path):
+ d_name = item.d_name
+ if os.path.basename(i) == d_name:
+ if item.is_dir():
+ dirs.append(os.path.join(path, d_name))
+ paths.extend(dirs)
+ continue
+ else:
+ is_pattern = False
+
+ if args.parent:
+ path = os.path.join(cephfs.getcwd(), path.rsplit(b'/')[0])
+ files = list(sorted(set(dirwalk(path)), reverse=True))
+ if not files:
+ path = b'.'
+ for filepath in files:
+ try:
+ cephfs.rmdir(os.path.normpath(filepath))
+ except libcephfs.Error as e:
+ perror(e)
+ path = b'.'
+ break
+ else:
+ path = os.path.normpath(os.path.join(cephfs.getcwd(), path))
+ if not is_pattern and path != os.path.normpath(b''):
+ try:
+ cephfs.rmdir(path)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ def complete_rm(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ rm_parser = argparse.ArgumentParser(description='Remove File.')
+ rm_parser.add_argument('paths', help='File Path.', nargs='+',
+ action=path_to_bytes)
+
+ @with_argparser(rm_parser)
+ def do_rm(self, args):
+ """
+ Remove a specific file
+ """
+ file_paths = args.paths
+ for path in file_paths:
+ if path.count(b'*') > 0:
+ file_paths.extend([i for i in get_all_possible_paths(
+ path) if is_file_exists(i)])
+ else:
+ try:
+ cephfs.unlink(path)
+ except libcephfs.Error as e:
+ # NOTE: perhaps we need a better msg here
+ set_exit_code_msg(msg=e)
+
+ def complete_mv(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ mv_parser = argparse.ArgumentParser(description='Move File.')
+ mv_parser.add_argument('src_path', type=str, action=path_to_bytes,
+ help='Source File Path.')
+ mv_parser.add_argument('dest_path', type=str, action=path_to_bytes,
+ help='Destination File Path.')
+
+ @with_argparser(mv_parser)
+ def do_mv(self, args):
+ """
+ Rename a file or Move a file from source path to the destination
+ """
+ cephfs.rename(args.src_path, args.dest_path)
+
+ def complete_cd(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ cd_parser = argparse.ArgumentParser(description='Change working directory')
+ cd_parser.add_argument('path', type=str, help='Name of the directory.',
+ action=path_to_bytes, nargs='?', default='/')
+
+ @with_argparser(cd_parser)
+ def do_cd(self, args):
+ """
+ Change working directory
+ """
+ cephfs.chdir(args.path)
+ self.working_dir = cephfs.getcwd().decode('utf-8')
+ self.set_prompt()
+
+ def do_cwd(self, arglist):
+ """
+ Get current working directory.
+ """
+ poutput(cephfs.getcwd().decode('utf-8'))
+
+ def complete_chmod(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ chmod_parser = argparse.ArgumentParser(description='Create Directory.')
+ chmod_parser.add_argument('mode', type=str, action=ModeAction, help='Mode')
+ chmod_parser.add_argument('paths', type=str, action=path_to_bytes,
+ help='Name of the file', nargs='+')
+
+ @with_argparser(chmod_parser)
+ def do_chmod(self, args):
+ """
+ Change permission of a file
+ """
+ for path in args.paths:
+ mode = int(args.mode, base=8)
+ try:
+ cephfs.chmod(path, mode)
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ def complete_cat(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ cat_parser = argparse.ArgumentParser(description='')
+ cat_parser.add_argument('paths', help='Name of Files', action=path_to_bytes,
+ nargs='+')
+
+ @with_argparser(cat_parser)
+ def do_cat(self, args):
+ """
+ Print contents of a file
+ """
+ for path in args.paths:
+ if is_file_exists(path):
+ copy_to_local(path, b'-')
+ else:
+ set_exit_code_msg(errno.ENOENT, '{}: no such file'.format(
+ path.decode('utf-8')))
+
+ umask_parser = argparse.ArgumentParser(description='Set umask value.')
+ umask_parser.add_argument('mode', help='Mode', type=str, action=ModeAction,
+ nargs='?', default='')
+
+ @with_argparser(umask_parser)
+ def do_umask(self, args):
+ """
+ Set Umask value.
+ """
+ if args.mode == '':
+ poutput(self.umask.zfill(4))
+ else:
+ mode = int(args.mode, 8)
+ self.umask = str(oct(cephfs.umask(mode))[2:])
+
+ def complete_write(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ write_parser = argparse.ArgumentParser(description='Writes data into a file')
+ write_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of File')
+
+ @with_argparser(write_parser)
+ def do_write(self, args):
+ """
+ Write data into a file.
+ """
+
+ copy_from_local(b'-', args.path)
+
+ def complete_lcd(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ lcd_parser = argparse.ArgumentParser(description='')
+ lcd_parser.add_argument('path', type=str, action=path_to_bytes, help='Path')
+
+ @with_argparser(lcd_parser)
+ def do_lcd(self, args):
+ """
+ Moves into the given local directory
+ """
+ try:
+ os.chdir(os.path.expanduser(args.path))
+ except OSError as e:
+ set_exit_code_msg(e.errno, "Cannot change to "
+ f"{e.filename.decode('utf-8')}: {e.strerror}")
+
+ def complete_lls(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ lls_parser = argparse.ArgumentParser(
+ description='List files in local system.')
+ lls_parser.add_argument('paths', help='Paths', action=path_to_bytes,
+ nargs='*')
+
+ @with_argparser(lls_parser)
+ def do_lls(self, args):
+ """
+ Lists all files and folders in the current local directory
+ """
+ if not args.paths:
+ print_list(os.listdir(os.getcwdb()))
+ else:
+ for path in args.paths:
+ try:
+ items = os.listdir(path)
+ poutput("{}:".format(path.decode('utf-8')))
+ print_list(items)
+ except OSError as e:
+ set_exit_code_msg(e.errno, f"{e.filename.decode('utf-8')}: "
+ f"{e.strerror}")
+ # Arguments to the with_argpaser decorator function are sticky.
+ # The items in args.path do not get overwritten in subsequent calls.
+ # The arguments remain in args.paths after the function exits and we
+ # neeed to clean it up to ensure the next call works as expected.
+ args.paths.clear()
+
+ def do_lpwd(self, arglist):
+ """
+ Prints the absolute path of the current local directory
+ """
+ poutput(os.getcwd())
+
+ def complete_df(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ df_parser = argparse.ArgumentParser(description='Show information about\
+ the amount of available disk space')
+ df_parser.add_argument('file', help='Name of the file', nargs='*',
+ default=['.'], action=path_to_bytes)
+
+ @with_argparser(df_parser)
+ def do_df(self, arglist):
+ """
+ Display the amount of available disk space for file systems
+ """
+ header = True # Set to true for printing header only once
+ if b'.' == arglist.file[0]:
+ arglist.file = ls(b'.')
+
+ for file in arglist.file:
+ if isinstance(file, libcephfs.DirEntry):
+ file = file.d_name
+ if file == b'.' or file == b'..':
+ continue
+ try:
+ statfs = cephfs.statfs(file)
+ stat = cephfs.stat(file)
+ block_size = (statfs['f_blocks'] * statfs['f_bsize']) // 1024
+ available = block_size - stat.st_size
+ use = 0
+
+ if block_size > 0:
+ use = (stat.st_size * 100) // block_size
+
+ if header:
+ header = False
+ poutput('{:25s}\t{:5s}\t{:15s}{:10s}{}'.format(
+ "1K-blocks", "Used", "Available", "Use%",
+ "Stored on"))
+
+ poutput('{:d}\t{:18d}\t{:8d}\t{:10s} {}'.format(block_size,
+ stat.st_size, available, str(int(use)) + '%',
+ file.decode('utf-8')))
+ except libcephfs.OSError as e:
+ set_exit_code_msg(e.get_error_code(), "could not statfs {}: {}".format(
+ file.decode('utf-8'), e.strerror))
+
+ locate_parser = argparse.ArgumentParser(
+ description='Find file within file system')
+ locate_parser.add_argument('name', help='name', type=str,
+ action=path_to_bytes)
+ locate_parser.add_argument('-c', '--count', action='store_true',
+ help='Count list of items located.')
+ locate_parser.add_argument(
+ '-i', '--ignorecase', action='store_true', help='Ignore case')
+
+ @with_argparser(locate_parser)
+ def do_locate(self, args):
+ """
+ Find a file within the File System
+ """
+ if args.name.count(b'*') == 1:
+ if args.name[0] == b'*':
+ args.name += b'/'
+ elif args.name[-1] == '*':
+ args.name = b'/' + args.name
+ args.name = args.name.replace(b'*', b'')
+ if args.ignorecase:
+ locations = locate_file(args.name, False)
+ else:
+ locations = locate_file(args.name)
+ if args.count:
+ poutput(len(locations))
+ else:
+ poutput((b'\n'.join(locations)).decode('utf-8'))
+
+ def complete_du(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ du_parser = argparse.ArgumentParser(
+ description='Disk Usage of a Directory')
+ du_parser.add_argument('paths', type=str, action=get_list_of_bytes_path,
+ help='Name of the directory.', nargs='*',
+ default=[b'.'])
+ du_parser.add_argument('-r', action='store_true',
+ help='Recursive Disk usage of all directories.')
+
+ @with_argparser(du_parser)
+ def do_du(self, args):
+ """
+ Print disk usage of a given path(s).
+ """
+ def print_disk_usage(files):
+ if isinstance(files, bytes):
+ files = (files, )
+
+ for f in files:
+ try:
+ st = cephfs.lstat(f)
+
+ if stat.S_ISDIR(st.st_mode):
+ dusage = int(cephfs.getxattr(f,
+ 'ceph.dir.rbytes').decode('utf-8'))
+ else:
+ dusage = st.st_size
+
+ # print path in local context
+ f = os.path.normpath(f)
+ if f[0] is ord('/'):
+ f = b'.' + f
+ poutput('{:10s} {}'.format(humansize(dusage),
+ f.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+ continue
+
+ for path in args.paths:
+ if args.r:
+ print_disk_usage(sorted(set(dirwalk(path)).union({path})))
+ else:
+ print_disk_usage(path)
+
+ quota_parser = argparse.ArgumentParser(
+ description='Quota management for a Directory')
+ quota_parser.add_argument('op', choices=['get', 'set'],
+ help='Quota operation type.')
+ quota_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of the directory.')
+ quota_parser.add_argument('--max_bytes', type=int, default=-1, nargs='?',
+ help='Max cumulative size of the data under '
+ 'this directory.')
+ quota_parser.add_argument('--max_files', type=int, default=-1, nargs='?',
+ help='Total number of files under this '
+ 'directory tree.')
+
+ @with_argparser(quota_parser)
+ def do_quota(self, args):
+ """
+ Quota management.
+ """
+ if not is_dir_exists(args.path):
+ set_exit_code_msg(errno.ENOENT, 'error: no such directory {}'.format(
+ args.path.decode('utf-8')))
+ return
+
+ if args.op == 'set':
+ if (args.max_bytes == -1) and (args.max_files == -1):
+ set_exit_code_msg(errno.EINVAL, 'please specify either '
+ '--max_bytes or --max_files or both')
+ return
+
+ if args.max_bytes >= 0:
+ max_bytes = to_bytes(str(args.max_bytes))
+ try:
+ cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
+ max_bytes, os.XATTR_CREATE)
+ poutput('max_bytes set to %d' % args.max_bytes)
+ except libcephfs.Error as e:
+ cephfs.setxattr(args.path, 'ceph.quota.max_bytes',
+ max_bytes, os.XATTR_REPLACE)
+ set_exit_code_msg(e.get_error_code(), 'max_bytes reset to '
+ f'{args.max_bytes}')
+
+ if args.max_files >= 0:
+ max_files = to_bytes(str(args.max_files))
+ try:
+ cephfs.setxattr(args.path, 'ceph.quota.max_files',
+ max_files, os.XATTR_CREATE)
+ poutput('max_files set to %d' % args.max_files)
+ except libcephfs.Error as e:
+ cephfs.setxattr(args.path, 'ceph.quota.max_files',
+ max_files, os.XATTR_REPLACE)
+ set_exit_code_msg(e.get_error_code(), 'max_files reset to '
+ f'{args.max_files}')
+ elif args.op == 'get':
+ max_bytes = '0'
+ max_files = '0'
+ try:
+ max_bytes = cephfs.getxattr(args.path, 'ceph.quota.max_bytes')
+ poutput('max_bytes: {}'.format(max_bytes.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(), 'max_bytes is not set')
+
+ try:
+ max_files = cephfs.getxattr(args.path, 'ceph.quota.max_files')
+ poutput('max_files: {}'.format(max_files.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(), 'max_files is not set')
+
+ snap_parser = argparse.ArgumentParser(description='Snapshot Management')
+ snap_parser.add_argument('op', type=str,
+ help='Snapshot operation: create or delete')
+ snap_parser.add_argument('name', type=str, action=path_to_bytes,
+ help='Name of snapshot')
+ snap_parser.add_argument('dir', type=str, action=path_to_bytes,
+ help='Directory for which snapshot '
+ 'needs to be created or deleted')
+
+ @with_argparser(snap_parser)
+ def do_snap(self, args):
+ """
+ Snapshot management for the volume
+ """
+ # setting self.colors to None turns off colorizing and
+ # perror emits plain text
+ self.colors = None
+
+ snapdir = '.snap'
+ conf_snapdir = cephfs.conf_get('client_snapdir')
+ if conf_snapdir is not None:
+ snapdir = conf_snapdir
+ snapdir = to_bytes(snapdir)
+ if args.op == 'create':
+ try:
+ if is_dir_exists(args.dir):
+ cephfs.mkdir(os.path.join(args.dir, snapdir, args.name), 0o755)
+ else:
+ set_exit_code_msg(errno.ENOENT, "'{}': no such directory".format(
+ args.dir.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(),
+ "snapshot '{}' already exists".format(
+ args.name.decode('utf-8')))
+ elif args.op == 'delete':
+ snap_dir = os.path.join(args.dir, snapdir, args.name)
+ try:
+ if is_dir_exists(snap_dir):
+ newargs = argparse.Namespace(paths=[snap_dir], parent=False)
+ self.do_rmdir_helper(newargs)
+ else:
+ set_exit_code_msg(errno.ENOENT, "'{}': no such snapshot".format(
+ args.name.decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(e.get_error_code(), "error while deleting "
+ "'{}'".format(snap_dir.decode('utf-8')))
+ else:
+ set_exit_code_msg(errno.EINVAL, "snapshot can only be created or "
+ "deleted; check - help snap")
+
+ def do_help(self, line):
+ """
+ Get details about a command.
+ Usage: help <cmd> - for a specific command
+ help all - for all the commands
+ """
+ if line == 'all':
+ for k in dir(self):
+ if k.startswith('do_'):
+ poutput('-' * 80)
+ super().do_help(k[3:])
+ return
+ parser = self.create_argparser(line)
+ if parser:
+ parser.print_help()
+ else:
+ super().do_help(line)
+
+ def complete_stat(self, text, line, begidx, endidx):
+ """
+ auto complete of file name.
+ """
+ return self.complete_filenames(text, line, begidx, endidx)
+
+ stat_parser = argparse.ArgumentParser(
+ description='Display file or file system status')
+ stat_parser.add_argument('paths', type=str, help='file paths',
+ action=path_to_bytes, nargs='+')
+
+ @with_argparser(stat_parser)
+ def do_stat(self, args):
+ """
+ Display file or file system status
+ """
+ for path in args.paths:
+ try:
+ stat = cephfs.stat(path)
+ atime = stat.st_atime.isoformat(' ')
+ mtime = stat.st_mtime.isoformat(' ')
+ ctime = stat.st_mtime.isoformat(' ')
+
+ poutput("File: {}\nSize: {:d}\nBlocks: {:d}\nIO Block: {:d}\n"
+ "Device: {:d}\tInode: {:d}\tLinks: {:d}\nPermission: "
+ "{:o}/{}\tUid: {:d}\tGid: {:d}\nAccess: {}\nModify: "
+ "{}\nChange: {}".format(path.decode('utf-8'),
+ stat.st_size, stat.st_blocks,
+ stat.st_blksize, stat.st_dev,
+ stat.st_ino, stat.st_nlink,
+ stat.st_mode,
+ mode_notation(stat.st_mode),
+ stat.st_uid, stat.st_gid, atime,
+ mtime, ctime))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ setxattr_parser = argparse.ArgumentParser(
+ description='Set extended attribute for a file')
+ setxattr_parser.add_argument('path', type=str, action=path_to_bytes, help='Name of the file')
+ setxattr_parser.add_argument('name', type=str, help='Extended attribute name')
+ setxattr_parser.add_argument('value', type=str, help='Extended attribute value')
+
+ @with_argparser(setxattr_parser)
+ def do_setxattr(self, args):
+ """
+ Set extended attribute for a file
+ """
+ val_bytes = to_bytes(args.value)
+ name_bytes = to_bytes(args.name)
+ try:
+ cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_CREATE)
+ poutput('{} is successfully set to {}'.format(args.name, args.value))
+ except libcephfs.ObjectExists:
+ cephfs.setxattr(args.path, name_bytes, val_bytes, os.XATTR_REPLACE)
+ poutput('{} is successfully reset to {}'.format(args.name, args.value))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ getxattr_parser = argparse.ArgumentParser(
+ description='Get extended attribute set for a file')
+ getxattr_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of the file')
+ getxattr_parser.add_argument('name', type=str, help='Extended attribute name')
+
+ @with_argparser(getxattr_parser)
+ def do_getxattr(self, args):
+ """
+ Get extended attribute for a file
+ """
+ try:
+ poutput('{}'.format(cephfs.getxattr(args.path,
+ to_bytes(args.name)).decode('utf-8')))
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+ listxattr_parser = argparse.ArgumentParser(
+ description='List extended attributes set for a file')
+ listxattr_parser.add_argument('path', type=str, action=path_to_bytes,
+ help='Name of the file')
+
+ @with_argparser(listxattr_parser)
+ def do_listxattr(self, args):
+ """
+ List extended attributes for a file
+ """
+ try:
+ size, xattr_list = cephfs.listxattr(args.path)
+ if size > 0:
+ poutput('{}'.format(xattr_list.replace(b'\x00', b' ').decode('utf-8')))
+ else:
+ poutput('No extended attribute is set')
+ except libcephfs.Error as e:
+ set_exit_code_msg(msg=e)
+
+
+#######################################################
+#
+# Following are methods that get cephfs-shell started.
+#
+#####################################################
+
+def setup_cephfs():
+ """
+ Mounting a cephfs
+ """
+ global cephfs
+ try:
+ cephfs = libcephfs.LibCephFS(conffile='')
+ cephfs.mount()
+ except libcephfs.ObjectNotFound as e:
+ print('couldn\'t find ceph configuration not found')
+ sys.exit(e.get_error_code())
+ except libcephfs.Error as e:
+ print(e)
+ sys.exit(e.get_error_code())
+
+
+def str_to_bool(val):
+ """
+ Return corresponding bool values for strings like 'true' or 'false'.
+ """
+ if not isinstance(val, str):
+ return val
+
+ val = val.replace('\n', '')
+ if val.lower() in ['true', 'yes']:
+ return True
+ elif val.lower() in ['false', 'no']:
+ return False
+ else:
+ return val
+
+
+def read_shell_conf(shell, shell_conf_file):
+ import configparser
+
+ sec = 'cephfs-shell'
+ opts = []
+ if LooseVersion(cmd2_version) >= LooseVersion("0.10.0"):
+ for attr in shell.settables.keys():
+ opts.append(attr)
+ else:
+ if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"):
+ # hardcoding options for 0.7.9 because -
+ # 1. we use cmd2 v0.7.9 with teuthology and
+ # 2. there's no way distinguish between a shell setting and shell
+ # object attribute until v0.10.0
+ opts = ['abbrev', 'autorun_on_edit', 'colors',
+ 'continuation_prompt', 'debug', 'echo', 'editor',
+ 'feedback_to_output', 'locals_in_py', 'prompt', 'quiet',
+ 'timing']
+ elif LooseVersion(cmd2_version) >= LooseVersion("0.9.23"):
+ opts.append('allow_style')
+ # no equivalent option was defined by cmd2.
+ else:
+ pass
+
+ # default and only section in our conf file.
+ cp = configparser.ConfigParser(default_section=sec, strict=False)
+ cp.read(shell_conf_file)
+ for opt in opts:
+ if cp.has_option(sec, opt):
+ setattr(shell, opt, str_to_bool(cp.get(sec, opt)))
+
+
+def get_shell_conffile_path(arg_conf=''):
+ conf_filename = 'cephfs-shell.conf'
+ env_var = 'CEPHFS_SHELL_CONF'
+
+ arg_conf = '' if not arg_conf else arg_conf
+ home_dir_conf = os.path.expanduser('~/.' + conf_filename)
+ env_conf = os.environ[env_var] if env_var in os.environ else ''
+
+ # here's the priority by which conf gets read.
+ for path in (arg_conf, env_conf, home_dir_conf):
+ if os.path.isfile(path):
+ return path
+ else:
+ return ''
+
+
+def manage_args():
+ main_parser = argparse.ArgumentParser(description='')
+ main_parser.add_argument('-c', '--config', action='store',
+ help='Path to Ceph configuration file.',
+ type=str)
+ main_parser.add_argument('-b', '--batch', action='store',
+ help='Path to CephFS shell script/batch file'
+ 'containing CephFS shell commands',
+ type=str)
+ main_parser.add_argument('-t', '--test', action='store',
+ help='Test against transcript(s) in FILE',
+ nargs='+')
+ main_parser.add_argument('commands', nargs='*', help='Comma delimited '
+ 'commands. The shell executes the given command '
+ 'and quits immediately with the return value of '
+ 'command. In case no commands are provided, the '
+ 'shell is launched.', default=[])
+
+ args = main_parser.parse_args()
+ args.exe_and_quit = False # Execute and quit, don't launch the shell.
+
+ if args.batch:
+ if LooseVersion(cmd2_version) <= LooseVersion("0.9.13"):
+ args.commands = ['load ' + args.batch, ',quit']
+ else:
+ args.commands = ['run_script ' + args.batch, ',quit']
+ if args.test:
+ args.commands.extend(['-t,'] + [arg + ',' for arg in args.test])
+ if not args.batch and len(args.commands) > 0:
+ args.exe_and_quit = True
+
+ manage_sys_argv(args)
+
+ return args
+
+
+def manage_sys_argv(args):
+ exe = sys.argv[0]
+ sys.argv.clear()
+ sys.argv.append(exe)
+ sys.argv.extend([i.strip() for i in ' '.join(args.commands).split(',')])
+
+ setup_cephfs()
+
+
+def execute_cmd_args(args):
+ """
+ Launch a shell session if no arguments were passed, else just execute
+ the given argument as a shell command and exit the shell session
+ immediately at (last) command's termination with the (last) command's
+ return value.
+ """
+ if not args.exe_and_quit:
+ return shell.cmdloop()
+ return execute_cmds_and_quit(args)
+
+
+def execute_cmds_and_quit(args):
+ """
+ Multiple commands might be passed separated by commas, feed onecmd()
+ one command at a time.
+ """
+ # do_* methods triggered by cephfs-shell commands return None when they
+ # complete running successfully. Until 0.9.6, shell.onecmd() returned this
+ # value to indicate whether the execution of the commands should stop, but
+ # since 0.9.7 it returns the return value of do_* methods only if it's
+ # not None. When it is None it returns False instead of None.
+ if LooseVersion(cmd2_version) <= LooseVersion("0.9.6"):
+ stop_exec_val = None
+ else:
+ stop_exec_val = False
+
+ args_to_onecmd = ''
+ if len(args.commands) <= 1:
+ args.commands = args.commands[0].split(' ')
+ for cmdarg in args.commands:
+ if ',' in cmdarg:
+ args_to_onecmd += ' ' + cmdarg[0:-1]
+ onecmd_retval = shell.onecmd(args_to_onecmd)
+ # if the curent command failed, let's abort the execution of
+ # series of commands passed.
+ if onecmd_retval is not stop_exec_val:
+ return onecmd_retval
+ if shell.exit_code != 0:
+ return shell.exit_code
+
+ args_to_onecmd = ''
+ continue
+
+ args_to_onecmd += ' ' + cmdarg
+ return shell.onecmd(args_to_onecmd)
+
+
+if __name__ == '__main__':
+ args = manage_args()
+
+ shell = CephFSShell()
+ # TODO: perhaps, we should add an option to pass ceph.conf?
+ read_shell_conf(shell, get_shell_conffile_path(args.config))
+ # XXX: setting shell.exit_code to zero so that in case there are no errors
+ # and exceptions, it is not set by any method or function of cephfs-shell
+ # and return values from shell.cmdloop() or shell.onecmd() is not an
+ # integer, we can treat it as the return value of cephfs-shell.
+ shell.exit_code = 0
+
+ retval = execute_cmd_args(args)
+ sys.exit(retval if retval else shell.exit_code)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from setuptools import setup
+
+__version__ = '0.0.1'
+
+setup(
+ name='cephfs-shell',
+ version=__version__,
+ description='Interactive shell for Ceph file system',
+ keywords='cephfs, shell',
+ scripts=['cephfs-shell'],
+ install_requires=[
+ 'cephfs',
+ 'cmd2',
+ 'colorama',
+ ],
+ classifiers=[
+ 'Development Status :: 3 - Alpha',
+ 'Environment :: Console',
+ 'Intended Audience :: System Administrators',
+ 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
+ 'Operating System :: POSIX :: Linux',
+ 'Programming Language :: Python :: 3'
+ ],
+ license='LGPLv2+',
+)
--- /dev/null
+[tox]
+envlist = py3
+skipsdist = true
+
+[testenv:py3]
+deps = flake8
+commands = flake8 --ignore=W503 --max-line-length=100 cephfs-shell
+++ /dev/null
-[tox]
-envlist = py3
-skipsdist = true
-
-[testenv:py3]
-deps = flake8
-commands = flake8 --ignore=W503 --max-line-length=100 cephfs-shell