]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: move toward state-machine
authorSage Weil <sage@redhat.com>
Tue, 21 Apr 2015 00:10:19 +0000 (17:10 -0700)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:39 +0000 (13:39 -0400)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/config_opts.h
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h

index 8d26707c79b365410db4f6dc11bdfadd794e729d..e9020757aba8fe4cdb47e95bf0f781d6866fa8a1 100644 (file)
@@ -794,7 +794,8 @@ OPTION(newstore_max_dir_size, OPT_U32, 1000000)
 OPTION(newstore_onode_map_size, OPT_U32, 1024)   // onodes per collection
 OPTION(newstore_backend, OPT_STR, "rocksdb")
 OPTION(newstore_fail_eio, OPT_BOOL, true)
-OPTION(newstore_sync_queue_transaction, OPT_BOOL, false)  // perform write synchronously from queue_transaction
+OPTION(newstore_sync_io, OPT_BOOL, false)  // perform initial io synchronously
+OPTION(newstore_sync_transaction, OPT_BOOL, false)  // perform kv txn synchronously
 OPTION(newstore_fsync_threads, OPT_INT, 16)  // num threads calling fsync
 OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
 OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
index 246ff18699269caa734790ea4f2edc4e699614aa..25eb6412134e9a22985ec23f091af94fdb118505 100644 (file)
@@ -991,7 +991,7 @@ int NewStore::mount()
   if (r < 0)
     goto out_db;
 
-  r = _replay_wal();
+  r = _wal_replay();
   if (r < 0)
     goto out_aio;
 
@@ -2036,6 +2036,88 @@ NewStore::TransContext *NewStore::_txc_create(OpSequencer *osr)
   return txc;
 }
 
+void NewStore::_txc_state_proc(TransContext *txc)
+{
+  while (true) {
+    dout(10) << __func__ << " txc " << txc
+            << " " << txc->get_state_name() << dendl;
+    switch (txc->state) {
+    case TransContext::STATE_PREPARE:
+      if (!txc->aios.empty()) {
+       txc->state = TransContext::STATE_AIO_WAIT;
+       _txc_aio_submit(txc);
+       return;
+      }
+      // ** fall-thru **
+
+    case TransContext::STATE_AIO_WAIT:
+      if (!txc->fds.empty()) {
+       txc->state = TransContext::STATE_FSYNC_WAIT;
+       if (!g_conf->newstore_sync_io) {
+         _txc_queue_fsync(txc);
+         return;
+       }
+       _txc_do_sync_fsync(txc);
+      }
+      _txc_finish_io(txc);  // may trigger blocked txc's too
+      return;
+
+    case TransContext::STATE_IO_DONE:
+      assert(txc->osr->qlock.is_locked());  // see _txc_finish_io
+      txc->state = TransContext::STATE_KV_QUEUED;
+      if (!g_conf->newstore_sync_transaction) {
+       Mutex::Locker l(kv_lock);
+       db->submit_transaction(txc->t);
+       kv_queue.push_back(txc);
+       kv_cond.SignalOne();
+       return;
+      }
+      db->submit_transaction_sync(txc->t);
+      break;
+
+    case TransContext::STATE_KV_QUEUED:
+      txc->state = TransContext::STATE_KV_DONE;
+      _txc_finish_kv(txc);
+      // ** fall-thru **
+
+    case TransContext::STATE_KV_DONE:
+      if (txc->wal_txn) {
+       txc->state = TransContext::STATE_WAL_QUEUED;
+       wal_wq.queue(txc);
+       return;
+      }
+      txc->state = TransContext::STATE_FINISHING;
+      break;
+
+    case TransContext::STATE_WAL_APPLYING:
+      if (!txc->aios.empty()) {
+       txc->state = TransContext::STATE_WAL_AIO_WAIT;
+       _txc_aio_submit(txc);
+       return;
+      }
+      // ** fall-thru **
+
+    case TransContext::STATE_WAL_AIO_WAIT:
+      _wal_finish(txc);
+      return;
+
+    case TransContext::STATE_WAL_CLEANUP:
+      txc->state = TransContext::STATE_FINISHING;
+      // ** fall-thru **
+
+    case TransContext::TransContext::STATE_FINISHING:
+      _txc_finish(txc);
+      return;
+
+    default:
+      derr << __func__ << " unexpected txc " << txc
+          << " state " << txc->get_state_name() << dendl;
+      assert(0 == "unexpected txc state");
+      return;
+    }
+  }
+}
+
 void NewStore::_txc_process_fsync(fsync_item *i)
 {
   dout(20) << __func__ << " txc " << i->txc << dendl;
@@ -2049,12 +2131,12 @@ void NewStore::_txc_process_fsync(fsync_item *i)
   }
   VOID_TEMP_FAILURE_RETRY(::close(i->fd));
   if (i->txc->finish_fsync()) {
-    _txc_finish_fsync(i->txc);
+    _txc_finish_io(i->txc);
   }
   dout(20) << __func__ << " txc " << i->txc << " done" << dendl;
 }
 
