from __future__ import print_function
from nose import SkipTest
+from nose.plugins.attrib import attr
from nose.tools import eq_ as eq, ok_ as ok, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, NotConnected, requires, opt,
eq(set(['a' * 500]), self.list_non_default_pools())
self.rados.delete_pool('a' * 500)
+ @attr('tier')
def test_get_pool_base_tier(self):
self.rados.create_pool('foo')
try:
def test_blacklist_add(self):
self.rados.blacklist_add("1.2.3.4/123", 1)
+ @attr('stats')
def test_get_cluster_stats(self):
stats = self.rados.get_cluster_stats()
assert stats['kb'] > 0
def test_get_pool_name(self):
eq(self.ioctx.get_pool_name(), 'test_pool')
+ @attr('snap')
def test_create_snap(self):
assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
self.ioctx.create_snap('foo')
self.ioctx.remove_snap('foo')
+ @attr('snap')
def test_list_snaps_empty(self):
eq(list(self.ioctx.list_snaps()), [])
+ @attr('snap')
def test_list_snaps(self):
snaps = ['snap1', 'snap2', 'snap3']
for snap in snaps:
listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
eq(snaps, listed_snaps)
+ @attr('snap')
def test_lookup_snap(self):
self.ioctx.create_snap('foo')
snap = self.ioctx.lookup_snap('foo')
eq(snap.name, 'foo')
+ @attr('snap')
def test_snap_timestamp(self):
self.ioctx.create_snap('foo')
snap = self.ioctx.lookup_snap('foo')
snap.get_timestamp()
+ @attr('snap')
def test_remove_snap(self):
self.ioctx.create_snap('foo')
(snap,) = self.ioctx.list_snaps()
self.ioctx.remove_snap('foo')
eq(list(self.ioctx.list_snaps()), [])
+ @attr('snap')
def test_snap_rollback(self):
self.ioctx.write("insnap", b"contents1")
self.ioctx.create_snap("snap1")
self.ioctx.remove_snap("snap1")
self.ioctx.remove_object("insnap")
+ @attr('snap')
def test_snap_read(self):
self.ioctx.write("insnap", b"contents1")
self.ioctx.create_snap("snap1")
r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
eq(r, 0)
+ @attr('thrash')
def test_aio_read(self):
# this is a list so that the local cb() can modify it
retval = [None]
eq(self.ioctx.alignment(), None)
+@attr('ec')
class TestIoctxEc(object):
def setUp(self):
eq(self.object.read(3), b'bar')
eq(self.object.read(3), b'baz')
+@attr('snap')
class TestIoCtxSelfManagedSnaps(object):
def setUp(self):
self.rados = Rados(conffile='')
e = json.loads(buf.decode("utf-8"))
assert('release' in e)
+ @attr('bench')
def test_osd_bench(self):
cmd = dict(prefix='bench', size=4096, count=8192)
ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',