SYNC_IN_PROGRESS = 'INPROGRESS'
SYNC_COMPLETED = 'DONESYNC'
-class Syncer:
+class Syncer(object):
def __init__(self, type_, src, dest, daemon_id):
- self._type = type_
+ self.type = type_
self.src = src
self.dest = dest
self.src_conn = client.connection(src)
self.dest_conn = client.connection(dest)
self.daemon_id = daemon_id
+ self.worker_cls = None # filled in by subclass constructor
- def data_sync_incremental(self, num_workers, log_lock_time, max_entries):
- pass
-
- def metadata_sync_incremental(self, num_workers, log_lock_time, max_entries):
+ def get_num_shards(self):
try:
- num_shards = client.num_log_shards(self.src_conn, self._type)
- except:
+ num_shards = client.num_log_shards(self.src_conn, self.type)
+ log.debug('%d shards to check', num_shards)
+ return num_shards
+ except Exception:
log.exception('finding number of shards failed')
raise
- log.debug('We have %d shards to check', num_shards)
- # create the work and results Queue
+ def prepare(self):
+ """Setup any state required before syncing starts"""
+ pass
+
+ def generate_work(self):
+ """Generate items to be place in a queue or processing"""
+ pass
+
+ def complete(self):
+ """Called when syncing completes successfully"""
+ pass
+
+ def get_worker_cls(self):
+ """Return the subclass of Worker to run"""
+ pass
+
+ def sync(self, num_workers, log_lock_time, max_entries=None):
+ self.prepare()
+
workQueue = multiprocessing.Queue()
resultQueue = multiprocessing.Queue()
- # create the worker processes
- if self._type == 'data':
- worker_cls = worker.DataWorkerIncremental
- else:
- worker_cls = worker.MetadataWorkerIncremental
- processes = [worker_cls(workQueue,
- resultQueue,
- log_lock_time,
- self.src,
- self.dest,
- daemon_id=self.daemon_id,
- max_entries=max_entries)
+ processes = [self.worker_cls(workQueue,
+ resultQueue,
+ log_lock_time,
+ self.src,
+ self.dest,
+ daemon_id=self.daemon_id,
+ max_entries=max_entries)
for i in xrange(num_workers)]
for process in processes:
process.daemon = True
process.start()
- log.info('Starting incremental sync')
+ log.info('Starting sync')
# enqueue the shards to be synced
- for i in xrange(num_shards):
- workQueue.put(i)
+ num_items = 0
+ for item in self.generate_work():
+ num_items += 1
+ workQueue.put(item)
# add a poison pill for each worker
for i in xrange(num_workers):
# pull the results out as they are produced
errors = []
- for i in xrange(num_shards):
- result, shard_num = resultQueue.get()
+ for i in xrange(num_items):
+ result, item = resultQueue.get()
if result == worker.RESULT_SUCCESS:
- log.debug('synced shard %d', shard_num)
+ log.debug('synced item %r', item)
else:
- log.error('error on incremental sync of shard %d', shard_num)
- errors.append(shard_num)
+ log.error('error syncing item %r', item)
+ errors.append(item)
- log.info('%d/%d shards processed', i + 1, num_shards)
+ log.info('%d/%d items processed', i + 1, num_items)
if errors:
- log.error('Encountered errors syncing these %d shards: %s',
+ log.error('Encountered errors syncing these %d items: %r',
len(errors), errors)
+ else:
+ self.complete()
- def sync_incremental(self, num_workers, log_lock_time, max_entries):
- if self._type == 'metadata':
- self.metadata_sync_incremental(num_workers, log_lock_time, max_entries)
- elif self._type == 'data':
- self.data_sync_incremental(num_workers, log_lock_time, max_entries)
- else:
- raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
+class MetaSyncerInc(Syncer):
+
+ def __init__(self, *args, **kwargs):
+ super(MetaSyncerInc, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.MetadataWorkerIncremental
+
+ def generate_work(self):
+ return xrange(self.get_num_shards())
- def data_sync_full(self, num_workers, log_lock_time):
+
+class DataSyncerFull(Syncer):
+
+ def __init__(self, *args, **kwargs):
+ super(DataSyncerFull, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.DataWorkerFull
+
+ def prepare(self):
# TODO we need some sort of a lock here to make sure that only
# one client is getting a list of buckets to sync so that it's
# consistent.
- num_data_shards = client.num_log_shards(self.src_conn, 'data')
- log.debug('There are {ns} data log shards'.format(ns=num_data_shards))
+ num_data_shards = self.get_num_shards()
# get the set of all buckets and then add an entry to the data replica
# log for each
)
log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output))
- # create the work and results Queue
- workQueue = multiprocessing.Queue()
- resultQueue = multiprocessing.Queue()
-
- # create the worker processes
- if self._type == 'data':
- worker_cls = worker.DataWorkerFull
- else:
- worker_cls = worker.MetadataWorkerFull
-
- processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src,
- self.dest, daemon_id=self.daemon_id) for i in xrange(num_workers)]
- for process in processes:
- process.daemon = True
- process.start()
-
- start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
- log.info('Starting full data sync at %s', start_time)
+ def generate_work(self):
+ return xrange(self.get_num_shards())
- # enqueue the shards to be synced.
- # the shards simply tell the worker tasks which data replica log shard to check
- for shard_num in xrange(num_data_shards):
- workQueue.put(shard_num)
-
- # add a poison pill for each worker
- for i in xrange(num_workers):
- workQueue.put(None)
+ def complete(self):
+ # TODO: set replica log
+ pass
- # pull the results out as they are produced
- errors = []
- for i in xrange(num_data_shards):
- log.info('%d/%d shards synced, ', i, num_data_shards)
- result, shard_num = resultQueue.get()
- if result != worker.RESULT_SUCCESS:
- log.error('error syncing shard %d', shard_num)
- errors.append((shard_num))
- else:
- log.debug('synced shard %s', shard_num)
- for process in processes:
- process.join()
- if errors:
- log.error('Encountered errors syncing these %d entries: %s',
- len(errors), errors)
+class MetaSyncerFull(Syncer):
+ def __init__(self, *args, **kwargs):
+ super(MetaSyncerInc, self).__init__(*args, **kwargs)
+ self.worker_cls = worker.MetadataWorkerFull
- def metadata_sync_full(self, num_workers, log_lock_time):
+ def prepare(self):
try:
- sections = client.get_metadata_sections(self.src_conn)
+ self.sections = client.get_metadata_sections(self.src_conn)
except client.HttpError as e:
log.error('Error listing metadata sections: %s', e)
raise
# grab the lastest shard markers and timestamps before we sync
- shard_info = []
- num_shards = client.num_log_shards(self.src_conn, 'metadata')
+ self.shard_info = []
+ num_shards = self.get_num_shards()
for shard_num in xrange(num_shards):
info = client.get_log_info(self.src_conn, 'metadata', shard_num)
# setting an empty marker returns an error
if info['marker']:
- shard_info.append((shard_num, info['marker'],
- info['last_update']))
+ self.shard_info.append((shard_num, info['marker'],
+ info['last_update']))
- meta_keys = []
- for section in sections:
+ def generate_work(self):
+ for section in self.sections:
try:
- meta_keys += [(section, key) for key in
- client.list_metadata_keys(self.src_conn, section)]
+ yield [(section, key) for key in
+ client.list_metadata_keys(self.src_conn, section)]
except client.NotFound:
# no keys of this type exist
continue
section, e)
raise
- # create the work and results Queue
- workQueue = multiprocessing.Queue()
- resultQueue = multiprocessing.Queue()
-
- # create the worker processes
- if self._type == 'data':
- worker_cls = worker.DataWorkerFull
- else:
- worker_cls = worker.MetadataWorkerFull
- processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src,
- self.dest) for i in xrange(num_workers)]
- for process in processes:
- process.daemon = True
- process.start()
-
- start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
- log.info('Starting full sync at %s', start_time)
-
- # enqueue the shards to be synced
- for meta in meta_keys:
- workQueue.put(meta)
-
- # add a poison pill for each worker
- for i in xrange(num_workers):
- workQueue.put(None)
-
- # pull the results out as they are produced
- errors = []
- for i in xrange(len(meta_keys)):
- log.info('%d/%d items synced', i, len(meta_keys))
- result, section, name = resultQueue.get()
- if result != worker.RESULT_SUCCESS:
- log.error('error on full sync of %s %r', section, name)
- errors.append((section, name))
- else:
- log.debug('synced %s %r', section, name)
- for process in processes:
- process.join()
- if errors:
- log.error('Encountered errors syncing these %d entries: %s',
- len(errors), errors)
- else:
- for shard_num, marker, timestamp in shard_info:
- client.set_worker_bound(self.dest_conn, 'metadata', shard_num,
- marker, timestamp, self.daemon_id)
- client.del_worker_bound(self.dest_conn, 'metadata', shard_num,
- self.daemon_id)
-
- def sync_full(self, num_workers, log_lock_time):
- if self._type == 'metadata':
- self.metadata_sync_full(num_workers, log_lock_time)
- elif self._type == 'data':
- self.data_sync_full(num_workers, log_lock_time)
- else:
- raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
-
+ def complete(self):
+ for shard_num, marker, timestamp in self.shard_info:
+ client.set_worker_bound(self.dest_conn, 'metadata', shard_num,
+ marker, timestamp, self.daemon_id)
+ client.del_worker_bound(self.dest_conn, 'metadata', shard_num,
+ self.daemon_id)