# pull the results out as they are produced
errors = []
+ connection_errors = []
for i in xrange(num_shards):
+ # if all processes error out, stop trying to process data
+ if len(connection_errors) == len(processes):
+ log.error('All {num_workers} incremental sync workers have failed.'
+ ' Ceasing to process shards'.format(num_workers=len(processes)))
+ break
result, shard_num = resultQueue.get()
if result == worker.RESULT_SUCCESS:
log.debug('synced shard %d', shard_num)
else:
- log.error('error syncing shard %d', shard_num)
+ log.error('error on incremental sync of shard %d', shard_num)
errors.append(shard_num)
+ if result == worker.RESULT_CONNECTION_ERROR:
+ connection_errors.append(shard_num)
+
log.info('%d/%d shards processed', i + 1, num_shards)
if errors:
log.error('Encountered errors syncing these %d shards: %s',
# pull the results out as they are produced
errors = []
+ connection_errors = []
for i in xrange(len(meta_keys)):
+ # if all processes error out, stop trying to process data
+ if len(connection_errors) == len(processes):
+ log.error('All {num_workers} full sync workers have failed.'
+ ' Ceasing to process shards'.format(num_workers=len(processes)))
+ break
+
log.info('%d/%d items synced', i, len(meta_keys))
result, section, name = resultQueue.get()
if result != worker.RESULT_SUCCESS:
- log.error('error syncing %s %r', section, name)
+ log.error('error on full sync of %s %r', section, name)
errors.append((section, name))
else:
log.debug('synced %s %r', section, name)
+ if result == worker.RESULT_CONNECTION_ERROR:
+ connection_errors.append(shard_num)
for process in processes:
process.join()
if errors:
from collections import namedtuple
import logging
import multiprocessing
+import requests
import os
import socket
RESULT_SUCCESS = 0
RESULT_ERROR = 1
+RESULT_CONNECTION_ERROR = 2
class Worker(multiprocessing.Process):
"""sync worker to run in its own process"""
self.result_queue.put((RESULT_SUCCESS, shard_num))
continue
except client.HttpError as e:
- log.info('error locking shard %d log, assuming'
+ log.exception('error locking shard %d log, assuming'
' it was processed by someone else and skipping: %s',
shard_num, e)
self.lock.unset_shard()
self.result_queue.put((RESULT_ERROR, shard_num))
continue
+ except requests.exceptions.ConnectionError as e:
+ log.exception('ConnectionError encountered. Bailing out of'
+ ' processing loop for shard %d. %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+ break
result = RESULT_SUCCESS
try:
except client.NotFound:
# if no worker bounds have been set, start from the beginning
marker, time = '', '1970-01-01 00:00:00'
+ except requests.exceptions.ConnectionError as e:
+ log.exception('ConnectionError encountered. Bailing out of'
+ ' processing loop for shard %d. %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+ break
except Exception as e:
log.exception('error getting worker bound for shard %d',
shard_num)
try:
if result == RESULT_SUCCESS:
self.get_and_process_entries(marker, shard_num)
+ except requests.exceptions.ConnectionError as e:
+ log.exception('ConnectionError encountered. Bailing out of'
+ ' processing loop for shard %d. %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+ break
except:
log.exception('syncing entries from %s for shard %d failed',
marker, shard_num)