From 1214f9ecc283fe2f16ed30571931722df969bb15 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 6 May 2015 14:15:59 -0700 Subject: [PATCH] sync: multiple fixes Signed-off-by: Yehuda Sadeh --- radosgw_agent/client.py | 23 +++++++++++++++++++++++ radosgw_agent/sync_tool.py | 25 ++++++++++++++++++------- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py index 0f20c30..0d6f8b8 100644 --- a/radosgw_agent/client.py +++ b/radosgw_agent/client.py @@ -495,6 +495,29 @@ def get_worker_bound(connection, type_, id_, init_if_not_found=True, key = None) out['retries'] = retries return out +def list_worker_bound(connection, type_, id_): + params={ + 'type': type_, + } + if id_ is not None: + params[_id_name(type_)] = id_ + params['list-keys'] = True + try: + out = request( + connection, 'get', 'admin/replica_log', + params=params, + special_first_param='bounds', + ) + dev_log.debug('get_worker_bound returned: %r', out) + except exc.NotFound: + dev_log.debug('no worker bound found for %s "%s"', + type_, id_) + # if no worker bounds have been set, start from the beginning + # returning fallback, default values + return None + + return out + class Zone(object): def __init__(self, zone_info): diff --git a/radosgw_agent/sync_tool.py b/radosgw_agent/sync_tool.py index 09af7cf..a045664 100644 --- a/radosgw_agent/sync_tool.py +++ b/radosgw_agent/sync_tool.py @@ -752,7 +752,7 @@ class ZoneFullSyncState(object): def get_full_sync_status(self): cur_bound = client.get_worker_bound( self.sync.dest_conn, - 'zone-full-sync', + 'zone.full_data_sync.meta', None, init_if_not_found=False) @@ -766,7 +766,7 @@ class ZoneFullSyncState(object): def set_state(self, state): self.marker['state'] = state print json.dumps(self.marker) - client.set_worker_bound(self.sync.dest_conn, 'zone-full-sync', + client.set_worker_bound(self.sync.dest_conn, 'zone.full_data_sync.meta', json.dumps(self.marker), DEFAULT_TIME, self.sync.worker.daemon_id, @@ -781,6 +781,14 @@ class ZoneFullSyncState(object): return hash_val % self.num_shards + def iterate_full_sync_buckets(self): + for shard_id in xrange(self.num_shards): + info = client.list_worker_bound(self.sync.dest_conn, 'zone.full_data_sync', shard_id) + if info is not None: + for k in info['keys']: + yield k + + class BucketsIterator(object): def __init__(self, sync, zone, bucket_name): @@ -811,13 +819,16 @@ class BucketsIterator(object): key=bucket_name) log.info('adding bucket to full sync work: {b}'.format(b=bucket_name)) - def iterate_dirty_buckets(self): if not self.explicit_bucket: cur_state = self.fs_state['state'] if cur_state == 'init': self.build_full_sync_work() self.fs_state_manager.set_state('full-sync') + cur_state = 'full-sync' + + if cur_state == 'full-sync': + src_buckets = self.fs_state_manager.iterate_full_sync_buckets() else: assert False else: @@ -858,7 +869,7 @@ class Zone(object): gens = {} if restart: - self.fs_state_manager.set_state('init') + bi.fs_state_manager.set_state('init') objs_per_bucket = 10 @@ -871,11 +882,11 @@ class Zone(object): marker = bs.marker bound = bs.bound - print dump_json({'bucket': bucket.bucket, 'bucket_instance': bucket_instance, 'shard_id': bucket_state.shard.shard_id, 'marker': marker, 'bound': bound}) + print dump_json({'bucket': bucket.bucket, 'bucket_instance': bucket.bucket_instance, 'shard_id': shard.shard_id, 'marker': marker, 'bound': bound}) si = ShardIter(shard) - bucket_shard_id = bucket_instance + ':' + str(shard.shard_id) + bucket_shard_id = bucket.bucket_instance + ':' + str(shard.shard_id) bucket.sync_meta() @@ -1052,7 +1063,7 @@ The commands are: self.zone.sync_data(BucketsIterator(self.sync, self.zone, None), args.restart) # client.get_bucket_list(self.src_conn)) elif len(target) == 1: bucket = target[0] - self.zone.sync_data([bucket]) + self.zone.sync_data(BucketsIterator(self.sync, self.zone, bucket), args.restart) log.info('sync bucket={b}'.format(b=bucket)) else: bucket = target[0] -- 2.47.3