]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: use aio for wal writes, too
authorSage Weil <sage@redhat.com>
Mon, 20 Apr 2015 22:33:11 +0000 (15:33 -0700)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:39 +0000 (13:39 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h

index 8dd10d6c5a9a5a4cab4f20a5f944e0c9ee330235..246ff18699269caa734790ea4f2edc4e699614aa 100644 (file)
@@ -2245,14 +2245,27 @@ void NewStore::_aio_thread()
     if (r == 1) {
       TransContext *txc = static_cast<TransContext*>(aio->priv);
       int left = txc->num_aio.dec();
-      dout(10) << __func__ << " finished aio on " << txc << ", "
-              << left << " left" << dendl;
+      dout(10) << __func__ << " finished aio on " << txc << " state "
+              << txc->get_state_name() << ", "
+              << left << " aios left" << dendl;
+      VOID_TEMP_FAILURE_RETRY(::close(aio->fd));
       if (left == 0) {
-       txc->state = TransContext::STATE_AIO_DONE;
-       if (!txc->fds.empty()) {
-         _txc_queue_fsync(txc);
-       } else {
-         _txc_finish_fsync(txc);
+       switch (txc->state) {
+       case TransContext::STATE_AIO_QUEUED:
+         txc->state = TransContext::STATE_AIO_DONE;
+         if (!txc->fds.empty()) {
+           _txc_queue_fsync(txc);
+         } else {
+           _txc_finish_fsync(txc);
+         }
+         break;
+
+       case TransContext::STATE_WAL_AIO_WAIT:
+         _wal_finish(txc);
+         break;
+
+       default:
+         assert(0 == "unexpected txc state on aio completion");
        }
       }
     }
@@ -2319,16 +2332,31 @@ wal_op_t *NewStore::_get_wal_op(TransContext *txc)
   return &txc->wal_txn->ops.back();
 }
 
-int NewStore::_apply_wal_transaction(TransContext *txc)
+int NewStore::_wal_apply(TransContext *txc)
 {
   wal_transaction_t& wt = *txc->wal_txn;
   dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
   txc->state = TransContext::STATE_WAL_APPLYING;
 
-  int r = _do_wal_transaction(wt);
+  txc->aios.clear();
+  int r = _do_wal_transaction(wt, txc);
   if (r < 0)
     return r;
 
+  if (!txc->aios.empty()) {
+    _txc_aio_submit(txc);
+    txc->state = TransContext::STATE_WAL_AIO_WAIT;
+    return 0;
+  } else {
+    return _wal_finish(txc);
+  }
+}
+
+int NewStore::_wal_finish(TransContext *txc)
+{
+  wal_transaction_t& wt = *txc->wal_txn;
+  dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
+
   string key;
   get_wal_key(wt.seq, &key);
   KeyValueDB::Transaction cleanup = db->get_transaction();
@@ -2345,7 +2373,8 @@ int NewStore::_apply_wal_transaction(TransContext *txc)
   return 0;
 }
 
-int NewStore::_do_wal_transaction(wal_transaction_t& wt)
+int NewStore::_do_wal_transaction(wal_transaction_t& wt,
+                                 TransContext *txc)
 {
   vector<int> sync_fds;
   sync_fds.reserve(wt.ops.size());
@@ -2372,20 +2401,29 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt)
        int fd = _open_fid(p->fid, flags);
        if (fd < 0)
          return fd;
-       int r = ::lseek64(fd, p->offset, SEEK_SET);
-       if (r < 0) {
-         r = -errno;
-         derr << __func__ << " lseek64 on " << fd << " got: "
-              << cpp_strerror(r) << dendl;
-         return r;
-       }
-       r = p->data.write_fd(fd);
-       if (r < 0) {
-         derr << __func__ << " write_fd on " << fd << " got: "
-              << cpp_strerror(r) << dendl;
-         return r;
-       }
-       if (!(flags & O_DSYNC)) {
+#ifdef HAVE_LIBAIO
+       if (g_conf->newstore_aio && txc && (flags & O_DIRECT)) {
+         txc->aios.push_back(FS::aio_t(txc, fd));
+         FS::aio_t& aio = txc->aios.back();
+         p->data.prepare_iov(&aio.iov);
+         aio.pwritev(p->offset);
+         dout(2) << __func__ << " prepared aio " << &aio << dendl;
+       } else
+#endif
+       {
+         int r = ::lseek64(fd, p->offset, SEEK_SET);
+         if (r < 0) {
+           r = -errno;
+           derr << __func__ << " lseek64 on " << fd << " got: "
+                << cpp_strerror(r) << dendl;
+           return r;
+         }
+         r = p->data.write_fd(fd);
+         if (r < 0) {
+           derr << __func__ << " write_fd on " << fd << " got: "
+                << cpp_strerror(r) << dendl;
+           return r;
+         }
          sync_fds.push_back(fd);
        }
       }
@@ -2403,6 +2441,7 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt)
               << cpp_strerror(r) << dendl;
          return r;
        }
+       // FIXME: do aio fdatasync?
        sync_fds.push_back(fd);
       }
       break;
@@ -2469,7 +2508,7 @@ int NewStore::_replay_wal()
       return -EIO;
     }
     dout(20) << __func__ << " replay " << it->key() << dendl;
-    int r = _do_wal_transaction(wt);
+    int r = _do_wal_transaction(wt, NULL);  // don't bother with aio here
     if (r < 0)
       return r;
     cleanup->rmkey(PREFIX_WAL, it->key());
