void rados_write_op_omap_clear(rados_write_op_t write_op)
void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
void rados_write_op_setxattr(rados_write_op_t write_op, const char *name, const char *value, size_t value_len)
-
+ void rados_write_op_rmxattr(rados_write_op_t write_op, const char *name)
+
void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
with nogil:
rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
+ @requires(('xattr_name', str_type))
+ def rm_xattr(self, xattr_name):
+ """
+ Removes an extended attribute on from an object.
+ :param xattr_name: name of the xattr to remove
+ :type xattr_name: str
+ """
+ xattr_name = cstr(xattr_name, 'xattr_name')
+ cdef:
+ char *_xattr_name = xattr_name
+ with nogil:
+ rados_write_op_rmxattr(self.write_op, _xattr_name)
+
@requires(('to_write', bytes))
def append(self, to_write):
"""
from nose.tools import eq_ as eq, ok_ as ok, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
- LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
+ LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
import time
import threading
self.ioctx.operate_read_op(read_op, "hw")
eq(list(iter), [])
+ def test_xattrs_op(self):
+ xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
+ with WriteOpCtx() as write_op:
+ write_op.new(LIBRADOS_CREATE_EXCLUSIVE)
+ for key, value in xattrs.items():
+ write_op.set_xattr(key, value)
+ self.ioctx.operate_write_op(write_op, "abc")
+ eq(self.ioctx.get_xattr('abc', key), value)
+ stored_xattrs_1 = {}
+ for key, value in self.ioctx.get_xattrs('abc'):
+ stored_xattrs_1[key] = value
+ eq(stored_xattrs_1, xattrs)
+ for key in xattrs.keys():
+ write_op.rm_xattr(key)
+ self.ioctx.operate_write_op(write_op, "abc")
+ stored_xattrs_2 = {}
+ for key, value in self.ioctx.get_xattrs('abc'):
+ stored_xattrs_2[key] = value
+ eq(stored_xattrs_2, {})
+ write_op.remove()
+
def test_locator(self):
self.ioctx.set_locator_key("bar")
self.ioctx.write('foo', b'contents1')