out['retries'] = retries
return out
+def list_worker_bound(connection, type_, id_):
+ params={
+ 'type': type_,
+ }
+ if id_ is not None:
+ params[_id_name(type_)] = id_
+ params['list-keys'] = True
+ try:
+ out = request(
+ connection, 'get', 'admin/replica_log',
+ params=params,
+ special_first_param='bounds',
+ )
+ dev_log.debug('get_worker_bound returned: %r', out)
+ except exc.NotFound:
+ dev_log.debug('no worker bound found for %s "%s"',
+ type_, id_)
+ # if no worker bounds have been set, start from the beginning
+ # returning fallback, default values
+ return None
+
+ return out
+
class Zone(object):
def __init__(self, zone_info):
def get_full_sync_status(self):
cur_bound = client.get_worker_bound(
self.sync.dest_conn,
- 'zone-full-sync',
+ 'zone.full_data_sync.meta',
None,
init_if_not_found=False)
def set_state(self, state):
self.marker['state'] = state
print json.dumps(self.marker)
- client.set_worker_bound(self.sync.dest_conn, 'zone-full-sync',
+ client.set_worker_bound(self.sync.dest_conn, 'zone.full_data_sync.meta',
json.dumps(self.marker),
DEFAULT_TIME,
self.sync.worker.daemon_id,
return hash_val % self.num_shards
+ def iterate_full_sync_buckets(self):
+ for shard_id in xrange(self.num_shards):
+ info = client.list_worker_bound(self.sync.dest_conn, 'zone.full_data_sync', shard_id)
+ if info is not None:
+ for k in info['keys']:
+ yield k
+
+
class BucketsIterator(object):
def __init__(self, sync, zone, bucket_name):
key=bucket_name)
log.info('adding bucket to full sync work: {b}'.format(b=bucket_name))
-
def iterate_dirty_buckets(self):
if not self.explicit_bucket:
cur_state = self.fs_state['state']
if cur_state == 'init':
self.build_full_sync_work()
self.fs_state_manager.set_state('full-sync')
+ cur_state = 'full-sync'
+
+ if cur_state == 'full-sync':
+ src_buckets = self.fs_state_manager.iterate_full_sync_buckets()
else:
assert False
else:
gens = {}
if restart:
- self.fs_state_manager.set_state('init')
+ bi.fs_state_manager.set_state('init')
objs_per_bucket = 10
marker = bs.marker
bound = bs.bound
- print dump_json({'bucket': bucket.bucket, 'bucket_instance': bucket_instance, 'shard_id': bucket_state.shard.shard_id, 'marker': marker, 'bound': bound})
+ print dump_json({'bucket': bucket.bucket, 'bucket_instance': bucket.bucket_instance, 'shard_id': shard.shard_id, 'marker': marker, 'bound': bound})
si = ShardIter(shard)
- bucket_shard_id = bucket_instance + ':' + str(shard.shard_id)
+ bucket_shard_id = bucket.bucket_instance + ':' + str(shard.shard_id)
bucket.sync_meta()
self.zone.sync_data(BucketsIterator(self.sync, self.zone, None), args.restart) # client.get_bucket_list(self.src_conn))
elif len(target) == 1:
bucket = target[0]
- self.zone.sync_data([bucket])
+ self.zone.sync_data(BucketsIterator(self.sync, self.zone, bucket), args.restart)
log.info('sync bucket={b}'.format(b=bucket))
else:
bucket = target[0]