]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
FileStore: integrate WBThrottle
authorSamuel Just <sam.just@inktank.com>
Mon, 20 May 2013 20:44:57 +0000 (13:44 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 21 May 2013 23:37:44 +0000 (16:37 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/config_opts.h
src/os/FileStore.cc
src/os/FileStore.h

index 27593cb9f9df7730933de18f475d34bc9b14205d..27e2daceb3177d22d1a49072c324062434762850 100644 (file)
@@ -508,10 +508,6 @@ OPTION(filestore_btrfs_snap, OPT_BOOL, true)
 OPTION(filestore_btrfs_clone_range, OPT_BOOL, true)
 OPTION(filestore_fsync_flushes_journal_data, OPT_BOOL, false)
 OPTION(filestore_fiemap, OPT_BOOL, false)     // (try to) use fiemap
-OPTION(filestore_flusher, OPT_BOOL, true)
-OPTION(filestore_flusher_max_fds, OPT_INT, 512)
-OPTION(filestore_flush_min, OPT_INT, 65536)
-OPTION(filestore_sync_flush, OPT_BOOL, false)
 OPTION(filestore_journal_parallel, OPT_BOOL, false)
 OPTION(filestore_journal_writeahead, OPT_BOOL, false)
 OPTION(filestore_journal_trailing, OPT_BOOL, false)
index 24f7f1eff0806262355759b5504b03f3ad141eee..dca8a1bbfea4d0e289d42f9ec336cbb9470aac84 100644 (file)
@@ -353,6 +353,7 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
       if (g_conf->filestore_debug_inject_read_err) {
        debug_obj_on_delete(o);
       }
+      wbthrottle.clear_object(o); // should be only non-cache ref
       fdcache.clear(o);
     } else {
       /* Ensure that replay of this op doesn't result in the object_map
@@ -388,6 +389,7 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
   stop(false), sync_thread(this),
   fdcache_lock("fdcache_lock"),
   fdcache(g_ceph_context),
+  wbthrottle(g_ceph_context),
   default_osr("default"),
   op_queue_len(0), op_queue_bytes(0),
   op_throttle_lock("FileStore::op_throttle_lock"),
@@ -395,22 +397,17 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha
   op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
   op_wq(this, g_conf->filestore_op_thread_timeout,
        g_conf->filestore_op_thread_suicide_timeout, &op_tp),
-  flusher_queue_len(0), flusher_thread(this),
   logger(NULL),
   read_error_lock("FileStore::read_error_lock"),
   m_filestore_btrfs_clone_range(g_conf->filestore_btrfs_clone_range),
   m_filestore_btrfs_snap (g_conf->filestore_btrfs_snap ),
   m_filestore_commit_timeout(g_conf->filestore_commit_timeout),
   m_filestore_fiemap(g_conf->filestore_fiemap),
-  m_filestore_flusher (g_conf->filestore_flusher ),
   m_filestore_fsync_flushes_journal_data(g_conf->filestore_fsync_flushes_journal_data),
   m_filestore_journal_parallel(g_conf->filestore_journal_parallel ),
   m_filestore_journal_trailing(g_conf->filestore_journal_trailing),
   m_filestore_journal_writeahead(g_conf->filestore_journal_writeahead),
   m_filestore_fiemap_threshold(g_conf->filestore_fiemap_threshold),
-  m_filestore_sync_flush(g_conf->filestore_sync_flush),
-  m_filestore_flusher_max_fds(g_conf->filestore_flusher_max_fds),
-  m_filestore_flush_min(g_conf->filestore_flush_min),
   m_filestore_max_sync_interval(g_conf->filestore_max_sync_interval),
   m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval),
   m_filestore_fail_eio(g_conf->filestore_fail_eio),
@@ -1068,6 +1065,7 @@ int FileStore::_detect_fs()
 #if defined(__linux__)
   if (st.f_type == BTRFS_SUPER_MAGIC) {
     dout(0) << "mount detected btrfs" << dendl;      
+    wbthrottle.set_fs(WBThrottle::BTRFS);
     btrfs = true;
 
     btrfs_stable_commits = btrfs && m_filestore_btrfs_snap;
@@ -1779,7 +1777,6 @@ int FileStore::mount()
   journal_start();
 
   op_tp.start();
-  flusher_thread.create();
   op_finisher.start();
   ondisk_finisher.start();
 
@@ -1817,11 +1814,9 @@ int FileStore::umount()
   lock.Lock();
   stop = true;
   sync_cond.Signal();
-  flusher_cond.Signal();
   lock.Unlock();
   sync_thread.join();
   op_tp.stop();
-  flusher_thread.join();
 
   journal_stop();
 
@@ -1969,6 +1964,7 @@ void FileStore::op_queue_release_throttle(Op *o)
 
 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
 {
+  wbthrottle.throttle();
   // inject a stall?
   if (g_conf->filestore_inject_stall) {
     int orig = g_conf->filestore_inject_stall;
@@ -2952,38 +2948,7 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
     r = bl.length();
 
   // flush?
-  {
-    bool should_flush = (ssize_t)len >= m_filestore_flush_min;
-    bool local_flush = false;
-#ifdef HAVE_SYNC_FILE_RANGE
-    bool async_done = false;
-    if (!should_flush ||
-       !m_filestore_flusher ||
-       !(async_done = queue_flusher(**fd, offset, len, replica))) {
-      if (should_flush && m_filestore_sync_flush) {
-       ::sync_file_range(**fd, offset, len, SYNC_FILE_RANGE_WRITE);
-       local_flush = true;
-      }
-    }
-    // TODOSAM: this will be fixed in a subsequent patch
-    //Both lfn_close() and possible posix_fadvise() done by flusher
-    //if (async_done) fd = -1;
-#else
-    // no sync_file_range; (maybe) flush inline and close.
-    if (should_flush && m_filestore_sync_flush) {
-      ::fdatasync(**fd);
-      local_flush = true;
-    }
-#endif
-    if (local_flush && replica && m_filestore_replica_fadvise) {
-      int fa_r = posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
-      if (fa_r) {
-       dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
-      } else {
-       dout(10) << "posix_fadvise performed after local flush" << dendl;
-      }
-    }
-  }
+  wbthrottle.queue_wb(fd, oid, offset, len, replica);
   lfn_close(fd);
 
  out:
@@ -3277,90 +3242,6 @@ int FileStore::_clone_range(coll_t cid, const hobject_t& oldoid, const hobject_t
   return r;
 }
 
-
-bool FileStore::queue_flusher(int fd, uint64_t off, uint64_t len, bool replica)
-{
-  bool queued;
-  lock.Lock();
-  if (flusher_queue_len < m_filestore_flusher_max_fds) {
-    flusher_queue.push_back(sync_epoch);
-    flusher_queue.push_back(fd);
-    flusher_queue.push_back(off);
-    flusher_queue.push_back(len);
-    flusher_queue.push_back(replica);
-    flusher_queue_len++;
-    flusher_cond.Signal();
-    dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
-            << " qlen " << flusher_queue_len
-            << dendl;
-    queued = true;
-  } else {
-    dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
-            << " qlen " << flusher_queue_len 
-            << " hit flusher_max_fds " << m_filestore_flusher_max_fds
-            << ", skipping async flush" << dendl;
-    queued = false;
-  }
-  lock.Unlock();
-  return queued;
-}
-
-void FileStore::flusher_entry()
-{
-  lock.Lock();
-  dout(20) << "flusher_entry start" << dendl;
-  while (true) {
-    if (!flusher_queue.empty()) {
-#ifdef HAVE_SYNC_FILE_RANGE
-      list<uint64_t> q;
-      q.swap(flusher_queue);
-
-      int num = flusher_queue_len;  // see how many we're taking, here
-
-      lock.Unlock();
-      while (!q.empty()) {
-       uint64_t ep = q.front();
-       q.pop_front();
-       int fd = q.front();
-       q.pop_front();
-       uint64_t off = q.front();
-       q.pop_front();
-       uint64_t len = q.front();
-       q.pop_front();
-       bool replica = q.front();
-       q.pop_front();
-       if (!stop && ep == sync_epoch) {
-         dout(10) << "flusher_entry flushing+closing " << fd << " ep " << ep << dendl;
-         ::sync_file_range(fd, off, len, SYNC_FILE_RANGE_WRITE);
-         if (replica && m_filestore_replica_fadvise) {
-           int fa_r = posix_fadvise(fd, off, len, POSIX_FADV_DONTNEED);
-           if (fa_r) {
-             dout(0) << "posic_fadvise failed: " << cpp_strerror(fa_r) << dendl;
-           } else {
-             dout(10) << "posix_fadvise performed after local flush" << dendl;
-           }
-         }
-       } else 
-         dout(10) << "flusher_entry JUST closing " << fd << " (stop=" << stop << ", ep=" << ep
-                  << ", sync_epoch=" << sync_epoch << ")" << dendl;
-       // TODOSAM: this will be replaced in a subsequent patch
-       //lfn_close(fd);
-      }
-      lock.Lock();
-      flusher_queue_len -= num;   // they're definitely closed, forget
-#endif
-    } else {
-      if (stop)
-       break;
-      dout(20) << "flusher_entry sleeping" << dendl;
-      flusher_cond.Wait(lock);
-      dout(20) << "flusher_entry awoke" << dendl;
-    }
-  }
-  dout(20) << "flusher_entry finish" << dendl;
-  lock.Unlock();
-}
-
 class SyncEntryTimeout : public Context {
 public:
   SyncEntryTimeout(int commit_timeo) 
@@ -3553,6 +3434,7 @@ void FileStore::sync_entry()
       logger->tinc(l_os_commit_len, dur);
 
       apply_manager.commit_finish();
+      wbthrottle.clear();
 
       logger->set(l_os_committing, 0);
 
@@ -4919,9 +4801,6 @@ const char** FileStore::get_tracked_conf_keys() const
     "filestore_queue_max_bytes",
     "filestore_queue_committing_max_ops",
     "filestore_queue_committing_max_bytes",
-    "filestore_flusher",
-    "filestore_flusher_max_fds",
-    "filestore_sync_flush",
     "filestore_commit_timeout",
     "filestore_dump_file",
     "filestore_kill_at",
@@ -4941,9 +4820,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
       changed.count("filestore_queue_max_bytes") ||
       changed.count("filestore_queue_committing_max_ops") ||
       changed.count("filestore_queue_committing_max_bytes") ||
-      changed.count("filestore_flusher") ||
-      changed.count("filestore_flusher_max_fds") ||
-      changed.count("filestore_flush_min") ||
       changed.count("filestore_kill_at") ||
       changed.count("filestore_fail_eio") ||
       changed.count("filestore_replica_fadvise")) {
@@ -4954,10 +4830,6 @@ void FileStore::handle_conf_change(const struct md_config_t *conf,
     m_filestore_queue_max_bytes = conf->filestore_queue_max_bytes;
     m_filestore_queue_committing_max_ops = conf->filestore_queue_committing_max_ops;
     m_filestore_queue_committing_max_bytes = conf->filestore_queue_committing_max_bytes;
-    m_filestore_flusher = conf->filestore_flusher;
-    m_filestore_flusher_max_fds = conf->filestore_flusher_max_fds;
-    m_filestore_flush_min = conf->filestore_flush_min;
-    m_filestore_sync_flush = conf->filestore_sync_flush;
     m_filestore_kill_at.set(conf->filestore_kill_at);
     m_filestore_fail_eio = conf->filestore_fail_eio;
     m_filestore_replica_fadvise = conf->filestore_replica_fadvise;
index 00249f274c11744e0c261bda07a53fa79b939a56..78668dd92a41afce674ec1354ce55db46b2ffe3e 100644 (file)
@@ -41,6 +41,7 @@ using namespace __gnu_cxx;
 #include "ObjectMap.h"
 #include "SequencerPosition.h"
 #include "FDCache.h"
+#include "WBThrottle.h"
 
 #include "include/uuid.h"
 
@@ -201,6 +202,8 @@ private:
 
   Mutex fdcache_lock;
   FDCache fdcache;
+  WBThrottle wbthrottle;
+
   Sequencer default_osr;
   deque<OpSequencer*> op_queue;
   uint64_t op_queue_len, op_queue_bytes;
@@ -253,24 +256,8 @@ private:
   void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
   friend class C_JournaledAhead;
 
-  // flusher thread
-  Cond flusher_cond;
-  list<uint64_t> flusher_queue;
-  int flusher_queue_len;
-  void flusher_entry();
-  struct FlusherThread : public Thread {
-    FileStore *fs;
-    FlusherThread(FileStore *f) : fs(f) {}
-    void *entry() {
-      fs->flusher_entry();
-      return 0;
-    }
-  } flusher_thread;
-  bool queue_flusher(int fd, uint64_t off, uint64_t len, bool replica);
-
   int open_journal();
 
-
   PerfCounters *logger;
 
 public:
@@ -514,15 +501,11 @@ private:
   bool m_filestore_btrfs_snap;
   float m_filestore_commit_timeout;
   bool m_filestore_fiemap;
-  bool m_filestore_flusher;
   bool m_filestore_fsync_flushes_journal_data;
   bool m_filestore_journal_parallel;
   bool m_filestore_journal_trailing;
   bool m_filestore_journal_writeahead;
   int m_filestore_fiemap_threshold;
-  bool m_filestore_sync_flush;
-  int m_filestore_flusher_max_fds;
-  int m_filestore_flush_min;
   double m_filestore_max_sync_interval;
   double m_filestore_min_sync_interval;
   bool m_filestore_fail_eio;