From: Joe Buck Date: Tue, 27 Aug 2013 18:14:06 +0000 (-0700) Subject: Getting code readh for handoff X-Git-Tag: v1.1~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1ce24c10cd5a0445121b7f7e17ee67c935de1a5a;p=radosgw-agent.git Getting code readh for handoff Signed-off-by: Joe Buck --- diff --git a/radosgw_agent/cli.py b/radosgw_agent/cli.py index 107fabc..f0a76c4 100644 --- a/radosgw_agent/cli.py +++ b/radosgw_agent/cli.py @@ -49,6 +49,11 @@ def parse_args(): action='store_true', dest='quiet', help='be less verbose', ) + parser.add_argument( + '--data', + action='store_true', dest='data', + help='sync data', + ) parser.add_argument( '--src-access-key', required='src_access_key' not in defaults, @@ -171,17 +176,17 @@ class TestHandler(BaseHTTPRequestHandler): resp = '' if self.path.startswith('/metadata/full'): try: - TestHandler.syncer.sync_full(TestHandler.num_workers, - TestHandler.lock_timeout) + TestHandler.syncer.metadata_sync_full(TestHandler.num_workers, + TestHandler.lock_timeout) except Exception as e: log.exception('error doing full sync') status = 500 resp = str(e) elif self.path.startswith('/metadata/incremental'): try: - TestHandler.syncer.sync_incremental(TestHandler.num_workers, - TestHandler.lock_timeout, - TestHandler.max_entries) + TestHandler.syncer.metadata_sync_incremental(TestHandler.num_workers, + TestHandler.lock_timeout, + TestHandler.max_entries) except Exception as e: log.exception('error doing incremental sync') status = 500 @@ -226,8 +231,15 @@ def main(): args.src_secret_key, args.src_zone) dest = client.Endpoint(args.dest_host, args.dest_port, args.dest_access_key, args.dest_secret_key, args.dest_zone) - # TODO: check src and dest zone names and endpoints match the region map - syncer = sync.Syncer('metadata', src, dest, args.daemon_id) + + if args.data: + # TODO: check src and dest zone names and endpoints match the region map + syncer = sync.Syncer('data', src, dest, args.daemon_id) + log.info('syncing data') + else: + # TODO: check src and dest zone names and endpoints match the region map + syncer = sync.Syncer('metadata', src, dest, args.daemon_id) + log.info('syncing metadata') if args.test_server_host: log.warn('TEST MODE - do not run unless you are testing this program') diff --git a/radosgw_agent/client.py b/radosgw_agent/client.py index 90f22ff..f6d25a2 100644 --- a/radosgw_agent/client.py +++ b/radosgw_agent/client.py @@ -93,7 +93,32 @@ def get_metadata_sections(connection): return request(connection, 'get', '/admin/metadata') def list_metadata_keys(connection, section): - return request(connection, 'get', '/admin/metadata/' + section) + return request(connection, 'get', '/admin/metadata/' + section) + +def list_ops_for_client(connection, client_id, op_id): + return request(connection, 'get', '/admin/opstate/', + params={ + 'client_id':client_id, + 'op_id':op_id, + } + ) + +def get_bucket_list(connection): + return list_metadata_keys(connection, 'bucket') + +def list_objects_in_bucket(connection, bucket_name): + # use the boto library to do this + bucket = connection.get_bucket(bucket_name) + return bucket.list() + +def sync_object_intra_region(connection, bucket_name, object_name, src_zone, client_id, op_id): + return request(connection, 'put', '/{bucket_name}/{object_name}'.format(bucket_name=bucket_name,object_name=object_name), + params={ + 'rgwx-source-zone': src_zone, + 'rgwx-client-id': client_id, + 'rgwx-op-id': op_id, + }, + expect_json=False) def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id): return request(connection, 'post', '/admin/log', @@ -128,6 +153,16 @@ def get_meta_log(connection, shard_num, marker, max_entries): }, ) +def get_data_log(connection, shard_num, marker, max_entries): + return request(connection, 'get', '/admin/log', + params={ + 'type': 'data', + 'id': shard_num, + 'marker': marker, + 'max-entries': max_entries, + }, + ) + def get_log_info(connection, log_type, shard_num): return request( connection, 'get', '/admin/log', @@ -143,7 +178,7 @@ def num_log_shards(connection, shard_type): return out['num_objects'] def set_worker_bound(connection, type_, shard_num, marker, timestamp, - daemon_id): + daemon_id, data=[]): return request( connection, 'post', '/admin/replica_log', params=dict( @@ -153,7 +188,7 @@ def set_worker_bound(connection, type_, shard_num, marker, timestamp, time=timestamp, daemon_id=daemon_id, ), - data='[]', + data=json.dumps(data), special_first_param='work_bound', ) @@ -169,6 +204,16 @@ def del_worker_bound(connection, type_, shard_num, daemon_id): expect_json=False, ) +def get_worker_bound(connection, type_, shard_num): + return request( + connection, 'get', '/admin/replica_log', + params=dict( + type=type_, + id=shard_num, + ), + special_first_param='work_bound', + ) + def get_min_worker_bound(connection, type_, shard_num): out = request( connection, 'get', '/admin/replica_log', diff --git a/radosgw_agent/sync.py b/radosgw_agent/sync.py index 9855241..025c32f 100644 --- a/radosgw_agent/sync.py +++ b/radosgw_agent/sync.py @@ -1,4 +1,5 @@ import datetime +import hashlib import logging import multiprocessing @@ -7,8 +8,11 @@ from radosgw_agent import client log = logging.getLogger(__name__) -class Syncer: +NEEDS_SYNC = 'NEEDSSYNC' +SYNC_IN_PROGRESS = 'INPROGRESS' +SYNC_COMPLETED = 'DONESYNC' +class Syncer: def __init__(self, type_, src, dest, daemon_id): self._type = type_ self.src = src @@ -17,7 +21,10 @@ class Syncer: self.dest_conn = client.connection(dest) self.daemon_id = daemon_id - def sync_incremental(self, num_workers, log_lock_time, max_entries): + def data_sync_incremental(self, num_workers, log_lock_time, max_entries): + pass + + def metadata_sync_incremental(self, num_workers, log_lock_time, max_entries): try: num_shards = client.num_log_shards(self.src_conn, self._type) except: @@ -57,19 +64,122 @@ class Syncer: # pull the results out as they are produced errors = [] + connection_errors = [] for i in xrange(num_shards): + # if all processes error out, stop trying to process data + if len(connection_errors) == len(processes): + log.error('All {num_workers} incremental sync workers have failed.' + ' Ceasing to process shards'.format(num_workers=len(processes))) + break result, shard_num = resultQueue.get() if result == worker.RESULT_SUCCESS: log.debug('synced shard %d', shard_num) else: - log.error('error syncing shard %d', shard_num) + log.error('error on incremental sync of shard %d', shard_num) errors.append(shard_num) + if result == worker.RESULT_CONNECTION_ERROR: + connection_errors.append(shard_num) + log.info('%d/%d shards processed', i + 1, num_shards) if errors: log.error('Encountered errors syncing these %d shards: %s', len(errors), errors) - def sync_full(self, num_workers, log_lock_time): + + def sync_incremental(self, num_workers, log_lock_time, max_entries): + if self._type == 'metadata': + self.metadata_sync_incremental(num_workers, log_lock_time, max_entries) + elif self._type == 'data': + self.data_sync_incremental(num_workers, log_lock_time, max_entries) + else: + raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type)) + + def data_sync_full(self, num_workers, log_lock_time): + # TODO we need some sort of a lock here to make sure that only + # one client is getting a list of buckets to sync so that it's + # consistent. + + num_data_shards = client.num_log_shards(self.src_conn, 'data') + log.debug('There are {ns} data log shards'.format(ns=num_data_shards)) + + # get the set of all buckets and then add an entry to the data replica + # log for each + buckets = client.get_bucket_list(self.src_conn) + now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ") + shard_to_bucket_dict = dict() + for bucket_name in buckets: + bucket_hash = int(hashlib.md5(bucket_name).hexdigest(),16) % num_data_shards + if bucket_hash not in shard_to_bucket_dict: + shard_to_bucket_dict[bucket_hash] = list() + # TODO: the docs for set_worker_bound suggest that the data payload should be a list of dicts, with each dict + # haveing a 'bucket' and 'time' entry. I'm trying to stash the state of the bucket (NEEDSSYNC) and the + # bucket name as the value. May need a different approach / delimiter + shard_to_bucket_dict[bucket_hash].append({ + 'bucket': '{needssync}:{bucket_name}'.format(needssync=NEEDS_SYNC,bucket_name=bucket_name), + 'time':now + }) + for shard_num, entries in shard_to_bucket_dict.items(): + # this call returns a list of buckets that are "in progress" from before the time in the "now" variable + # TODO: sort out if this is the proper usage of set_worker_bound + replica_log_output = client.set_worker_bound(self.dest_conn, 'data', shard_num, + 'buckets_in_shard_{n}'.format(n=shard_num), + now, self.daemon_id,in_data=entries + ) + log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output)) + + # create the work and results Queue + workQueue = multiprocessing.Queue() + resultQueue = multiprocessing.Queue() + + # create the worker processes + if self._type == 'data': + worker_cls = worker.DataWorkerFull + else: + worker_cls = worker.MetadataWorkerFull + + processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src, + self.dest, daemon_id=self.daemon_id) for i in xrange(num_workers)] + for process in processes: + process.daemon = True + process.start() + + start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ") + log.info('Starting full data sync at %s', start_time) + + # enqueue the shards to be synced. + # the shards simply tell the worker tasks which data replica log shard to check + for shard_num in xrange(num_data_shards): + workQueue.put(shard_num) + + # add a poison pill for each worker + for i in xrange(num_workers): + workQueue.put(None) + + # pull the results out as they are produced + errors = [] + connection_errors = [] + for i in xrange(num_data_shards): + if len(connection_errors) == len(processes): + log.error('All {num_workers} full sync workers have failed.' + ' Ceasing to process shards'.format(num_workers=len(processes))) + break + log.info('%d/%d shards synced, ', i, num_data_shards) + result, shard_num = resultQueue.get() + if result != worker.RESULT_SUCCESS: + log.error('error syncing shard %d', shard_num) + errors.append((shard_num)) + else: + log.debug('synced shard %s', shard_num) + if result == worker.RESULT_CONNECTION_ERROR: + connection_errors.append(shard_num) + + for process in processes: + process.join() + if errors: + log.error('Encountered errors syncing these %d entries: %s', + len(errors), errors) + + def metadata_sync_full(self, num_workers, log_lock_time): try: sections = client.get_metadata_sections(self.src_conn) except client.HttpError as e: @@ -127,14 +237,23 @@ class Syncer: # pull the results out as they are produced errors = [] + connection_errors = [] for i in xrange(len(meta_keys)): + # if all processes error out, stop trying to process data + if len(connection_errors) == len(processes): + log.error('All {num_workers} full sync workers have failed.' + ' Ceasing to process shards'.format(num_workers=len(processes))) + break + log.info('%d/%d items synced', i, len(meta_keys)) result, section, name = resultQueue.get() if result != worker.RESULT_SUCCESS: - log.error('error syncing %s %r', section, name) + log.error('error on full sync of %s %r', section, name) errors.append((section, name)) else: log.debug('synced %s %r', section, name) + if result == worker.RESULT_CONNECTION_ERROR: + connection_errors.append(shard_num) for process in processes: process.join() if errors: @@ -146,3 +265,12 @@ class Syncer: marker, timestamp, self.daemon_id) client.del_worker_bound(self.dest_conn, 'metadata', shard_num, self.daemon_id) + + def sync_full(self, num_workers, log_lock_time): + if self._type == 'metadata': + self.metadata_sync_full(num_workers, log_lock_time) + elif self._type == 'data': + self.data_sync_full(num_workers, log_lock_time) + else: + raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type)) + diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py index 7913e4b..e6410f0 100644 --- a/radosgw_agent/worker.py +++ b/radosgw_agent/worker.py @@ -1,6 +1,8 @@ from collections import namedtuple +import hashlib import logging import multiprocessing +import requests import os import socket @@ -11,6 +13,8 @@ log = logging.getLogger(__name__) RESULT_SUCCESS = 0 RESULT_ERROR = 1 +RESULT_CONNECTION_ERROR = 2 +MAX_CONCURRENT_OPS = 5 class Worker(multiprocessing.Process): """sync worker to run in its own process""" @@ -49,6 +53,166 @@ def _meta_entry_from_json(entry): entry['timestamp'], ) +class DataWorker(Worker): + + def __init__(self, *args, **kwargs): + super(DataWorker, self).__init__(*args, **kwargs) + self.type = 'data' + + def get_new_op_id(self): + self.op_id = self.op_id + 1 + return self.op_id + + def sync_object(self, connection, bucket, key, src_zone, client_id): + op_id = self.get_new_op_id() + client.sync_object_intra_region(connection, bucket, key, src_zone, client_id, op_id) + return op_id + + # TODO + # use the op_id to keep track of on-going copies + def sync_data(self, bucket_name): + log.debug('syncing bucket {bucket}'.format(bucket=bucket_name)) + + objects = client.list_objects_in_bucket(self.source_conn, bucket_name) + counter = 0 + + # sync each object in the list. + # We only want to have X in flight at once, so use the op_id to track when + # individual operations finish. + inflight_ops = [] + for key in objects: + counter = counter + 1 + # sync each object + log.debug('syncing object {bucket}:{key}'.format(bucket=bucket_name,key=key)) + op_id = self.sync_object(self.dest_conn, key.bucket.name, key.name, self.source_zone, self.daemon_id) + inflight_ops.append(op_id) + # Do not progress to the next key until there are less than the maximum + # number of syncs in-flight + #while(len(inflight_ops) == MAX_CONCURRENT_OPS): + # check each of the inflight ops. If they've been completed, remove them from the in-flight list + #for op_id in inflight_ops: + + # Once all the copy commands are issued, track op_ids until they're all done + for op_id in inflight_ops: + op_ids = client.list_ops_for_client(self.dest_conn, self.daemon_id, op_id) + log.debug('jbuck, op_ids: {op_ids}'.format(op_ids=op_ids)) + + log.debug('bucket {bucket} has {num_objects} object'.format( + bucket=bucket_name,num_objects=counter)) + +class DataWorkerIncremental(DataWorker): + + def __init__(self, *args, **kwargs): + self.daemon_id = kwargs['daemon_id'] + self.max_entries = kwargs['max_entries'] + self.op_id = 0 + super(DataWorkerIncremental, self).__init__(*args, **kwargs) + + def get_and_process_entries(self, marker, shard_num): + pass + + def _get_and_process_entries(self, marker, shard_num): + pass + + def run(self): + pass + +class DataWorkerFull(DataWorker): + + def __init__(self, *args, **kwargs): + self.daemon_id = kwargs['daemon_id'] + self.op_id = 0 + super(DataWorkerFull, self).__init__(*args, **kwargs) + + def run(self): + self.prepare_lock() + num_data_shards = client.num_log_shards(self.source_conn, 'data') + + while True: + shard_num = self.work_queue.get() + if shard_num is None: + log.info('No more entries in queue, exiting') + break + + log.info('%s is processing shard %d', self.ident, shard_num) + + # lock the log + try: + self.lock.set_shard(shard_num) + self.lock.acquire() + except client.NotFound: + self.lock.unset_shard() + self.result_queue.put((RESULT_SUCCESS, shard_num)) + continue + except client.HttpError as e: + log.info('error locking shard %d log, assuming' + ' it was processed by someone else and skipping: %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_ERROR, shard_num)) + continue + + # set a marker in the replica log + worker_bound_info = None + buckets_to_sync = [] + try: + worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num) + log.debug('data full sync shard {i} data log is {data}'.format(i=shard_num,data=worker_bound_info)) + # determine whether there's a marker string with NEEDSSYNC and with + # our daemon_id. If so, sync it the bucket(s) that match + except Exception as e: + log.exception('could not set worker bound for shard_num %d: %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_ERROR, shard_num)) + continue + + # set the default result + result = RESULT_SUCCESS + + + log_bucket_name = "" +# try: +# for bucket_name in buckets_to_sync: +# log.info('bucket %s is processed as part of shard %d', bucket_name, shard_num) +# self.sync_data(bucket_name) +# +# result = RESULT_SUCCESS +# except Exception as e: +# log.exception('could not sync shard_num %d failed on bucket %s: %s', +# shard_num, log_bucket_name, e) +# self.lock.unset_shard() +# self.result_queue.put((RESULT_ERROR, shard_num)) +# continue +# +# # TODO this may need to do a set the omits the synced buckets and then delete the entries for the +# # synced buckets? The replica log usage is still pretty ill-defined +# # remove the data replica log entry. If we have gotten to this part, all +# # the pertinent info should be valid. +# try: +# client.del_worker_bound(self.dest_conn, 'data', shard_num, +# self.daemon_id) +# except Exception as e: +# log.exception('could not delete worker bound for shard_num %d: %s', +# shard_num, e) +# self.lock.unset_shard() +# self.result_queue.put((RESULT_ERROR, shard_num)) +# continue + + # TODO + # update the bucket index log (trim it) + + # finally, unlock the log + try: + self.lock.release_and_clear() + except lock.LockBroken as e: + log.warn('work may be duplicated: %s', e) + except: + log.exception('error unlocking data log, continuing anyway ' + 'since lock will timeout') + + self.result_queue.put((result, shard_num)) + class MetadataWorker(Worker): def __init__(self, *args, **kwargs): @@ -155,12 +319,19 @@ class MetadataWorkerIncremental(MetadataWorker): self.result_queue.put((RESULT_SUCCESS, shard_num)) continue except client.HttpError as e: - log.info('error locking shard %d log, assuming' + log.exception('error locking shard %d log, assuming' ' it was processed by someone else and skipping: %s', shard_num, e) self.lock.unset_shard() self.result_queue.put((RESULT_ERROR, shard_num)) continue + except requests.exceptions.ConnectionError as e: + log.exception('ConnectionError encountered. Bailing out of' + ' processing loop for shard %d. %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num)) + break result = RESULT_SUCCESS try: @@ -172,6 +343,13 @@ class MetadataWorkerIncremental(MetadataWorker): except client.NotFound: # if no worker bounds have been set, start from the beginning marker, time = '', '1970-01-01 00:00:00' + except requests.exceptions.ConnectionError as e: + log.exception('ConnectionError encountered. Bailing out of' + ' processing loop for shard %d. %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num)) + break except Exception as e: log.exception('error getting worker bound for shard %d', shard_num) @@ -180,6 +358,13 @@ class MetadataWorkerIncremental(MetadataWorker): try: if result == RESULT_SUCCESS: self.get_and_process_entries(marker, shard_num) + except requests.exceptions.ConnectionError as e: + log.exception('ConnectionError encountered. Bailing out of' + ' processing loop for shard %d. %s', + shard_num, e) + self.lock.unset_shard() + self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num)) + break except: log.exception('syncing entries from %s for shard %d failed', marker, shard_num)