set<unsigned> inflight;
unsigned max_success = 0;
unsigned min_failed = 0;
+std::condition_variable_any cv_inflight;
struct io_info {
unsigned i;
if (!min_failed || i < min_failed) {
min_failed = i;
}
+ if (inflight.empty()) {
+ cv_inflight.notify_all();
+ }
}
}
std::unique_ptr<std::thread> t;
std::atomic<bool> missed_eio{false};
- unsigned max = 1000;
- unsigned timeout = max * 10;
+ auto start_time = std::chrono::steady_clock::now();
+ const int max_run_seconds = 30;
unsigned long i = 1;
- for (; min_failed == 0 && i <= timeout; ++i) {
+
+ while (true) {
+ {
+ std::unique_lock l(my_lock);
+ if (min_failed != 0) {
+ break;
+ }
+ inflight.insert(i);
+ }
io_info *info = new io_info;
info->i = i;
info->c = Rados::aio_create_completion();
info->c->set_complete_callback((void*)info, pool_io_callback);
int r = test_data.m_ioctx.aio_write(test_data.m_oid, info->c, bl, bl.length(), 0);
+ if (r < 0) {
+ std::cout << "Race caught: aio_write returned " << r << " at index " << i << std::endl;
+ std::scoped_lock l(my_lock);
+ inflight.erase(i);
+ if (inflight.empty()) {
+ cv_inflight.notify_all();
+ }
+ break;
+ }
// Trigger EIO after 100 ops have been submitted
if (i == 100) {
"val": "true"
}})", test_data.m_pool_name),
{}, nullptr, nullptr));
-
- {
- std::scoped_lock lk(my_lock);
- missed_eio = (!min_failed && max_success == max);
- }
});
}
- std::this_thread::sleep_for(10ms);
- my_lock.lock();
- if (r < 0) {
- inflight.erase(i);
- break;
+ // Timeout to avoid infinite loop in case EIO never takes effect
+ // Check every 100 ops if we've exceeded max run time
+ if (i % 100 == 0) {
+ auto now = std::chrono::steady_clock::now();
+ if (std::chrono::duration_cast<std::chrono::seconds>(now - start_time).count() > max_run_seconds) {
+ // Stop loop if EIO never happened (Cluster issue?)
+ std::cout << "Timed out waiting for EIO to take effect after " << i << " ops" << std::endl;
+ missed_eio = true;
+ break;
+ }
+ }
+ i++;
+ }
+
+ {
+ std::unique_lock l(my_lock);
+ std::cout << "waiting for inflight ios to complete, count=" << inflight.size() << std::endl;
+ bool finished = cv_inflight.wait_for(l, std::chrono::seconds(60), []{
+ return inflight.empty();
+ });
+ if (!finished) {
+ GTEST_FAIL() << "timeout waiting for inflight ios to complete";
}
}
if (t && t->joinable()) {
t->join();
}
- // wait for ios to finish
- for (; !inflight.empty(); ++i) {
- cout << "waiting for " << inflight.size() << std::endl;
- my_lock.unlock();
- sleep(1);
- my_lock.lock();
- }
-
- if (!missed_eio) {
- my_lock.unlock();
+ std::scoped_lock l(my_lock);
+ if (missed_eio) {
GTEST_SKIP() << "eio flag missed all ios that already completed";
}
cout << "max_success " << max_success << ", min_failed " << min_failed << std::endl;