]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
s3tests: concurrent object versioning tests
authorYehuda Sadeh <yehuda@inktank.com>
Wed, 24 Dec 2014 00:26:47 +0000 (16:26 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Mon, 19 Jan 2015 23:08:29 +0000 (15:08 -0800)
test concurrent creation and removal

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
s3tests/functional/test_s3.py

index d2ae8621382d38535903046338e67bb65bec9a6d..3cbe74dd79cc439eafb50e00639ab30998d1e834 100644 (file)
@@ -21,6 +21,7 @@ import sha
 import pytz
 import json
 import httplib2
+import threading
 
 import xml.etree.ElementTree as ET
 
@@ -5570,7 +5571,7 @@ def test_versioning_copy_obj_version():
     new_key = another_bucket.copy_key('new_key', bucket.name, objname)
     eq(new_key.get_contents_as_string(), c[num_versions - 1])
 
-def _count_bucket_objs(bucket):
+def _count_bucket_versioned_objs(bucket):
     k = []
     for key in bucket.list_versions():
         k.insert(0, key)
@@ -5604,14 +5605,14 @@ def test_versioning_multi_object_delete():
         eq(len(result.deleted), 2)
         eq(len(result.errors), 0)
 
-        eq(_count_bucket_objs(bucket), 0)
+        eq(_count_bucket_versioned_objs(bucket), 0)
 
         # now remove again, should all succeed due to idempotency
         result = bucket.delete_keys(stored_keys)
         eq(len(result.deleted), 2)
         eq(len(result.errors), 0)
 
-        eq(_count_bucket_objs(bucket), 0)
+        eq(_count_bucket_versioned_objs(bucket), 0)
 
 @attr(resource='object')
 @attr(method='delete')
@@ -5642,7 +5643,7 @@ def test_versioning_multi_object_delete_with_marker():
        result = bucket.delete_keys(stored_keys)
         eq(len(result.deleted), 3)
         eq(len(result.errors), 0)
-        eq(_count_bucket_objs(bucket), 0)
+        eq(_count_bucket_versioned_objs(bucket), 0)
 
         delete_markers = []
         for o in result.deleted:
@@ -5657,7 +5658,7 @@ def test_versioning_multi_object_delete_with_marker():
         eq(len(result.deleted), 3)
         eq(len(result.errors), 0)
 
-        eq(_count_bucket_objs(bucket), 0)
+        eq(_count_bucket_versioned_objs(bucket), 0)
 
 @attr(resource='object')
 @attr(method='delete')
@@ -5673,11 +5674,11 @@ def test_versioning_multi_object_delete_with_marker_create():
 
         rmkeys = { bucket.new_key(keyname) }
 
-        eq(_count_bucket_objs(bucket), 0)
+        eq(_count_bucket_versioned_objs(bucket), 0)
 
         result = bucket.delete_keys(rmkeys)
         eq(len(result.deleted), 1)
-        eq(_count_bucket_objs(bucket), 1)
+        eq(_count_bucket_versioned_objs(bucket), 1)
 
         delete_markers = []
         for o in result.deleted:
@@ -5760,3 +5761,90 @@ def test_versioned_object_acl():
     k = bucket.new_key(keyname)
     check_grants(k.get_acl().acl.grants, default_policy)
 
+
+def _do_create_object(bucket, objname, i):
+    k = bucket.new_key(objname)
+    k.set_contents_from_string('data {i}'.format(i=i))
+
+def _do_remove_ver(bucket, obj):
+    bucket.delete_key(obj.name, version_id = obj.version_id)
+
+def _do_create_versioned_obj_concurrent(bucket, objname, num):
+    t = []
+    for i in range(num):
+        thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i))
+        thr.start()
+        t.append(thr)
+    return t
+
+def _do_clear_versioned_bucket_concurrent(bucket):
+    t = []
+    for o in bucket.list_versions():
+        thr = threading.Thread(target = _do_remove_ver, args=(bucket, o))
+        thr.start()
+        t.append(thr)
+    return t
+
+def _do_wait_completion(t):
+    for thr in t:
+        thr.join()
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='concurrent creation of objects, concurrent removal')
+@attr(assertion='works')
+@attr('versioning')
+def test_versioned_concurrent_object_create_concurrent_remove():
+    bucket = get_new_bucket()
+
+    check_configure_versioning_retry(bucket, True, "Enabled")
+
+    keyname = 'myobj'
+
+    num_objs = 5
+
+    for i in xrange(5):
+        t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
+        _do_wait_completion(t)
+
+        eq(_count_bucket_versioned_objs(bucket), num_objs)
+        eq(len(bucket.get_all_keys()), 1)
+
+        t = _do_clear_versioned_bucket_concurrent(bucket)
+        _do_wait_completion(t)
+
+        eq(_count_bucket_versioned_objs(bucket), 0)
+        eq(len(bucket.get_all_keys()), 0)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='concurrent creation and removal of objects')
+@attr(assertion='works')
+@attr('versioning')
+def test_versioned_concurrent_object_create_and_remove():
+    bucket = get_new_bucket()
+
+    check_configure_versioning_retry(bucket, True, "Enabled")
+
+    keyname = 'myobj'
+
+    num_objs = 3
+
+    all_threads = []
+
+    for i in xrange(3):
+        t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
+        all_threads.append(t)
+
+        t = _do_clear_versioned_bucket_concurrent(bucket)
+        all_threads.append(t)
+
+
+    for t in all_threads:
+        _do_wait_completion(t)
+
+    t = _do_clear_versioned_bucket_concurrent(bucket)
+    _do_wait_completion(t)
+
+    eq(_count_bucket_versioned_objs(bucket), 0)
+    eq(len(bucket.get_all_keys()), 0)