From: Josh Durgin Date: Fri, 13 Sep 2013 20:01:34 +0000 (-0700) Subject: Set up full data sync X-Git-Tag: v1.1~20 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=393a672517da68c584e3f4c4183c20eaf720129e;p=radosgw-agent.git Set up full data sync Get data log markers and store them after all buckets are synced. Also get bucket-index markers for each bucket, and update them in the bucket index replica log once syncing is complete. The bucket-index replica log updates should be moved to happend after each bucket once the actual object copying is implemented. Signed-off-by: Josh Durgin --- diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py index e9c4756..8254e75 100644 --- a/radosgw_agent/client.py +++ b/radosgw_agent/client.py @@ -152,20 +152,10 @@ def unlock_shard(connection, lock_type, shard_num, zone_id, locker_id): special_first_param='unlock', expect_json=False) -def get_meta_log(connection, shard_num, marker, max_entries): +def get_log(connection, log_type, shard_num, marker, max_entries): return request(connection, 'get', '/admin/log', params={ - 'type': 'metadata', - 'id': shard_num, - 'marker': marker, - 'max-entries': max_entries, - }, - ) - -def get_data_log(connection, shard_num, marker, max_entries): - return request(connection, 'get', '/admin/log', - params={ - 'type': 'data', + 'type': log_type, 'id': shard_num, 'marker': marker, 'max-entries': max_entries, @@ -182,21 +172,44 @@ def get_log_info(connection, log_type, shard_num): special_first_param='info', ) + +def get_bucket_index_marker(connection, bucket_instance): + out = request( + connection, 'get', '/admin/log', + params={ + 'type': 'bucket-index', + 'bucket-instance': bucket_instance, + }, + special_first_param='info', + ) + return out['max_marker'] + def num_log_shards(connection, shard_type): out = request(connection, 'get', '/admin/log', dict(type=shard_type)) return out['num_objects'] -def set_worker_bound(connection, type_, shard_num, marker, timestamp, - daemon_id, data=[]): +def set_worker_bound(connection, type_, marker, timestamp, + daemon_id, data=None, shard_num=None, + bucket_instance=None): + if data is None: + data = [] + + if type_ == 'bucket-index': + key = 'bucket-index' + value = bucket_instance + else: + key = 'id' + value = shard_num + return request( connection, 'post', '/admin/replica_log', - params=dict( - type=type_, - id=shard_num, - marker=marker, - time=timestamp, - daemon_id=daemon_id, - ), + params={ + 'type': type_, + key: value, + 'marker': marker, + 'time': timestamp, + 'daemon_id': daemon_id, + }, data=json.dumps(data), special_first_param='work_bound', ) diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py index 2f9714d..9e5a504 100644 --- a/radosgw_agent/sync.py +++ b/radosgw_agent/sync.py @@ -1,5 +1,4 @@ import datetime -import hashlib import logging import multiprocessing @@ -110,44 +109,61 @@ class DataSyncerFull(Syncer): super(DataSyncerFull, self).__init__(*args, **kwargs) self.worker_cls = worker.DataWorkerFull + def shard_num_for_bucket(self, bucket_name, num_shards): + bucket_name = bucket_name.encode('utf8') + hash_val = 0 + for char in bucket_name: + c = ord(char) + hash_val = (hash_val + (c << 4) + (c >> 4)) * 11; + return hash_val % num_shards; + + def get_bucket_instance(self, bucket_name): + metadata = client.get_metadata(self.src_conn, 'bucket', bucket_name) + return bucket_name + ':' + metadata['data']['bucket']['bucket_id'] + def prepare(self): # TODO we need some sort of a lock here to make sure that only # one client is getting a list of buckets to sync so that it's # consistent. - num_data_shards = self.get_num_shards() + self.num_shards = self.get_num_shards() + + # get list of buckets before getting any markers to avoid inconsistency + buckets = client.get_bucket_list(self.src_conn) + + # save data log markers for each shard + self.shard_info = [] + for shard in xrange(self.num_shards): + info = client.get_log_info(self.src_conn, 'data', shard) + # setting an empty marker returns an error + if info['marker']: + self.shard_info.append((shard, info['marker'], + info['last_update'])) - # get the set of all buckets and then add an entry to the data replica - # log for each - buckets = client.get_bucket_list(self.src_conn) + # bucket index info doesn't include a timestamp, so just use + # local time since it isn't important for correctness now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ") - shard_to_bucket_dict = dict() + self.bucket_info = [] for bucket_name in buckets: - bucket_hash = int(hashlib.md5(bucket_name).hexdigest(),16) % num_data_shards - if bucket_hash not in shard_to_bucket_dict: - shard_to_bucket_dict[bucket_hash] = list() - # TODO: the docs for set_worker_bound suggest that the data payload should be a list of dicts, with each dict - # haveing a 'bucket' and 'time' entry. I'm trying to stash the state of the bucket (NEEDSSYNC) and the - # bucket name as the value. May need a different approach / delimiter - shard_to_bucket_dict[bucket_hash].append({ - 'bucket': '{needssync}:{bucket_name}'.format(needssync=NEEDS_SYNC,bucket_name=bucket_name), - 'time':now - }) - for shard_num, entries in shard_to_bucket_dict.items(): - # this call returns a list of buckets that are "in progress" from before the time in the "now" variable - # TODO: sort out if this is the proper usage of set_worker_bound - replica_log_output = client.set_worker_bound(self.dest_conn, 'data', shard_num, - 'buckets_in_shard_{n}'.format(n=shard_num), - now, self.daemon_id,in_data=entries - ) - log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output)) + instance = self.get_bucket_instance(bucket_name) + marker = client.get_bucket_index_marker('bucket-instance', instance) + self.bucket_info.append((instance, marker, now)) def generate_work(self): - return xrange(self.get_num_shards()) + return self.bucket_info def complete(self): - # TODO: set replica log - pass + for shard_num, marker, time in self.shard_info: + out = client.set_worker_bound(self.dest_conn, 'data', marker, + time, self.daemon_id, + shard_num=shard_num) + log.debug('jbuck, set replica log output:\n%s', out) + + for bucket_instance, marker, time in self.bucket_info: + out = client.set_worker_bound(self.dest_conn, 'bucket-index', marker, + time, self.daemon_id, + bucket_instance=bucket_instance) + log.debug('set_worker_bound on bucket index replica log returned:\n%s', out) class MetaSyncerFull(Syncer): diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py index 318f5bb..46fd89d 100644 --- a/radosgw_agent/worker.py +++ b/radosgw_agent/worker.py @@ -155,7 +155,7 @@ class DataWorkerFull(DataWorker): worker_bound_info = None buckets_to_sync = [] try: - worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num) + worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num=shard_num) log.debug('data full sync shard {i} data log is {data}'.format(i=shard_num,data=worker_bound_info)) # determine whether there's a marker string with NEEDSSYNC and with # our daemon_id. If so, sync it the bucket(s) that match @@ -261,8 +261,8 @@ class MetadataWorkerIncremental(MetadataWorker): sync up to self.max_entries entries, returning number of entries processed and the last marker of the entries processed. """ - log_entries = client.get_meta_log(self.source_conn, shard_num, - marker, self.max_entries) + log_entries = client.get_log(self.source_conn, 'metadata', shard_num, + marker, self.max_entries) log.info('shard %d has %d entries after %r', shard_num, len(log_entries), marker) @@ -284,9 +284,10 @@ class MetadataWorkerIncremental(MetadataWorker): if entries and not error_encountered: try: client.set_worker_bound(self.dest_conn, 'metadata', - shard_num, entries[-1].marker, + entries[-1].marker, entries[-1].timestamp, - self.daemon_id) + self.daemon_id, + shard_num=shard_num) return len(entries), entries[-1].marker except: log.exception('error setting worker bound for shard {shard_num},' @@ -336,7 +337,7 @@ class MetadataWorkerIncremental(MetadataWorker): try: marker, time = client.get_min_worker_bound(self.dest_conn, 'metadata', - shard_num) + shard_num=shard_num) log.debug('oldest marker and time for shard %d are: %r %r', shard_num, marker, time) except client.NotFound: