Also move implementation to utils and add unit tests.
import isodate
import nose
import operator
-import random
-import string
import socket
import ssl
import os
from nose.plugins.skip import SkipTest
from .utils import assert_raises
+from .utils import generate_random
from .utils import region_sync_meta
import AnonymousAuth
part_out = StringIO(part)
mp.upload_part_from_file(part_out, i+1)
-def generate_random(size, part_size=5*1024*1024):
- """
- Generate the specified number random data.
- (actually each MB is a repetition of the first KB)
- """
- chunk = 1024
- allowed = string.ascii_letters
- for x in range(0, size, part_size):
- strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in xrange(chunk)])
- s = ''
- left = size - x
- this_part_size = min(left, part_size)
- for y in range(this_part_size / chunk):
- s = s + strpart
- yield s
- if (x == size):
- return
-
def _multipart_upload(bucket, s3_key_name, size, part_size=5*1024*1024, do_list=None, headers=None, metadata=None):
"""
generate a multi-part upload for a random file of specifed size,
--- /dev/null
+from nose.tools import eq_ as eq
+
+import utils
+
+def test_generate():
+ FIVE_MB = 5 * 1024 * 1024
+ eq(len(''.join(utils.generate_random(0))), 0)
+ eq(len(''.join(utils.generate_random(1))), 1)
+ eq(len(''.join(utils.generate_random(FIVE_MB - 1))), FIVE_MB - 1)
+ eq(len(''.join(utils.generate_random(FIVE_MB))), FIVE_MB)
+ eq(len(''.join(utils.generate_random(FIVE_MB + 1))), FIVE_MB + 1)
+import random
import requests
+import string
import time
from nose.tools import eq_ as eq
excName = str(excClass)
raise AssertionError("%s not raised" % excName)
+def generate_random(size, part_size=5*1024*1024):
+ """
+ Generate the specified number random data.
+ (actually each MB is a repetition of the first KB)
+ """
+ chunk = 1024
+ allowed = string.ascii_letters
+ for x in range(0, size, part_size):
+ strpart = ''.join([allowed[random.randint(0, len(allowed) - 1)] for _ in xrange(chunk)])
+ s = ''
+ left = size - x
+ this_part_size = min(left, part_size)
+ for y in range(this_part_size / chunk):
+ s = s + strpart
+ s = s + strpart[:(this_part_size % chunk)]
+ yield s
+ if (x == size):
+ return
+
# syncs all the regions except for the one passed in
def region_sync_meta(targets, region):