-void NewStore::_txc_finish_fsync(TransContext *txc)
+void NewStore::_txc_finish_io(TransContext *txc)
 {
   dout(20) << __func__ << " " << txc << dendl;
 
@@ -2065,25 +2147,25 @@ void NewStore::_txc_finish_fsync(TransContext *txc)
 
   OpSequencer *osr = txc->osr.get();
   Mutex::Locker l(osr->qlock);
-  txc->state = TransContext::STATE_FSYNC_DONE;
+  txc->state = TransContext::STATE_IO_DONE;
 
   OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
   while (p != osr->q.begin()) {
     --p;
-    if (p->state < TransContext::STATE_FSYNC_DONE) {
+    if (p->state < TransContext::STATE_IO_DONE) {
       dout(20) << __func__ << " " << txc << " blocked by " << &*p << " "
               << p->get_state_name() << dendl;
       return;
     }
-    if (p->state > TransContext::STATE_FSYNC_DONE) {
+    if (p->state > TransContext::STATE_IO_DONE) {
       ++p;
       break;
     }
   }
   do {
-    _txc_submit_kv(&*p++);
+    _txc_state_proc(&*p++);
   } while (p != osr->q.end() &&
-          p->state == TransContext::STATE_FSYNC_DONE);
+          p->state == TransContext::STATE_IO_DONE);
 }
 
 int NewStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
@@ -2119,7 +2201,6 @@ int NewStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
 void NewStore::_txc_queue_fsync(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << dendl;
-  txc->state = TransContext::STATE_FSYNC_QUEUED;
   fsync_wq.lock();
   for (list<fsync_item>::iterator p = txc->fds.begin();
        p != txc->fds.end();
@@ -2130,22 +2211,25 @@ void NewStore::_txc_queue_fsync(TransContext *txc)
   fsync_wq.unlock();
 }
 
-void NewStore::_txc_submit_kv(TransContext *txc)
+void NewStore::_txc_do_sync_fsync(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << dendl;
-  txc->state = TransContext::STATE_KV_QUEUED;
-
-  Mutex::Locker l(kv_lock);
-  db->submit_transaction(txc->t);
-  kv_queue.push_back(txc);
-  kv_cond.SignalOne();
+  for (list<fsync_item>::iterator p = txc->fds.begin();
+       p != txc->fds.end(); ++p) {
+    dout(30) << __func__ << " fsync " << p->fd << dendl;
+    int r = ::fdatasync(p->fd);
+    if (r < 0) {
+      r = -errno;
+      derr << __func__ << " fsync: " << cpp_strerror(r) << dendl;
+      assert(0 == "fsync error");
+    }
+    VOID_TEMP_FAILURE_RETRY(::close(p->fd));
+  }
 }
 
 void NewStore::_txc_finish_kv(TransContext *txc)
 {
   dout(20) << __func__ << " txc " << txc << dendl;
-  txc->osr->qlock.Lock();
-  txc->state = TransContext::STATE_KV_DONE;
 
   // warning: we're calling onreadable_sync inside the sequencer lock
   if (txc->onreadable_sync) {
@@ -2164,20 +2248,9 @@ void NewStore::_txc_finish_kv(TransContext *txc)
     finisher.queue(txc->oncommits.front());
     txc->oncommits.pop_front();
   }
-
-  if (txc->wal_txn) {
-    dout(20) << __func__ << " starting wal apply" << dendl;
-    txc->state = TransContext::STATE_WAL_QUEUED;
-    txc->osr->qlock.Unlock();
-    wal_wq.queue(txc);
-  } else {
-    txc->state = TransContext::STATE_FINISHING;
-    txc->osr->qlock.Unlock();
-    _txc_finish_apply(txc);
-  }
 }
 
-void NewStore::_txc_finish_apply(TransContext *txc)
+void NewStore::_txc_finish(TransContext *txc)
 {
   dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
   assert(txc->state == TransContext::STATE_FINISHING);
@@ -2250,23 +2323,7 @@ void NewStore::_aio_thread()
               << left << " aios left" << dendl;
       VOID_TEMP_FAILURE_RETRY(::close(aio->fd));
       if (left == 0) {
-       switch (txc->state) {
-       case TransContext::STATE_AIO_QUEUED:
-         txc->state = TransContext::STATE_AIO_DONE;
-         if (!txc->fds.empty()) {
-           _txc_queue_fsync(txc);
-         } else {
-           _txc_finish_fsync(txc);
-         }
-         break;
-
-       case TransContext::STATE_WAL_AIO_WAIT:
-         _wal_finish(txc);
-         break;
-
-       default:
-         assert(0 == "unexpected txc state on aio completion");
-       }
+       _txc_state_proc(txc);
       }
     }
   }
@@ -2298,18 +2355,7 @@ void NewStore::_kv_sync_thread()
               << " in " << dur << dendl;
       while (!kv_committing.empty()) {
        TransContext *txc = kv_committing.front();
-       if (txc->state == TransContext::STATE_WAL_CLEANUP) {
-         txc->osr->qlock.Lock();
-         txc->state = TransContext::STATE_FINISHING;
-         txc->osr->qlock.Unlock();
-         _txc_finish_apply(txc);
-       } else if (txc->state == TransContext::STATE_KV_QUEUED) {
-         _txc_finish_kv(txc);
-       } else {
-         derr << __func__ << " unexpected txc state " << txc->get_state_name()
-              << dendl;
-         assert(0);
-       }
+       _txc_state_proc(txc);
        kv_committing.pop_front();
       }
 
@@ -2340,16 +2386,10 @@ int NewStore::_wal_apply(TransContext *txc)
 
   txc->aios.clear();
   int r = _do_wal_transaction(wt, txc);
-  if (r < 0)
-    return r;
+  assert(r == 0);
 
-  if (!txc->aios.empty()) {
-    _txc_aio_submit(txc);
-    txc->state = TransContext::STATE_WAL_AIO_WAIT;
-    return 0;
-  } else {
-    return _wal_finish(txc);
-  }
+  _txc_state_proc(txc);
+  return 0;
 }
 
 int NewStore::_wal_finish(TransContext *txc)
