From 91575727b37f4bb528bd8c1608d0b09e8b261d9d Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 17 Apr 2015 16:24:42 -0700 Subject: [PATCH] radosgw-sync: keep full-sync state Use a new replicalog construct to keep track of the full sync stage. Set state to 'init'. Generate a list of all the buckets that need to exist, update all these buckets as keys in the replicalog. Then set the state to 'full-sync'. Later we'll set the state to 'incremental'. Signed-off-by: Yehuda Sadeh --- radosgw_agent/client.py | 49 ++++++++------ radosgw_agent/sync_tool.py | 128 +++++++++++++++++++++++++++++++------ 2 files changed, 138 insertions(+), 39 deletions(-) diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py index 846d5c0..0f20c30 100644 --- a/radosgw_agent/client.py +++ b/radosgw_agent/client.py @@ -414,7 +414,7 @@ def num_log_shards(connection, shard_type): def set_worker_bound(connection, type_, marker, timestamp, - daemon_id, id_, data=None, sync_type='incremental'): + daemon_id, id_, data=None, sync_type='incremental', key=None): """ :param sync_type: The type of synchronization that should be attempted by @@ -422,46 +422,55 @@ def set_worker_bound(connection, type_, marker, timestamp, """ if data is None: data = [] - key = _id_name(type_) boto.log.debug('set_worker_bound: data = %r', data) - return request( - connection, 'post', 'admin/replica_log', - params={ + params={ 'type': type_, - key: id_, 'marker': marker, 'time': timestamp, 'daemon_id': daemon_id, 'sync-type': sync_type, - }, + } + if id_ is not None: + params[_id_name(type_)] = id_ + if key is not None: + params['key'] = key + return request( + connection, 'post', 'admin/replica_log', + params=params, data=json.dumps(data), special_first_param='work_bound', ) -def del_worker_bound(connection, type_, daemon_id, id_): - key = _id_name(type_) +def del_worker_bound(connection, type_, daemon_id, id_, key = None): + params={ + 'type': type_, + 'daemon_id': daemon_id, + } + if id_ is not None: + params[_id_name(type_)] = id_ + if key is not None: + params['key'] = key return request( connection, 'delete', 'admin/replica_log', - params={ - 'type': type_, - key: id_, - 'daemon_id': daemon_id, - }, + params=params, special_first_param='work_bound', expect_json=False, ) -def get_worker_bound(connection, type_, id_, init_if_not_found=True): - key = _id_name(type_) +def get_worker_bound(connection, type_, id_, init_if_not_found=True, key = None): + params={ + 'type': type_, + } + if id_ is not None: + params[_id_name(type_)] = id_ + if key is not None: + params['key'] = key try: out = request( connection, 'get', 'admin/replica_log', - params={ - 'type': type_, - key: id_, - }, + params=params, special_first_param='bounds', ) dev_log.debug('get_worker_bound returned: %r', out) diff --git a/radosgw_agent/sync_tool.py b/radosgw_agent/sync_tool.py index 9ef1795..2a08d22 100644 --- a/radosgw_agent/sync_tool.py +++ b/radosgw_agent/sync_tool.py @@ -22,6 +22,7 @@ from radosgw_agent import config from radosgw_agent import worker from radosgw_agent.util import string from radosgw_agent.exceptions import SkipShard, SyncError, SyncTimedOut, SyncFailed, NotFound, BucketEmpty +from radosgw_agent.constants import DEFAULT_TIME log = logging.getLogger(__name__) @@ -732,12 +733,83 @@ class Object(object): print dump_json(entries) +class BucketState(object): + def __init__(self, bucket, shard, marker, bound): + self.bucket = bucket + self.shard = shard + self.marker = marker + self.bound = bound -class Zone(object): - def __init__(self, sync): +class ZoneFullSyncState(object): + def __init__(self, sync, zone): self.sync = sync + self.zone = zone + self.marker = {} + self.status = self.get_full_sync_status() + + + def get_full_sync_status(self): + cur_bound = client.get_worker_bound( + self.sync.dest_conn, + 'zone-full-sync', + None, + init_if_not_found=False) + + if not cur_bound: + self.set_state('init') + return self.marker + + self.marker = json.loads(cur_bound['marker']) + return self.marker + + def set_state(self, state): + self.marker['state'] = state + print json.dumps(self.marker) + client.set_worker_bound(self.sync.dest_conn, 'zone-full-sync', + json.dumps(self.marker), + DEFAULT_TIME, + self.sync.worker.daemon_id, + None) + + + +class BucketsIterator(object): + def __init__(self, sync, zone, bucket_name): + self.sync = sync + self.zone = zone + self.bucket_name = bucket_name + + self.explicit_bucket = bucket_name is not None + + self.fs_state_manager = ZoneFullSyncState(sync, zone) + self.fs_state = self.fs_state_manager.get_full_sync_status() + + def build_full_sync_work(self): + log.info('building full data sync work') + for bucket_name in client.get_bucket_list(self.sync.src_conn): + marker = '' + client.set_worker_bound(self.sync.dest_conn, + 'zone.full_data_sync', + marker, + DEFAULT_TIME, + self.sync.worker.daemon_id, + 0, + data=None, + key=bucket_name) + log.info('adding bucket to full sync work: {b}'.format(b=bucket_name)) + + + def iterate_dirty_buckets(self): + if not self.explicit_bucket: + cur_state = self.fs_state['state'] + if cur_state == 'init': + self.build_full_sync_work() + self.fs_state_manager.set_state('full-sync') + else: + assert False + else: + src_buckets = [self.bucket_name] - def iterate_diff(self, src_buckets): for b in src_buckets: buck = Bucket(b, -1, self.sync) @@ -759,21 +831,43 @@ class Zone(object): continue if marker != bound: - yield buck, buck.bucket_instance, shard, marker, bound + yield BucketState(buck, shard, marker, bound) + +class Zone(object): + def __init__(self, sync): + self.sync = sync - def sync_data(self, src_buckets): + def set_full_sync_bound_entry(self, bucket_name): + return client.set_worker_bound(self.sync_work.dest_conn, + 'zone-full-sync', + marker, + timestamp, + self.sync_work.worker.daemon_id, + self.shard_instance, + data=data) + + def iterate_diff(self, bi): + for bs in bi.iterate_dirty_buckets(): + yield bs + + def sync_data(self, bi): gens = {} objs_per_bucket = 10 concurrent_buckets = 2 while True: - for bucket, bucket_id, shard, marker, bound in self.iterate_diff(src_buckets): - print dump_json({'bucket': bucket.bucket, 'bucket_id': bucket_id, 'shard_id': shard.shard_id, 'marker': marker, 'bound': bound}) + for bs in self.iterate_diff(bi): + bucket = bs.bucket + shard = bs.shard + marker = bs.marker + bound = bs.bound + + print dump_json({'bucket': bucket.bucket, 'bucket_instance': bucket_instance, 'shard_id': bucket_state.shard.shard_id, 'marker': marker, 'bound': bound}) si = ShardIter(shard) - bucket_shard_id = bucket_id + ':' + str(shard.shard_id) + bucket_shard_id = bucket_instance + ':' + str(shard.shard_id) bucket.sync_meta() @@ -803,7 +897,6 @@ class Zone(object): yield obj - class SyncToolCommand(object): def __init__(self): @@ -945,7 +1038,7 @@ The commands are: target = '' if len(target) == 0: - self.zone.sync_data(client.get_bucket_list(self.src_conn)) + self.zone.sync_data(BucketsIterator(self.sync, self.zone, None)) # client.get_bucket_list(self.src_conn)) elif len(target) == 1: bucket = target[0] self.zone.sync_data([bucket]) @@ -967,21 +1060,18 @@ The commands are: parser.add_argument('bucket_name', nargs='?') args = parser.parse_args(self.remaining[1:]) - if not args.bucket_name: - src_buckets = client.get_bucket_list(self.src_conn) - else: - src_buckets = [args.bucket_name] + bi = BucketsIterator(self.sync, self.zone, args.bucket_name) + + for bs in bi.iterate_dirty_buckets(): + bucket = bs.bucket - for bucket, bucket_id, shard, marker, bound in self.zone.iterate_diff(src_buckets): - print dump_json({'bucket': bucket.bucket, 'bucket_id': bucket_id, 'shard_id': shard.shard_id, 'marker': marker, 'bound': bound}) + print dump_json({'bucket': bucket.bucket, 'bucket_instance': bucket.bucket_instance, 'shard_id': bs.shard.shard_id, 'marker': bs.marker, 'bound': bs.bound}) - si = ShardIter(shard) + si = ShardIter(bs.shard) for (obj, marker, op) in si.iterate_diff_objects(): print obj, marker - - def get_bucket_bounds(self, bucket): print dump_json({'src': src_buckets, 'dest': dest_buckets}) -- 2.47.3