got = key6.get_contents_as_string()
eq(got, data)
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='test copy object of a multipart upload')
+@attr(assertion='successful')
+def test_object_copy_versioning_multipart_upload():
+ bucket = get_new_bucket()
+ check_configure_versioning_retry(bucket, True, "Enabled")
+ key_name="srcmultipart"
+ content_type='text/bla'
+ objlen = 30 * 1024 * 1024
+ (upload, data) = _multipart_upload(bucket, key_name, objlen, headers={'Content-Type': content_type}, metadata={'foo': 'bar'})
+ upload.complete_upload()
+ key = bucket.get_key(key_name)
+
+ # copy object in the same bucket
+ key2 = bucket.copy_key('dstmultipart', bucket.name, key.name, src_version_id = key.version_id)
+ key2 = bucket.get_key(key2.name)
+ eq(key2.metadata['foo'], 'bar')
+ eq(key2.content_type, content_type)
+ eq(key2.size, key.size)
+ got = key2.get_contents_as_string()
+ eq(got, data)
+
+ # second copy
+ key3 = bucket.copy_key('dstmultipart2', bucket.name, key2.name, src_version_id = key2.version_id)
+ key3 = bucket.get_key(key3.name)
+ eq(key3.metadata['foo'], 'bar')
+ eq(key3.content_type, content_type)
+ eq(key3.size, key.size)
+ got = key3.get_contents_as_string()
+ eq(got, data)
+
+ # copy to another versioned bucket
+ bucket2 = get_new_bucket()
+ check_configure_versioning_retry(bucket2, True, "Enabled")
+ key4 = bucket2.copy_key('dstmultipart3', bucket.name, key.name, src_version_id = key.version_id)
+ key4 = bucket2.get_key(key4.name)
+ eq(key4.metadata['foo'], 'bar')
+ eq(key4.content_type, content_type)
+ eq(key4.size, key.size)
+ got = key4.get_contents_as_string()
+ eq(got, data)
+
+ # copy to another non versioned bucket
+ bucket3 = get_new_bucket()
+ key5 = bucket3.copy_key('dstmultipart4', bucket.name, key.name, src_version_id = key.version_id)
+ key5 = bucket3.get_key(key5.name)
+ eq(key5.metadata['foo'], 'bar')
+ eq(key5.content_type, content_type)
+ eq(key5.size, key.size)
+ got = key5.get_contents_as_string()
+ eq(got, data)
+
+ # copy from a non versioned bucket
+ key6 = bucket.copy_key('dstmultipart5', bucket3.name, key5.name)
+ key6 = bucket3.get_key(key6.name)
+ eq(key6.metadata['foo'], 'bar')
+ eq(key6.content_type, content_type)
+ eq(key6.size, key.size)
+ got = key6.get_contents_as_string()
+ eq(got, data)
+
def transfer_part(bucket, mp_id, mp_keyname, i, part):
"""Transfer a part of a multipart upload. Designed to be run in parallel.
"""