]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: basic aio support
authorSage Weil <sage@redhat.com>
Mon, 20 Apr 2015 17:34:04 +0000 (10:34 -0700)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:39 +0000 (13:39 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/config_opts.h
src/os/fs/FS.h
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h

index ac913c2b0a68c955a35ba7868d6c069b5fd6bc41..8d26707c79b365410db4f6dc11bdfadd794e729d 100644 (file)
@@ -810,6 +810,9 @@ OPTION(newstore_overlay_max, OPT_INT, 32)
 OPTION(newstore_open_by_handle, OPT_BOOL, true)
 OPTION(newstore_o_direct, OPT_BOOL, true)
 OPTION(newstore_db_path, OPT_STR, "")
+OPTION(newstore_aio, OPT_BOOL, true)
+OPTION(newstore_aio_poll_ms, OPT_INT, 250)  // milliseconds
+OPTION(newstore_aio_max_queue_depth, OPT_INT, 64)
 
 OPTION(filestore_omap_backend, OPT_STR, "leveldb")
 
index a9d8100fafcc81585a0d8a553b0913f88e0e1cef..51c6363f13fcd9dabe6f1eadedc6ed24e2df8001 100644 (file)
 #ifndef CEPH_OS_FS_H
 #define CEPH_OS_FS_H
 
+#include <errno.h>
+#include <time.h>
+
+#include "acconfig.h"
+#ifdef HAVE_LIBAIO
+# include <libaio.h>
+#endif
+
 #include <string>
 
 #include "include/types.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
 
 class FS {
 public:
@@ -39,6 +49,79 @@ public:
                              int from_fd,
                              uint64_t from_offset, uint64_t from_len);
   virtual int zero(int fd, uint64_t offset, uint64_t length);
+
+  // -- aio --
+
+  struct aio_t {
+    struct iocb iocb;  // must be first element; see shenanigans in aio_queue_t
+    void *priv;
+    int fd;
+    vector<iovec> iov;
+
+    aio_t(void *p, int f) : priv(p), fd(f) {
+      memset(&iocb, 0, sizeof(iocb));
+    }
+
+    void pwritev(uint64_t offset) {
+      io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
+    }
+  };
+
+  struct aio_queue_t {
+    int max_iodepth;
+    io_context_t ctx;
+
+    aio_queue_t(unsigned max_iodepth = 8)
+      : max_iodepth(max_iodepth),
+       ctx(0) {
+    }
+    ~aio_queue_t() {
+      assert(ctx == 0);
+    }
+
+    int init() {
+      assert(ctx == 0);
+      return io_setup(max_iodepth, &ctx);
+    }
+    void shutdown() {
+      if (ctx) {
+       int r = io_destroy(ctx);
+       assert(r == 0);
+       ctx = 0;
+      }
+    }
+
+    int submit(aio_t &aio) {
+      int attempts = 10;
+      iocb *piocb = &aio.iocb;
+      do {
+       int r = io_submit(ctx, 1, &piocb);
+       if (r < 0) {
+         if (r == -EAGAIN && attempts-- > 0) {
+           usleep(500);
+           continue;
+         }
+         return r;
+       }
+      } while (false);
+      return 0;
+    }
+
+    int get_next_completed(int timeout_ms, aio_t **paio) {
+      io_event event[1];
+      struct timespec t = {
+       timeout_ms / 1000,
+       (timeout_ms % 1000) * 1000 * 1000
+      };
+      int r = io_getevents(ctx, 1, 1, event, &t);
+      if (r <= 0) {
+       return r;
+      }
+      *paio = (aio_t *)event[0].obj;
+      return 1;
+    }
+  };
+
 };
 
 #endif
index 9db13d368c8fb6cd9b2eddd4d977f5443015aaa5..7de2092308d919968a20210e37cbbdd2ff458ba6 100644 (file)
@@ -598,6 +598,9 @@ NewStore::NewStore(CephContext *cct, const string& path)
             cct->_conf->newstore_fsync_thread_timeout,
             cct->_conf->newstore_fsync_thread_suicide_timeout,
             &fsync_tp),
+    aio_thread(this),
+    aio_stop(false),
+    aio_queue(cct->_conf->newstore_aio_max_queue_depth),
     kv_sync_thread(this),
     kv_lock("NewStore::kv_lock"),
     kv_stop(false),
@@ -830,6 +833,29 @@ void NewStore::_close_db()
   db = NULL;
 }
 
