From: ubuntu Date: Tue, 3 Sep 2013 16:43:56 +0000 (-0700) Subject: Enhancing exception handling X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d091aac448fcea2896354dc44f8d0c5182737e39;p=radosgw-agent.git Enhancing exception handling Previously, a sync worker receiving a ConnectionError exception would be effectively dead but the sync worker would still be waiting for more results. This patch adds a check that if all the worker processes have failed due to connection errors, then the syncer should exit. Signed-off-by: Joe Buck --- diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py index 9855241..422ba8a 100644 --- a/radosgw_agent/sync.py +++ b/radosgw_agent/sync.py @@ -57,13 +57,22 @@ class Syncer: # pull the results out as they are produced errors = [] + connection_errors = [] for i in xrange(num_shards): + # if all processes error out, stop trying to process data + if len(connection_errors) == len(processes): + log.error('All {num_workers} incremental sync workers have failed.' + ' Ceasing to process shards'.format(num_workers=len(processes))) + break result, shard_num = resultQueue.get() if result == worker.RESULT_SUCCESS: log.debug('synced shard %d', shard_num) else: - log.error('error syncing shard %d', shard_num) + log.error('error on incremental sync of shard %d', shard_num) errors.append(shard_num) + if result == worker.RESULT_CONNECTION_ERROR: + connection_errors.append(shard_num) + log.info('%d/%d shards processed', i + 1, num_shards) if errors: log.error('Encountered errors syncing these %d shards: %s', @@ -127,14 +136,23 @@ class Syncer: # pull the results out as they are produced errors = [] + connection_errors = [] for i in xrange(len(meta_keys)): + # if all processes error out, stop trying to process data + if len(connection_errors) == len(processes): + log.error('All {num_workers} full sync workers have failed.' + ' Ceasing to process shards'.format(num_workers=len(processes))) + break + log.info('%d/%d items synced', i, len(meta_keys)) result, section, name = resultQueue.get() if result != worker.RESULT_SUCCESS: - log.error('error syncing %s %r', section, name) + log.error('error on full sync of %s %r', section, name) errors.append((section, name)) else: log.debug('synced %s %r', section, name) + if result == worker.RESULT_CONNECTION_ERROR: + connection_errors.append(shard_num) for process in processes: process.join() if errors: diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py index 7913e4b..b418cd7 100644 --- a/radosgw_agent/worker.py +++ b/radosgw_agent/worker.py @@ -1,6 +1,7 @@ from collections import namedtuple import logging import multiprocessing +import requests import os import socket @@ -11,6 +12,7 @@ log = logging.getLogger(__name__) RESULT_SUCCESS = 0 RESULT_ERROR = 1 +RESULT_CONNECTION_ERROR = 2 class Worker(multiprocessing.Process): """sync worker to run in its own process""" @@ -155,12 +157,19 @@ class MetadataWorkerIncremental(MetadataWorker): self.result_queue.put((RESULT_SUCCESS, shard_num)) continue except client.HttpError as e: - log.info('error locking shard %d log, assuming' + log.exception('error locking shard %d log, assuming' ' it was processed by someone else and skipping: %s', shard_num, e) self.lock.unset_shard() self.result_queue.put((RESULT_ERROR, shard_num)) continue + except requests.exceptions.ConnectionError as e: + log.exception('ConnectionError encountered. Bailing out of' + ' processing loop for shard %d. %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num)) + break result = RESULT_SUCCESS try: @@ -172,6 +181,13 @@ class MetadataWorkerIncremental(MetadataWorker): except client.NotFound: # if no worker bounds have been set, start from the beginning marker, time = '', '1970-01-01 00:00:00' + except requests.exceptions.ConnectionError as e: + log.exception('ConnectionError encountered. Bailing out of' + ' processing loop for shard %d. %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num)) + break except Exception as e: log.exception('error getting worker bound for shard %d', shard_num) @@ -180,6 +196,13 @@ class MetadataWorkerIncremental(MetadataWorker): try: if result == RESULT_SUCCESS: self.get_and_process_entries(marker, shard_num) + except requests.exceptions.ConnectionError as e: + log.exception('ConnectionError encountered. Bailing out of' + ' processing loop for shard %d. %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num)) + break except: log.exception('syncing entries from %s for shard %d failed', marker, shard_num)