These tests can now be run automatically more easily.
Fixes: #1653
Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
pybind/rgw.py \
pybind/rbd.py
-if WITH_DEBUG
-dist_bin_SCRIPTS += test/ceph-pybind-test.py \
- test/ceph-pybind-rgw-test.py
-endif
-
# headers... and everything else we want to include in a 'make dist'
# that autotools doesn't magically identify.
noinst_HEADERS = \
+++ /dev/null
-#!/usr/bin/python
-
-import rgw
-import sys
-
-r = rgw.Rgw()
-
-xml = """<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
- <Owner>
- <ID>foo</ID>
- <DisplayName>MrFoo</DisplayName>
- </Owner>
- <AccessControlList>
- <Grant>
- <Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">
- <ID>bar</ID>
- <DisplayName>display-name</DisplayName>
- </Grantee>
- <Permission>FULL_CONTROL</Permission>
- </Grant>
- </AccessControlList>
-</AccessControlPolicy>"""
-
-print "converting %s to binary..." % xml
-blob = r.acl_xml2bin(xml)
-print "got blob of length %d" % len(blob)
-
-xml2 = r.acl_bin2xml(blob)
-
-blob2 = r.acl_xml2bin(xml2)
-
-if (blob != blob2):
- raise "blob differed from blob2!"
-
-sys.exit(0)
+++ /dev/null
-#!/usr/bin/python
-
-import rados
-
-r = rados.Rados()
-r.conf_read_file();
-r.connect()
-v = r.version()
-print "rados version %s" % str(v)
-
-try:
- auid = 100
- r.create_pool("foo1", auid)
- print "created pool foo1 with auid %d" % auid
- r.delete_pool("foo1")
- print "deleted pool foo1"
-except rados.ObjectExists:
- print "pool foo1 already exists"
-
-try:
- r.create_pool("foo2")
- print "created pool foo2"
-except rados.ObjectExists:
- print "pool foo2 already exists"
-if r.pool_exists("foo2") != True:
- raise RuntimeError("we just created pool 'foo2', but it doesn't exist?")
-print "opening pool foo2"
-foo2_ioctx = r.open_ioctx("foo2")
-# give this pool to the anonymous AUID
-foo2_ioctx.change_auid(rados.ANONYMOUS_AUID)
-# well, actually, we want it back.
-foo2_ioctx.change_auid(rados.ADMIN_AUID)
-foo2_ioctx.close()
-# now delete
-print "deleting pool foo2"
-r.delete_pool("foo2")
-
-# create a pool and some objects
-try:
- r.create_pool("foo3")
-except rados.ObjectExists:
- pass
-foo3_ioctx = r.open_ioctx("foo3")
-foo3_ioctx.write("abc", "abc")
-foo3_ioctx.write("def", "def")
-abc_str = foo3_ioctx.read("abc")
-if (abc_str != "abc"):
- raise RuntimeError("error reading object abc: expected value abc, \
-got %s" % abc_str)
-b_str = foo3_ioctx.read("abc", 1, 1)
-if (b_str != "b"):
- raise RuntimeError("error reading object abc: expected value b, \
-got %s" % b_str)
-# write_full replaces the whole 'def' object
-foo3_ioctx.write_full("def", "d")
-def_str = foo3_ioctx.read("def")
-if (def_str != "d"):
- raise RuntimeError("error reading object def: expected value d, \
-got %s" % def_str)
-for obj in foo3_ioctx.list_objects():
- print str(obj)
-
-foo3_ioctx.write_full("ghi", "g\0h\0i")
-ghi_str = foo3_ioctx.read("ghi")
-if (ghi_str != "g\0h\0i"):
- raise RuntimeError("error reading object ghi: expected value g\\0h\\0\i, \
-got %s" % (ghi_str))
-
-# do some things with extended attributes
-foo3_ioctx.set_xattr("abc", "a", "1")
-foo3_ioctx.set_xattr("def", "b", "2")
-foo3_ioctx.set_xattr("abc", "c", "3")
-ret = foo3_ioctx.get_xattr("abc", "a")
-if (ret != "1"):
- raise RuntimeError("error: expected object abc to have a=1")
-ret = foo3_ioctx.get_xattr("def", "b")
-if (ret != "2"):
- raise RuntimeError("error: expected object def to have b=2")
-ret = foo3_ioctx.get_xattr("abc", "c")
-if (ret != "3"):
- raise RuntimeError("error: expected object abc to have c=3")
-found = {}
-for k,v in foo3_ioctx.get_xattrs("abc"):
- found[k] = v
-if (len(found) != 2):
- raise RuntimeError("error: expected two extended attributes on abc")
-if (found["a"] != "1"):
- raise RuntimeError("error: expected object abc to have a=1")
-if (found["c"] != "3"):
- raise RuntimeError("error: expected object abc to have c=3")
-
-foo3_ioctx.set_xattr("def", "zeroholder", "a\0b")
-ret = foo3_ioctx.get_xattr("def", "zeroholder")
-if (ret != "a\0b"):
- raise RuntimeError("error: set_xattr/get_xattr failed with " +
- "an extended attribute containing NULL")
-
-# create some snapshots and do stuff with them
-print "creating snap bjork"
-foo3_ioctx.create_snap("bjork")
-print "creating snap aardvark"
-foo3_ioctx.create_snap("aardvark")
-print "creating snap carnuba"
-foo3_ioctx.create_snap("carnuba")
-print "listing snaps..."
-for snap in foo3_ioctx.list_snaps():
- print str(snap)
-
-print "removing snap bjork"
-foo3_ioctx.remove_snap("bjork")
-foo3_ioctx.close()
-
-# remove foo3
-print "deleting foo3"
-r.delete_pool("foo3")
--- /dev/null
+from nose.tools import eq_ as eq, assert_raises
+from rados import Rados, ObjectExists, ObjectNotFound, ANONYMOUS_AUID, ADMIN_AUID
+
+class TestPool(object):
+
+ def setUp(self):
+ self.rados = Rados(conffile='')
+ self.rados.connect()
+
+ def tearDown(self):
+ self.rados.shutdown()
+
+ def test_create(self):
+ self.rados.create_pool('foo')
+ self.rados.delete_pool('foo')
+
+ def test_create_auid(self):
+ self.rados.create_pool('foo', 100)
+ assert self.rados.pool_exists('foo')
+ self.rados.delete_pool('foo')
+
+ def test_eexist(self):
+ self.rados.create_pool('foo')
+ assert_raises(ObjectExists, self.rados.create_pool, 'foo')
+
+class TestIoctx(object):
+
+ def setUp(self):
+ self.rados = Rados(conffile='')
+ self.rados.connect()
+ self.rados.create_pool('test_pool')
+ assert self.rados.pool_exists('test_pool')
+ self.ioctx = self.rados.open_ioctx('test_pool')
+
+ def tearDown(self):
+ self.ioctx.close()
+ self.rados.delete_pool('test_pool')
+ self.rados.shutdown()
+
+ def test_change_auid(self):
+ self.ioctx.change_auid(ANONYMOUS_AUID)
+ self.ioctx.change_auid(ADMIN_AUID)
+
+ def test_write(self):
+ self.ioctx.write('abc', 'abc')
+ eq(self.ioctx.read('abc'), 'abc')
+
+ def test_write_full(self):
+ self.ioctx.write('abc', 'abc')
+ eq(self.ioctx.read('abc'), 'abc')
+ self.ioctx.write_full('abc', 'd')
+ eq(self.ioctx.read('abc'), 'd')
+
+ def test_write_zeros(self):
+ self.ioctx.write('abc', 'a\0b\0c')
+ eq(self.ioctx.read('abc'), 'a\0b\0c')
+
+ def test_list_objects_empty(self):
+ eq(list(self.ioctx.list_objects()), [])
+
+ def test_list_objects(self):
+ self.ioctx.write('a', '')
+ self.ioctx.write('b', 'foo')
+ self.ioctx.write_full('c', 'bar')
+ object_names = [obj.key for obj in self.ioctx.list_objects()]
+ eq(sorted(object_names), ['a', 'b', 'c'])
+
+ def test_xattrs(self):
+ xattrs = dict(a='1', b='2', c='3', d='a\0b', e='\0')
+ self.ioctx.write('abc', '')
+ for key, value in xattrs.iteritems():
+ self.ioctx.set_xattr('abc', key, value)
+ eq(self.ioctx.get_xattr('abc', key), value)
+ stored_xattrs = {}
+ for key, value in self.ioctx.get_xattrs('abc'):
+ stored_xattrs[key] = value
+ eq(stored_xattrs, xattrs)
+
+ def test_create_snap(self):
+ assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
+ self.ioctx.create_snap('foo')
+ self.ioctx.remove_snap('foo')
+
+ def test_list_snaps_empty(self):
+ eq(list(self.ioctx.list_snaps()), [])
+
+ def test_list_snaps(self):
+ snaps = ['snap1', 'snap2', 'snap3']
+ for snap in snaps:
+ self.ioctx.create_snap(snap)
+ listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
+ eq(snaps, listed_snaps)
+
+ def test_lookup_snap(self):
+ self.ioctx.create_snap('foo')
+ snap = self.ioctx.lookup_snap('foo')
+ eq(snap.name, 'foo')
+
+ def test_snap_timestamp(self):
+ self.ioctx.create_snap('foo')
+ snap = self.ioctx.lookup_snap('foo')
+ snap.get_timestamp()
+
+ def test_remove_snap(self):
+ self.ioctx.create_snap('foo')
+ (snap,) = self.ioctx.list_snaps()
+ eq(snap.name, 'foo')
+ self.ioctx.remove_snap('foo')
+ eq(list(self.ioctx.list_snaps()), [])