if (x == size):
return
-def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, resend_parts=[]):
+def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None, storage_class=None, resend_parts=[]):
"""
generate a multi-part upload for a random file of specifed size,
if requested, generate a list of the parts
return the upload descriptor
"""
+
+ if storage_class is not None:
+ if not headers:
+ headers = {}
+ headers['X-Amz-Storage-Class'] = storage_class
+
upload = bucket.initiate_multipart_upload(s3_key_name, headers=headers, metadata=metadata)
s = ''
for i, part in enumerate(generate_random(size, part_size)):
return upload
-def _populate_key(keyname, size=7*1024*1024, bucket=None, storage_class=None):
+def _populate_key(bucket, keyname, size=7*1024*1024, storage_class=None):
if bucket is None:
bucket = get_new_bucket()
key = bucket.new_key(keyname)
data_str = str(generate_random(size, size).next())
data = StringIO(data_str)
key.set_contents_from_file(fp=data)
- return (bucket, key, data_str)
+ return (key, data_str)
def _create_key_with_random_content(keyname, size=7*1024*1024, bucket=None):
- bucket, key, _ = _populate_key(keyname, size, bucket)
+ bucket = get_new_bucket()
+ key, _ = _populate_key(bucket, keyname, size)
return (bucket, key)
@attr(resource='object')
eq(e.reason.lower(), 'bad request') # some proxies vary the case
eq(e.error_code, 'InvalidPart')
+def verify_object(bucket, k, data=None, storage_class=None):
+ if storage_class:
+ eq(k.storage_class, storage_class)
+
+ if data:
+ read_data = k.get_contents_as_string()
+
+ equal = data == read_data # avoid spamming log if data not equal
+ eq(equal, True)
+
+def copy_object_storage_class(src_bucket, src_key, dest_bucket, dest_key, storage_class):
+ query_args=None
+
+ if dest_key.version_id:
+ query_arg='versionId={v}'.format(v=dest_key.version_id)
+
+ headers = {}
+ headers['X-Amz-Copy-Source'] = '/{bucket}/{object}'.format(bucket=src_bucket.name, object=src_key.name)
+ if src_key.version_id:
+ headers['X-Amz-Copy-Source-Version-Id'] = src_key.version_id
+ headers['X-Amz-Storage-Class'] = storage_class
+
+ res = dest_bucket.connection.make_request('PUT', dest_bucket.name, dest_key.name,
+ query_args=query_args, headers=headers)
+ eq(res.status, 200)
+
@attr(resource='object')
@attr(method='put')
@attr(operation='test create object with storage class')
@attr('fails_on_aws')
def test_object_storage_class():
sc = configured_storage_classes()
- if len(sc) < 3:
+ if len(sc) < 2:
raise SkipTest
bucket = get_new_bucket()
for storage_class in sc:
kname = 'foo-' + storage_class
- k, _, data = _populate_key(kname, bucket=bucket, size=9*1024*1024, storage_class=storage_class)
+ k, data = _populate_key(bucket, kname, size=9*1024*1024, storage_class=storage_class)
- read_key = bucket.get_key(kname)
+ verify_object(bucket, k, data, storage_class)
- eq(read_key.storage_class, storage_class)
- read_data = read_key.get_contents_as_string()
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='test create multipart object with storage class')
+@attr('storage_class')
+@attr('fails_on_aws')
+def test_object_storage_class_multipart():
+ sc = configured_storage_classes()
+ if len(sc) < 2:
+ raise SkipTest
+
+ bucket = get_new_bucket()
+ size = 11 * 1024 * 1024
+
+ for storage_class in sc:
+ key = "mymultipart-" + storage_class
+ (upload, data) = _multipart_upload(bucket, key, size, storage_class=storage_class)
+ upload.complete_upload()
+ key2 = bucket.get_key(key)
+ eq(key2.size, size)
+ eq(key2.storage_class, storage_class)
+
+def _do_test_object_modify_storage_class(obj_write_func, size):
+ sc = configured_storage_classes()
+ if len(sc) < 2:
+ raise SkipTest
+
+ bucket = get_new_bucket()
+
+ for storage_class in sc:
+ kname = 'foo-' + storage_class
+ k, data = obj_write_func(bucket, kname, size, storage_class=storage_class)
+
+ verify_object(bucket, k, data, storage_class)
+
+ for new_storage_class in sc:
+ if new_storage_class == storage_class:
+ continue
+
+ copy_object_storage_class(bucket, k, bucket, k, new_storage_class)
+ verify_object(bucket, k, data, storage_class)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='test changing objects storage class')
+@attr('storage_class')
+@attr('fails_on_aws')
+def test_object_modify_storage_class():
+ _do_test_object_modify_storage_class(_populate_key, size=9*1024*1024)
+
+
+def _populate_multipart_upload(bucket, kname, size, storage_class=None):
+ (upload, data) = _multipart_upload(bucket, kname, size, storage_class=storage_class)
+ upload.complete_upload()
+
+ k = bucket.get_key(kname)
+
+ return (k, data)
+
+@attr(resource='object')
+@attr(method='put')
+@attr(operation='test changing objects storage class')
+@attr('storage_class')
+@attr('fails_on_aws')
+def test_object_modify_storage_class_multipart():
+ do_test_object_modify_storage_class(_populate_multipart_upload, size=11*1024*1024)
- equal = data == read_data # avoid spamming log if data not equal
- eq(equal, True)
def _simple_http_req_100_cont(host, port, is_secure, method, resource):
"""