cmd = ['sync', 'status']
sync_status_output, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
assert(retcode == 0)
- match = re.search(r"oldest incremental change not applied:\s*([0-9T:\.\+\-Z]+)", sync_status_output)
- timestamp = match.group(1) if match else None
- if timestamp is not None:
- timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z").timestamp()
- return timestamp
+
+ # extract the "data sync source:" section from the output
+ data_sync_match = re.search(r"data sync source:.*", sync_status_output, re.DOTALL)
+ if not data_sync_match:
+ # no data sync section found; return epoch 0 to signal no progress
+ return 0.0
+
+ data_sync_section = data_sync_match.group(0)
+
+ # 1) look for "oldest incremental change not applied" in the data sync section
+ match = re.search(r"oldest incremental change not applied:\s*([0-9T:\.\+\-Z]+)", data_sync_section)
+ if match:
+ return datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S.%f%z").timestamp()
+
+ # 2) if data is caught up with source, return None (no work pending)
+ if "data is caught up with source" in data_sync_section:
+ return None
+
+ # 3) data sync section exists but neither marker found; return epoch 0
+ # so the caller knows data sync is not making progress
+ return 0.0
def data_sync_making_progress(zone, time_window_sec=180, check_interval_sec=30):
deadline = time.time() + time_window_sec
log.info(f"delete bucket={bucket.name}")
primary_zone_client_conn.s3_client.delete_bucket(Bucket=bucket.name)
+def test_bucket_full_sync_when_the_bucket_is_deleted_in_the_meantime():
+ num_objects_to_upload = (
+ 3000 # must be more than 1000 to have pagination at full sync
+ )
+ bucket_full_sync_listing_inject_delay_sec = 100
+ bucket_full_sync_listing_inject_delay_pattern = "delay_bucket_full_sync_loop"
+
+ # get client connection
+ master_zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(master_zonegroup)
+ primary_zone_client_conn = zonegroup_conns.master_zone
+
+ # get cluster connections
+ primary_zone_cluster_conn = master_zonegroup.master_zone
+ secondary_zone_cluster_conn = None
+ for zg in realm.current_period.zonegroups:
+ for zone in zg.zones:
+ if zone.cluster != primary_zone_cluster_conn.cluster and zone != zg.master_zone:
+ secondary_zone_cluster_conn = zone
+ break
+ if secondary_zone_cluster_conn is not None:
+ break
+ else:
+ raise SkipTest("test_bucket_full_sync_when_the_bucket_is_deleted_in_the_meantime is skipped. "
+ "Requires a secondary zone in a different cluster.")
+
+ bucket = primary_zone_client_conn.create_bucket(gen_bucket_name())
+ log.info(f"created bucket={bucket.name}")
+
+ log.info(f"disable sync for bucket={bucket.name}")
+ disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+ try:
+ log.info(f"upload {num_objects_to_upload} objects to bucket={bucket.name}")
+ num_objects_to_uploaded = 0
+ for i in range(num_objects_to_upload):
+ if i % 100 == 0:
+ log.debug(f"uploaded {i} objects to bucket={bucket.name}...")
+ try:
+ primary_zone_client_conn.s3_client.put_object(
+ Bucket=bucket.name, Key=f"obj-{i:04d}", Body="..."
+ )
+ num_objects_to_uploaded += 1
+ except Exception as e:
+ log.debug(f"failed to upload object to bucket={bucket.name}: {e}")
+ log.info(
+ f"successfully uploaded {num_objects_to_uploaded} objects to bucket={bucket.name}"
+ )
+
+ log.info("trim bucket bilog to avoid any incremental sync happening")
+ primary_zone_cluster_conn.cluster.admin(["bilog", "trim", "--bucket", bucket.name])
+ log.info("set rgw_inject_delay_sec and rgw_inject_delay_pattern to slow down bucket full sync")
+ secondary_zone_cluster_conn.cluster.ceph_admin(
+ ["config", "set", "client.rgw", "rgw_inject_delay_sec", str(bucket_full_sync_listing_inject_delay_sec)]
+ )
+ secondary_zone_cluster_conn.cluster.ceph_admin(
+ ["config", "set", "client.rgw", "rgw_inject_delay_pattern", bucket_full_sync_listing_inject_delay_pattern]
+ )
+ log.info("enable bucket sync to initiate full sync")
+ enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+ # Since incremenetal sync is not possible and full sync is stalled,
+ # we should see that the bucket's sync is stalled.
+ log.info("verify that bucket sync is stalled")
+ deadline = time.time() + bucket_full_sync_listing_inject_delay_sec
+ oldest_inc_change = None
+ while True:
+ if time.time() > deadline:
+ raise Exception("failed to verify the stall of bucket sync")
+ new_reading = get_oldest_incremental_change_not_applied_epoch(
+ secondary_zone_cluster_conn
+ )
+ if new_reading is not None:
+ if oldest_inc_change is None or oldest_inc_change != new_reading:
+ oldest_inc_change = new_reading
+ elif (
+ oldest_inc_change == new_reading
+ ): # 2 back-to-back readings are the same
+ break
+ time.sleep(10)
+ log.info(
+ f"verified that bucket sync is stalled, oldest incremental change not applied epoch: {oldest_inc_change}"
+ )
+
+ # while bucket sync is stalled, delete all objects and the bucket.
+ log.info(f"delete {num_objects_to_upload} objects from bucket={bucket.name}")
+ for i in range(num_objects_to_upload):
+ primary_zone_client_conn.s3_client.delete_object(
+ Bucket=bucket.name,
+ Key=f"obj-{i:04d}",
+ )
+ log.info(f"delete bucket={bucket.name}")
+ primary_zone_client_conn.s3_client.delete_bucket(Bucket=bucket.name)
+
+ log.info(f"verify that bucket={bucket.name} is deleted on secondary zone")
+ for zg in realm.current_period.zonegroups:
+ zonegroup_meta_checkpoint(zg)
+
+ log.info(
+ "removing rgw_inject_delay_sec and rgw_inject_delay_pattern to allow bucket full sync to run normally to the completion"
+ )
+ secondary_zone_cluster_conn.cluster.ceph_admin(
+ ["config", "rm", "client.rgw", "rgw_inject_delay_sec"]
+ )
+ secondary_zone_cluster_conn.cluster.ceph_admin(
+ ["config", "rm", "client.rgw", "rgw_inject_delay_pattern"]
+ )
+ time.sleep(
+ bucket_full_sync_listing_inject_delay_sec
+ ) # wait to make sure bucket sync loop resumes to normal pace
+
+ log.info("wait for data sync to complete")
+ zonegroup_data_checkpoint(zonegroup_conns)
+ except Exception as e:
+ log.error(f"test_bucket_full_sync_when_the_bucket_is_deleted_in_the_meantime failed: {e}")
+ log.info(f"delete {num_objects_to_upload} objects from bucket={bucket.name}")
+ for i in range(num_objects_to_upload):
+ try:
+ primary_zone_client_conn.s3_client.delete_object(
+ Bucket=bucket.name,
+ Key=f"obj-{i:04d}",
+ )
+ except:
+ pass
+ log.info(f"delete bucket={bucket.name}")
+ try:
+ primary_zone_client_conn.s3_client.delete_bucket(Bucket=bucket.name)
+ except:
+ pass
+ log.info(
+ "reset rgw_inject_delay settings"
+ )
+ try:
+ secondary_zone_cluster_conn.cluster.ceph_admin(
+ ["config", "rm", "client.rgw", "rgw_inject_delay_sec"]
+ )
+ secondary_zone_cluster_conn.cluster.ceph_admin(
+ ["config", "rm", "client.rgw", "rgw_inject_delay_pattern"]
+ )
+ except:
+ pass
+ raise