@@ -2557,25 +2596,8 @@ int NewStore::queue_transactions(
   } else {
     // async path
     if (!txc->aios.empty()) {
+      _txc_aio_submit(txc);
       txc->state = TransContext::STATE_AIO_QUEUED;
-      dout(20) << __func__ << " submitting " << txc->num_aio.read() << " aios"
-              << dendl;
-      for (list<FS::aio_t>::iterator p = txc->aios.begin();
-          p != txc->aios.end();
-          ++p) {
-       FS::aio_t& aio = *p;
-       dout(20) << __func__ << " submitting aio " << &aio << dendl;
-       for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
-         dout(30) << __func__ << "  iov " << (void*)q->iov_base
-                  << " len " << q->iov_len << dendl;
-       dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
-                << dendl;
-       int r = aio_queue.submit(*p);
-       if (r) {
-         derr << " aio submit got " << cpp_strerror(r) << dendl;
-         assert(r == 0);
-       }
-      }
     } else if (!txc->fds.empty()) {
       _txc_queue_fsync(txc);
     } else {
@@ -2586,6 +2608,29 @@ int NewStore::queue_transactions(
   return 0;
 }
 
+void NewStore::_txc_aio_submit(TransContext *txc)
+{
+  int num = txc->aios.size();
+  dout(10) << __func__ << " submitting " << num << " aios" << dendl;
+  txc->num_aio.set(num);
+  for (list<FS::aio_t>::iterator p = txc->aios.begin();
+       p != txc->aios.end();
+       ++p) {
+    FS::aio_t& aio = *p;
+    dout(20) << __func__ << " submitting aio " << &aio << dendl;
+    for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
+      dout(30) << __func__ << "  iov " << (void*)q->iov_base
+              << " len " << q->iov_len << dendl;
+    dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
+            << dendl;
+    int r = aio_queue.submit(*p);
+    if (r) {
+      derr << " aio submit got " << cpp_strerror(r) << dendl;
+      assert(r == 0);
+    }
+  }
+}
+
 int NewStore::_do_transaction(Transaction *t,
                              TransContext *txc,
                              ThreadPool::TPHandle *handle)
@@ -3193,12 +3238,10 @@ int NewStore::_do_write(TransContext *txc,
 #ifdef HAVE_LIBAIO
     if (g_conf->newstore_aio && (flags & O_DIRECT)) {
       txc->aios.push_back(FS::aio_t(txc, fd));
-      txc->num_aio.inc();
       FS::aio_t& aio = txc->aios.back();
       bl.prepare_iov(&aio.iov);
       txc->aio_bl.append(bl);
       aio.pwritev(x_offset);
-
       dout(2) << __func__ << " prepared aio " << &aio << dendl;
     } else
 #endif
@@ -3209,8 +3252,6 @@ int NewStore::_do_write(TransContext *txc,
        derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
        goto out;
       }
-    }
-    if (!(flags & O_DSYNC)) {
       txc->sync_fd(fd);
     }
     r = 0;
@@ -3245,7 +3286,6 @@ int NewStore::_do_write(TransContext *txc,
 #ifdef HAVE_LIBAIO
     if (g_conf->newstore_aio && (flags & O_DIRECT)) {
       txc->aios.push_back(FS::aio_t(txc, fd));
-      txc->num_aio.inc();
       FS::aio_t& aio = txc->aios.back();
       bl.prepare_iov(&aio.iov);
       txc->aio_bl.append(bl);
index f96f85270c594a5a6b16b7c396eac81cc0b7d06e..93e547566b3e1332645598cb5d8c5c994cce15e5 100644 (file)
@@ -155,6 +155,7 @@ public:
       STATE_KV_DONE,
       STATE_WAL_QUEUED,
       STATE_WAL_APPLYING,
+      STATE_WAL_AIO_WAIT,
       STATE_WAL_CLEANUP,   // remove wal kv record
       STATE_WAL_DONE,
       STATE_FINISHING,
@@ -176,6 +177,7 @@ public:
       case STATE_KV_DONE: return "kv_done";
       case STATE_WAL_QUEUED: return "wal_queued";
       case STATE_WAL_APPLYING: return "wal_applying";
+      case STATE_WAL_AIO_WAIT: return "wal_aio_wait";
       case STATE_WAL_CLEANUP: return "wal_cleanup";
       case STATE_WAL_DONE: return "wal_done";
       case STATE_FINISHING: return "finishing";
@@ -424,7 +426,7 @@ public:
       return i;
     }
     void _process(TransContext *i, ThreadPool::TPHandle &handle) {
-      store->_apply_wal_transaction(i);
+      store->_wal_apply(i);
       i->osr->wal_apply_lock.Unlock();
     }
     void _clear() {
@@ -557,6 +559,7 @@ private:
 
   TransContext *_txc_create(OpSequencer *osr);
   int _txc_finalize(OpSequencer *osr, TransContext *txc);
+  void _txc_aio_submit(TransContext *txc);
   void _txc_queue_fsync(TransContext *txc);
   void _txc_process_fsync(fsync_item *i);
   void _txc_finish_fsync(TransContext *txc);
@@ -582,8 +585,9 @@ private:
   }
 
   wal_op_t *_get_wal_op(TransContext *txc);
-  int _apply_wal_transaction(TransContext *txc);
-  int _do_wal_transaction(wal_transaction_t& wt);
+  int _wal_apply(TransContext *txc);
+  int _wal_finish(TransContext *txc);
+  int _do_wal_transaction(wal_transaction_t& wt, TransContext *txc);
   void _wait_object_wal(OnodeRef onode);
   int _replay_wal();
   friend class C_ApplyWAL;