"""
from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_int, c_long, c_uint, c_ulong, \
c_ushort, create_string_buffer, byref, Structure, pointer, c_char, POINTER, \
- c_uint8
+ c_uint8, c_int64
from ctypes.util import find_library
from collections import namedtuple
import errno
self.state = "mounted"
def statfs(self, path):
+ if not isinstance(path, basestring):
+ raise TypeError('path must be a string')
self.require_state("mounted")
statbuf = cephfs_statvfs()
ret = self.libcephfs.ceph_statfs(self.cluster, c_char_p(path), byref(statbuf))
if ret < 0:
raise make_ex(ret, "error in close")
+ def read(self, fd, offset, l):
+ self.require_state("mounted")
+ if not isinstance(offset, int):
+ raise TypeError('path must be an int')
+ if not isinstance(l, int):
+ raise TypeError('path must be an int')
+
+ buf = create_string_buffer(l)
+ ret = self.libcephfs.ceph_read(self.cluster, c_int(fd),
+ buf, c_int64(l), c_int64(offset))
+ if ret < 0:
+ raise make_ex(ret, "error in close")
+ return buf.value
+
+ def write(self, fd, buf, offset):
+ self.require_state("mounted")
+ if not isinstance(buf, basestring):
+ raise TypeError('buf must be a string')
+ if not isinstance(offset, int):
+ raise TypeError('offset must be an int')
+
+ ret = self.libcephfs.ceph_write(self.cluster, c_int(fd),
+ c_char_p(buf), c_int64(len(buf)),
+ c_int64(offset))
+ if ret < 0:
+ raise make_ex(ret, "error in close")
+ return ret
+
def getxattr(self, path, name):
if not isinstance(path, basestring):
raise TypeError('path must be a string')
if not isinstance(name, basestring):
raise TypeError('name must be a string')
+ self.require_state("mounted")
l = 255
buf = create_string_buffer(l)
actual_l = self.libcephfs.ceph_getxattr(self.cluster, path, name, buf, c_int(l))
def setup_module():
global cephfs
- cephfs = cephfs.LibCephfs(conffile='')
+ cephfs = cephfs.LibCephFS(conffile='')
cephfs.mount()
def teardown_module():
cephfs.shutdown()
def test_mount():
- cephfs.mount()
cephfs.shutdown()
assert_raise(cephfs.LibCephFSStateError, cephfs.statfs)
+ cephfs.mount()
def test_version():
cephfs.version()
cephfs.closedir(handler)
def test_xattr():
- assert_raise(InvalidValue, cephfs.setxattr, "/", "key", "value", 0)
+ assert_raise(cephfs.InvalidValue, cephfs.setxattr, "/", "key", "value", 0)
cephfs.setxattr("/", "user.key", "value", 0)
assert_equal("value", cephfs.getxattr("/", "user.key"))
cephfs.stat("/a/b")
def test_open():
- assert_raise(ObjectExists, cephfs.open, 'file-1', 'r')
+ assert_raise(cephfs.ObjectNotFound, cephfs.open, 'file-1', 'r')
+ assert_raise(cephfs.ObjectNotFound, cephfs.open, 'file-1', 'r+')
fd = cephfs.open('file-1', 'w')
cephfs.close(fd)
fd = cephfs.open('file-1', 'r')