Create a C-style string from a Python string
:param str val: Python string
+ :param encoding: Encoding to use
:rtype: c_char_p
"""
if val is None:
return c_char_p(val.encode(encoding))
+def decode_cstr(addr, size=-1, encoding="utf-8"):
+ """
+ Decode a C-style string into a Python string.
+
+ Return None if a the C string is a NULL pointer.
+
+ :param c_char_p addr: C-style string
+ :param int: String size (assume NUL-terminated if size is -1)
+ :param encoding: Encoding to use
+ :rtype: str or None
+ """
+ if not addr:
+ # NULL pointer
+ return None
+
+ return ctypes.string_at(addr, size).decode(encoding)
+
+
class Rados(object):
"""librados python wrapper"""
def require_state(self, *args):
# cretargs was allocated with fixed length; collapse return
# list to eliminate any missing args
- retargs = [a for a in cretargs if a is not None]
+ retargs = [decode_cstr(a) for a in cretargs if a is not None]
self.parsed_args = args
return retargs
(self.cluster, cstr(option), ret_buf,
c_size_t(length)))
if (ret == 0):
- return ret_buf.value.decode("utf-8")
+ return decode_cstr(ret_buf)
elif (ret == -errno.ENAMETOOLONG):
length = length * 2
elif (ret == -errno.ENOENT):
if ret != 0:
raise make_ex(ret, "error calling ping_monitor")
- return my_outstr
+ return decode_cstr(my_outstr)
def connect(self, timeout=0):
"""
else:
break
- return [name for name in c_names.raw.decode("utf-8").split('\0') if len(name) > 0]
+ return [decode_cstr(name) for name in c_names.raw.split(b'\0') if len(name) > 0]
def get_fsid(self):
"""
# copy returned memory (ctypes makes a copy, not a reference)
my_outbuf = outbufp.contents[:(outbuflen.value)]
- my_outs = outsp.contents[:(outslen.value)]
+ my_outs = decode_cstr(outsp.contents, outslen.value)
# free callee's allocations
if outbuflen.value:
# copy returned memory (ctypes makes a copy, not a reference)
my_outbuf = outbufp.contents[:(outbuflen.value)]
- my_outs = outsp.contents[:(outslen.value)]
+ my_outs = decode_cstr(outsp.contents, outslen.value)
# free callee's allocations
if outbuflen.value:
# copy returned memory (ctypes makes a copy, not a reference)
my_outbuf = outbufp.contents[:(outbuflen.value)]
- my_outs = outsp.contents[:(outslen.value)]
+ my_outs = decode_cstr(outsp.contents, outslen.value)
# free callee's allocations
if outbuflen.value:
raise make_ex(ret, "error iterating over the omap")
if key_.value is None:
raise StopIteration()
- key = key_.value.decode("utf-8")
+ key = decode_cstr(key_)
val = None
if val_.value is not None:
val = ctypes.string_at(val_, len_)
if ret < 0:
raise StopIteration()
- key = None if key_.value is None else key_.value.decode("utf-8")
- locator = None if locator_.value is None else locator_.value.decode("utf-8")
- nspace = None if nspace_.value is None else nspace_.value.decode("utf-8")
+ key = decode_cstr(key_)
+ locator = decode_cstr(locator_)
+ nspace = decode_cstr(nspace_)
return Object(self.ioctx, key, locator, nspace)
def __del__(self):
in '%s'" % self.oid)
if name_.value is None:
raise StopIteration()
- name = ctypes.string_at(name_).decode("utf-8")
+ name = decode_cstr(name_)
val = ctypes.string_at(val_, len_)
return (name, val)
elif (ret != -errno.ERANGE):
raise make_ex(ret, "rados_snap_get_name error")
name_len = name_len * 2
- snap = Snap(self.ioctx, name.value.decode("utf-8"), snap_id)
+ snap = Snap(self.ioctx, decode_cstr(name), snap_id)
self.cur_snap = self.cur_snap + 1
return snap
import errno
import sys
-from rados import cstr
+from rados import cstr, decode_cstr
ANONYMOUS_AUID = 0xffffffffffffffff
ADMIN_AUID = 0
elif ret != -errno.ERANGE:
raise make_ex(ret, 'error listing images')
- return [name for name in c_names.raw.decode("utf-8").split('\0') if len(name) > 0]
+ return [decode_cstr(name) for name in c_names.raw.split(b'\0') if len(name) > 0]
def remove(self, ioctx, name):
"""
cookies = [cookie.decode("utf-8") for cookie in c_cookies.raw[:cookies_size.value - 1].split(b'\0')]
addrs = [addr.decode("utf-8") for addr in c_addrs.raw[:addrs_size.value - 1].split(b'\0')]
return {
- 'tag' : c_tag.value.decode("utf-8"),
+ 'tag' : decode_cstr(c_tag),
'exclusive' : exclusive.value == 1,
'lockers' : list(zip(clients, cookies, addrs)),
}
yield {
'id' : self.snaps[i].id,
'size' : self.snaps[i].size,
- 'name' : self.snaps[i].name.decode("utf-8"),
+ 'name' : decode_cstr(self.snaps[i].name),
}
def __del__(self):