import argparse
import contextlib
import logging
-import time
import yaml
import sys
help='seconds to wait for an individual object to sync before '
'assuming failure',
)
+ parser.add_argument(
+ '--prepare-error-delay',
+ type=check_positive_int,
+ default=10,
+ help='seconds to wait before retrying when preparing '
+ 'an incremental sync fails',
+ )
parser.add_argument(
'--rgw-data-log-window',
type=check_positive_int,
# due to rgw's window of data log updates during which the bucket index
# log may still be updated without the data log getting a new entry for
# the bucket
- meta_syncer.prepare()
+ sync.prepare_sync(meta_syncer, args.prepare_error_delay)
if not args.metadata_only:
- data_syncer.prepare()
+ sync.prepare_sync(data_syncer, args.prepare_error_delay)
if args.sync_scope == 'full':
log.info('syncing all metadata')
log.info('Finished full sync. Check logs to see any issues that '
'incremental sync will retry.')
else:
- while True:
- try:
- meta_syncer.sync(args.num_workers, args.lock_timeout)
- if not args.metadata_only:
- data_syncer.sync(args.num_workers, args.lock_timeout)
- except Exception as e:
- log.warn('error doing incremental sync, will try again: %s', e)
- # prepare data before sleeping due to rgw_log_bucket_window
- data_syncer.prepare()
- log.debug('waiting %d seconds until next sync',
- args.incremental_sync_delay)
- time.sleep(args.incremental_sync_delay)
- meta_syncer.prepare()
+ sync.incremental_sync(meta_syncer, data_syncer,
+ args.num_workers,
+ args.lock_timeout,
+ args.incremental_sync_delay,
+ args.metadata_only,
+ args.prepare_error_delay)
# radosgw-agent, so just use a constant value for the daemon id.
DAEMON_ID = 'radosgw-agent'
+def prepare_sync(syncer, error_delay):
+ """Attempt to prepare a syncer for running a sync.
+
+ :param error_delay: seconds to wait before retrying
+
+ This will retry forever so the sync agent continues if radosgws
+ are unavailable temporarily.
+ """
+ while True:
+ try:
+ syncer.prepare()
+ break
+ except Exception as e:
+ log.warn('error preparing for sync, will retry: %s', e)
+ time.sleep(error_delay)
+
+def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout,
+ incremental_sync_delay, metadata_only, error_delay):
+ """Run a continuous incremental sync.
+
+ This will run forever, pausing between syncs by a
+ incremental_sync_delay seconds.
+ """
+ while True:
+ try:
+ meta_syncer.sync(num_workers, lock_timeout)
+ if not metadata_only:
+ data_syncer.sync(num_workers, lock_timeout)
+ except Exception as e:
+ log.warn('error doing incremental sync, will try again: %s', e)
+
+ # prepare data before sleeping due to rgw_log_bucket_window
+ if not metadata_only:
+ prepare_sync(data_syncer, error_delay)
+ log.info('waiting %d seconds until next sync',
+ incremental_sync_delay)
+ time.sleep(incremental_sync_delay)
+ prepare_sync(meta_syncer, error_delay)
+
class Syncer(object):
def __init__(self, src, dest, max_entries, *args, **kwargs):
self.src = src