special_first_param='unlock',
expect_json=False)
-def get_meta_log(connection, shard_num, marker, max_entries):
+def get_log(connection, log_type, shard_num, marker, max_entries):
return request(connection, 'get', '/admin/log',
params={
- 'type': 'metadata',
- 'id': shard_num,
- 'marker': marker,
- 'max-entries': max_entries,
- },
- )
-
-def get_data_log(connection, shard_num, marker, max_entries):
- return request(connection, 'get', '/admin/log',
- params={
- 'type': 'data',
+ 'type': log_type,
'id': shard_num,
'marker': marker,
'max-entries': max_entries,
special_first_param='info',
)
+
+def get_bucket_index_marker(connection, bucket_instance):
+ out = request(
+ connection, 'get', '/admin/log',
+ params={
+ 'type': 'bucket-index',
+ 'bucket-instance': bucket_instance,
+ },
+ special_first_param='info',
+ )
+ return out['max_marker']
+
def num_log_shards(connection, shard_type):
out = request(connection, 'get', '/admin/log', dict(type=shard_type))
return out['num_objects']
-def set_worker_bound(connection, type_, shard_num, marker, timestamp,
- daemon_id, data=[]):
+def set_worker_bound(connection, type_, marker, timestamp,
+ daemon_id, data=None, shard_num=None,
+ bucket_instance=None):
+ if data is None:
+ data = []
+
+ if type_ == 'bucket-index':
+ key = 'bucket-index'
+ value = bucket_instance
+ else:
+ key = 'id'
+ value = shard_num
+
return request(
connection, 'post', '/admin/replica_log',
- params=dict(
- type=type_,
- id=shard_num,
- marker=marker,
- time=timestamp,
- daemon_id=daemon_id,
- ),
+ params={
+ 'type': type_,
+ key: value,
+ 'marker': marker,
+ 'time': timestamp,
+ 'daemon_id': daemon_id,
+ },
data=json.dumps(data),
special_first_param='work_bound',
)
import datetime
-import hashlib
import logging
import multiprocessing
super(DataSyncerFull, self).__init__(*args, **kwargs)
self.worker_cls = worker.DataWorkerFull
+ def shard_num_for_bucket(self, bucket_name, num_shards):
+ bucket_name = bucket_name.encode('utf8')
+ hash_val = 0
+ for char in bucket_name:
+ c = ord(char)
+ hash_val = (hash_val + (c << 4) + (c >> 4)) * 11;
+ return hash_val % num_shards;
+
+ def get_bucket_instance(self, bucket_name):
+ metadata = client.get_metadata(self.src_conn, 'bucket', bucket_name)
+ return bucket_name + ':' + metadata['data']['bucket']['bucket_id']
+
def prepare(self):
# TODO we need some sort of a lock here to make sure that only
# one client is getting a list of buckets to sync so that it's
# consistent.
- num_data_shards = self.get_num_shards()
+ self.num_shards = self.get_num_shards()
+
+ # get list of buckets before getting any markers to avoid inconsistency
+ buckets = client.get_bucket_list(self.src_conn)
+
+ # save data log markers for each shard
+ self.shard_info = []
+ for shard in xrange(self.num_shards):
+ info = client.get_log_info(self.src_conn, 'data', shard)
+ # setting an empty marker returns an error
+ if info['marker']:
+ self.shard_info.append((shard, info['marker'],
+ info['last_update']))
- # get the set of all buckets and then add an entry to the data replica
- # log for each
- buckets = client.get_bucket_list(self.src_conn)
+ # bucket index info doesn't include a timestamp, so just use
+ # local time since it isn't important for correctness
now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
- shard_to_bucket_dict = dict()
+ self.bucket_info = []
for bucket_name in buckets:
- bucket_hash = int(hashlib.md5(bucket_name).hexdigest(),16) % num_data_shards
- if bucket_hash not in shard_to_bucket_dict:
- shard_to_bucket_dict[bucket_hash] = list()
- # TODO: the docs for set_worker_bound suggest that the data payload should be a list of dicts, with each dict
- # haveing a 'bucket' and 'time' entry. I'm trying to stash the state of the bucket (NEEDSSYNC) and the
- # bucket name as the value. May need a different approach / delimiter
- shard_to_bucket_dict[bucket_hash].append({
- 'bucket': '{needssync}:{bucket_name}'.format(needssync=NEEDS_SYNC,bucket_name=bucket_name),
- 'time':now
- })
- for shard_num, entries in shard_to_bucket_dict.items():
- # this call returns a list of buckets that are "in progress" from before the time in the "now" variable
- # TODO: sort out if this is the proper usage of set_worker_bound
- replica_log_output = client.set_worker_bound(self.dest_conn, 'data', shard_num,
- 'buckets_in_shard_{n}'.format(n=shard_num),
- now, self.daemon_id,in_data=entries
- )
- log.debug('jbuck, set replica log output:\n{data}'.format(data=replica_log_output))
+ instance = self.get_bucket_instance(bucket_name)
+ marker = client.get_bucket_index_marker('bucket-instance', instance)
+ self.bucket_info.append((instance, marker, now))
def generate_work(self):
- return xrange(self.get_num_shards())
+ return self.bucket_info
def complete(self):
- # TODO: set replica log
- pass
+ for shard_num, marker, time in self.shard_info:
+ out = client.set_worker_bound(self.dest_conn, 'data', marker,
+ time, self.daemon_id,
+ shard_num=shard_num)
+ log.debug('jbuck, set replica log output:\n%s', out)
+
+ for bucket_instance, marker, time in self.bucket_info:
+ out = client.set_worker_bound(self.dest_conn, 'bucket-index', marker,
+ time, self.daemon_id,
+ bucket_instance=bucket_instance)
+ log.debug('set_worker_bound on bucket index replica log returned:\n%s', out)
class MetaSyncerFull(Syncer):
worker_bound_info = None
buckets_to_sync = []
try:
- worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num)
+ worker_bound_info = client.get_worker_bound(self.dest_conn, 'data', shard_num=shard_num)
log.debug('data full sync shard {i} data log is {data}'.format(i=shard_num,data=worker_bound_info))
# determine whether there's a marker string with NEEDSSYNC and with
# our daemon_id. If so, sync it the bucket(s) that match
sync up to self.max_entries entries, returning number of entries
processed and the last marker of the entries processed.
"""
- log_entries = client.get_meta_log(self.source_conn, shard_num,
- marker, self.max_entries)
+ log_entries = client.get_log(self.source_conn, 'metadata', shard_num,
+ marker, self.max_entries)
log.info('shard %d has %d entries after %r', shard_num, len(log_entries),
marker)
if entries and not error_encountered:
try:
client.set_worker_bound(self.dest_conn, 'metadata',
- shard_num, entries[-1].marker,
+ entries[-1].marker,
entries[-1].timestamp,
- self.daemon_id)
+ self.daemon_id,
+ shard_num=shard_num)
return len(entries), entries[-1].marker
except:
log.exception('error setting worker bound for shard {shard_num},'
try:
marker, time = client.get_min_worker_bound(self.dest_conn,
'metadata',
- shard_num)
+ shard_num=shard_num)
log.debug('oldest marker and time for shard %d are: %r %r',
shard_num, marker, time)
except client.NotFound: