]> git-server-git.apps.pok.os.sepia.ceph.com Git - radosgw-agent.git/commitdiff
Enhancing exception handling wip-buck-fix-hang
authorubuntu <jbbuck@gmail.com>
Tue, 3 Sep 2013 16:43:56 +0000 (09:43 -0700)
committerJoe Buck <jbbuck@gmail.com>
Tue, 3 Sep 2013 22:18:00 +0000 (15:18 -0700)
Previously, a sync worker receiving
a ConnectionError exception would
be effectively dead but the sync worker
would still be waiting for more results.
This patch adds a check that if
all the worker processes have failed
due to connection errors,
then the syncer should exit.

Signed-off-by: Joe Buck <jbbuck@gmail.com>
radosgw_agent/sync.py
radosgw_agent/worker.py

index 98552411cd9feac350cd4f2da8ba14a0e084f614..422ba8a4bf98ae670db19c9dc9fb3c4bacd8ba8b 100644 (file)
@@ -57,13 +57,22 @@ class Syncer:
 
         # pull the results out as they are produced
         errors = []
+        connection_errors = []
         for i in xrange(num_shards):
+            # if all processes error out, stop trying to process data
+            if len(connection_errors) == len(processes):
+                log.error('All {num_workers} incremental sync workers have failed.'
+                          ' Ceasing to process shards'.format(num_workers=len(processes)))
+                break
             result, shard_num = resultQueue.get()
             if result == worker.RESULT_SUCCESS:
                 log.debug('synced shard %d', shard_num)
             else:
-                log.error('error syncing shard %d', shard_num)
+                log.error('error on incremental sync of shard %d', shard_num)
                 errors.append(shard_num)
+            if result == worker.RESULT_CONNECTION_ERROR:
+                connection_errors.append(shard_num)
+
             log.info('%d/%d shards processed', i + 1, num_shards)
         if errors:
             log.error('Encountered  errors syncing these %d shards: %s',
@@ -127,14 +136,23 @@ class Syncer:
 
         # pull the results out as they are produced
         errors = []
+        connection_errors = []
         for i in xrange(len(meta_keys)):
+            # if all processes error out, stop trying to process data
+            if len(connection_errors) == len(processes):
+                log.error('All {num_workers} full sync workers have failed.'
+                          ' Ceasing to process shards'.format(num_workers=len(processes)))
+                break
+
             log.info('%d/%d items synced', i, len(meta_keys))
             result, section, name = resultQueue.get()
             if result != worker.RESULT_SUCCESS:
-                log.error('error syncing %s %r', section, name)
+                log.error('error on full sync of %s %r', section, name)
                 errors.append((section, name))
             else:
                 log.debug('synced %s %r', section, name)
+            if result == worker.RESULT_CONNECTION_ERROR:
+                connection_errors.append(shard_num)
         for process in processes:
             process.join()
         if errors:
index 7913e4b3b684b387f5fbaf4b6a3f1d3529cbf788..b418cd724f5fdb7bf2241e483e0e3eb599d6abcb 100644 (file)
@@ -1,6 +1,7 @@
 from collections import namedtuple
 import logging
 import multiprocessing
+import requests
 import os
 import socket
 
@@ -11,6 +12,7 @@ log = logging.getLogger(__name__)
 
 RESULT_SUCCESS = 0
 RESULT_ERROR = 1
+RESULT_CONNECTION_ERROR = 2
 
 class Worker(multiprocessing.Process):
     """sync worker to run in its own process"""
@@ -155,12 +157,19 @@ class MetadataWorkerIncremental(MetadataWorker):
                 self.result_queue.put((RESULT_SUCCESS, shard_num))
                 continue
             except client.HttpError as e:
-                log.info('error locking shard %d log, assuming'
+                log.exception('error locking shard %d log, assuming'
                          ' it was processed by someone else and skipping: %s',
                          shard_num, e)
                 self.lock.unset_shard()
                 self.result_queue.put((RESULT_ERROR, shard_num))
                 continue
+            except requests.exceptions.ConnectionError as e:
+                log.exception('ConnectionError encountered. Bailing out of'
+                              ' processing loop for shard %d. %s', 
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+                break
 
             result = RESULT_SUCCESS
             try:
@@ -172,6 +181,13 @@ class MetadataWorkerIncremental(MetadataWorker):
             except client.NotFound:
                 # if no worker bounds have been set, start from the beginning
                 marker, time = '', '1970-01-01 00:00:00'
+            except requests.exceptions.ConnectionError as e:
+                log.exception('ConnectionError encountered. Bailing out of'
+                              ' processing loop for shard %d. %s', 
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+                break
             except Exception as e:
                 log.exception('error getting worker bound for shard %d',
                               shard_num)
@@ -180,6 +196,13 @@ class MetadataWorkerIncremental(MetadataWorker):
             try:
                 if result == RESULT_SUCCESS:
                     self.get_and_process_entries(marker, shard_num)
+            except requests.exceptions.ConnectionError as e:
+                log.exception('ConnectionError encountered. Bailing out of'
+                              ' processing loop for shard %d. %s', 
+                              shard_num, e)
+                self.lock.unset_shard()
+                self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+                break
             except:
                 log.exception('syncing entries from %s for shard %d failed',
                               marker, shard_num)