@@ -2362,9 +2402,7 @@ int NewStore::_wal_finish(TransContext *txc)
   KeyValueDB::Transaction cleanup = db->get_transaction();
   cleanup->rmkey(PREFIX_WAL, key);
 
-  txc->osr->qlock.Lock();
   txc->state = TransContext::STATE_WAL_CLEANUP;
-  txc->osr->qlock.Unlock();
 
   Mutex::Locker l(kv_lock);
   db->submit_transaction(cleanup);
@@ -2490,7 +2528,7 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt,
   return 0;
 }
 
-int NewStore::_replay_wal()
+int NewStore::_wal_replay()
 {
   dout(10) << __func__ << " start" << dendl;
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL);
@@ -2539,7 +2577,7 @@ int NewStore::queue_transactions(
     tls, &onreadable, &ondisk, &onreadable_sync);
   int r;
 
-  // throttle wal work
+  // throttle on wal work
   wal_wq.throttle(g_conf->newstore_wal_max_ops,
                  g_conf->newstore_wal_max_bytes);
 
@@ -2557,54 +2595,22 @@ int NewStore::queue_transactions(
     dout(5) << __func__ << " new " << *osr << "/" << osr->parent << dendl;
   }
 
+  // prepare
   TransContext *txc = _txc_create(osr);
+  txc->onreadable = onreadable;
+  txc->onreadable_sync = onreadable_sync;
+  txc->oncommit = ondisk;
 
-  // XXX do it sync for now; this is not crash safe
   for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
     (*p)->set_osr(osr);
-    _do_transaction(*p, txc, handle);
+    _txc_add_transaction(txc, *p);
   }
 
-  txc->onreadable = onreadable;
-  txc->onreadable_sync = onreadable_sync;
-  txc->oncommit = ondisk;
-
   r = _txc_finalize(osr, txc);
   assert(r == 0);
 
