action='store_true', dest='quiet',
help='be less verbose',
)
+ parser.add_argument(
+ '--data',
+ action='store_true', dest='data',
+ help='sync data',
+ )
parser.add_argument(
'--src-access-key',
required='src_access_key' not in defaults,
resp = ''
if self.path.startswith('/metadata/full'):
try:
- TestHandler.syncer.sync_full(TestHandler.num_workers,
- TestHandler.lock_timeout)
+ TestHandler.syncer.metadata_sync_full(TestHandler.num_workers,
+ TestHandler.lock_timeout)
except Exception as e:
log.exception('error doing full sync')
status = 500
resp = str(e)
elif self.path.startswith('/metadata/incremental'):
try:
- TestHandler.syncer.sync_incremental(TestHandler.num_workers,
- TestHandler.lock_timeout,
- TestHandler.max_entries)
+ TestHandler.syncer.metadata_sync_incremental(TestHandler.num_workers,
+ TestHandler.lock_timeout,
+ TestHandler.max_entries)
except Exception as e:
log.exception('error doing incremental sync')
status = 500
args.src_secret_key, args.src_zone)
dest = client.Endpoint(args.dest_host, args.dest_port, args.dest_access_key,
args.dest_secret_key, args.dest_zone)
- # TODO: check src and dest zone names and endpoints match the region map
- syncer = sync.Syncer('metadata', src, dest, args.daemon_id)
+
+ if args.data:
+ # TODO: check src and dest zone names and endpoints match the region map
+ syncer = sync.Syncer('data', src, dest, args.daemon_id)
+ log.info('syncing data')
+ else:
+ # TODO: check src and dest zone names and endpoints match the region map
+ syncer = sync.Syncer('metadata', src, dest, args.daemon_id)
+ log.info('syncing metadata')
if args.test_server_host:
log.warn('TEST MODE - do not run unless you are testing this program')
return request(connection, 'get', '/admin/metadata')
def list_metadata_keys(connection, section):
- return request(connection, 'get', '/admin/metadata/' + section)
+ return request(connection, 'get', '/admin/metadata/' + section)
+
+def list_ops_for_client(connection, client_id, op_id):
+ return request(connection, 'get', '/admin/opstate/',
+ params={
+ 'client_id':client_id,
+ 'op_id':op_id,
+ }
+ )
+
+def get_bucket_list(connection):
+ return list_metadata_keys(connection, 'bucket')
+
+def list_objects_in_bucket(connection, bucket_name):
+ # use the boto library to do this
+ bucket = connection.get_bucket(bucket_name)
+ return bucket.list()
+
+def sync_object_intra_region(connection, bucket_name, object_name, src_zone, client_id, op_id):
+ return request(connection, 'put', '/{bucket_name}/{object_name}'.format(bucket_name=bucket_name,object_name=object_name),
+ params={
+ 'rgwx-source-zone': src_zone,
+ 'rgwx-client-id': client_id,
+ 'rgwx-op-id': op_id,
+ },
+ expect_json=False)
def lock_shard(connection, lock_type, shard_num, zone_id, timeout, locker_id):
return request(connection, 'post', '/admin/log',
},
)
+def get_data_log(connection, shard_num, marker, max_entries):
+ return request(connection, 'get', '/admin/log',
+ params={
+ 'type': 'data',
+ 'id': shard_num,
+ 'marker': marker,
+ 'max-entries': max_entries,
+ },
+ )
+
def get_log_info(connection, log_type, shard_num):
return request(
connection, 'get', '/admin/log',
return out['num_objects']
def set_worker_bound(connection, type_, shard_num, marker, timestamp,
- daemon_id):
+ daemon_id, data=[]):
return request(
connection, 'post', '/admin/replica_log',
params=dict(
time=timestamp,
daemon_id=daemon_id,
),
- data='[]',
+ data=json.dumps(data),
special_first_param='work_bound',
)
expect_json=False,
)
+def get_worker_bound(connection, type_, shard_num):
+ return request(
+ connection, 'get', '/admin/replica_log',
+ params=dict(
+ type=type_,
+ id=shard_num,
+ ),
+ special_first_param='work_bound',
+ )
+
def get_min_worker_bound(connection, type_, shard_num):
out = request(
connection, 'get', '/admin/replica_log',
import datetime
+import hashlib
import logging
import multiprocessing
log = logging.getLogger(__name__)
-class Syncer:
+NEEDS_SYNC = 'NEEDSSYNC'
+SYNC_IN_PROGRESS = 'INPROGRESS'
+SYNC_COMPLETED = 'DONESYNC'
+class Syncer:
def __init__(self, type_, src, dest, daemon_id):
self._type = type_
self.src = src
self.dest_conn = client.connection(dest)
self.daemon_id = daemon_id
- def sync_incremental(self, num_workers, log_lock_time, max_entries):
+ def data_sync_incremental(self, num_workers, log_lock_time, max_entries):
+ pass
+
+ def metadata_sync_incremental(self, num_workers, log_lock_time, max_entries):
try:
num_shards = client.num_log_shards(self.src_conn, self._type)
except:
# pull the results out as they are produced
errors = []
+ connection_errors = []
for i in xrange(num_shards):
+ # if all processes error out, stop trying to process data
+ if len(connection_errors) == len(processes):
+ log.error('All {num_workers} incremental sync workers have failed.'
+ ' Ceasing to process shards'.format(num_workers=len(processes)))
+ break
result, shard_num = resultQueue.get()
if result == worker.RESULT_SUCCESS:
log.debug('synced shard %d', shard_num)
else:
- log.error('error syncing shard %d', shard_num)
+ log.error('error on incremental sync of shard %d', shard_num)
errors.append(shard_num)
+ if result == worker.RESULT_CONNECTION_ERROR:
+ connection_errors.append(shard_num)
+
log.info('%d/%d shards processed', i + 1, num_shards)
if errors:
log.error('Encountered errors syncing these %d shards: %s',
len(errors), errors)
- def sync_full(self, num_workers, log_lock_time):
+
+ def sync_incremental(self, num_workers, log_lock_time, max_entries):
+ if self._type == 'metadata':
+ self.metadata_sync_incremental(num_workers, log_lock_time, max_entries)
+ elif self._type == 'data':
+ self.data_sync_incremental(num_workers, log_lock_time, max_entries)
+ else:
+ raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
+
+ def data_sync_full(self, num_workers, log_lock_time):
+ # TODO we need some sort of a lock here to make sure that only
+ # one client is getting a list of buckets to sync so that it's
+ # consistent.
+
+ num_data_shards = client.num_log_shards(self.src_conn, 'data')
+ log.debug('There are {ns} data log shards'.format(ns=num_data_shards))
+
+ # get the set of all buckets and then add an entry to the data replica
+ # log for each
+ buckets = client.get_bucket_list(self.src_conn)
+ now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
+ shard_to_bucket_dict = dict()
+ for bucket_name in buckets:
+ bucket_hash = int(hashlib.md5(bucket_name).hexdigest(),16) % num_data_shards
+ if bucket_hash not in shard_to_bucket_dict:
+ shard_to_bucket_dict[bucket_hash] = list()
+ # TODO: the docs for set_worker_bound suggest that the data payload should be a list of dicts, with each dict
+ # haveing a 'bucket' and 'time' entry. I'm trying to stash the state of the bucket (NEEDSSYNC) and the
+ # bucket name as the value. May need a different approach / delimiter
+ shard_to_bucket_dict[bucket_hash].append({
+ 'bucket': '{needssync}:{bucket_name}'.format(needssync=NEEDS_SYNC,bucket_name=bucket_name),
+ 'time':now
+ })
+ for shard_num, entries in shard_to_bucket_dict.items():
+ # this call returns a list of buckets that are "in progress" from before the time in the "now" variable
+ # TODO: sort out if this is the proper usage of set_worker_bound
+ replica_log_output = client.set_worker_bound(self.dest_conn, 'data', shard_num,
+ 'buckets_in_shard_{n}'.format(n=shard_num),
+ now, self.daemon_id,in_data=entries
+ )
+ log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output))
+
+ # create the work and results Queue
+ workQueue = multiprocessing.Queue()
+ resultQueue = multiprocessing.Queue()
+
+ # create the worker processes
+ if self._type == 'data':
+ worker_cls = worker.DataWorkerFull
+ else:
+ worker_cls = worker.MetadataWorkerFull
+
+ processes = [worker_cls(workQueue, resultQueue, log_lock_time, self.src,
+ self.dest, daemon_id=self.daemon_id) for i in xrange(num_workers)]
+ for process in processes:
+ process.daemon = True
+ process.start()
+
+ start_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
+ log.info('Starting full data sync at %s', start_time)
+
+ # enqueue the shards to be synced.
+ # the shards simply tell the worker tasks which data replica log shard to check
+ for shard_num in xrange(num_data_shards):
+ workQueue.put(shard_num)
+
+ # add a poison pill for each worker
+ for i in xrange(num_workers):
+ workQueue.put(None)
+
+ # pull the results out as they are produced
+ errors = []
+ connection_errors = []
+ for i in xrange(num_data_shards):
+ if len(connection_errors) == len(processes):
+ log.error('All {num_workers} full sync workers have failed.'
+ ' Ceasing to process shards'.format(num_workers=len(processes)))
+ break
+ log.info('%d/%d shards synced, ', i, num_data_shards)
+ result, shard_num = resultQueue.get()
+ if result != worker.RESULT_SUCCESS:
+ log.error('error syncing shard %d', shard_num)
+ errors.append((shard_num))
+ else:
+ log.debug('synced shard %s', shard_num)
+ if result == worker.RESULT_CONNECTION_ERROR:
+ connection_errors.append(shard_num)
+
+ for process in processes:
+ process.join()
+ if errors:
+ log.error('Encountered errors syncing these %d entries: %s',
+ len(errors), errors)
+
+ def metadata_sync_full(self, num_workers, log_lock_time):
try:
sections = client.get_metadata_sections(self.src_conn)
except client.HttpError as e:
# pull the results out as they are produced
errors = []
+ connection_errors = []
for i in xrange(len(meta_keys)):
+ # if all processes error out, stop trying to process data
+ if len(connection_errors) == len(processes):
+ log.error('All {num_workers} full sync workers have failed.'
+ ' Ceasing to process shards'.format(num_workers=len(processes)))
+ break
+
log.info('%d/%d items synced', i, len(meta_keys))
result, section, name = resultQueue.get()
if result != worker.RESULT_SUCCESS:
- log.error('error syncing %s %r', section, name)
+ log.error('error on full sync of %s %r', section, name)
errors.append((section, name))
else:
log.debug('synced %s %r', section, name)
+ if result == worker.RESULT_CONNECTION_ERROR:
+ connection_errors.append(shard_num)
for process in processes:
process.join()
if errors:
marker, timestamp, self.daemon_id)
client.del_worker_bound(self.dest_conn, 'metadata', shard_num,
self.daemon_id)
+
+ def sync_full(self, num_workers, log_lock_time):
+ if self._type == 'metadata':
+ self.metadata_sync_full(num_workers, log_lock_time)
+ elif self._type == 'data':
+ self.data_sync_full(num_workers, log_lock_time)
+ else:
+ raise Exception('Unknown _type in sync.py: {_type}'.format(_type=self._type))
+
from collections import namedtuple
+import hashlib
import logging
import multiprocessing
+import requests
import os
import socket
RESULT_SUCCESS = 0
RESULT_ERROR = 1
+RESULT_CONNECTION_ERROR = 2
+MAX_CONCURRENT_OPS = 5
class Worker(multiprocessing.Process):
"""sync worker to run in its own process"""
entry['timestamp'],
)
+class DataWorker(Worker):
+
+ def __init__(self, *args, **kwargs):
+ super(DataWorker, self).__init__(*args, **kwargs)
+ self.type = 'data'
+
+ def get_new_op_id(self):
+ self.op_id = self.op_id + 1
+ return self.op_id
+
+ def sync_object(self, connection, bucket, key, src_zone, client_id):
+ op_id = self.get_new_op_id()
+ client.sync_object_intra_region(connection, bucket, key, src_zone, client_id, op_id)
+ return op_id
+
+ # TODO
+ # use the op_id to keep track of on-going copies
+ def sync_data(self, bucket_name):
+ log.debug('syncing bucket {bucket}'.format(bucket=bucket_name))
+
+ objects = client.list_objects_in_bucket(self.source_conn, bucket_name)
+ counter = 0
+
+ # sync each object in the list.
+ # We only want to have X in flight at once, so use the op_id to track when
+ # individual operations finish.
+ inflight_ops = []
+ for key in objects:
+ counter = counter + 1
+ # sync each object
+ log.debug('syncing object {bucket}:{key}'.format(bucket=bucket_name,key=key))
+ op_id = self.sync_object(self.dest_conn, key.bucket.name, key.name, self.source_zone, self.daemon_id)
+ inflight_ops.append(op_id)
+ # Do not progress to the next key until there are less than the maximum
+ # number of syncs in-flight
+ #while(len(inflight_ops) == MAX_CONCURRENT_OPS):
+ # check each of the inflight ops. If they've been completed, remove them from the in-flight list
+ #for op_id in inflight_ops:
+
+ # Once all the copy commands are issued, track op_ids until they're all done
+ for op_id in inflight_ops:
+ op_ids = client.list_ops_for_client(self.dest_conn, self.daemon_id, op_id)
+ log.debug('jbuck, op_ids: {op_ids}'.format(op_ids=op_ids))
+
+ log.debug('bucket {bucket} has {num_objects} object'.format(
+ bucket=bucket_name,num_objects=counter))
+
+class DataWorkerIncremental(DataWorker):
+
+ def __init__(self, *args, **kwargs):
+ self.daemon_id = kwargs['daemon_id']
+ self.max_entries = kwargs['max_entries']
+ self.op_id = 0
+ super(DataWorkerIncremental, self).__init__(*args, **kwargs)
+
+ def get_and_process_entries(self, marker, shard_num):
+ pass
+
+ def _get_and_process_entries(self, marker, shard_num):
+ pass
+
+ def run(self):
+ pass
+
+class DataWorkerFull(DataWorker):
+
+ def __init__(self, *args, **kwargs):
+ self.daemon_id = kwargs['daemon_id']
+ self.op_id = 0
+ super(DataWorkerFull, self).__init__(*args, **kwargs)
+
+ def run(self):
+ self.prepare_lock()
+ num_data_shards = client.num_log_shards(self.source_conn, 'data')
+
+ while True:
+ shard_num = self.work_queue.get()
+ if shard_num is None:
+ log.info('No more entries in queue, exiting')
+ break
+
+ log.info('%s is processing shard %d', self.ident, shard_num)
+
+ # lock the log
+ try:
+ self.lock.set_shard(shard_num)
+ self.lock.acquire()
+ except client.NotFound:
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_SUCCESS, shard_num))
+ continue
+ except client.HttpError as e:
+ log.info('error locking shard %d log, assuming'
+ ' it was processed by someone else and skipping: %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_ERROR, shard_num))
+ continue
+
+ # set a marker in the replica log
+ worker_bound_info = None
+ buckets_to_sync = []
+ try:
+ worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num)
+ log.debug('data full sync shard {i} data log is {data}'.format(i=shard_num,data=worker_bound_info))
+ # determine whether there's a marker string with NEEDSSYNC and with
+ # our daemon_id. If so, sync it the bucket(s) that match
+ except Exception as e:
+ log.exception('could not set worker bound for shard_num %d: %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_ERROR, shard_num))
+ continue
+
+ # set the default result
+ result = RESULT_SUCCESS
+
+
+ log_bucket_name = ""
+# try:
+# for bucket_name in buckets_to_sync:
+# log.info('bucket %s is processed as part of shard %d', bucket_name, shard_num)
+# self.sync_data(bucket_name)
+#
+# result = RESULT_SUCCESS
+# except Exception as e:
+# log.exception('could not sync shard_num %d failed on bucket %s: %s',
+# shard_num, log_bucket_name, e)
+# self.lock.unset_shard()
+# self.result_queue.put((RESULT_ERROR, shard_num))
+# continue
+#
+# # TODO this may need to do a set the omits the synced buckets and then delete the entries for the
+# # synced buckets? The replica log usage is still pretty ill-defined
+# # remove the data replica log entry. If we have gotten to this part, all
+# # the pertinent info should be valid.
+# try:
+# client.del_worker_bound(self.dest_conn, 'data', shard_num,
+# self.daemon_id)
+# except Exception as e:
+# log.exception('could not delete worker bound for shard_num %d: %s',
+# shard_num, e)
+# self.lock.unset_shard()
+# self.result_queue.put((RESULT_ERROR, shard_num))
+# continue
+
+ # TODO
+ # update the bucket index log (trim it)
+
+ # finally, unlock the log
+ try:
+ self.lock.release_and_clear()
+ except lock.LockBroken as e:
+ log.warn('work may be duplicated: %s', e)
+ except:
+ log.exception('error unlocking data log, continuing anyway '
+ 'since lock will timeout')
+
+ self.result_queue.put((result, shard_num))
+
class MetadataWorker(Worker):
def __init__(self, *args, **kwargs):
self.result_queue.put((RESULT_SUCCESS, shard_num))
continue
except client.HttpError as e:
- log.info('error locking shard %d log, assuming'
+ log.exception('error locking shard %d log, assuming'
' it was processed by someone else and skipping: %s',
shard_num, e)
self.lock.unset_shard()
self.result_queue.put((RESULT_ERROR, shard_num))
continue
+ except requests.exceptions.ConnectionError as e:
+ log.exception('ConnectionError encountered. Bailing out of'
+ ' processing loop for shard %d. %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+ break
result = RESULT_SUCCESS
try:
except client.NotFound:
# if no worker bounds have been set, start from the beginning
marker, time = '', '1970-01-01 00:00:00'
+ except requests.exceptions.ConnectionError as e:
+ log.exception('ConnectionError encountered. Bailing out of'
+ ' processing loop for shard %d. %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+ break
except Exception as e:
log.exception('error getting worker bound for shard %d',
shard_num)
try:
if result == RESULT_SUCCESS:
self.get_and_process_entries(marker, shard_num)
+ except requests.exceptions.ConnectionError as e:
+ log.exception('ConnectionError encountered. Bailing out of'
+ ' processing loop for shard %d. %s',
+ shard_num, e)
+ self.lock.unset_shard()
+ self.result_queue.put((RESULT_CONNECTION_ERROR, shard_num))
+ break
except:
log.exception('syncing entries from %s for shard %d failed',
marker, shard_num)