check_obj_versions(bucket, objname, k, c)
+@attr(resource='object')
+@attr(method='create')
+@attr(operation='create object, then switch to versioning')
+@attr(assertion='behaves correctly')
+@attr('versioning')
+def test_versioning_obj_plain_null_version_removal():
+ bucket = get_new_bucket()
+ check_versioning(bucket, None)
+
+ content = 'fooz'
+ objname = 'testobj'
+
+ key = bucket.new_key(objname)
+ key.set_contents_from_string(content)
+
+ check_configure_versioning_retry(bucket, True, "Enabled")
+
+ bucket.delete_key(key, version_id='null')
+
+ e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
+ eq(e.status, 404)
+ eq(e.reason, 'Not Found')
+ eq(e.error_code, 'NoSuchKey')
+
+
+ k = []
+ for key in bucket.list_versions():
+ k.insert(0, key)
+
+ eq(len(k), 0)
+
+@attr(resource='object')
+@attr(method='create')
+@attr(operation='create object, then switch to versioning')
+@attr(assertion='behaves correctly')
+@attr('versioning')
+def test_versioning_obj_plain_null_version_overwrite():
+ bucket = get_new_bucket()
+ check_versioning(bucket, None)
+
+ content = 'fooz'
+ objname = 'testobj'
+
+ key = bucket.new_key(objname)
+ key.set_contents_from_string(content)
+
+ check_configure_versioning_retry(bucket, True, "Enabled")
+
+ content2 = 'zzz'
+ key.set_contents_from_string(content2)
+
+ eq(key.get_contents_as_string(), content2)
+
+ version_id = None
+ for k in bucket.list_versions():
+ version_id = k.version_id
+ break
+
+ print 'version_id=', version_id
+ bucket.delete_key(key, version_id=version_id)
+
+ eq(key.get_contents_as_string(), content)
+
+ bucket.delete_key(key, version_id='null')
+ e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
+ eq(e.status, 404)
+ eq(e.reason, 'Not Found')
+ eq(e.error_code, 'NoSuchKey')
+
+ k = []
+ for key in bucket.list_versions():
+ k.insert(0, key)
+
+ eq(len(k), 0)
+
+@attr(resource='object')
+@attr(method='create')
+@attr(operation='create object, then switch to versioning')
+@attr(assertion='behaves correctly')
+@attr('versioning')
+def test_versioning_obj_plain_null_version_overwrite_suspended():
+ bucket = get_new_bucket()
+ check_versioning(bucket, None)
+
+ content = 'fooz'
+ objname = 'testobj'
+
+ key = bucket.new_key(objname)
+ key.set_contents_from_string(content)
+
+ check_configure_versioning_retry(bucket, True, "Enabled")
+ check_configure_versioning_retry(bucket, False, "Suspended")
+
+ content2 = 'zzz'
+ key.set_contents_from_string(content2)
+
+ eq(key.get_contents_as_string(), content2)
+
+ version_id = None
+ for k in bucket.list_versions():
+ version_id = k.version_id
+ break
+
+ print 'version_id=', version_id
+ bucket.delete_key(key, version_id=version_id)
+
+ e = assert_raises(boto.exception.S3ResponseError, key.get_contents_as_string)
+ eq(e.status, 404)
+ eq(e.reason, 'Not Found')
+ eq(e.error_code, 'NoSuchKey')
+
+ k = []
+ for key in bucket.list_versions():
+ k.insert(0, key)
+
+ eq(len(k), 0)
+
+
+
@attr(resource='object')
@attr(method='create')
@attr(operation='suspend versioned bucket')