int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
int rados_ioctx_create2(rados_t cluster, int64_t pool_id, rados_ioctx_t *ioctx)
void rados_ioctx_destroy(rados_ioctx_t io)
- int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
if self.state != "open":
raise IoctxStateError("The pool is %s" % self.state)
- def change_auid(self, auid):
- """
- Attempt to change an io context's associated auid "owner."
-
- Requires that you have write permission on both the current and new
- auid.
-
- :raises: :class:`Error`
- """
- self.require_ioctx_open()
-
- cdef:
- uint64_t _auid = auid
-
- with nogil:
- ret = rados_ioctx_pool_set_auid(self.io, _auid)
- if ret < 0:
- raise make_ex(ret, "error changing auid of '%s' to %d"
- % (self.name, auid))
-
@requires(('loc_key', str_type))
def set_locator_key(self, loc_key):
"""
from nose.tools import eq_ as eq, ok_ as ok, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
- ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
+ LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
import time
import threading
finally:
self.rados.delete_pool(poolname)
- def test_create_auid(self):
- self.rados.create_pool('foo', 100)
- assert self.rados.pool_exists('foo')
- self.rados.delete_pool('foo')
-
def test_eexist(self):
self.rados.create_pool('foo')
assert_raises(ObjectExists, self.rados.create_pool, 'foo')
'num_objects_degraded': 0,
'num_rd': 0})
- def test_change_auid(self):
- self.ioctx.change_auid(ANONYMOUS_AUID)
- self.ioctx.change_auid(ADMIN_AUID)
-
def test_write(self):
self.ioctx.write('abc', b'abc')
eq(self.ioctx.read('abc'), b'abc')
'num_objects_degraded': 0,
'num_rd': 0})
- def test_change_auid(self):
- self.ioctx2.change_auid(ANONYMOUS_AUID)
- self.ioctx2.change_auid(ADMIN_AUID)
-
class TestObject(object):