import pytz
import json
import httplib2
+import threading
import xml.etree.ElementTree as ET
new_key = another_bucket.copy_key('new_key', bucket.name, objname)
eq(new_key.get_contents_as_string(), c[num_versions - 1])
-def _count_bucket_objs(bucket):
+def _count_bucket_versioned_objs(bucket):
k = []
for key in bucket.list_versions():
k.insert(0, key)
eq(len(result.deleted), 2)
eq(len(result.errors), 0)
- eq(_count_bucket_objs(bucket), 0)
+ eq(_count_bucket_versioned_objs(bucket), 0)
# now remove again, should all succeed due to idempotency
result = bucket.delete_keys(stored_keys)
eq(len(result.deleted), 2)
eq(len(result.errors), 0)
- eq(_count_bucket_objs(bucket), 0)
+ eq(_count_bucket_versioned_objs(bucket), 0)
@attr(resource='object')
@attr(method='delete')
result = bucket.delete_keys(stored_keys)
eq(len(result.deleted), 3)
eq(len(result.errors), 0)
- eq(_count_bucket_objs(bucket), 0)
+ eq(_count_bucket_versioned_objs(bucket), 0)
delete_markers = []
for o in result.deleted:
eq(len(result.deleted), 3)
eq(len(result.errors), 0)
- eq(_count_bucket_objs(bucket), 0)
+ eq(_count_bucket_versioned_objs(bucket), 0)
@attr(resource='object')
@attr(method='delete')
rmkeys = { bucket.new_key(keyname) }
- eq(_count_bucket_objs(bucket), 0)
+ eq(_count_bucket_versioned_objs(bucket), 0)
result = bucket.delete_keys(rmkeys)
eq(len(result.deleted), 1)
- eq(_count_bucket_objs(bucket), 1)
+ eq(_count_bucket_versioned_objs(bucket), 1)
delete_markers = []
for o in result.deleted:
k = bucket.new_key(keyname)
check_grants(k.get_acl().acl.grants, default_policy)
+
+def _do_create_object(bucket, objname, i):
+ k = bucket.new_key(objname)
+ k.set_contents_from_string('data {i}'.format(i=i))
+
+def _do_remove_ver(bucket, obj):
+ bucket.delete_key(obj.name, version_id = obj.version_id)
+
+def _do_create_versioned_obj_concurrent(bucket, objname, num):
+ t = []
+ for i in range(num):
+ thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i))
+ thr.start()
+ t.append(thr)
+ return t
+
+def _do_clear_versioned_bucket_concurrent(bucket):
+ t = []
+ for o in bucket.list_versions():
+ thr = threading.Thread(target = _do_remove_ver, args=(bucket, o))
+ thr.start()
+ t.append(thr)
+ return t
+
+def _do_wait_completion(t):
+ for thr in t:
+ thr.join()
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='concurrent creation of objects, concurrent removal')
+@attr(assertion='works')
+@attr('versioning')
+def test_versioned_concurrent_object_create_concurrent_remove():
+ bucket = get_new_bucket()
+
+ check_configure_versioning_retry(bucket, True, "Enabled")
+
+ keyname = 'myobj'
+
+ num_objs = 5
+
+ for i in xrange(5):
+ t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
+ _do_wait_completion(t)
+
+ eq(_count_bucket_versioned_objs(bucket), num_objs)
+ eq(len(bucket.get_all_keys()), 1)
+
+ t = _do_clear_versioned_bucket_concurrent(bucket)
+ _do_wait_completion(t)
+
+ eq(_count_bucket_versioned_objs(bucket), 0)
+ eq(len(bucket.get_all_keys()), 0)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='concurrent creation and removal of objects')
+@attr(assertion='works')
+@attr('versioning')
+def test_versioned_concurrent_object_create_and_remove():
+ bucket = get_new_bucket()
+
+ check_configure_versioning_retry(bucket, True, "Enabled")
+
+ keyname = 'myobj'
+
+ num_objs = 3
+
+ all_threads = []
+
+ for i in xrange(3):
+ t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs)
+ all_threads.append(t)
+
+ t = _do_clear_versioned_bucket_concurrent(bucket)
+ all_threads.append(t)
+
+
+ for t in all_threads:
+ _do_wait_completion(t)
+
+ t = _do_clear_versioned_bucket_concurrent(bucket)
+ _do_wait_completion(t)
+
+ eq(_count_bucket_versioned_objs(bucket), 0)
+ eq(len(bucket.get_all_keys()), 0)