cmd_determ = [CEPH_DENCODER, "type", type, "is_deterministic"]
determ_res = subprocess.run(cmd_determ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check if the command failed
- if determ_res.returncode != 0:
+ if determ_res.returncode != 0 and determ_res.returncode != 1:
error_message = determ_res.stderr.decode().strip()
debug_print(f"Error running command: {error_message}")
return 1
version_name = version.name
_backward_compat[version_name] = {}
type_dir = archive_dir / version_name / "forward_incompat"
- if type_dir.exists() and type_dir.is_dir():
+ if type_dir.exists():
for type_entry in type_dir.iterdir():
if type_entry.is_dir():
type_name = type_entry.name
def process_batch(batch):
results = []
- with concurrent.futures.ThreadPoolExecutor() as executor:
+ max_workers = 15
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
test_object_wrapper, batch_type, vdir, arversion, current_ver
# Create a generator that processes batches asynchronously
def async_process_batches(task_batches):
- with concurrent.futures.ProcessPoolExecutor() as executor:
+ max_workers = 10
+ with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_batch, batch) for batch in task_batches]
for future in concurrent.futures.as_completed(futures):
yield future.result()