import boto.s3.tagging
# configure logging for the tests module
-log = logging.getLogger(__name__)
+class LogWrapper:
+ def __init__(self):
+ self.log = logging.getLogger(__name__)
+ self.errors = 0
+
+ def info(self, msg, *args, **kwargs):
+ try:
+ self.log.info(msg, *args, **kwargs)
+ except BlockingIOError:
+ self.errors += 1
+
+ def warning(self, msg, *args, **kwargs):
+ try:
+ self.log.warning(msg, *args, **kwargs)
+ except BlockingIOError:
+ self.errors += 1
+
+ def error(self, msg, *args, **kwargs):
+ try:
+ self.log.error(msg, *args, **kwargs)
+ except BlockingIOError:
+ self.errors += 1
+
+ def __del__(self):
+ if self.errors > 0:
+ self.log.error("%d logs were lost", self.errors)
+
+
+log = LogWrapper()
TOPIC_SUFFIX = "_topic"
NOTIFICATION_SUFFIX = "_notif"
self.addr = addr
self.lock = threading.Lock()
ThreadingHTTPServer.__init__(self, addr, HTTPPostHandler)
- log.info('http server created on %s', str(self.addr))
+ log.info('http server created on %s', self.addr)
self.proc = threading.Thread(target=self.run)
self.proc.start()
retries = 0
assert_equal(result[1], 0)
# create objects in the bucket (async)
- number_of_objects = 100
+ number_of_objects = 20
client_threads = []
start_time = time.time()
for i in range(number_of_objects):