def get_log_entries(self, shard_num, marker):
try:
- log_entries = client.get_log(self.src_conn, self.type,
- marker, self.max_entries,
- shard_num)
+ result = client.get_log(self.src_conn, self.type,
+ marker, self.max_entries,
+ shard_num)
+ last_marker = result['marker']
+ log_entries = result['entries']
if len(log_entries) == self.max_entries:
log.warn('shard %d log has fallen behind - log length >= %d',
shard_num)
except client.NotFound:
# no entries past this marker yet, but we my have retries
+ last_marker = ''
log_entries = []
- return log_entries
+ return last_marker, log_entries
def prepare(self):
self.init_num_shards()
self.shard_work = {}
for shard_num in xrange(self.num_shards):
marker, retries = self.get_worker_bound(shard_num)
- log_entries = self.get_log_entries(shard_num, marker)
- self.shard_info[shard_num] = marker
+ last_marker, log_entries = self.get_log_entries(shard_num, marker)
self.shard_work[shard_num] = log_entries, retries
+ self.shard_info[shard_num] = last_marker
self.prepared_at = time.time()
return self.shard_work.iteritems()
-
class MetaSyncerInc(IncrementalSyncer):
def __init__(self, *args, **kwargs):