]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore/BlockDevice: fix racy aio_{wake,wait}() on IOContext
authorSage Weil <sage@redhat.com>
Fri, 5 May 2017 18:11:31 +0000 (14:11 -0400)
committerSage Weil <sage@redhat.com>
Fri, 5 May 2017 18:39:30 +0000 (14:39 -0400)
Thread 1 (_do_read)                 Thread 2 (_aio_thread)
queues aio
ioc->aio_wait()
  locks ioc->lock
  num_waiting++
                                    finishes aios
                                    ioc->aio_wake
                                      if (num_waiting)
                                        lock ioc->lock (blocks)
  num_running == 0, continue
  ioc->unlock
                                        ioc->lock taken
do_read destroys ioc
                                        use after free, may block forever

The last bit of the race may vary; the key thing is that thread 2 is
taking a lock that thread 1 can destroy; thread 2 doesn't have it pinned
in memory.

Fix this by simplifying the aio_wake, aio_wait.  Since it is mutually
exclusive with a callback completion, we can avoid calling it at all when
a callback in present, and focus on keeping it simple.

Avoid use-after-free by making sure the last decrement happens under
the lock in the aio_wake/wait case.

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlockDevice.cc
src/os/bluestore/BlockDevice.h
src/os/bluestore/KernelDevice.cc
src/os/bluestore/NVMEDevice.cc

index f0d6b4d7384393640b6c29c3333323f4edbb0fbc..751db6d8a2c85c8b3cdac5c81ffcbcc758de0e41 100644 (file)
@@ -34,14 +34,12 @@ void IOContext::aio_wait()
 {
   std::unique_lock<std::mutex> l(lock);
   // see _aio_thread for waker logic
-  ++num_waiting;
   while (num_running.load() > 0 || num_reading.load() > 0) {
     dout(10) << __func__ << " " << this
             << " waiting for " << num_running.load() << " aios and/or "
             << num_reading.load() << " readers to complete" << dendl;
     cond.wait(l);
   }
-  --num_waiting;
   dout(20) << __func__ << " " << this << " done" << dendl;
 }
 
index 157add61fcc11c1f2247a2fd5606586f302a1e34..3057c00e0e8d1ebde00c3753d6a817d7d36f486c 100644 (file)
@@ -44,7 +44,6 @@ struct IOContext {
   std::atomic_int num_pending = {0};
   std::atomic_int num_running = {0};
   std::atomic_int num_reading = {0};
-  std::atomic_int num_waiting = {0};
 
   explicit IOContext(CephContext* cct, void *p)
     : cct(cct), priv(p)
@@ -61,10 +60,10 @@ struct IOContext {
   void aio_wait();
 
   void aio_wake() {
-    if (num_waiting.load()) {
-      std::lock_guard<std::mutex> l(lock);
-      cond.notify_all();
-    }
+    std::lock_guard<std::mutex> l(lock);
+    cond.notify_all();
+    --num_running;
+    assert(num_running == 0);
   }
 };
 
index 3c4e2fc0d684214c479512fa67e6841c499c5ae0..2092d88f736338e9de7d4636edaba2a9e9a2eee0 100644 (file)
@@ -365,18 +365,18 @@ void KernelDevice::_aio_thread()
                 << " aios left" << dendl;
        assert(r >= 0);
 
-       int left = --ioc->num_running;
-       // NOTE: once num_running is decremented we can no longer
-       // trust aio[] values; they my be freed (e.g., by BlueFS::_fsync)
-       if (left == 0) {
-         // check waiting count before doing callback (which may
-         // destroy this ioc).  and avoid ref to ioc after aio_wake()
-         // in case that triggers destruction.
-         void *priv = ioc->priv;
-         if (priv) {
-           aio_callback(aio_callback_priv, priv);
-         } else {
+       // NOTE: once num_running and we either call the callback or
+       // call aio_wake we cannot touch ioc or aio[] as the caller
+       // may free it.
+       if (ioc->priv) {
+         if (--ioc->num_running == 0) {
+           aio_callback(aio_callback_priv, ioc->priv);
+         }
+       } else {
+         if (ioc->num_running == 1) {
            ioc->aio_wake();
+         } else {
+           --ioc->num_running;
          }
        }
       }
index df7a4026a33bcbd6c143916e92105ecee298e68a..5f5264395c425b60e707c6f7528dc5dccbf31930 100644 (file)
@@ -811,11 +811,15 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
              << queue->queue_op_seq - queue->completed_op_seq << dendl;
     // check waiting count before doing callback (which may
     // destroy this ioc).
-    if (!--ctx->num_running) {
-      if (task->device->aio_callback && ctx->priv) {
+    if (ctx->priv) {
+      if (!--ctx->num_running) {
         task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
-      } else {
+      }
+    } else {
+      if (ctx->num_running == 1) {
        ctx->aio_wake();
+      } else {
+       --ctx->num_running;
       }
     }
     task->release_segs(queue);
@@ -828,11 +832,15 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
     task->release_segs(queue);
     // read submitted by AIO
     if(!task->return_code) {
-      if (!--ctx->num_running) {
-        if (task->device->aio_callback && ctx->priv) {
+      if (ctx->priv) {
+       if (!--ctx->num_running) {
           task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
-        } else {
+       }
+      } else {
+       if (ctx->num_running == 1) {
          ctx->aio_wake();
+       } else {
+         --ctx->num_running;
        }
       }
       delete task;