From: Yehuda Sadeh Date: Wed, 24 Dec 2014 00:26:47 +0000 (-0800) Subject: s3tests: concurrent object versioning tests X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=460e3f1f1e2eb0e5c9581cf0f73d6bf2634a2089;p=s3-tests.git s3tests: concurrent object versioning tests test concurrent creation and removal Signed-off-by: Yehuda Sadeh --- diff --git a/s3tests/functional/test_s3.py b/s3tests/functional/test_s3.py index d2ae8621..3cbe74dd 100644 --- a/s3tests/functional/test_s3.py +++ b/s3tests/functional/test_s3.py @@ -21,6 +21,7 @@ import sha import pytz import json import httplib2 +import threading import xml.etree.ElementTree as ET @@ -5570,7 +5571,7 @@ def test_versioning_copy_obj_version(): new_key = another_bucket.copy_key('new_key', bucket.name, objname) eq(new_key.get_contents_as_string(), c[num_versions - 1]) -def _count_bucket_objs(bucket): +def _count_bucket_versioned_objs(bucket): k = [] for key in bucket.list_versions(): k.insert(0, key) @@ -5604,14 +5605,14 @@ def test_versioning_multi_object_delete(): eq(len(result.deleted), 2) eq(len(result.errors), 0) - eq(_count_bucket_objs(bucket), 0) + eq(_count_bucket_versioned_objs(bucket), 0) # now remove again, should all succeed due to idempotency result = bucket.delete_keys(stored_keys) eq(len(result.deleted), 2) eq(len(result.errors), 0) - eq(_count_bucket_objs(bucket), 0) + eq(_count_bucket_versioned_objs(bucket), 0) @attr(resource='object') @attr(method='delete') @@ -5642,7 +5643,7 @@ def test_versioning_multi_object_delete_with_marker(): result = bucket.delete_keys(stored_keys) eq(len(result.deleted), 3) eq(len(result.errors), 0) - eq(_count_bucket_objs(bucket), 0) + eq(_count_bucket_versioned_objs(bucket), 0) delete_markers = [] for o in result.deleted: @@ -5657,7 +5658,7 @@ def test_versioning_multi_object_delete_with_marker(): eq(len(result.deleted), 3) eq(len(result.errors), 0) - eq(_count_bucket_objs(bucket), 0) + eq(_count_bucket_versioned_objs(bucket), 0) @attr(resource='object') @attr(method='delete') @@ -5673,11 +5674,11 @@ def test_versioning_multi_object_delete_with_marker_create(): rmkeys = { bucket.new_key(keyname) } - eq(_count_bucket_objs(bucket), 0) + eq(_count_bucket_versioned_objs(bucket), 0) result = bucket.delete_keys(rmkeys) eq(len(result.deleted), 1) - eq(_count_bucket_objs(bucket), 1) + eq(_count_bucket_versioned_objs(bucket), 1) delete_markers = [] for o in result.deleted: @@ -5760,3 +5761,90 @@ def test_versioned_object_acl(): k = bucket.new_key(keyname) check_grants(k.get_acl().acl.grants, default_policy) + +def _do_create_object(bucket, objname, i): + k = bucket.new_key(objname) + k.set_contents_from_string('data {i}'.format(i=i)) + +def _do_remove_ver(bucket, obj): + bucket.delete_key(obj.name, version_id = obj.version_id) + +def _do_create_versioned_obj_concurrent(bucket, objname, num): + t = [] + for i in range(num): + thr = threading.Thread(target = _do_create_object, args=(bucket, objname, i)) + thr.start() + t.append(thr) + return t + +def _do_clear_versioned_bucket_concurrent(bucket): + t = [] + for o in bucket.list_versions(): + thr = threading.Thread(target = _do_remove_ver, args=(bucket, o)) + thr.start() + t.append(thr) + return t + +def _do_wait_completion(t): + for thr in t: + thr.join() + +@attr(resource='object') +@attr(method='put') +@attr(operation='concurrent creation of objects, concurrent removal') +@attr(assertion='works') +@attr('versioning') +def test_versioned_concurrent_object_create_concurrent_remove(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'myobj' + + num_objs = 5 + + for i in xrange(5): + t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs) + _do_wait_completion(t) + + eq(_count_bucket_versioned_objs(bucket), num_objs) + eq(len(bucket.get_all_keys()), 1) + + t = _do_clear_versioned_bucket_concurrent(bucket) + _do_wait_completion(t) + + eq(_count_bucket_versioned_objs(bucket), 0) + eq(len(bucket.get_all_keys()), 0) + +@attr(resource='object') +@attr(method='put') +@attr(operation='concurrent creation and removal of objects') +@attr(assertion='works') +@attr('versioning') +def test_versioned_concurrent_object_create_and_remove(): + bucket = get_new_bucket() + + check_configure_versioning_retry(bucket, True, "Enabled") + + keyname = 'myobj' + + num_objs = 3 + + all_threads = [] + + for i in xrange(3): + t = _do_create_versioned_obj_concurrent(bucket, keyname, num_objs) + all_threads.append(t) + + t = _do_clear_versioned_bucket_concurrent(bucket) + all_threads.append(t) + + + for t in all_threads: + _do_wait_completion(t) + + t = _do_clear_versioned_bucket_concurrent(bucket) + _do_wait_completion(t) + + eq(_count_bucket_versioned_objs(bucket), 0) + eq(len(bucket.get_all_keys()), 0)