return hash_val % self.num_shards
- def iterate_full_sync_buckets(self):
+ def iterate_buckets(self):
for shard_id in xrange(self.num_shards):
info = client.list_worker_bound(self.sync.dest_conn, 'zone.full_data_sync', shard_id)
if info is not None:
yield k
+class SingleBucketSyncManager(object):
+ def __init__(self, bucket_name):
+ self.bucket_name = bucket_name
+
+ def iterate_buckets(self):
+ yield self.bucket_name
+
+
class BucketsIterator(object):
def __init__(self, sync, zone, bucket_name):
log.info('adding bucket to full sync work: {b}'.format(b=bucket_name))
def iterate_dirty_buckets(self):
+ manager = None
+
if not self.explicit_bucket:
cur_state = self.fs_state['state']
if cur_state == 'init':
cur_state = 'full-sync'
if cur_state == 'full-sync':
- src_buckets = self.fs_state_manager.iterate_full_sync_buckets()
+ manager = self.fs_state_manager
else:
+ # incremental sync goes here
assert False
else:
- src_buckets = [self.bucket_name]
+ manager = SingleBucketSyncManager(self.bucket_name)
- for b in src_buckets:
+ for b in manager.iterate_buckets():
print 'bucket=', b
try:
buck = Bucket(b, -1, self.sync)
# an empty bucket
continue
- if marker != bound:
+ if marker != bound: # need to sync!
yield BucketState(buck, shard, marker, bound)
class Zone(object):
def __init__(self, sync):
self.sync = sync
- def iterate_diff(self, bi):
- for bs in bi.iterate_dirty_buckets():
- yield bs
-
def sync_data(self, bi, restart):
gens = {}
if restart:
bi.fs_state_manager.set_state('init')
-
objs_per_bucket = 10
concurrent_buckets = 2
while True:
- for bs in self.iterate_diff(bi):
+ for bs in bi.iterate_dirty_buckets():
bucket = bs.bucket
shard = bs.shard
marker = bs.marker