: ctrlr(nullptr),
ns(nullptr),
aio_stop(false),
+ queue_empty(1),
queue_lock("NVMEDevice::queue_lock"),
aio_thread(this),
inflight_ops(0),
{
dout(1) << __func__ << dendl;
- aio_stop = true;
+ {
+ Mutex::Locker l(queue_lock);
+ aio_stop = true;
+ queue_cond.Signal();
+ }
aio_thread.join();
aio_stop = false;
name.clear();
while (!aio_stop) {
dout(40) << __func__ << " polling" << dendl;
t = nullptr;
- {
+ if (!queue_empty.read()) {
Mutex::Locker l(queue_lock);
if (!task_queue.empty()) {
t = task_queue.front();
task_queue.pop();
}
+ queue_empty.inc();
+ } else if (!inflight_ops.read()) {
+ Mutex::Locker l(queue_lock);
+ if (queue_empty.read())
+ queue_cond.Wait();
}
if (t) {
t->return_code = r;
Mutex::Locker l(t->ctx->lock);
t->ctx->cond.Signal();
+ } else {
+ inflight_ops.inc();
}
- inflight_ops.inc();
break;
}
case IOCommand::FLUSH_COMMAND:
t->return_code = r;
Mutex::Locker l(t->ctx->lock);
t->ctx->cond.Signal();
+ } else {
+ inflight_ops.inc();
}
- inflight_ops.inc();
break;
}
}
{
Mutex::Locker l(queue_lock);
task_queue.push(t);
+ if (queue_empty.read()) {
+ queue_empty.dec();
+ queue_cond.Signal();
+ }
}
{
Mutex::Locker l(ioc.lock);
Mutex::Locker l(queue_lock);
// Only need to push the first entry
task_queue.push(t);
+ if (queue_empty.read()) {
+ queue_empty.dec();
+ queue_cond.Signal();
+ }
}
int NVMEDevice::aio_write(
{
Mutex::Locker l(queue_lock);
task_queue.push(t);
+ if (queue_empty.read()) {
+ queue_empty.dec();
+ queue_cond.Signal();
+ }
}
{