import dateutil.parser
from itertools import combinations
+from itertools import zip_longest
from io import StringIO
import boto
for a, b in zip(z1, z2):
eq(a.name, b.name)
eq(a.creation_date, b.creation_date)
+
+def get_bucket_shard_objects(zone, num_shards):
+ """
+ Get one object for each shard of the bucket index log
+ """
+ cmd = ['bucket', 'shard', 'objects'] + zone.zone_args()
+ cmd += ['--num-shards', str(num_shards)]
+ shardobjs_json, ret = zone.cluster.admin(cmd, read_only=True)
+ assert ret == 0
+ shardobjs = json.loads(shardobjs_json)
+ return shardobjs['objs']
+
+def write_most_shards(zone, bucket_name, num_shards):
+ """
+ Write one object to most (but not all) bucket index shards.
+ """
+ objs = get_bucket_shard_objects(zone.zone, num_shards)
+ random.shuffle(objs)
+ del objs[-(len(objs)//10):]
+ for obj in objs:
+ k = new_key(zone, bucket_name, obj)
+ k.set_contents_from_string('foo')
+
+def reshard_bucket(zone, bucket_name, num_shards):
+ """
+ Reshard a bucket
+ """
+ cmd = ['bucket', 'reshard'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--num-shards', str(num_shards)]
+ cmd += ['--yes-i-really-mean-it']
+ zone.cluster.admin(cmd)
+
+def get_obj_names(zone, bucket_name, maxobjs):
+ """
+ Get names of objects in a bucket.
+ """
+ cmd = ['bucket', 'list'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--max-entries', str(maxobjs)]
+ objs_json, _ = zone.cluster.admin(cmd, read_only=True)
+ objs = json.loads(objs_json)
+ return [o['name'] for o in objs]
+
+def bucket_keys_eq(zone1, zone2, bucket_name):
+ """
+ Ensure that two buckets have the same keys, but get the lists through
+ radosgw-admin rather than S3 so it can be used when radosgw isn't running.
+ Only works for buckets of 10,000 objects since the tests calling it don't
+ need more, and the output from bucket list doesn't have an obvious marker
+ with which to continue.
+ """
+ keys1 = get_obj_names(zone1, bucket_name, 10000)
+ keys2 = get_obj_names(zone2, bucket_name, 10000)
+ for key1, key2 in zip_longest(keys1, keys2):
+ if key1 is None:
+ log.critical('key=%s is missing from zone=%s', key1.name,
+ zone1.name)
+ assert False
+ if key2 is None:
+ log.critical('key=%s is missing from zone=%s', key2.name,
+ zone2.name)
+ assert False
+
+@attr('bucket_reshard')
+def test_bucket_sync_run_basic_incremental():
+ """
+ Create several generations of objects, then run bucket sync
+ run to ensure they're all processed.
+ """
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+
+ # create a bucket write objects to it and wait for them to sync, ensuring
+ # we are in incremental.
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+ write_most_shards(primary, bucket.name, 11)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop gateways in other zones so we can rely on bucket sync run
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ secondary.zone.stop()
+
+ # build up multiple generations each with some objects written to
+ # them.
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ write_most_shards(primary, bucket.name, num_shards)
+
+ # bucket sync run on every secondary
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+
+ bucket_keys_eq(primary.zone, secondary.zone, bucket.name)
+
+ finally:
+ # Restart so bucket_checkpoint can actually fetch things from the
+ # secondaries. Put this in a finally block so they restart even on
+ # error.
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ secondary.zone.start()
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)