-  if (g_conf->newstore_sync_queue_transaction) {
-    // do it syncrhonously.  for example, if we have a *very* fast backend.
-
-    // sync
-    txc->state = TransContext::STATE_FSYNC_FSYNCING;
-    for (list<fsync_item>::iterator p = txc->fds.begin();
-        p != txc->fds.end(); ++p) {
-      dout(30) << __func__ << " fsync " << p->fd << dendl;
-      int r = ::fdatasync(p->fd);
-      if (r < 0) {
-       r = -errno;
-       derr << __func__ << " fsync: " << cpp_strerror(r) << dendl;
-       return r;
-      }
-      VOID_TEMP_FAILURE_RETRY(::close(p->fd));
-    }
-
-    txc->state = TransContext::STATE_KV_COMMITTING;
-    db->submit_transaction_sync(txc->t);
-
-    _txc_finish_kv(txc);
-  } else {
-    // async path
-    if (!txc->aios.empty()) {
-      _txc_aio_submit(txc);
-      txc->state = TransContext::STATE_AIO_QUEUED;
-    } else if (!txc->fds.empty()) {
-      _txc_queue_fsync(txc);
-    } else {
-      _txc_finish_fsync(txc);
-    }
-  }
-
+  // execute (start)
+  _txc_state_proc(txc);
   return 0;
 }
 
@@ -2617,7 +2623,7 @@ void NewStore::_txc_aio_submit(TransContext *txc)
        p != txc->aios.end();
        ++p) {
     FS::aio_t& aio = *p;
-    dout(20) << __func__ << " submitting aio " << &aio << dendl;
+    dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd << dendl;
     for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
       dout(30) << __func__ << "  iov " << (void*)q->iov_base
               << " len " << q->iov_len << dendl;
@@ -2631,9 +2637,7 @@ void NewStore::_txc_aio_submit(TransContext *txc)
   }
 }
 
-int NewStore::_do_transaction(Transaction *t,
-                             TransContext *txc,
-                             ThreadPool::TPHandle *handle)
+int NewStore::_txc_add_transaction(TransContext *txc, Transaction *t)
 {
   Transaction::iterator i = t->begin();
   int pos = 0;
index 93e547566b3e1332645598cb5d8c5c994cce15e5..961b55cce1e3386adbfb475708c15941aca15ca4 100644 (file)
@@ -145,11 +145,9 @@ public:
   struct TransContext {
     typedef enum {
       STATE_PREPARE,
-      STATE_AIO_QUEUED,
-      STATE_AIO_DONE,
-      STATE_FSYNC_QUEUED,
-      STATE_FSYNC_FSYNCING,
-      STATE_FSYNC_DONE,
+      STATE_FSYNC_WAIT,
+      STATE_AIO_WAIT,
+      STATE_IO_DONE,
       STATE_KV_QUEUED,
       STATE_KV_COMMITTING,
       STATE_KV_DONE,
@@ -167,11 +165,9 @@ public:
     const char *get_state_name() {
       switch (state) {
       case STATE_PREPARE: return "prepare";
-      case STATE_FSYNC_QUEUED: return "fsync_queued";
-      case STATE_FSYNC_FSYNCING: return "fsync_fsyncing";
-      case STATE_FSYNC_DONE: return "fsync_done";
-      case STATE_AIO_QUEUED: return "aio_queued";
-      case STATE_AIO_DONE: return "aio_done";
+      case STATE_FSYNC_WAIT: return "fsync_wait";
+      case STATE_AIO_WAIT: return "aio_wait";
+      case STATE_IO_DONE: return "io_done";
       case STATE_KV_QUEUED: return "kv_queued";
       case STATE_KV_COMMITTING: return "kv_committing";
       case STATE_KV_DONE: return "kv_done";
@@ -558,14 +554,16 @@ private:
   int _clean_fid_tail(TransContext *txc, const fragment_t& f);
 
   TransContext *_txc_create(OpSequencer *osr);
+  int _txc_add_transaction(TransContext *txc, Transaction *t);
   int _txc_finalize(OpSequencer *osr, TransContext *txc);
+  void _txc_state_proc(TransContext *txc);
   void _txc_aio_submit(TransContext *txc);
+  void _txc_do_sync_fsync(TransContext *txc);
   void _txc_queue_fsync(TransContext *txc);
   void _txc_process_fsync(fsync_item *i);
-  void _txc_finish_fsync(TransContext *txc);
-  void _txc_submit_kv(TransContext *txc);
+  void _txc_finish_io(TransContext *txc);
   void _txc_finish_kv(TransContext *txc);
-  void _txc_finish_apply(TransContext *txc);
+  void _txc_finish(TransContext *txc);
 
   void _osr_reap_done(OpSequencer *osr);
 
@@ -588,9 +586,7 @@ private:
   int _wal_apply(TransContext *txc);
   int _wal_finish(TransContext *txc);
   int _do_wal_transaction(wal_transaction_t& wt, TransContext *txc);
-  void _wait_object_wal(OnodeRef onode);
-  int _replay_wal();
-  friend class C_ApplyWAL;
+  int _wal_replay();
 
 public:
   NewStore(CephContext *cct, const string& path);