+int NewStore::_aio_start()
+{
+  if (g_conf->newstore_aio) {
+    dout(10) << __func__ << dendl;
+    int r = aio_queue.init();
+    if (r < 0)
+      return r;
+    aio_thread.create();
+  }
+  return 0;
+}
+
+void NewStore::_aio_stop()
+{
+  if (g_conf->newstore_aio) {
+    dout(10) << __func__ << dendl;
+    aio_stop = true;
+    aio_thread.join();
+    aio_stop = false;
+    aio_queue.shutdown();
+  }
+}
+
 int NewStore::_open_collections()
 {
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
@@ -961,10 +987,14 @@ int NewStore::mount()
   if (r < 0)
     goto out_db;
 
-  r = _replay_wal();
+  r = _aio_start();
   if (r < 0)
     goto out_db;
 
+  r = _replay_wal();
+  if (r < 0)
+    goto out_aio;
+
   finisher.start();
   fsync_tp.start();
   wal_tp.start();
@@ -973,6 +1003,8 @@ int NewStore::mount()
   mounted = true;
   return 0;
 
+ out_aio:
+  _aio_stop();
  out_db:
   _close_db();
  out_frag:
@@ -994,6 +1026,8 @@ int NewStore::umount()
 
   dout(20) << __func__ << " stopping fsync_wq" << dendl;
   fsync_tp.stop();
+  dout(20) << __func__ << " stopping aio" << dendl;
+  _aio_stop();
   dout(20) << __func__ << " stopping kv thread" << dendl;
   _kv_stop();
   dout(20) << __func__ << " draining wal_wq" << dendl;
@@ -2198,6 +2232,34 @@ void NewStore::_osr_reap_done(OpSequencer *osr)
   }
 }
 
+void NewStore::_aio_thread()
+{
+  dout(10) << __func__ << " start" << dendl;
+  while (!aio_stop) {
+    dout(40) << __func__ << " polling" << dendl;
+    FS::aio_t *aio;
+    int r = aio_queue.get_next_completed(g_conf->newstore_aio_poll_ms, &aio);
+    if (r < 0) {
+      derr << __func__ << " got " << cpp_strerror(r) << dendl;
+    }
+    if (r == 1) {
+      TransContext *txc = static_cast<TransContext*>(aio->priv);
+      int left = txc->num_aio.dec();
+      dout(10) << __func__ << " finished aio on " << txc << ", "
+              << left << " left" << dendl;
+      if (left == 0) {
+       txc->state = TransContext::STATE_AIO_DONE;
+       if (!txc->fds.empty()) {
+         _txc_queue_fsync(txc);
+       } else {
+         _txc_finish_fsync(txc);
+       }
+      }
+    }
+  }
+  dout(10) << __func__ << " end" << dendl;
+}
+
 void NewStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
@@ -2317,7 +2379,12 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt)
               << cpp_strerror(r) << dendl;
          return r;
        }
-       p->data.write_fd(fd);
+       r = p->data.write_fd(fd);
+       if (r < 0) {
+         derr << __func__ << " write_fd on " << fd << " got: "
+              << cpp_strerror(r) << dendl;
+         return r;
+       }
        sync_fds.push_back(fd);
       }
       break;
