rados_aio_release(completion);
}
-// Context structure for read/write ordering test
-struct ReadWriteOrderingContext {
- rados_ioctx_t ioctx;
- char* write_buf;
- char* read_buf;
- int current_value;
- int iteration;
- int max_iterations;
- rados_completion_t write_completion;
- rados_completion_t read_completion;
- bool test_failed;
- std::string error_msg;
-};
-
-// Callback for read completion - decode value, increment, and issue next write
-static void read_complete_callback(rados_completion_t c, void *arg) {
- ReadWriteOrderingContext* ctx = static_cast<ReadWriteOrderingContext*>(arg);
-
- // Check if read succeeded
- int ret = rados_aio_get_return_value(c);
- if (ret < 0) {
- ctx->test_failed = true;
- ctx->error_msg = "Read failed with error: " + std::to_string(ret);
- return;
- }
-
- // Decode the integer from the read buffer
- int read_value;
- memcpy(&read_value, ctx->read_buf, sizeof(int));
-
- std::cout << "Iteration " << ctx->iteration << ": Read value = " << read_value
- << ", Expected = " << ctx->current_value << std::endl;
-
- // Verify the value matches expected
- if (read_value != ctx->current_value) {
- ctx->test_failed = true;
- ctx->error_msg = "Read value mismatch: expected " +
- std::to_string(ctx->current_value) +
- " but got " + std::to_string(read_value);
- return;
- }
-
- // Increment for next iteration
- ctx->current_value++;
- ctx->iteration++;
-
- // If we haven't reached max iterations, issue next write
- if (ctx->iteration < ctx->max_iterations) {
- memcpy(ctx->write_buf, &ctx->current_value, sizeof(int));
-
- std::cout << "Iteration " << ctx->iteration << ": Writing value = " << ctx->current_value << std::endl;
-
- // Create new write completion
- int r = rados_aio_create_completion2(ctx, nullptr, &ctx->write_completion);
- if (r < 0) {
- ctx->test_failed = true;
- ctx->error_msg = "Failed to create write completion: " + std::to_string(r);
- return;
- }
-
- // Issue async write
- r = rados_aio_write(ctx->ioctx, "ordering_test_obj",
- ctx->write_completion, ctx->write_buf, sizeof(int), 0);
- if (r < 0) {
- ctx->test_failed = true;
- ctx->error_msg = "Write failed with error: " + std::to_string(r);
- rados_aio_release(ctx->write_completion);
- return;
- }
-
- // Immediately issue read with ORDER_READS_WRITES flag
- r = rados_aio_create_completion2(ctx, read_complete_callback, &ctx->read_completion);
- if (r < 0) {
- ctx->test_failed = true;
- ctx->error_msg = "Failed to create read completion: " + std::to_string(r);
- return;
- }
-
- rados_read_op_t read_op = rados_create_read_op();
- rados_read_op_read(read_op, 0, sizeof(int), ctx->read_buf, NULL, NULL);
- r = rados_aio_read_op_operate(read_op, ctx->ioctx, ctx->read_completion,
- "ordering_test_obj",
- LIBRADOS_OPERATION_ORDER_READS_WRITES);
- rados_release_read_op(read_op);
- if (r < 0) {
- ctx->test_failed = true;
- ctx->error_msg = "Read failed with error: " + std::to_string(r);
- rados_aio_release(ctx->read_completion);
- return;
- }
- }
-}
-
-// Shared implementation for read/write ordering test
-static void test_read_write_ordering_impl(rados_ioctx_t ioctx) {
- const int max_iterations = 20;
- char write_buf[sizeof(int)];
- char read_buf[sizeof(int)];
-
- // Initialize context
- ReadWriteOrderingContext ctx;
- ctx.ioctx = ioctx;
- ctx.write_buf = write_buf;
- ctx.read_buf = read_buf;
- ctx.current_value = 1;
- ctx.iteration = 0;
- ctx.max_iterations = max_iterations;
- ctx.test_failed = false;
-
- // Create the object with initial value 0 (without ordering flag, wait for completion)
- memcpy(write_buf, &ctx.current_value, sizeof(int));
- rados_completion_t create_completion;
- ASSERT_EQ(0, rados_aio_create_completion2(nullptr, nullptr, &create_completion));
- ASSERT_EQ(0, rados_aio_write(ioctx, "ordering_test_obj",
- create_completion, write_buf, sizeof(int), 0));
- {
- TestAlarm alarm;
- ASSERT_EQ(0, rados_aio_wait_for_complete(create_completion));
- }
- ASSERT_EQ(0, rados_aio_get_return_value(create_completion));
- rados_aio_release(create_completion);
-
- // Start the read/write loop with ordering
- ctx.current_value = 1; // Next value to write
- ctx.iteration = 1; // We've done iteration 0 (initial write)
-
- // Write value 1
- memcpy(write_buf, &ctx.current_value, sizeof(int));
- std::cout << "Iteration " << ctx.iteration << ": Writing value = " << ctx.current_value << std::endl;
- ASSERT_EQ(0, rados_aio_create_completion2(&ctx, nullptr, &ctx.write_completion));
- ASSERT_EQ(0, rados_aio_write(ioctx, "ordering_test_obj",
- ctx.write_completion, write_buf, sizeof(int), 0));
-
- // Immediately read with ORDER_READS_WRITES flag (should see value 1 due to ordering)
- ASSERT_EQ(0, rados_aio_create_completion2(&ctx, read_complete_callback,
- &ctx.read_completion));
-
- rados_read_op_t read_op = rados_create_read_op();
- rados_read_op_read(read_op, 0, sizeof(int), read_buf, NULL, NULL);
- ASSERT_EQ(0, rados_aio_read_op_operate(read_op, ioctx, ctx.read_completion,
- "ordering_test_obj",
- LIBRADOS_OPERATION_ORDER_READS_WRITES));
- rados_release_read_op(read_op);
-
- // Wait for all operations to complete
- // The callback chain will continue until max_iterations is reached
- {
- TestAlarm alarm;
- while (ctx.iteration < max_iterations && !ctx.test_failed) {
- usleep(10000); // Sleep 10ms between checks
- }
- }
-
- // Check for any errors during the test
- ASSERT_FALSE(ctx.test_failed) << "Test failed: " << ctx.error_msg;
-
- // Wait for final operations to complete
- if (ctx.iteration >= max_iterations) {
- TestAlarm alarm;
- if (ctx.write_completion) {
- rados_aio_wait_for_complete(ctx.write_completion);
- rados_aio_release(ctx.write_completion);
- }
- if (ctx.read_completion) {
- rados_aio_wait_for_complete(ctx.read_completion);
- rados_aio_release(ctx.read_completion);
- }
- }
-
- // Verify final value is 20
- char final_buf[sizeof(int)];
- rados_completion_t final_read;
- ASSERT_EQ(0, rados_aio_create_completion2(nullptr, nullptr, &final_read));
- ASSERT_EQ(0, rados_aio_read(ioctx, "ordering_test_obj",
- final_read, final_buf, sizeof(int), 0));
- {
- TestAlarm alarm;
- ASSERT_EQ(0, rados_aio_wait_for_complete(final_read));
- }
- ASSERT_EQ((int)sizeof(int), rados_aio_get_return_value(final_read));
-
- int final_value;
- memcpy(&final_value, final_buf, sizeof(int));
- ASSERT_EQ(max_iterations, final_value) << "Final value should be " << max_iterations;
-
- rados_aio_release(final_read);
-}
-
-TEST_P(LibRadosAio, ReadWriteOrdering) {
- AioTestData test_data;
- ASSERT_EQ("", test_data.init(GetParam()));
- test_read_write_ordering_impl(test_data.m_ioctx);
-}
-
-TEST_P(LibRadosAioEC, ReadWriteOrdering) {
- SKIP_IF_CRIMSON();
- AioTestDataEC test_data;
- auto [fast_ec, split_ops] = GetParam();
- ASSERT_EQ("", test_data.init(fast_ec, split_ops));
- test_read_write_ordering_impl(test_data.m_ioctx);
-}
-
INSTANTIATE_TEST_SUITE_P_REPLICA(LibRadosAio);
INSTANTIATE_TEST_SUITE_P_EC(LibRadosAioEC);