@@ -2481,7 +2548,27 @@ int NewStore::queue_transactions(
     _txc_finish_kv(txc);
   } else {
     // async path
-    if (!txc->fds.empty()) {
+    if (!txc->aios.empty()) {
+      txc->state = TransContext::STATE_AIO_QUEUED;
+      dout(20) << __func__ << " submitting " << txc->num_aio.read() << " aios"
+              << dendl;
+      for (list<FS::aio_t>::iterator p = txc->aios.begin();
+          p != txc->aios.end();
+          ++p) {
+       FS::aio_t& aio = *p;
+       dout(20) << __func__ << " submitting aio " << &aio << dendl;
+       for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
+         dout(30) << __func__ << "  iov " << (void*)q->iov_base
+                  << " len " << q->iov_len << dendl;
+       dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
+                << dendl;
+       int r = aio_queue.submit(*p);
+       if (r) {
+         derr << " aio submit got " << cpp_strerror(r) << dendl;
+         assert(r == 0);
+       }
+      }
+    } else if (!txc->fds.empty()) {
       _txc_queue_fsync(txc);
     } else {
       _txc_finish_fsync(txc);
@@ -3063,6 +3150,7 @@ int NewStore::_do_write(TransContext *txc,
       o->onode.size == 0 ||
       o->onode.data_map.empty()) {
     _do_overlay_clear(txc, o);
+    uint64_t x_offset;
     if (o->onode.data_map.empty()) {
       // create
       fragment_t &f = o->onode.data_map[0];
@@ -3073,7 +3161,7 @@ int NewStore::_do_write(TransContext *txc,
        r = fd;
        goto out;
       }
-      ::lseek64(fd, offset, SEEK_SET);
+      x_offset = offset;
       dout(20) << __func__ << " create " << f.fid << " writing "
               << offset << "~" << length << dendl;
     } else {
@@ -3087,17 +3175,32 @@ int NewStore::_do_write(TransContext *txc,
       }
       ::ftruncate(fd, f.length);  // in case there is trailing crap
       f.length = (offset + length) - f.offset;
-      ::lseek64(fd, offset - f.offset, SEEK_SET);
+      x_offset = offset - f.offset;
       dout(20) << __func__ << " append " << f.fid << " writing "
               << (offset - f.offset) << "~" << length << dendl;
     }
     if (offset + length > o->onode.size) {
       o->onode.size = offset + length;
     }
-    r = bl.write_fd(fd);
-    if (r < 0) {
-      derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
-      goto out;
+#ifdef HAVE_LIBAIO
+    if (g_conf->newstore_aio && (flags & O_DIRECT)) {
+      txc->aios.push_back(FS::aio_t(txc, fd));
+      txc->num_aio.inc();
+      FS::aio_t& aio = txc->aios.back();
+      bl.prepare_iov(&aio.iov);
+      txc->aio_bl.append(bl);
+      aio.pwritev(x_offset);
+
+      dout(2) << __func__ << " prepared aio " << &aio << dendl;
+    } else
+#endif
+    {
+      ::lseek64(fd, x_offset, SEEK_SET);
+      r = bl.write_fd(fd);
+      if (r < 0) {
+       derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
+       goto out;
+      }
     }
     txc->sync_fd(fd);
     r = 0;
@@ -3128,12 +3231,26 @@ int NewStore::_do_write(TransContext *txc,
     dout(20) << __func__ << " replace old fid " << op->fid
             << " with new fid " << f.fid
             << ", writing " << offset << "~" << length << dendl;
-    r = bl.write_fd(fd);
-    if (r < 0) {
-      derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
-      goto out;
+
+#ifdef HAVE_LIBAIO
+    if (g_conf->newstore_aio && (flags & O_DIRECT)) {
+      txc->aios.push_back(FS::aio_t(txc, fd));
+      txc->num_aio.inc();
+      FS::aio_t& aio = txc->aios.back();
+      bl.prepare_iov(&aio.iov);
+      txc->aio_bl.append(bl);
+      aio.pwritev(0);
+      dout(2) << __func__ << " prepared aio " << &aio << dendl;
+    } else
+#endif
+    {
+      r = bl.write_fd(fd);
+      if (r < 0) {
+       derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
+       goto out;
+      }
+      txc->sync_fd(fd);
     }
-    txc->sync_fd(fd);
     r = 0;
     goto out;
   }
index 35b73770d63fc5c5b88194000fee04bbec66790d..f96f85270c594a5a6b16b7c396eac81cc0b7d06e 100644 (file)
@@ -15,6 +15,8 @@
 #ifndef CEPH_OSD_NEWSTORE_H
 #define CEPH_OSD_NEWSTORE_H
 
+#include "acconfig.h"
+
 #include <unistd.h>
 
 #include "include/assert.h"
@@ -143,6 +145,8 @@ public:
   struct TransContext {
     typedef enum {
       STATE_PREPARE,
+      STATE_AIO_QUEUED,
+      STATE_AIO_DONE,
       STATE_FSYNC_QUEUED,
       STATE_FSYNC_FSYNCING,
       STATE_FSYNC_DONE,
@@ -165,6 +169,8 @@ public:
       case STATE_FSYNC_QUEUED: return "fsync_queued";
       case STATE_FSYNC_FSYNCING: return "fsync_fsyncing";
       case STATE_FSYNC_DONE: return "fsync_done";
+      case STATE_AIO_QUEUED: return "aio_queued";
+      case STATE_AIO_DONE: return "aio_done";
       case STATE_KV_QUEUED: return "kv_queued";
       case STATE_KV_COMMITTING: return "kv_committing";
       case STATE_KV_DONE: return "kv_done";
@@ -194,6 +200,10 @@ public:
     wal_transaction_t *wal_txn; ///< wal transaction (if any)
     unsigned num_fsyncs_completed;
 
+    list<FS::aio_t> aios;
+    bufferlist aio_bl;  // just a pile of refs
+    atomic_t num_aio;
+
     Mutex lock;
     Cond cond;
 
@@ -207,6 +217,7 @@ public:
        onreadable_sync(NULL),
        wal_txn(NULL),
        num_fsyncs_completed(0),
+       num_aio(0),
        lock("NewStore::TransContext::lock") {
       //cout << "txc new " << this << std::endl;
     }
@@ -373,7 +384,9 @@ public:
   public:
     WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
       : ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
-       store(s) {
+       store(s),
+       ops(0),
+       bytes(0) {
     }
     bool _empty() {
       return wal_queue.empty();
@@ -445,6 +458,15 @@ public:
     }
   };
 
+  struct AioCompletionThread : public Thread {
+    NewStore *store;
+    AioCompletionThread(NewStore *s) : store(s) {}
+    void *entry() {
+      store->_aio_thread();
+      return NULL;
+    }
+  };
+
   // --------------------------------------------------------
   // members
 private:
@@ -479,6 +501,10 @@ private:
   ThreadPool fsync_tp;
   FsyncWQ fsync_wq;
 
+  AioCompletionThread aio_thread;
+  bool aio_stop;
+  FS::aio_queue_t aio_queue;
+
   KVSyncThread kv_sync_thread;
   Mutex kv_lock;
   Cond kv_cond, kv_sync_cond;
@@ -540,6 +566,10 @@ private:
 
   void _osr_reap_done(OpSequencer *osr);
 
+  void _aio_thread();
+  int _aio_start();
+  void _aio_stop();
+
   void _kv_sync_thread();
   void _kv_stop() {
     {